diff --git a/CHANGELOG.md b/CHANGELOG.md index 136dc7b689..54a95b632a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,9 @@ - Ingest profiler_id in the profile context and in spans. ([#3714](https://github.com/getsentry/relay/pull/3714), [#3784](https://github.com/getsentry/relay/pull/3784)) - Support extrapolation of metrics extracted from sampled data, as long as the sample rate is set in the DynamicSamplingContext. ([#3753](https://github.com/getsentry/relay/pull/3753)) - Extract thread ID and name in spans. ([#3771](https://github.com/getsentry/relay/pull/3771)) +- Compute metrics summary on the extracted custom metrics. ([#3769](https://github.com/getsentry/relay/pull/3769)) + +>>>>>>> master ## 24.6.0 diff --git a/relay-dynamic-config/src/global.rs b/relay-dynamic-config/src/global.rs index 35e950da7a..ecb972a929 100644 --- a/relay-dynamic-config/src/global.rs +++ b/relay-dynamic-config/src/global.rs @@ -214,6 +214,14 @@ pub struct Options { )] pub span_extraction_sample_rate: Option, + /// Overall sampling of metrics summaries computation. + #[serde( + rename = "relay.compute-metrics-summaries.sample-rate", + deserialize_with = "default_on_error", + skip_serializing_if = "is_default" + )] + pub compute_metrics_summaries_sample_rate: Option, + /// The maximum duplication factor used to extrapolate distribution metrics from sampled data. /// /// This applies as long as Relay duplicates distribution values to extrapolate. The default is diff --git a/relay-event-schema/src/protocol/metrics_summary.rs b/relay-event-schema/src/protocol/metrics_summary.rs index 30987d8327..8c2a29f9d4 100644 --- a/relay-event-schema/src/protocol/metrics_summary.rs +++ b/relay-event-schema/src/protocol/metrics_summary.rs @@ -40,3 +40,13 @@ pub struct MetricSummary { /// Tags of the metric. pub tags: Annotated>, } + +impl MetricSummary { + /// Merges another [`MetricSummary`] in this [`MetricsSummary`]. + pub fn merge(&mut self, other: MetricSummary) { + self.min.merge(other.min, |l, r| *l = l.min(r)); + self.max.merge(other.max, |l, r| *l = l.max(r)); + self.sum.merge(other.sum, |l, r| *l += r); + self.count.merge(other.count, |l, r| *l += r); + } +} diff --git a/relay-protocol/src/annotated.rs b/relay-protocol/src/annotated.rs index 02244b8e5e..09db50f776 100644 --- a/relay-protocol/src/annotated.rs +++ b/relay-protocol/src/annotated.rs @@ -190,6 +190,15 @@ impl Annotated { { self.value_mut().get_or_insert_with(f) } + + /// Merges the supplied [`Annotated`] in the left [`Annotated`]. + pub fn merge(&mut self, other: Annotated, block: impl FnOnce(&mut T, T)) { + match (self.value_mut(), other.into_value()) { + (Some(left), Some(right)) => block(left, right), + (None, Some(right)) => self.set_value(Some(right)), + _ => {} + } + } } impl Annotated diff --git a/relay-server/src/metrics_extraction/event.rs b/relay-server/src/metrics_extraction/event.rs index 05c808a9ac..2fb5a22037 100644 --- a/relay-server/src/metrics_extraction/event.rs +++ b/relay-server/src/metrics_extraction/event.rs @@ -5,6 +5,7 @@ use relay_metrics::Bucket; use relay_quotas::DataCategory; use crate::metrics_extraction::generic::{self, Extractable}; +use crate::metrics_extraction::metrics_summary; use crate::services::processor::extract_transaction_span; use crate::statsd::RelayTimers; use crate::utils::sample; @@ -46,38 +47,58 @@ impl Extractable for Span { /// /// If this is a transaction event with spans, metrics will also be extracted from the spans. pub fn extract_metrics( - event: &Event, + event: &mut Event, spans_extracted: bool, config: CombinedMetricExtractionConfig<'_>, max_tag_value_size: usize, span_extraction_sample_rate: Option, + compute_metrics_summaries_sample_rate: Option, ) -> Vec { let mut metrics = generic::extract_metrics(event, config); - - // If spans were already extracted for an event, - // we rely on span processing to extract metrics. + // If spans were already extracted for an event, we rely on span processing to extract metrics. if !spans_extracted && sample(span_extraction_sample_rate.unwrap_or(1.0)) { - extract_span_metrics_for_event(event, config, max_tag_value_size, &mut metrics); + let compute_metrics_summaries_sample_rate = + compute_metrics_summaries_sample_rate.unwrap_or(1.0); + extract_span_metrics_for_event( + event, + config, + max_tag_value_size, + &mut metrics, + compute_metrics_summaries_sample_rate, + ); } metrics } fn extract_span_metrics_for_event( - event: &Event, + event: &mut Event, config: CombinedMetricExtractionConfig<'_>, max_tag_value_size: usize, output: &mut Vec, + compute_metrics_summaries_sample_rate: f32, ) { + let compute_metrics_summaries = sample(compute_metrics_summaries_sample_rate); + relay_statsd::metric!(timer(RelayTimers::EventProcessingSpanMetricsExtraction), { if let Some(transaction_span) = extract_transaction_span(event, max_tag_value_size) { - output.extend(generic::extract_metrics(&transaction_span, config)); + let (metrics, metrics_summary) = + metrics_summary::extract_and_summarize_metrics(&transaction_span, config); + if compute_metrics_summaries { + metrics_summary.apply_on(&mut event._metrics_summary); + } + output.extend(metrics); } - if let Some(spans) = event.spans.value() { + if let Some(spans) = event.spans.value_mut() { for annotated_span in spans { - if let Some(span) = annotated_span.value() { - output.extend(generic::extract_metrics(span, config)); + if let Some(span) = annotated_span.value_mut() { + let (metrics, metrics_summary) = + metrics_summary::extract_and_summarize_metrics(span, config); + if compute_metrics_summaries { + metrics_summary.apply_on(&mut span._metrics_summary); + } + output.extend(metrics); } } } @@ -91,8 +112,8 @@ mod tests { use chrono::{DateTime, Utc}; use insta::assert_debug_snapshot; use relay_dynamic_config::{ - Feature, FeatureSet, GlobalConfig, MetricExtractionConfig, MetricExtractionGroups, - ProjectConfig, + ErrorBoundary, Feature, FeatureSet, GlobalConfig, MetricExtractionConfig, + MetricExtractionGroups, MetricSpec, ProjectConfig, }; use relay_event_normalization::{normalize_event, NormalizationConfig}; use relay_event_schema::protocol::Timestamp; @@ -111,7 +132,10 @@ mod tests { } } - fn combined_config(features: impl Into>) -> OwnedConfig { + fn combined_config( + features: impl Into>, + metric_extraction: Option, + ) -> OwnedConfig { let mut global = GlobalConfig::default(); global.normalize(); // defines metrics extraction rules let global = global.metric_extraction.ok().unwrap(); @@ -120,6 +144,7 @@ mod tests { let mut project = ProjectConfig { features, + metric_extraction: metric_extraction.map(ErrorBoundary::Ok).unwrap_or_default(), ..ProjectConfig::default() }; project.sanitize(); // enables metrics extraction rules @@ -1181,11 +1206,12 @@ mod tests { ); extract_metrics( - event.value().unwrap(), + event.value_mut().as_mut().unwrap(), false, - combined_config(features).combined(), + combined_config(features, None).combined(), 200, None, + None, ) } @@ -1383,11 +1409,12 @@ mod tests { ); let metrics = extract_metrics( - event.value().unwrap(), + event.value_mut().as_mut().unwrap(), false, - combined_config([Feature::ExtractCommonSpanMetricsFromEvent]).combined(), + combined_config([Feature::ExtractCommonSpanMetricsFromEvent], None).combined(), 200, None, + None, ); insta::assert_debug_snapshot!((&event.value().unwrap().spans, metrics)); } @@ -1437,14 +1464,15 @@ mod tests { ] } "#; - let event = Annotated::from_json(json).unwrap(); + let mut event = Annotated::from_json(json).unwrap(); let metrics = extract_metrics( - event.value().unwrap(), + event.value_mut().as_mut().unwrap(), false, - combined_config([Feature::ExtractCommonSpanMetricsFromEvent]).combined(), + combined_config([Feature::ExtractCommonSpanMetricsFromEvent], None).combined(), 200, None, + None, ); // When transaction.op:ui.load and mobile:true, HTTP spans still get both @@ -1472,11 +1500,12 @@ mod tests { ); let metrics = extract_metrics( - event.value().unwrap(), + event.value_mut().as_mut().unwrap(), false, - combined_config([Feature::ExtractCommonSpanMetricsFromEvent]).combined(), + combined_config([Feature::ExtractCommonSpanMetricsFromEvent], None).combined(), 200, None, + None, ); let usage_metrics = metrics @@ -1504,7 +1533,7 @@ mod tests { generic::extract_metrics( &span, - combined_config([Feature::ExtractCommonSpanMetricsFromEvent]).combined(), + combined_config([Feature::ExtractCommonSpanMetricsFromEvent], None).combined(), ) } @@ -1617,7 +1646,7 @@ mod tests { .unwrap(); let metrics = generic::extract_metrics( &span, - combined_config([Feature::ExtractCommonSpanMetricsFromEvent]).combined(), + combined_config([Feature::ExtractCommonSpanMetricsFromEvent], None).combined(), ); assert!(!metrics.is_empty()); @@ -1660,7 +1689,7 @@ mod tests { let span = Annotated::::from_json(json).unwrap(); let metrics = generic::extract_metrics( span.value().unwrap(), - combined_config([Feature::ExtractCommonSpanMetricsFromEvent]).combined(), + combined_config([Feature::ExtractCommonSpanMetricsFromEvent], None).combined(), ); for mri in [ @@ -1692,14 +1721,15 @@ mod tests { } } "#; - let event = Annotated::::from_json(event).unwrap(); + let mut event = Annotated::::from_json(event).unwrap(); let metrics = extract_metrics( - event.value().unwrap(), + event.value_mut().as_mut().unwrap(), false, - combined_config([Feature::ExtractCommonSpanMetricsFromEvent]).combined(), + combined_config([Feature::ExtractCommonSpanMetricsFromEvent], None).combined(), 200, None, + None, ); assert_eq!(metrics.len(), 4); @@ -1722,4 +1752,135 @@ mod tests { assert_eq!(&*metrics[3].name, "d:spans/duration@millisecond"); } + + #[test] + fn test_metrics_summaries_on_transaction_and_spans() { + let mut event = Annotated::from_json( + r#" + { + "type": "transaction", + "sdk": {"name": "sentry.javascript.react-native"}, + "start_timestamp": "2021-04-26T07:59:01+0100", + "timestamp": "2021-04-26T08:00:00+0100", + "release": "1.2.3", + "transaction": "gEt /api/:version/users/", + "transaction_info": {"source": "custom"}, + "platform": "cocoa", + "contexts": { + "trace": { + "trace_id": "ff62a8b040f340bda5d830223def1d81", + "span_id": "bd429c44b67a3eb4", + "op": "ui.load" + }, + "device": { + "family": "iOS", + "model": "iPhone1,1" + }, + "app": { + "app_identifier": "org.reactjs.native.example.RnDiffApp", + "app_name": "RnDiffApp" + }, + "os": { + "name": "iOS", + "version": "16.2" + } + }, + "measurements": { + "app_start_warm": { + "value": 1.0, + "unit": "millisecond" + } + }, + "spans": [ + { + "op": "ui.load.initial_display", + "span_id": "bd429c44b67a3eb2", + "start_timestamp": 1597976300.0000000, + "timestamp": 1597976303.0000000, + "trace_id": "ff62a8b040f340bda5d830223def1d81", + "data": { + "frames.slow": 1, + "frames.frozen": 2, + "frames.total": 9, + "frames.delay": 0.1 + }, + "_metrics_summary": { + "d:spans/duration@millisecond": [ + { + "min": 50.0, + "max": 60.0, + "sum": 100.0, + "count": 2, + "tags": { + "app_start_type": "warm", + "device.class": "1" + } + } + ] + } + } + ], + "_metrics_summary": { + "d:spans/duration@millisecond": [ + { + "min": 50.0, + "max": 100.0, + "sum": 150.0, + "count": 2, + "tags": { + "app_start_type": "warm", + "device.class": "1" + } + } + ] + } + } + "#, + ) + .unwrap(); + + // Normalize first, to make sure that all things are correct as in the real pipeline: + normalize_event( + &mut event, + &NormalizationConfig { + enrich_spans: true, + device_class_synthesis_config: true, + ..Default::default() + }, + ); + + let metric_extraction = MetricExtractionConfig { + version: 4, + metrics: vec![MetricSpec { + category: DataCategory::Span, + mri: "d:custom/my_metric@millisecond".to_owned(), + field: Some("span.duration".to_owned()), + condition: None, + tags: vec![], + }], + ..MetricExtractionConfig::default() + }; + let binding = combined_config( + [Feature::ExtractCommonSpanMetricsFromEvent], + Some(metric_extraction), + ); + let config = binding.combined(); + + let _ = extract_metrics( + event.value_mut().as_mut().unwrap(), + false, + config, + 200, + None, + None, + ); + + insta::assert_debug_snapshot!(&event.value().unwrap()._metrics_summary); + insta::assert_debug_snapshot!( + &event.value().unwrap().spans.value().unwrap()[0] + .value() + .unwrap() + ._metrics_summary + ); + } } diff --git a/relay-server/src/metrics_extraction/generic.rs b/relay-server/src/metrics_extraction/generic.rs index f4215e00c7..c4f1994e71 100644 --- a/relay-server/src/metrics_extraction/generic.rs +++ b/relay-server/src/metrics_extraction/generic.rs @@ -2,7 +2,6 @@ use std::collections::BTreeMap; use relay_common::time::UnixTimestamp; use relay_dynamic_config::{CombinedMetricExtractionConfig, TagMapping, TagSource, TagSpec}; - use relay_metrics::{ Bucket, BucketMetadata, BucketValue, FiniteF64, MetricResourceIdentifier, MetricType, }; diff --git a/relay-server/src/metrics_extraction/metrics_summary.rs b/relay-server/src/metrics_extraction/metrics_summary.rs new file mode 100644 index 0000000000..e784a78e34 --- /dev/null +++ b/relay-server/src/metrics_extraction/metrics_summary.rs @@ -0,0 +1,715 @@ +use crate::metrics_extraction::generic; +use crate::metrics_extraction::generic::Extractable; +use relay_base_schema::metrics::{MetricName, MetricNamespace}; +use relay_dynamic_config::CombinedMetricExtractionConfig; +use relay_event_schema::protocol as event; +use relay_metrics::{ + Bucket, BucketValue, CounterType, DistributionValue, FiniteF64, GaugeValue, SetValue, +}; +use relay_protocol::Annotated; +use std::collections::BTreeMap; +use std::ops::AddAssign; + +/// Maps two [`Option`] values using a provided function. +fn map_multiple(a: Option, b: Option, f: fn(T, T) -> T) -> Option { + match (a, b) { + (Some(x), Some(y)) => Some(f(x, y)), + (Some(x), None) => Some(x), + (None, Some(y)) => Some(y), + (None, None) => None, + } +} + +/// Key of a bucket used to keep track of aggregates for the [`MetricsSummary`]. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +struct MetricsSummaryBucketKey { + /// Name of the metric. + metric_name: MetricName, + /// Tags of the bucket. + tags: BTreeMap, +} + +/// Value of a bucket used to keep track of aggregates for the [`MetricsSummary`]. +#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)] +struct MetricsSummaryBucketValue { + /// The minimum value reported in the bucket. + min: Option, + /// The maximum value reported in the bucket. + max: Option, + /// The sum of all values reported in the bucket. + sum: Option, + /// The number of times this bucket was updated with a new value. + count: u64, +} + +impl AddAssign for MetricsSummaryBucketValue { + fn add_assign(&mut self, rhs: Self) { + *self = MetricsSummaryBucketValue { + min: map_multiple(self.min, rhs.min, std::cmp::min), + max: map_multiple(self.max, rhs.max, std::cmp::max), + sum: map_multiple(self.sum, rhs.sum, |l, r| l.saturating_add(r)), + count: self.count + rhs.count, + } + } +} + +impl<'a> From<&'a BucketValue> for MetricsSummaryBucketValue { + fn from(value: &'a BucketValue) -> Self { + match value { + BucketValue::Counter(counter) => counter.into(), + BucketValue::Distribution(distribution) => distribution.into(), + BucketValue::Set(set) => set.into(), + BucketValue::Gauge(gauge) => gauge.into(), + } + } +} + +impl<'a> From<&'a CounterType> for MetricsSummaryBucketValue { + fn from(counter: &'a CounterType) -> Self { + MetricsSummaryBucketValue { + min: Some(*counter), + max: Some(*counter), + sum: Some(*counter), + count: 1, + } + } +} + +impl<'a> From<&'a DistributionValue> for MetricsSummaryBucketValue { + fn from(distribution: &'a DistributionValue) -> Self { + let mut min = FiniteF64::MAX; + let mut max = FiniteF64::MIN; + let mut sum = FiniteF64::new(0.0).unwrap(); + + for value in distribution { + min = std::cmp::min(min, *value); + max = std::cmp::max(max, *value); + sum = sum.saturating_add(*value); + } + + MetricsSummaryBucketValue { + min: Some(min), + max: Some(max), + sum: Some(sum), + count: distribution.len() as u64, + } + } +} + +impl<'a> From<&'a SetValue> for MetricsSummaryBucketValue { + fn from(set: &'a SetValue) -> Self { + // For sets, we limit to counting the number of occurrences. + MetricsSummaryBucketValue { + min: None, + max: None, + sum: None, + count: set.len() as u64, + } + } +} + +impl<'a> From<&'a GaugeValue> for MetricsSummaryBucketValue { + fn from(gauge: &'a GaugeValue) -> Self { + MetricsSummaryBucketValue { + min: Some(gauge.min), + max: Some(gauge.max), + sum: Some(gauge.sum), + count: gauge.count, + } + } +} + +/// [`MetricsSummary`] that tracks all the buckets containing the summaries for each +/// [`MetricsSummaryBucketKey`]. +/// +/// The need for a [`MetricsSummary`] arises from the fact that we want to compute metrics summaries +/// generically on any slice of [`Bucket`]s meaning that we need to handle cases in which +/// the same metrics as identified by the [`MetricsSummaryBucketKey`] have to be merged. +/// +/// The [`MetricsSummary`] is a different in-memory representation of a metric summary from the +/// [`event::MetricsSummary`]. +#[derive(Debug, Default)] +pub struct MetricsSummary { + buckets: BTreeMap, +} + +impl MetricsSummary { + fn new() -> MetricsSummary { + MetricsSummary { + buckets: BTreeMap::new(), + } + } + + /// Merges into the [`MetricsSummary`] a slice of [`Bucket`]s. + fn from_buckets<'a>(buckets: impl Iterator) -> MetricsSummary { + let mut metrics_summary_spec = MetricsSummary::new(); + + for bucket in buckets { + metrics_summary_spec.merge_bucket(bucket); + } + + metrics_summary_spec + } + + /// Merges a [`Bucket`] into the [`MetricsSummary`]. + fn merge_bucket(&mut self, bucket: &Bucket) { + let key = MetricsSummaryBucketKey { + metric_name: bucket.name.clone(), + tags: bucket.tags.clone(), + }; + + let value = (&bucket.value).into(); + self.buckets + .entry(key) + .and_modify(|e| *e += value) + .or_insert(value); + } + + /// Applies the [`MetricsSummary`] on a receiving [`Annotated`]. + pub fn apply_on(self, receiver: &mut Annotated) { + if self.buckets.is_empty() { + return; + } + + let event::MetricsSummary(metrics_summary_mapping) = + &mut receiver.get_or_insert_with(|| event::MetricsSummary(BTreeMap::new())); + + for (key, value) in self.buckets { + let metric_summary = event::MetricSummary { + min: value.min.map(|m| m.to_f64()).into(), + max: value.max.map(|m| m.to_f64()).into(), + sum: value.sum.map(|m| m.to_f64()).into(), + count: Annotated::new(value.count), + tags: Annotated::new( + key.tags + .into_iter() + .map(|(tag_key, tag_value)| (tag_key, Annotated::new(tag_value))) + .collect(), + ), + }; + + let existing_summary = metrics_summary_mapping + .get_mut(key.metric_name.as_ref()) + .and_then(|v| v.value_mut().as_mut()); + + if let Some(summaries) = existing_summary { + let found = summaries.iter_mut().find_map(|s| { + s.value_mut() + .as_mut() + .filter(|v| v.tags == metric_summary.tags) + }); + + if let Some(found) = found { + found.merge(metric_summary); + } else { + summaries.push(Annotated::new(metric_summary)); + } + } else { + metrics_summary_mapping.insert( + key.metric_name.to_string(), + Annotated::new(vec![Annotated::new(metric_summary)]), + ); + } + } + } +} + +/// Computes the [`MetricsSummary`] from a slice of [`Bucket`]s that belong to +/// [`MetricNamespace::Custom`]. +fn compute(buckets: &[Bucket]) -> MetricsSummary { + // For now, we only want metrics summaries to be extracted for custom metrics. + let filtered_buckets = buckets + .iter() + .filter(|b| matches!(b.name.namespace(), MetricNamespace::Custom)); + + MetricsSummary::from_buckets(filtered_buckets) +} + +/// Extract metrics and summarizes them. +pub fn extract_and_summarize_metrics( + instance: &T, + config: CombinedMetricExtractionConfig<'_>, +) -> (Vec, MetricsSummary) +where + T: Extractable, +{ + let metrics = generic::extract_metrics(instance, config); + let metrics_summaries = compute(&metrics); + + (metrics, metrics_summaries) +} + +#[cfg(test)] +mod tests { + use crate::metrics_extraction::metrics_summary::{MetricsSummary, MetricsSummaryBucketValue}; + use relay_common::time::UnixTimestamp; + use relay_event_schema::protocol as event; + use relay_metrics::{Bucket, FiniteF64}; + use relay_protocol::Annotated; + use std::collections::BTreeMap; + + fn build_buckets(slice: &[u8]) -> Vec { + Bucket::parse_all(slice, UnixTimestamp::now()) + .map(|b| b.unwrap()) + .collect() + } + + #[test] + fn test_with_counter_buckets() { + let buckets = + build_buckets(b"my_counter:3|c|#platform:ios\nmy_counter:2|c|#platform:android"); + + let metrics_summary_spec = MetricsSummary::from_buckets(buckets.iter()); + let mut metrics_summary = Annotated::new(event::MetricsSummary(BTreeMap::new())); + metrics_summary_spec.apply_on(&mut metrics_summary); + + insta::assert_debug_snapshot!(metrics_summary.value().unwrap(), @r###" + MetricsSummary( + { + "c:custom/my_counter@none": [ + MetricSummary { + min: 2.0, + max: 2.0, + sum: 2.0, + count: 1, + tags: { + "platform": "android", + }, + }, + MetricSummary { + min: 3.0, + max: 3.0, + sum: 3.0, + count: 1, + tags: { + "platform": "ios", + }, + }, + ], + }, + ) + "###); + } + + #[test] + fn test_with_distribution_buckets() { + let buckets = + build_buckets(b"my_dist:3.0:5.0|d|#platform:ios\nmy_dist:2.0:4.0|d|#platform:android"); + + let metrics_summary_spec = MetricsSummary::from_buckets(buckets.iter()); + let mut metrics_summary = Annotated::new(event::MetricsSummary(BTreeMap::new())); + metrics_summary_spec.apply_on(&mut metrics_summary); + + insta::assert_debug_snapshot!(metrics_summary.value().unwrap(), @r###" + MetricsSummary( + { + "d:custom/my_dist@none": [ + MetricSummary { + min: 2.0, + max: 4.0, + sum: 6.0, + count: 2, + tags: { + "platform": "android", + }, + }, + MetricSummary { + min: 3.0, + max: 5.0, + sum: 8.0, + count: 2, + tags: { + "platform": "ios", + }, + }, + ], + }, + ) + "###); + } + + #[test] + fn test_with_set_buckets() { + let buckets = + build_buckets(b"my_set:3.0:5.0|s|#platform:ios\nmy_set:2.0:4.0|s|#platform:android"); + + let metrics_summary_spec = MetricsSummary::from_buckets(buckets.iter()); + let mut metrics_summary = Annotated::new(event::MetricsSummary(BTreeMap::new())); + metrics_summary_spec.apply_on(&mut metrics_summary); + + insta::assert_debug_snapshot!(metrics_summary.value().unwrap(), @r###" + MetricsSummary( + { + "s:custom/my_set@none": [ + MetricSummary { + min: ~, + max: ~, + sum: ~, + count: 2, + tags: { + "platform": "android", + }, + }, + MetricSummary { + min: ~, + max: ~, + sum: ~, + count: 2, + tags: { + "platform": "ios", + }, + }, + ], + }, + ) + "###); + } + + #[test] + fn test_with_gauge_buckets() { + let buckets = + build_buckets(b"my_gauge:3.0|g|#platform:ios\nmy_gauge:2.0|g|#platform:android"); + + let metrics_summary_spec = MetricsSummary::from_buckets(buckets.iter()); + let mut metrics_summary = Annotated::new(event::MetricsSummary(BTreeMap::new())); + metrics_summary_spec.apply_on(&mut metrics_summary); + + insta::assert_debug_snapshot!(metrics_summary.value().unwrap(), @r###" + MetricsSummary( + { + "g:custom/my_gauge@none": [ + MetricSummary { + min: 2.0, + max: 2.0, + sum: 2.0, + count: 1, + tags: { + "platform": "android", + }, + }, + MetricSummary { + min: 3.0, + max: 3.0, + sum: 3.0, + count: 1, + tags: { + "platform": "ios", + }, + }, + ], + }, + ) + "###); + } + + #[test] + fn test_merge_buckets() { + let mut buckets = + build_buckets(b"my_counter:3|c|#platform:ios\nmy_counter:2|c|#platform:ios"); + buckets.extend(build_buckets( + b"my_dist:3.0:5.0|d|#platform:ios\nmy_dist:2.0:4.0|d|#platform:ios", + )); + buckets.extend(build_buckets( + b"my_set:3.0:5.0|s|#platform:ios\nmy_set:2.0:4.0|s|#platform:ios", + )); + buckets.extend(build_buckets( + b"my_gauge:3.0|g|#platform:ios\nmy_gauge:2.0|g|#platform:ios", + )); + + let metrics_summary_spec = MetricsSummary::from_buckets(buckets.iter()); + let mut metrics_summary = Annotated::new(event::MetricsSummary(BTreeMap::new())); + metrics_summary_spec.apply_on(&mut metrics_summary); + + insta::assert_debug_snapshot!(metrics_summary.value().unwrap(), @r###" + MetricsSummary( + { + "c:custom/my_counter@none": [ + MetricSummary { + min: 2.0, + max: 3.0, + sum: 5.0, + count: 2, + tags: { + "platform": "ios", + }, + }, + ], + "d:custom/my_dist@none": [ + MetricSummary { + min: 2.0, + max: 5.0, + sum: 14.0, + count: 4, + tags: { + "platform": "ios", + }, + }, + ], + "g:custom/my_gauge@none": [ + MetricSummary { + min: 2.0, + max: 3.0, + sum: 5.0, + count: 2, + tags: { + "platform": "ios", + }, + }, + ], + "s:custom/my_set@none": [ + MetricSummary { + min: ~, + max: ~, + sum: ~, + count: 4, + tags: { + "platform": "ios", + }, + }, + ], + }, + ) + "###); + } + + #[test] + fn test_apply_on_with_different_metric() { + let buckets = build_buckets(b"my_counter:3|c|#platform:ios"); + let mut tags_map = BTreeMap::new(); + tags_map.insert("region".to_owned(), Annotated::new("us".to_owned())); + let mut summary_map = BTreeMap::new(); + summary_map.insert( + "c:custom/my_other_counter@none".to_owned(), + Annotated::new(vec![Annotated::new(event::MetricSummary { + min: Annotated::new(5.0), + max: Annotated::new(10.0), + sum: Annotated::new(15.0), + count: Annotated::new(2), + tags: Annotated::new(tags_map), + })]), + ); + + let metrics_summary_spec = MetricsSummary::from_buckets(buckets.iter()); + let mut metrics_summary = Annotated::new(event::MetricsSummary(summary_map)); + metrics_summary_spec.apply_on(&mut metrics_summary); + + insta::assert_debug_snapshot!(metrics_summary.value().unwrap(), @r###" + MetricsSummary( + { + "c:custom/my_counter@none": [ + MetricSummary { + min: 3.0, + max: 3.0, + sum: 3.0, + count: 1, + tags: { + "platform": "ios", + }, + }, + ], + "c:custom/my_other_counter@none": [ + MetricSummary { + min: 5.0, + max: 10.0, + sum: 15.0, + count: 2, + tags: { + "region": "us", + }, + }, + ], + }, + ) + "###); + } + + #[test] + fn test_apply_on_with_same_metric_and_different_tags() { + let buckets = build_buckets(b"my_counter:3|c|#platform:ios"); + let mut tags_map = BTreeMap::new(); + tags_map.insert("region".to_owned(), Annotated::new("us".to_owned())); + let mut summary_map = BTreeMap::new(); + summary_map.insert( + "c:custom/my_counter@none".to_owned(), + Annotated::new(vec![Annotated::new(event::MetricSummary { + min: Annotated::new(5.0), + max: Annotated::new(10.0), + sum: Annotated::new(15.0), + count: Annotated::new(2), + tags: Annotated::new(tags_map), + })]), + ); + + let metrics_summary_spec = MetricsSummary::from_buckets(buckets.iter()); + let mut metrics_summary = Annotated::new(event::MetricsSummary(summary_map)); + metrics_summary_spec.apply_on(&mut metrics_summary); + + insta::assert_debug_snapshot!(metrics_summary.value().unwrap(), @r###" + MetricsSummary( + { + "c:custom/my_counter@none": [ + MetricSummary { + min: 5.0, + max: 10.0, + sum: 15.0, + count: 2, + tags: { + "region": "us", + }, + }, + MetricSummary { + min: 3.0, + max: 3.0, + sum: 3.0, + count: 1, + tags: { + "platform": "ios", + }, + }, + ], + }, + ) + "###); + } + + #[test] + fn test_apply_on_with_same_metric_and_same_tags() { + let buckets = build_buckets(b"my_counter:3|c|#platform:ios"); + let mut tags_map = BTreeMap::new(); + tags_map.insert("platform".to_owned(), Annotated::new("ios".to_owned())); + let mut summary_map = BTreeMap::new(); + summary_map.insert( + "c:custom/my_counter@none".to_owned(), + Annotated::new(vec![Annotated::new(event::MetricSummary { + min: Annotated::new(5.0), + max: Annotated::new(10.0), + sum: Annotated::new(15.0), + count: Annotated::new(2), + tags: Annotated::new(tags_map), + })]), + ); + + let metrics_summary_spec = MetricsSummary::from_buckets(buckets.iter()); + let mut metrics_summary = Annotated::new(event::MetricsSummary(summary_map)); + metrics_summary_spec.apply_on(&mut metrics_summary); + + insta::assert_debug_snapshot!(metrics_summary.value().unwrap(), @r###" + MetricsSummary( + { + "c:custom/my_counter@none": [ + MetricSummary { + min: 3.0, + max: 10.0, + sum: 18.0, + count: 3, + tags: { + "platform": "ios", + }, + }, + ], + }, + ) + "###); + } + + #[test] + fn test_apply_on_with_same_metric_and_same_empty_tags() { + let buckets = build_buckets(b"my_counter:3|c"); + let mut summary_map = BTreeMap::new(); + summary_map.insert( + "c:custom/my_counter@none".to_owned(), + Annotated::new(vec![Annotated::new(event::MetricSummary { + min: Annotated::new(5.0), + max: Annotated::new(10.0), + sum: Annotated::new(15.0), + count: Annotated::new(2), + tags: Annotated::new(BTreeMap::new()), + })]), + ); + + let metrics_summary_spec = MetricsSummary::from_buckets(buckets.iter()); + let mut metrics_summary = Annotated::new(event::MetricsSummary(summary_map)); + metrics_summary_spec.apply_on(&mut metrics_summary); + + insta::assert_debug_snapshot!(metrics_summary.value().unwrap(), @r###" + MetricsSummary( + { + "c:custom/my_counter@none": [ + MetricSummary { + min: 3.0, + max: 10.0, + sum: 18.0, + count: 3, + tags: {}, + }, + ], + }, + ) + "###); + } + + #[test] + fn test_apply_on_with_same_metric_and_same_empty_tags_and_none_values() { + let buckets = build_buckets(b"my_counter:3|c"); + let mut summary_map = BTreeMap::new(); + summary_map.insert( + "c:custom/my_counter@none".to_owned(), + Annotated::new(vec![Annotated::new(event::MetricSummary { + min: Annotated::empty(), + max: Annotated::new(10.0), + sum: Annotated::empty(), + count: Annotated::new(2), + tags: Annotated::new(BTreeMap::new()), + })]), + ); + + let metrics_summary_spec = MetricsSummary::from_buckets(buckets.iter()); + let mut metrics_summary = Annotated::new(event::MetricsSummary(summary_map)); + metrics_summary_spec.apply_on(&mut metrics_summary); + + insta::assert_debug_snapshot!(metrics_summary.value().unwrap(), @r###" + MetricsSummary( + { + "c:custom/my_counter@none": [ + MetricSummary { + min: 3.0, + max: 10.0, + sum: 3.0, + count: 3, + tags: {}, + }, + ], + }, + ) + "###); + } + + #[test] + fn test_bucket_value_merge() { + let mut value_1 = MetricsSummaryBucketValue { + min: Some(FiniteF64::MIN), + max: None, + sum: None, + count: 0, + }; + + let value_2 = MetricsSummaryBucketValue { + min: None, + max: Some(FiniteF64::MAX), + sum: Some(FiniteF64::from(10)), + count: 10, + }; + + value_1 += value_2; + + assert_eq!( + value_1, + MetricsSummaryBucketValue { + min: Some(FiniteF64::MIN), + max: Some(FiniteF64::MAX), + sum: Some(FiniteF64::from(10)), + count: 10 + } + ); + } +} diff --git a/relay-server/src/metrics_extraction/mod.rs b/relay-server/src/metrics_extraction/mod.rs index 4980fe0107..7ba6cd6db8 100644 --- a/relay-server/src/metrics_extraction/mod.rs +++ b/relay-server/src/metrics_extraction/mod.rs @@ -3,6 +3,7 @@ use relay_metrics::Bucket; pub mod event; pub mod generic; +pub mod metrics_summary; pub mod sessions; pub mod transactions; diff --git a/relay-server/src/metrics_extraction/snapshots/relay_server__metrics_extraction__event__tests__metrics_summaries_on_transaction_and_spans-2.snap b/relay-server/src/metrics_extraction/snapshots/relay_server__metrics_extraction__event__tests__metrics_summaries_on_transaction_and_spans-2.snap new file mode 100644 index 0000000000..4b5768c937 --- /dev/null +++ b/relay-server/src/metrics_extraction/snapshots/relay_server__metrics_extraction__event__tests__metrics_summaries_on_transaction_and_spans-2.snap @@ -0,0 +1,29 @@ +--- +source: relay-server/src/metrics_extraction/event.rs +expression: "&event.value().unwrap().spans.value().unwrap()[0].value().unwrap()._metrics_summary" +--- +MetricsSummary( + { + "d:custom/my_metric@millisecond": [ + MetricSummary { + min: 3000.0, + max: 3000.0, + sum: 3000.0, + count: 1, + tags: {}, + }, + ], + "d:spans/duration@millisecond": [ + MetricSummary { + min: 50.0, + max: 60.0, + sum: 100.0, + count: 2, + tags: { + "app_start_type": "warm", + "device.class": "1", + }, + }, + ], + }, +) diff --git a/relay-server/src/metrics_extraction/snapshots/relay_server__metrics_extraction__event__tests__metrics_summaries_on_transaction_and_spans.snap b/relay-server/src/metrics_extraction/snapshots/relay_server__metrics_extraction__event__tests__metrics_summaries_on_transaction_and_spans.snap new file mode 100644 index 0000000000..56f4f7b5c7 --- /dev/null +++ b/relay-server/src/metrics_extraction/snapshots/relay_server__metrics_extraction__event__tests__metrics_summaries_on_transaction_and_spans.snap @@ -0,0 +1,29 @@ +--- +source: relay-server/src/metrics_extraction/event.rs +expression: "&event.value().unwrap()._metrics_summary" +--- +MetricsSummary( + { + "d:custom/my_metric@millisecond": [ + MetricSummary { + min: 59000.0, + max: 59000.0, + sum: 59000.0, + count: 1, + tags: {}, + }, + ], + "d:spans/duration@millisecond": [ + MetricSummary { + min: 50.0, + max: 100.0, + sum: 150.0, + count: 2, + tags: { + "app_start_type": "warm", + "device.class": "1", + }, + }, + ], + }, +) diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 0e7f814e6b..3b41c4f690 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -1358,7 +1358,7 @@ impl EnvelopeProcessorService { if state.event_metrics_extracted { return Ok(()); } - let Some(event) = state.event.value() else { + let Some(event) = state.event.value_mut() else { return Ok(()); }; @@ -1428,6 +1428,7 @@ impl EnvelopeProcessorService { .aggregator .max_tag_value_length, global.options.span_extraction_sample_rate, + global.options.compute_metrics_summaries_sample_rate, ); state diff --git a/relay-server/src/services/processor/span/processing.rs b/relay-server/src/services/processor/span/processing.rs index d4aaa664d2..f57238f0f5 100644 --- a/relay-server/src/services/processor/span/processing.rs +++ b/relay-server/src/services/processor/span/processing.rs @@ -25,7 +25,7 @@ use relay_quotas::DataCategory; use relay_spans::{otel_to_sentry_span, otel_trace::Span as OtelSpan}; use crate::envelope::{ContentType, Envelope, Item, ItemType}; -use crate::metrics_extraction::generic::extract_metrics; +use crate::metrics_extraction::metrics_summary; use crate::services::outcome::{DiscardReason, Outcome}; use crate::services::processor::span::extract_transaction_span; use crate::services::processor::{ @@ -154,10 +154,19 @@ pub fn process( return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal)); }; - let metrics = extract_metrics( + let (metrics, metrics_summary) = metrics_summary::extract_and_summarize_metrics( span, CombinedMetricExtractionConfig::new(global_metrics_config, config), ); + if sample( + global_config + .options + .compute_metrics_summaries_sample_rate + .unwrap_or(1.0), + ) { + metrics_summary.apply_on(&mut span._metrics_summary) + } + state .extracted_metrics .extend_project_metrics(metrics, Some(sampling_result.decision())); diff --git a/tests/integration/consts.py b/tests/integration/consts.py index 63d7e8065d..d69a0f9a4c 100644 --- a/tests/integration/consts.py +++ b/tests/integration/consts.py @@ -1,4 +1,6 @@ # Minimum supported version for metric transaction by Relay. TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION = 3 +# Minimum supported version for generic metrics extraction by Relay. +METRICS_EXTRACTION_MIN_SUPPORTED_VERSION = 4 # Maximum supported version for metric transaction by Relay. TRANSACTION_EXTRACT_MAX_SUPPORTED_VERSION = 6 diff --git a/tests/integration/test_spans.py b/tests/integration/test_spans.py index 2e39a07589..9f61a95b9c 100644 --- a/tests/integration/test_spans.py +++ b/tests/integration/test_spans.py @@ -3,7 +3,10 @@ import uuid from collections import Counter from datetime import datetime, timedelta, timezone, UTC -from .consts import TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION +from .consts import ( + TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION, + METRICS_EXTRACTION_MIN_SUPPORTED_VERSION, +) import pytest from opentelemetry.proto.common.v1.common_pb2 import AnyValue, KeyValue @@ -2045,3 +2048,125 @@ def summarize_outcomes(outcomes): spans_consumer.assert_empty() outcomes_consumer.assert_empty() + + +def test_metrics_summary_with_extracted_spans( + mini_sentry, + relay_with_processing, + metrics_summaries_consumer, +): + mini_sentry.global_config["options"] = { + "relay.compute-metrics-summaries.sample-rate": 1.0 + } + + metrics_summaries_consumer = metrics_summaries_consumer() + + relay = relay_with_processing() + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["features"] = [ + "organizations:custom-metrics", + "projects:span-metrics-extraction", + "organizations:indexed-spans-extraction", + ] + project_config["config"]["transactionMetrics"] = { + "version": TRANSACTION_EXTRACT_MIN_SUPPORTED_VERSION, + } + project_config["config"]["metricExtraction"] = { + "version": METRICS_EXTRACTION_MIN_SUPPORTED_VERSION, + "metrics": [ + { + "category": "span", + "mri": "d:custom/my_metric@millisecond", + "field": "span.duration", + } + ], + } + + event = make_transaction({"event_id": "cbf6960622e14a45abc1f03b2055b186"}) + end = datetime.now(timezone.utc) - timedelta(seconds=1) + duration = timedelta(milliseconds=500) + start = end - duration + event["spans"] = [ + { + "description": "GET /api/0/organizations/?member=1", + "op": "http", + "origin": "manual", + "parent_span_id": "aaaaaaaaaaaaaaaa", + "span_id": "bbbbbbbbbbbbbbbb", + "start_timestamp": start.isoformat(), + "status": "success", + "timestamp": end.isoformat(), + "trace_id": "ff62a8b040f340bda5d830223def1d81", + }, + ] + + mri = "c:spans/some_metric@none" + metrics_summary = { + mri: [ + { + "min": 1.0, + "max": 2.0, + "sum": 3.0, + "count": 4, + "tags": { + "environment": "test", + }, + }, + ], + } + event["_metrics_summary"] = metrics_summary + + relay.send_event(project_id, event) + + metrics_summaries = metrics_summaries_consumer.get_metrics_summaries( + timeout=10.0, n=3 + ) + expected_mris = ["c:spans/some_metric@none", "d:custom/my_metric@millisecond"] + for metric_summary in metrics_summaries: + assert metric_summary["mri"] in expected_mris + + +def test_metrics_summary_with_standalone_spans( + mini_sentry, + relay_with_processing, + metrics_summaries_consumer, +): + mini_sentry.global_config["options"] = { + "relay.compute-metrics-summaries.sample-rate": 1.0 + } + + metrics_summaries_consumer = metrics_summaries_consumer() + + relay = relay_with_processing() + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["features"] = [ + "projects:span-metrics-extraction", + "organizations:standalone-span-ingestion", + ] + project_config["config"]["metricExtraction"] = { + "version": METRICS_EXTRACTION_MIN_SUPPORTED_VERSION, + "metrics": [ + { + "category": "span", + "mri": "d:custom/my_metric@millisecond", + "field": "span.duration", + } + ], + } + + duration = timedelta(milliseconds=500) + now = datetime.now(timezone.utc) + end = now - timedelta(seconds=1) + start = end - duration + + envelope = envelope_with_spans(start, end) + relay.send_envelope(project_id, envelope) + + metrics_summaries = metrics_summaries_consumer.get_metrics_summaries( + timeout=10.0, n=4 + ) + expected_mris = ["c:spans/some_metric@none", "d:custom/my_metric@millisecond"] + for metric_summary in metrics_summaries: + assert metric_summary["mri"] in expected_mris