Skip to content
This repository has been archived by the owner on Dec 11, 2023. It is now read-only.

Commit

Permalink
Merge pull request #121 from librato/feature/md
Browse files Browse the repository at this point in the history
Support for tagged measurements
  • Loading branch information
Chance Feick committed Nov 4, 2016
2 parents 7e5ce85 + 9c04838 commit 42407f2
Show file tree
Hide file tree
Showing 13 changed files with 623 additions and 65 deletions.
22 changes: 11 additions & 11 deletions lib/librato/metrics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
require 'metrics/persistence'
require 'metrics/queue'
require 'metrics/smart_json'
require 'metrics/util'
require 'metrics/version'

module Librato
Expand Down Expand Up @@ -65,23 +66,22 @@ module Metrics
extend SingleForwardable

TYPES = [:counter, :gauge]
PLURAL_TYPES = [:counters, :gauges]
PLURAL_TYPES = TYPES.map { |type| "#{type}s".to_sym }
MIN_MEASURE_TIME = (Time.now-(3600*24*365)).to_i

# Most of the singleton methods of Librato::Metrics are actually
# being called on a global Client instance. See further docs on
# Client.
#
def_delegators :client, :agent_identifier, :annotate, :api_endpoint,
:api_endpoint=, :authenticate, :connection,
:proxy, :proxy=,
:faraday_adapter, :faraday_adapter=,
:persistence, :persistence=, :persister,
:get_composite, :get_metric, :get_measurements, :metrics,
:delete_metrics, :update_metric, :update_metrics,
:submit,
:sources, :get_source, :update_source,
:create_snapshot, :get_snapshot
def_delegators :client, :agent_identifier, :annotate,
:api_endpoint, :api_endpoint=, :authenticate,
:connection, :create_snapshot, :delete_metrics,
:faraday_adapter, :faraday_adapter=, :get_composite,
:get_measurements, :get_metric, :get_series,
:get_snapshot, :get_source, :metrics,
:persistence, :persistence=, :persister, :proxy, :proxy=,
:sources, :submit, :update_metric, :update_metrics,
:update_source

# The Librato::Metrics::Client being used by module-level
# access.
Expand Down
60 changes: 38 additions & 22 deletions lib/librato/metrics/aggregator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ module Metrics
# queue.merge!(aggregator)
#
class Aggregator
SOURCE_SEPARATOR = '%%' # must not be in valid source name criteria
SEPARATOR = '%%' # must not be in valid tags and/or source criteria

include Processor

Expand Down Expand Up @@ -52,20 +52,29 @@ def initialize(opts={})
# @return [Aggregator] returns self
def add(measurements)
measurements.each do |metric, data|
entry = {}
if @prefix
metric = "#{@prefix}.#{metric}"
end
entry[:name] = metric.to_s
if data.respond_to?(:each) # hash form
validate_parameters(data)
value = data[:value]
if data[:source]
metric = "#{metric}#{SOURCE_SEPARATOR}#{data[:source]}"
metric = "#{metric}#{SEPARATOR}#{data[:source]}"
entry[:source] = data[:source].to_s
elsif data[:tags] && data[:tags].respond_to?(:each)
metric = Librato::Metrics::Util.build_key_for(metric.to_s, data[:tags])
entry[:tags] = data[:tags]
end
else
value = data
end

@aggregated[metric] ||= Aggregate.new
@aggregated[metric] << value
@aggregated[metric] = {} unless @aggregated[metric]
@aggregated[metric][:aggregate] ||= Aggregate.new
@aggregated[metric][:aggregate] << value
@aggregated[metric].merge!(entry)
end
autosubmit_check
self
Expand All @@ -88,31 +97,38 @@ def clear
# Returns currently queued data
#
def queued
gauges = []
entries = []
multidimensional = has_tags?

@aggregated.each do |metric, data|
source = nil
metric = metric.to_s
if metric.include?(SOURCE_SEPARATOR)
metric, source = metric.split(SOURCE_SEPARATOR)
end
@aggregated.each_value do |data|
entry = {
name: metric,
count: data.count,
sum: data.sum,

name: data[:name],
count: data[:aggregate].count,
sum: data[:aggregate].sum,
# TODO: make float/non-float consistent in the gem
min: data.min.to_f,
max: data.max.to_f
min: data[:aggregate].min.to_f,
max: data[:aggregate].max.to_f
# TODO: expose v.sum2 and include
}
entry[:source] = source if source
gauges << entry
if data[:source]
entry[:source] = data[:source]
elsif data[:tags]
multidimensional = true
entry[:tags] = data[:tags]
end
multidimensional = true if data[:time]
entries << entry
end

req = { gauges: gauges }
time = multidimensional ? :time : :measure_time
req =
if multidimensional
{ measurements: entries }
else
{ gauges: entries }
end
req[:source] = @source if @source
req[:measure_time] = @measure_time if @measure_time
req[:tags] = @tags if has_tags?
req[time] = @time if @time

req
end
Expand Down
34 changes: 34 additions & 0 deletions lib/librato/metrics/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,40 @@ def get_metric(name, options = {})
parsed
end

# Retrieve series of measurements for a given metric
#
# @example Get series for metric
# series = Librato::Metrics.get_series :requests, resolution: 1, duration: 3600
#
# @example Get series for metric grouped by tag
# query = { duration: 3600, resolution: 1, group_by: "environment", group_by_function: "sum" }
# series = Librato::Metrics.get_series :requests, query
#
# @example Get series for metric grouped by tag and negated by tag filter
# query = { duration: 3600, resolution: 1, group_by: "environment", group_by_function: "sum", tags_search: "environment=!staging" }
# series = Librato::Metrics.get_series :requests, query
#
# @param [Symbol|String] metric_name Metric name
# @param [Hash] options Query options
def get_series(metric_name, options={})
raise ArgumentError, ":resolution and :duration or :start_time must be set" if options.empty?
query = options.dup
if query[:start_time].respond_to?(:year)
query[:start_time] = query[:start_time].to_i
end
if query[:end_time].respond_to?(:year)
query[:end_time] = query[:end_time].to_i
end
query[:resolution] ||= 1
unless query[:start_time] || query[:end_time]
query[:duration] ||= 3600
end
url = connection.build_url("measurements/#{metric_name}", query)
response = connection.get(url)
parsed = SmartJSON.read(response.body)
parsed["series"]
end

# Retrieve data points for a specific metric
#
# @example Get 20 most recent data points for metric
Expand Down
1 change: 1 addition & 0 deletions lib/librato/metrics/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class NoMetricsProvided < MetricsError; end
class NoClientProvided < MetricsError; end
class InvalidMeasureTime < MetricsError; end
class NotMergeable < MetricsError; end
class InvalidParameters < MetricsError; end

class NetworkError < StandardError
attr_reader :response
Expand Down
26 changes: 17 additions & 9 deletions lib/librato/metrics/persistence/direct.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ module Librato
module Metrics
module Persistence
class Direct
MEASUREMENT_TYPES = [:gauges, :counters]

# Persist the queued metrics directly to the
# Metrics web API.
#
Expand All @@ -18,9 +16,15 @@ def persist(client, queued, options={})
requests = [queued]
end
requests.each do |request|
resource =
if queued[:gauges] || queued[:counters]
"metrics"
else
"measurements"
end
payload = SmartJSON.write(request)
# expects 200
client.connection.post('metrics', payload)
client.connection.post(resource, payload)
end
end

Expand All @@ -31,16 +35,16 @@ def chunk_queued(queued, per_request)
reqs = []
# separate metric-containing values from global values
globals = fetch_globals(queued)
MEASUREMENT_TYPES.each do |metric_type|
metrics = queued[metric_type]
top_level_keys.each do |key|
metrics = queued[key]
next unless metrics
if metrics.size <= per_request
# we can fit all of this metric type in a single request
reqs << build_request(metric_type, metrics, globals)
reqs << build_request(key, metrics, globals)
else
# going to have to split things up
metrics.each_slice(per_request) do |elements|
reqs << build_request(metric_type, elements, globals)
reqs << build_request(key, elements, globals)
end
end
end
Expand All @@ -51,8 +55,12 @@ def build_request(type, metrics, globals)
{type => metrics}.merge(globals)
end

def top_level_keys
[Librato::Metrics::PLURAL_TYPES, :measurements].flatten
end

def fetch_globals(queued)
queued.reject {|k, v| MEASUREMENT_TYPES.include?(k)}
queued.reject { |k, v| top_level_keys.include?(k) }
end

def queue_count(queued)
Expand All @@ -62,4 +70,4 @@ def queue_count(queued)
end
end
end
end
end
29 changes: 27 additions & 2 deletions lib/librato/metrics/processor.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require "set"

module Librato
module Metrics

Expand All @@ -7,7 +9,11 @@ module Processor
MEASUREMENTS_PER_REQUEST = 500

attr_reader :per_request, :last_submit_time
attr_accessor :prefix
attr_accessor :prefix, :tags

def tags
@tags ||= {}
end

# The current Client instance this queue is using to authenticate
# and connect to Librato Metrics. This will default to the primary
Expand All @@ -19,6 +25,11 @@ def client
@client ||= Librato::Metrics.client
end

def has_tags?
!@tags.empty?
end
alias :tags? :has_tags?

# The object this MetricSet will use to persist
#
def persister
Expand Down Expand Up @@ -82,11 +93,13 @@ def epoch_time
end

def setup_common_options(options)
validate_parameters(options)
@autosubmit_interval = options[:autosubmit_interval]
@client = options[:client] || Librato::Metrics.client
@per_request = options[:per_request] || MEASUREMENTS_PER_REQUEST
@source = options[:source]
@measure_time = options[:measure_time] && options[:measure_time].to_i
@tags = options.fetch(:tags, {})
@time = (options[:time] && options[:time].to_i || options[:measure_time] && options[:measure_time].to_i)
@create_time = Time.now
@clear_on_failure = options[:clear_failures] || false
@prefix = options[:prefix]
Expand All @@ -99,6 +112,18 @@ def autosubmit_check
end
end

def validate_parameters(options)
invalid_combinations = [
[:source, :tags],
]
opts = options.keys.to_set
invalid_combinations.each do |combo|
if combo.to_set.subset?(opts)
raise InvalidParameters, "#{combo} cannot be simultaneously set"
end
end
end

end

end
Expand Down
Loading

0 comments on commit 42407f2

Please sign in to comment.