Skip to content

Commit

Permalink
Merge pull request #6 from meetcleo/PLT-158-histo
Browse files Browse the repository at this point in the history
[PLT-158] Ability to publish histogram buckets
  • Loading branch information
joshuafleck authored Dec 12, 2023
2 parents fc102f8 + 4b42c0c commit 226703b
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 73 deletions.
3 changes: 3 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,6 @@ StatsD/MeasureAsDistArgument:

StatsD/MetricPrefixArgument:
Enabled: true

Layout/LineLength:
Enabled: false
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ The following environment variables are supported:
- `STATSD_PROMETHEUS_AUTH`: The API key (or password for basic auth) for the prometheus endpoint. If set, will send batch stats in prometheus-compatible format
- `STATSD_PROMETHEUS_BASIC_AUTH_USER`: The user to use if basic auth is required.
- `STATSD_PROMETHEUS_PERCENTILES`: The percentiles that will be calulcated when aggregating timers for prometheus. 95,99 are the default.
- `STATSD_PROMETHEUS_HISTOGRAMS`: The histogram buckets that will be used when aggregating timers for prometheus. Buckets are expected to be defined in milliseconds. 5,10,25,50,75,100,250,500,750,1000,2500,5000,7500,10000 are the default buckets.
- `STATSD_PROMETHEUS_APPLICATION_NAME`: The application name that will be included as a tag in all metrics sent to prometheus.
- `STATSD_PROMETHEUS_SUBSYSTEM`: The subsystem that will be included as a tag in all metrics sent to prometheus.
- `STATSD_PROMETHEUS_OPEN_TIMEOUT`: The timeout for connecting to the Prometheus backend (default is 2s).
Expand Down
2 changes: 1 addition & 1 deletion lib/statsd/instrument.rb
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ def singleton_client
# @!method distribution(name, value = nil, sample_rate: nil, tags: nil, &block)
# (see StatsD::Instrument::Client#distribution)
#
# @!method event(title, text, tags: nil, hostname: nil, timestamp: nil, aggregation_key: nil, priority: nil, source_type_name: nil, alert_type: nil) # rubocop:disable Layout/LineLength
# @!method event(title, text, tags: nil, hostname: nil, timestamp: nil, aggregation_key: nil, priority: nil, source_type_name: nil, alert_type: nil)
# (see StatsD::Instrument::Client#event)
#
# @!method service_check(name, status, tags: nil, hostname: nil, timestamp: nil, message: nil)
Expand Down
5 changes: 5 additions & 0 deletions lib/statsd/instrument/environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ def prometheus_percentiles
env.fetch("STATSD_PROMETHEUS_PERCENTILES", "95,99").split(",").map(&:to_i)
end

def prometheus_histograms
env.fetch("STATSD_PROMETHEUS_HISTOGRAMS", "5,10,25,50,75,100,250,500,750,1000,2500,5000,7500,10000").split(",").map(&:to_i)
end

def statsd_max_packet_size
default_statsd_max_packet_size = prometheus? ? StatsD::Instrument::Prometheus::BatchedPrometheusSink::DEFAULT_MAX_PACKET_SIZE : StatsD::Instrument::BatchedUDPSink::DEFAULT_MAX_PACKET_SIZE
Float(env.fetch("STATSD_MAX_PACKET_SIZE", default_statsd_max_packet_size))
Expand Down Expand Up @@ -171,6 +175,7 @@ def default_sink_for_environment
seconds_between_flushes: prometheus_seconds_between_flushes,
max_fill_ratio: prometheus_max_fill_ratio,
basic_auth_user: prometheus_basic_auth_user,
histograms: prometheus_histograms,
)
elsif statsd_batching?
StatsD::Instrument::BatchedUDPSink.for_addr(
Expand Down
7 changes: 4 additions & 3 deletions lib/statsd/instrument/prometheus/aggregator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ module StatsD
module Instrument
module Prometheus
class Aggregator
def initialize(datagrams, percentiles = nil)
def initialize(datagrams, percentiles = nil, histograms = nil)
@datagrams = datagrams
@percentiles = percentiles
@histograms = histograms
@pre_aggregation_number_of_metrics = 0
@number_of_metrics_failed_to_parse = 0
end
Expand All @@ -19,7 +20,7 @@ def run

private

attr_reader :datagrams, :percentiles
attr_reader :datagrams, :percentiles, :histograms

def datagrams_by_type_then_key
datagrams.split.map do |datagram|
Expand All @@ -33,7 +34,7 @@ def datagrams_by_type_then_key
def aggregated_datagrams
datagrams_by_type_then_key.flat_map do |type, datagrams_by_key|
datagrams_by_key.flat_map do |_, datagrams_for_key|
aggregation_class_for_type(type).new(datagrams_for_key, percentiles: percentiles).aggregate
aggregation_class_for_type(type).new(datagrams_for_key, percentiles: percentiles, histograms: histograms).aggregate
end
end
end
Expand Down
97 changes: 52 additions & 45 deletions lib/statsd/instrument/prometheus/aggregators/timing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,74 +13,63 @@ def aggregate
values = datagrams.map(&:value).sort
count = values.length
min = values.first
max = values.last

cumulative_values = [min]
cumulative_sum_squares_values = [min * min]
for i in 1..(count - 1) do # rubocop:disable Style/for
cumulative_values.push(values[i] + cumulative_values[i - 1])
cumulative_sum_squares_values.push((values[i] * values[i]) + cumulative_sum_squares_values[i - 1])
end

sum = min
sum_squares = min * min
mean = min
threshold_boundary = max
calculate_percentiles(count, cumulative_values, current_timer_data, current_timer_count_data)
calculate_base(count, cumulative_values, current_timer_data, current_timer_count_data)
current_timer_histogram_buckets = calculate_histograms(values)

percentiles.each do |percentile_threshold|
last_datagram = datagrams.last
timer_data_to_datagrams(current_timer_data, last_datagram) +
timer_count_data_to_datagrams(current_timer_count_data, last_datagram) +
timer_histogram_data_to_datagrams(current_timer_histogram_buckets, last_datagram)
end

private

def calculate_base(count, cumulative_values, current_timer_data, current_timer_count_data)
sum = cumulative_values[count - 1]
current_timer_count_data["count"] = count.to_i
current_timer_data["sum"] = sum
end

def calculate_percentiles(count, cumulative_values, current_timer_data, current_timer_count_data)
sum = cumulative_values[0]
percentiles.sort.each do |percentile_threshold|
count_within_percentile_threshold = count.to_f

if count > 1
count_within_percentile_threshold = (percentile_threshold.abs / 100.0 * count).round
next if count_within_percentile_threshold == 0

if percentile_threshold > 0
threshold_boundary = values[count_within_percentile_threshold - 1]
sum = cumulative_values[count_within_percentile_threshold - 1]
sum_squares = cumulative_sum_squares_values[count_within_percentile_threshold - 1]
sum = if percentile_threshold > 0
cumulative_values[count_within_percentile_threshold - 1]
else
threshold_boundary = values[count - count_within_percentile_threshold]
sum = cumulative_values[count - 1] - cumulative_values[count - count_within_percentile_threshold - 1]
sum_squares = cumulative_sum_squares_values[count - 1] - cumulative_sum_squares_values[count - count_within_percentile_threshold - 1]
cumulative_values[count - 1] - cumulative_values[count - count_within_percentile_threshold - 1]
end
mean = sum / count_within_percentile_threshold
end

clean_percentile_threshold = percentile_threshold.to_s
clean_percentile_threshold = clean_percentile_threshold.gsub(".", "_").gsub("-", "top")
current_timer_count_data["count_" + clean_percentile_threshold] = count_within_percentile_threshold.to_i
current_timer_data["mean_" + clean_percentile_threshold] = mean
current_timer_data[(percentile_threshold > 0 ? "upper_" : "lower_") + clean_percentile_threshold] =
threshold_boundary
current_timer_data["sum_" + clean_percentile_threshold] = sum
current_timer_data["sum_squares_" + clean_percentile_threshold] = sum_squares
end
end

sum = cumulative_values[count - 1]
sum_squares = cumulative_sum_squares_values[count - 1]
mean = sum / count.to_f

sum_of_diffs = 0
for i in 0..(count - 1) do # rubocop:disable Style/for
sum_of_diffs += (values[i] - mean) * (values[i] - mean)
def calculate_histograms(values)
result = histograms.sort.each_with_object({}) do |bucket, current_timer_histograms|
current_timer_histograms[bucket] = values.select { |value| value <= bucket }.count
end
result["+Inf"] = values.count if histograms.any?
result
end

mid = (count / 2.0).floor
median = count % 2 ? values[mid] : (values[mid - 1] + values[mid]) / 2.0

stddev = Math.sqrt(sum_of_diffs / count.to_f)
current_timer_data["std"] = stddev
current_timer_data["upper"] = max
current_timer_data["lower"] = min
current_timer_count_data["count"] = count.to_i
# current_timer_data["count_ps"] = count / (flushInterval / 1000.0)
current_timer_data["sum"] = sum
current_timer_data["sum_squares"] = sum_squares
current_timer_data["mean"] = mean
current_timer_data["median"] = median

last_datagram = datagrams.last
output = current_timer_data.map do |name, value|
def timer_data_to_datagrams(current_timer_data, last_datagram)
current_timer_data.map do |name, value|
DogStatsDDatagram.new(
DogStatsDDatagramBuilder.new.ms(
"#{last_datagram.name}.#{name}",
Expand All @@ -90,7 +79,10 @@ def aggregate
),
)
end
output + current_timer_count_data.map do |name, value|
end

def timer_count_data_to_datagrams(current_timer_count_data, last_datagram)
current_timer_count_data.map do |name, value|
DogStatsDDatagram.new(
DogStatsDDatagramBuilder.new.c(
"#{last_datagram.name}.#{name}",
Expand All @@ -102,11 +94,26 @@ def aggregate
end
end

private
def timer_histogram_data_to_datagrams(current_timer_histogram_buckets, last_datagram)
current_timer_histogram_buckets.map do |bucket, value|
DogStatsDDatagram.new(
DogStatsDDatagramBuilder.new.c(
"#{last_datagram.name}.bucket",
value,
last_datagram.sample_rate,
(last_datagram.tags || []) + ["le:#{bucket}"],
),
)
end
end

def percentiles
options[:percentiles] || []
end

def histograms
options[:histograms] || []
end
end
end
end
Expand Down
4 changes: 3 additions & 1 deletion lib/statsd/instrument/prometheus/batched_prometheus_sink.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ def initialize(
seconds_to_sleep:,
seconds_between_flushes:,
max_fill_ratio:,
basic_auth_user:
basic_auth_user:,
histograms:
)
dispatcher = PeriodicDispatcher.new(
nil,
Expand All @@ -52,6 +53,7 @@ def initialize(
read_timeout,
write_timeout,
basic_auth_user,
histograms,
),
seconds_to_sleep,
seconds_between_flushes,
Expand Down
8 changes: 5 additions & 3 deletions lib/statsd/instrument/prometheus/prometheus_sink.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ def thread_name
:number_of_requests_succeeded,
:number_of_metrics_dropped_due_to_buffer_full,
:last_flush_initiated_time,
:basic_auth_user
:basic_auth_user,
:histograms

def initialize(addr, auth_key, percentiles, application_name, subsystem, default_tags, open_timeout, read_timeout, write_timeout, basic_auth_user) # rubocop:disable Lint/MissingSuper
def initialize(addr, auth_key, percentiles, application_name, subsystem, default_tags, open_timeout, read_timeout, write_timeout, basic_auth_user, histograms) # rubocop:disable Lint/MissingSuper
ObjectSpace.define_finalizer(self, FINALIZER)
@uri = URI(addr)
@auth_key = auth_key
Expand All @@ -53,6 +54,7 @@ def initialize(addr, auth_key, percentiles, application_name, subsystem, default
@number_of_metrics_dropped_due_to_buffer_full = 0
@last_flush_initiated_time = Time.now
@basic_auth_user = basic_auth_user
@histograms = histograms
end

def <<(datagram)
Expand All @@ -79,7 +81,7 @@ def failed_to_push!
private

def request_body(datagram)
aggregator = StatsD::Instrument::Prometheus::Aggregator.new(datagram, percentiles)
aggregator = StatsD::Instrument::Prometheus::Aggregator.new(datagram, percentiles, histograms)
aggregated = aggregator.run
aggregated_with_flush_stats = StatsD::Instrument::Prometheus::FlushStats.new(
aggregated,
Expand Down
72 changes: 52 additions & 20 deletions test/prometheus/aggregator_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,49 +43,81 @@ def test_run_aggregates_by_type_and_key
def test_run_with_timer_and_percentiles
values = [0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95]
aggregator = described_class.new(values.map { |value| "foo:#{value}|ms" }.join("\n"), [90, 95])
assert_equal(18, aggregator.run.length)
assert_equal(6, aggregator.run.length)
actual = aggregator.run
expected = [
"foo.mean_90:42.5|ms",
"foo.upper_90:85.0|ms",
"foo.sum_90:765.0|ms",
"foo.sum_squares_90:44625.0|ms",
"foo.mean_95:45.0|ms",
"foo.upper_95:90.0|ms",
"foo.sum_95:855.0|ms",
"foo.sum_squares_95:52725.0|ms",
"foo.std:28.83140648667699|ms",
"foo.upper:95.0|ms",
"foo.lower:0.0|ms",
"foo.sum:950.0|ms",
"foo.sum_squares:61750.0|ms",
"foo.mean:47.5|ms",
"foo.median:50.0|ms",
"foo.count_90:18|c",
"foo.count_95:19|c",
"foo.count:20|c",
]
assert_equal(expected, actual.map(&:source))
end

def test_run_with_timer_and_percentiles_one_value
values = [5]
aggregator = described_class.new(values.map { |value| "foo:#{value}|ms" }.join("\n"), [90, 95])
assert_equal(6, aggregator.run.length)
actual = aggregator.run
expected = [
"foo.sum_90:5.0|ms",
"foo.sum_95:5.0|ms",
"foo.sum:5.0|ms",
"foo.count_90:1|c",
"foo.count_95:1|c",
"foo.count:1|c",
]
assert_equal(expected, actual.map(&:source))
end

def test_run_with_timer
values = [0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95]
aggregator = described_class.new(values.map { |value| "foo:#{value}|ms" }.join("\n"), [])
assert_equal(8, aggregator.run.length)
assert_equal(2, aggregator.run.length)
actual = aggregator.run
expected = [
"foo.std:28.83140648667699|ms",
"foo.upper:95.0|ms",
"foo.lower:0.0|ms",
"foo.sum:950.0|ms",
"foo.sum_squares:61750.0|ms",
"foo.mean:47.5|ms",
"foo.median:50.0|ms",
"foo.count:20|c",
]
assert_equal(expected, actual.map(&:source))
end

def test_run_with_timer_and_histograms
values = [0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95]
aggregator = described_class.new(values.map { |value| "foo:#{value}|ms" }.join("\n"), [], [10, 30, 50, 90])
assert_equal(7, aggregator.run.length)
actual = aggregator.run
expected = [
"foo.sum:950.0|ms",
"foo.count:20|c",
"foo.bucket:3|c|#le:10",
"foo.bucket:7|c|#le:30",
"foo.bucket:11|c|#le:50",
"foo.bucket:19|c|#le:90",
"foo.bucket:20|c|#le:+Inf",
]
assert_equal(expected, actual.map(&:source))
end

def test_run_with_timer_and_histograms_existing_tag
values = [0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95]
aggregator = described_class.new(values.map { |value| "foo:#{value}|ms|#host:abc" }.join("\n"), [], [10, 30, 50, 90])
assert_equal(7, aggregator.run.length)
actual = aggregator.run
expected = [
"foo.sum:950.0|ms|#host:abc",
"foo.count:20|c|#host:abc",
"foo.bucket:3|c|#host:abc,le:10",
"foo.bucket:7|c|#host:abc,le:30",
"foo.bucket:11|c|#host:abc,le:50",
"foo.bucket:19|c|#host:abc,le:90",
"foo.bucket:20|c|#host:abc,le:+Inf",
]
assert_equal(expected, actual.map(&:source))
end

private

def described_class
Expand Down

0 comments on commit 226703b

Please sign in to comment.