Skip to content

Commit

Permalink
Merge pull request #4 from meetcleo/PLT-93-grafana-adds
Browse files Browse the repository at this point in the history
[PLT-93] Make it work for Grafana
  • Loading branch information
joshuafleck authored Oct 17, 2023
2 parents 146bed7 + d098f9d commit 4a5a052
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 14 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ The following environment variables are supported:
If your network is properly configured to handle larger packets you may try
to increase this value for better performance, but most network can't handle
larger packets.
- `STATSD_PROMETHEUS_AUTH`: The API key for the prometheus endpoint. If set, will send batch stats in prometheus-compatible format
- `STATSD_PROMETHEUS_AUTH`: The API key (or password for basic auth) for the prometheus endpoint. If set, will send batch stats in prometheus-compatible format
- `STATSD_PROMETHEUS_BASIC_AUTH_USER`: The user to use if basic auth is required.
- `STATSD_PROMETHEUS_PERCENTILES`: The percentiles that will be calulcated when aggregating timers for prometheus. 95,99 are the default.
- `STATSD_PROMETHEUS_APPLICATION_NAME`: The application name that will be included as a tag in all metrics sent to prometheus.
- `STATSD_PROMETHEUS_SUBSYSTEM`: The subsystem that will be included as a tag in all metrics sent to prometheus.
Expand Down
5 changes: 5 additions & 0 deletions lib/statsd/instrument/environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ def prometheus_auth
env.fetch("STATSD_PROMETHEUS_AUTH", nil)
end

def prometheus_basic_auth_user
env.fetch("STATSD_PROMETHEUS_BASIC_AUTH_USER", nil)
end

def prometheus_application_name
env.fetch("STATSD_PROMETHEUS_APPLICATION_NAME", nil)
end
Expand Down Expand Up @@ -166,6 +170,7 @@ def default_sink_for_environment
seconds_to_sleep: prometheus_seconds_to_sleep,
seconds_between_flushes: prometheus_seconds_between_flushes,
max_fill_ratio: prometheus_max_fill_ratio,
basic_auth_user: prometheus_basic_auth_user,
)
elsif statsd_batching?
StatsD::Instrument::BatchedUDPSink.for_addr(
Expand Down
16 changes: 13 additions & 3 deletions lib/statsd/instrument/prometheus/aggregator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ def initialize(datagrams, percentiles = nil)
@datagrams = datagrams
@percentiles = percentiles
@pre_aggregation_number_of_metrics = 0
@number_of_metrics_failed_to_parse = 0
end

def run
aggregated_datagrams.compact
end

attr_reader :pre_aggregation_number_of_metrics
attr_reader :pre_aggregation_number_of_metrics, :number_of_metrics_failed_to_parse

private

Expand All @@ -23,8 +24,8 @@ def run
def datagrams_by_type_then_key
datagrams.split.map do |datagram|
@pre_aggregation_number_of_metrics += 1
DogStatsDDatagram.new(datagram)
end.group_by(&:type).to_h do |type, parsed_datagrams|
try_parse_metric(datagram)
end.compact.group_by(&:type).to_h do |type, parsed_datagrams|
[type, parsed_datagrams.group_by(&:key).to_h]
end
end
Expand All @@ -37,6 +38,15 @@ def aggregated_datagrams
end
end

def try_parse_metric(datagram)
parsed_datagram = DogStatsDDatagram.new(datagram)
parsed_datagram.key # Need to access something on the datagram to trigger the parse
parsed_datagram
rescue ArgumentError
@number_of_metrics_failed_to_parse += 1
nil
end

def aggregation_class_for_type(type)
case type
when :c
Expand Down
4 changes: 3 additions & 1 deletion lib/statsd/instrument/prometheus/batched_prometheus_sink.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ def initialize(
write_timeout:,
seconds_to_sleep:,
seconds_between_flushes:,
max_fill_ratio:
max_fill_ratio:,
basic_auth_user:
)
dispatcher = PeriodicDispatcher.new(
nil,
Expand All @@ -50,6 +51,7 @@ def initialize(
open_timeout,
read_timeout,
write_timeout,
basic_auth_user,
),
seconds_to_sleep,
seconds_between_flushes,
Expand Down
14 changes: 12 additions & 2 deletions lib/statsd/instrument/prometheus/flush_stats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ module Instrument
module Prometheus
class FlushStats
def initialize(datagrams, default_tags, pre_aggregation_number_of_metrics, number_of_requests_attempted,
number_of_requests_succeeded, number_of_metrics_dropped_due_to_buffer_full, last_flush_initiated_time)
number_of_requests_succeeded, number_of_metrics_dropped_due_to_buffer_full, last_flush_initiated_time, number_of_metrics_dropped_due_to_parsing_failure)
@datagrams = datagrams
@default_tags = default_tags
@pre_aggregation_number_of_metrics = pre_aggregation_number_of_metrics
@number_of_requests_attempted = number_of_requests_attempted
@number_of_requests_succeeded = number_of_requests_succeeded
@number_of_metrics_dropped_due_to_buffer_full = number_of_metrics_dropped_due_to_buffer_full
@last_flush_initiated_time = last_flush_initiated_time
@number_of_metrics_dropped_due_to_parsing_failure = number_of_metrics_dropped_due_to_parsing_failure
end

def run
Expand All @@ -27,7 +28,8 @@ def run
:number_of_requests_attempted,
:number_of_requests_succeeded,
:number_of_metrics_dropped_due_to_buffer_full,
:last_flush_initiated_time
:last_flush_initiated_time,
:number_of_metrics_dropped_due_to_parsing_failure

def flush_stats
[
Expand Down Expand Up @@ -79,6 +81,14 @@ def flush_stats
nil,
),
),
DogStatsDDatagram.new(
DogStatsDDatagramBuilder.new(default_tags: default_tags).c(
"number_of_metrics_dropped_due_to_parsing_failure.total",
number_of_metrics_dropped_due_to_parsing_failure,
nil,
nil,
),
),
]
end
end
Expand Down
15 changes: 11 additions & 4 deletions lib/statsd/instrument/prometheus/prometheus_sink.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ def thread_name
:number_of_requests_attempted,
:number_of_requests_succeeded,
:number_of_metrics_dropped_due_to_buffer_full,
:last_flush_initiated_time
:last_flush_initiated_time,
:basic_auth_user

def initialize(addr, auth_key, percentiles, application_name, subsystem, default_tags, open_timeout, read_timeout, write_timeout) # rubocop:disable Lint/MissingSuper
def initialize(addr, auth_key, percentiles, application_name, subsystem, default_tags, open_timeout, read_timeout, write_timeout, basic_auth_user) # rubocop:disable Lint/MissingSuper
ObjectSpace.define_finalizer(self, FINALIZER)
@uri = URI(addr)
@auth_key = auth_key
Expand All @@ -51,14 +52,15 @@ def initialize(addr, auth_key, percentiles, application_name, subsystem, default
@number_of_requests_succeeded = 0
@number_of_metrics_dropped_due_to_buffer_full = 0
@last_flush_initiated_time = Time.now
@basic_auth_user = basic_auth_user
end

def <<(datagram)
current_flush_initiated_time = Time.now
invalidate_socket_and_retry_if_error do
@number_of_requests_attempted += 1
response = make_request(datagram)
if response.code == "201"
if ["201", "200"].include?(response.code)
@number_of_requests_succeeded += 1
else
StatsD.logger.warn do
Expand Down Expand Up @@ -87,6 +89,7 @@ def request_body(datagram)
number_of_requests_succeeded,
number_of_metrics_dropped_due_to_buffer_full,
last_flush_initiated_time,
aggregator.number_of_metrics_failed_to_parse,
).run
serialized = StatsD::Instrument::Prometheus::Serializer.new(
aggregated_with_flush_stats,
Expand All @@ -98,7 +101,11 @@ def request_body(datagram)

def make_request(datagram)
request = Net::HTTP::Post.new(uri.request_uri)
request["Authorization"] = "Bearer #{auth_key}"
if basic_auth_user
request.basic_auth(basic_auth_user, auth_key)
else
request["Authorization"] = "Bearer #{auth_key}"
end
request.body = request_body(datagram)
socket.request(request)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/statsd/instrument/udp_sink.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def invalidate_socket_and_retry_if_error
retried = false
begin
yield
rescue SocketError, IOError, SystemCallError => error
rescue SocketError, IOError, SystemCallError, Net::OpenTimeout => error
StatsD.logger.debug do
"[StatsD::Instrument::UDPSink] Resetting connection because of #{error.class}: #{error.message}"
end
Expand Down
6 changes: 6 additions & 0 deletions test/prometheus/aggregator_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ def test_run_with_sums
assert_equal(14, aggregator.run.last.value)
end

def test_run_with_failed_parse
aggregator = described_class.new("bar::1|c\nfoo:10|c\nfoo:1|c\nfoo:1|c\nfoo:2|c")
assert_equal(1, aggregator.run.length)
assert_equal(14, aggregator.run.last.value)
end

def test_run_with_last_value
aggregator = described_class.new("foo:10|g\nfoo:1|g\nfoo:1|g\nfoo:2|g")
assert_equal(1, aggregator.run.length)
Expand Down
22 changes: 20 additions & 2 deletions test/prometheus/integration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,36 @@ def test_mocked_request
],
exemplars: [],
},
expected_metric("metrics_since_last_flush", 1.0),
expected_metric("pre_aggregation_number_of_metrics_since_last_flush", 2.0),
{
labels: [
{ name: "__meta_applicationname", value: "app-name" },
{ name: "__meta_subsystem", value: "subsystem" },
{ name: "host", value: "" },
{ name: "pid", value: "" },
{ name: "__name__", value: "will_fail_total" },
{ name: "source", value: "App::Main::Controller" },
{ name: "env", value: "test" },
],
samples: [
{ value: 1.0, timestamp: -1 },
],
exemplars: [],
},
expected_metric("metrics_since_last_flush", 2.0),
expected_metric("pre_aggregation_number_of_metrics_since_last_flush", 4.0),
expected_metric("number_of_requests_attempted_total", 1.0),
expected_metric("number_of_requests_succeeded_upto_previous_flush_total", 0.0),
expected_metric("number_of_metrics_dropped_due_to_buffer_full_total", 0.0),
expected_metric("time_since_last_flush_initiated", -1),
expected_metric("number_of_metrics_dropped_due_to_parsing_failure_total", 1.0),
],
metadata: [],
}
stub_request(:post, TEST_URL).to_return(status: 201)
StatsD.increment("counter", tags: { source: "App::Main::Controller", host: "localhost" })
StatsD.increment("counter", tags: { source: "App::Main::Controller", host: "localhost" })
# Will treat the newline as its own metric that will fail to parse
StatsD.increment(":\nwill_fail", tags: { source: "App::Main::Controller", host: "localhost" })
StatsD.singleton_client.sink.shutdown
assert_request_contents(TEST_URL, expected, expected_headers: { "Authorization" => "Bearer abc" })
end
Expand Down

0 comments on commit 4a5a052

Please sign in to comment.