diff --git a/config.go b/config.go index 506425899..d74e6e83b 100644 --- a/config.go +++ b/config.go @@ -1,5 +1,7 @@ package veneur +import "github.com/stripe/veneur/metricingester" + type Config struct { Aggregates []string `yaml:"aggregates"` AwsAccessKeyID string `yaml:"aws_access_key_id"` @@ -95,4 +97,6 @@ type Config struct { TraceLightstepNumClients int `yaml:"trace_lightstep_num_clients"` TraceLightstepReconnectPeriod string `yaml:"trace_lightstep_reconnect_period"` TraceMaxLengthBytes int `yaml:"trace_max_length_bytes"` + + AdditionalMetricSinks []metricingester.Sink } diff --git a/forward_grpc_test.go b/forward_grpc_test.go index 4603fc1c7..872c70959 100644 --- a/forward_grpc_test.go +++ b/forward_grpc_test.go @@ -226,7 +226,7 @@ func TestE2EForwardingGRPCMetrics(t *testing.T) { } assert.ElementsMatch(t, expectedNames, actualNames, - "The global Veneur didn't flush the right metrics") + "The global Veneur didn't flush the right metrics.\nEXPECTED: %v\nACTUAL: %v", expectedNames, actualNames) close(done) }() ff.local.Flush(context.TODO()) diff --git a/importsrv/server.go b/importsrv/server.go index 2d64cc9b6..bffb2c8be 100644 --- a/importsrv/server.go +++ b/importsrv/server.go @@ -122,14 +122,25 @@ func (s *Server) SendMetrics(ctx context.Context, mlist *forwardrpc.MetricList) // min/max/count values locally. // // We need a special metric type to represent this: the MixedHistogram. + // + // We also need to distinguish between the behavior before all metrics were forwarded + // to the behavior after. The before case is represented by Scope_Mixed, which makes that host's + // "min/max/count" not get emitted by the central veneur, since the local veneur is emitting it + // still. Scoped_MixedGlobal makes the central veneur emit the min/max/count. + // + // TODO(clin): After we completely migrate to the new veneur, delete support for this latter distinction! switch m.GetScope() { - case metricpb.Scope_Local, metricpb.Scope_Global: + case metricpb.Scope_Mixed: s.ingester.Merge( - metricingester.NewHistogramDigest(m.Name, v.Histogram, tags, hostname), + metricingester.NewMixedHistogramDigest(m.Name, v.Histogram, tags, hostname, false), ) - case metricpb.Scope_Mixed: + case metricpb.Scope_MixedGlobal: s.ingester.Merge( - metricingester.NewMixedHistogramDigest(m.Name, v.Histogram, tags, hostname), + metricingester.NewMixedHistogramDigest(m.Name, v.Histogram, tags, hostname, true), + ) + case metricpb.Scope_Local, metricpb.Scope_Global: + s.ingester.Merge( + metricingester.NewHistogramDigest(m.Name, v.Histogram, tags, hostname), ) } case nil: diff --git a/importsrv/server_test.go b/importsrv/server_test.go index 828dd8c50..762db0ead 100644 --- a/importsrv/server_test.go +++ b/importsrv/server_test.go @@ -34,7 +34,7 @@ var testE2EFlushingCases = []struct { "gauge not present", }, { - pbmetrics(pbhisto("test", []float64{1, 2, 3})), + pbmetrics(pbhisto("test", []float64{1, 2, 3}, pbscope(metricpb.Scope_MixedGlobal))), samplers.TMetrics( samplers.TGauge("test.min", 1), samplers.TGauge("test.max", 3), @@ -42,7 +42,15 @@ var testE2EFlushingCases = []struct { samplers.TGauge("test.50percentile", 2), samplers.TGauge("test.95percentile", 2.925), ), - "histo not present", + "mixed global histo not present", + }, + { + pbmetrics(pbhisto("test", []float64{1, 2, 3}, pbscope(metricpb.Scope_Mixed))), + samplers.TMetrics( + samplers.TGauge("test.50percentile", 2), + samplers.TGauge("test.95percentile", 2.925), + ), + "mixed histo not correct", }, { pbmetrics(pbset("test", []string{"asf", "clin", "aditya"})), @@ -131,7 +139,7 @@ var testE2EFlushingCases = []struct { { pbmetrics( - pbhisto("test", []float64{1, 2, 3}, pbscope(metricpb.Scope_Mixed), pbhostname("a")), + pbhisto("test", []float64{1, 2, 3}, pbscope(metricpb.Scope_MixedGlobal), pbhostname("a")), ), samplers.TMetrics( samplers.TGauge("test.min", 1, samplers.OptHostname("a")), @@ -140,12 +148,22 @@ var testE2EFlushingCases = []struct { samplers.TGauge("test.50percentile", 2), samplers.TGauge("test.95percentile", 2.925), ), - "mixed histos not reporting host level aggregates for one host", + "global mixed histos not reporting host level aggregates for one host", }, { pbmetrics( pbhisto("test", []float64{1, 2, 3}, pbscope(metricpb.Scope_Mixed), pbhostname("a")), - pbhisto("test", []float64{4, 5, 6}, pbscope(metricpb.Scope_Mixed), pbhostname("b")), + ), + samplers.TMetrics( + samplers.TGauge("test.50percentile", 2), + samplers.TGauge("test.95percentile", 2.925), + ), + "mixed histos should not report host metrics", + }, + { + pbmetrics( + pbhisto("test", []float64{1, 2, 3}, pbscope(metricpb.Scope_MixedGlobal), pbhostname("a")), + pbhisto("test", []float64{4, 5, 6}, pbscope(metricpb.Scope_MixedGlobal), pbhostname("b")), ), samplers.TMetrics( samplers.TGauge("test.min", 1, samplers.OptHostname("a")), @@ -157,6 +175,17 @@ var testE2EFlushingCases = []struct { samplers.TGauge("test.50percentile", 3.5), samplers.TGauge("test.95percentile", 5.85), ), + "global mixed histos not reporting host level aggregates for two hosts", + }, + { + pbmetrics( + pbhisto("test", []float64{1, 2, 3}, pbscope(metricpb.Scope_Mixed), pbhostname("a")), + pbhisto("test", []float64{4, 5, 6}, pbscope(metricpb.Scope_Mixed), pbhostname("b")), + ), + samplers.TMetrics( + samplers.TGauge("test.50percentile", 3.5), + samplers.TGauge("test.95percentile", 5.85), + ), "mixed histos not reporting host level aggregates for two hosts", }, diff --git a/metricingester/aggingestor.go b/metricingester/aggingestor.go index 1e8aa1d38..8e66796d5 100644 --- a/metricingester/aggingestor.go +++ b/metricingester/aggingestor.go @@ -38,10 +38,6 @@ func NewFlushingIngester( aggregates samplers.Aggregate, options ...ingesterOption, ) AggregatingIngestor { - if workers < 1 { - panic("more than one worker required") - } - var aggW []aggWorker for i := 0; i < workers; i++ { aggW = append(aggW, newAggWorker()) @@ -74,7 +70,7 @@ func (a AggregatingIngestor) Ingest(m Metric) error { func (a AggregatingIngestor) Merge(d Digest) error { var workerid metricHash - if d.digestType == mixedHistoDigest { + if d.digestType == mixedHistoDigest || d.digestType == mixedGlobalHistoDigest { workerid = d.MixedHash() % metricHash(len(a.workers)) } else { workerid = d.Hash() % metricHash(len(a.workers)) diff --git a/metricingester/aggworker.go b/metricingester/aggworker.go index c65e820c2..a669eff47 100644 --- a/metricingester/aggworker.go +++ b/metricingester/aggworker.go @@ -104,6 +104,9 @@ func (a aggWorker) merge(d Digest) { a.samplers.mixedHistograms[key] = samplers.NewMixedHisto(d.name, d.tags, samplers.OptMixedHistoHostname(d.hostname)) } a.samplers.mixedHistograms[key].Merge(d.hostname, d.histodigest) + if d.flushMixed { + a.samplers.mixedHosts[d.hostname] = struct{}{} + } case histoDigest: key := d.Key() if _, present := a.samplers.histograms[key]; !present { diff --git a/metricingester/sinkflusher.go b/metricingester/sinkflusher.go index 52e614f14..afe89104f 100644 --- a/metricingester/sinkflusher.go +++ b/metricingester/sinkflusher.go @@ -36,7 +36,7 @@ func (s sinkFlusher) Flush(ctx context.Context, envelope samplerEnvelope) { metrics = append(metrics, sampler.Flush(time.Second, s.percentiles, s.aggregates, true)...) } for _, sampler := range envelope.mixedHistograms { - metrics = append(metrics, sampler.Flush(s.percentiles, s.aggregates)...) + metrics = append(metrics, sampler.Flush(s.percentiles, s.aggregates, envelope.mixedHosts)...) } if len(metrics) == 0 { diff --git a/metricingester/types.go b/metricingester/types.go index ca4009b77..69d1d369e 100644 --- a/metricingester/types.go +++ b/metricingester/types.go @@ -96,6 +96,7 @@ type digestType int const ( histoDigest digestType = iota + 1 mixedHistoDigest + mixedGlobalHistoDigest setDigest ) @@ -109,6 +110,8 @@ type Digest struct { histodigest *metricpb.HistogramValue setdigest *metricpb.SetValue + + flushMixed bool } // TODO(clin): Ideally merge this with Hash when we choose an algorithm with sufficiently @@ -164,6 +167,7 @@ func NewMixedHistogramDigest( digest *metricpb.HistogramValue, tags []string, hostname string, + flushMixed bool, ) Digest { sort.Sort(sort.StringSlice(tags)) return Digest{ @@ -172,6 +176,23 @@ func NewMixedHistogramDigest( digestType: mixedHistoDigest, hostname: hostname, histodigest: digest, + flushMixed: flushMixed, + } +} + +func NewMixedGlobalHistogramDigest( + name string, + digest *metricpb.HistogramValue, + tags []string, + hostname string, +) Digest { + sort.Sort(sort.StringSlice(tags)) + return Digest{ + name: name, + tags: tags, + digestType: mixedGlobalHistoDigest, + hostname: hostname, + histodigest: digest, } } @@ -214,6 +235,8 @@ type samplerEnvelope struct { histograms map[metricKey]*samplers.Histo mixedHistograms map[metricKey]samplers.MixedHisto sets map[metricKey]*samplers.Set + + mixedHosts map[string]struct{} } func newSamplerEnvelope() samplerEnvelope { @@ -223,5 +246,7 @@ func newSamplerEnvelope() samplerEnvelope { histograms: make(map[metricKey]*samplers.Histo), mixedHistograms: make(map[metricKey]samplers.MixedHisto), sets: make(map[metricKey]*samplers.Set), + + mixedHosts: make(map[string]struct{}), } } diff --git a/samplers/metricpb/metric.pb.go b/samplers/metricpb/metric.pb.go index 737be3e99..0d1b288ae 100644 --- a/samplers/metricpb/metric.pb.go +++ b/samplers/metricpb/metric.pb.go @@ -40,20 +40,23 @@ const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package type Scope int32 const ( - Scope_Mixed Scope = 0 - Scope_Local Scope = 1 - Scope_Global Scope = 2 + Scope_Mixed Scope = 0 + Scope_Local Scope = 1 + Scope_Global Scope = 2 + Scope_MixedGlobal Scope = 3 ) var Scope_name = map[int32]string{ 0: "Mixed", 1: "Local", 2: "Global", + 3: "MixedGlobal", } var Scope_value = map[string]int32{ - "Mixed": 0, - "Local": 1, - "Global": 2, + "Mixed": 0, + "Local": 1, + "Global": 2, + "MixedGlobal": 3, } func (x Scope) String() string { @@ -1445,34 +1448,34 @@ var ( func init() { proto.RegisterFile("samplers/metricpb/metric.proto", fileDescriptorMetric) } var fileDescriptorMetric = []byte{ - // 451 bytes of a gzipped FileDescriptorProto + // 459 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x92, 0xd1, 0x6a, 0xdb, 0x30, - 0x14, 0x86, 0xa3, 0x38, 0x8e, 0xed, 0x93, 0x36, 0x33, 0x87, 0x6e, 0x88, 0x5c, 0x98, 0x60, 0xb6, - 0x91, 0x95, 0xe1, 0x42, 0xc6, 0x60, 0xb7, 0xeb, 0x0a, 0xe9, 0x45, 0x72, 0xe3, 0x96, 0xdd, 0x16, - 0x25, 0x15, 0x8a, 0xc1, 0x8e, 0x8c, 0xad, 0x8c, 0xe5, 0x2d, 0xf6, 0x58, 0xbb, 0xdc, 0x23, 0x8c, - 0x6c, 0x0f, 0x52, 0x24, 0x5b, 0x75, 0x73, 0x11, 0x72, 0xce, 0x7f, 0xbe, 0x5f, 0xe6, 0x3f, 0x12, - 0x44, 0x35, 0x2b, 0xca, 0x9c, 0x57, 0xf5, 0x55, 0xc1, 0x55, 0x95, 0x6d, 0xca, 0x75, 0x5b, 0x24, - 0x65, 0x25, 0x95, 0x44, 0xdf, 0xca, 0x93, 0xd7, 0xea, 0x31, 0x13, 0xbc, 0x56, 0x57, 0xed, 0x7f, - 0x03, 0xc4, 0xff, 0xfb, 0x30, 0x5c, 0x19, 0x06, 0x11, 0x06, 0x3b, 0x56, 0x70, 0x4a, 0xa6, 0x64, - 0x16, 0xa4, 0xa6, 0xd6, 0x9a, 0x62, 0xa2, 0xa6, 0xfd, 0xa9, 0xa3, 0x35, 0x5d, 0x63, 0x0c, 0x03, - 0x75, 0x28, 0x39, 0x75, 0xa6, 0x64, 0x36, 0x9e, 0x8f, 0x13, 0xfb, 0x89, 0xe4, 0xfe, 0x50, 0xf2, - 0xd4, 0xcc, 0x70, 0x0e, 0xde, 0x46, 0xee, 0x77, 0x8a, 0x57, 0xd4, 0x9d, 0x92, 0xd9, 0x68, 0xfe, - 0xa6, 0xc3, 0xbe, 0x35, 0x83, 0xef, 0x2c, 0xdf, 0xf3, 0xdb, 0x5e, 0x6a, 0x41, 0xfc, 0x08, 0xae, - 0x60, 0x7b, 0xc1, 0xe9, 0xd0, 0x38, 0x2e, 0x3a, 0xc7, 0x42, 0xcb, 0x96, 0x6f, 0x20, 0xfc, 0x02, - 0xc1, 0x36, 0xab, 0x95, 0x14, 0x15, 0x2b, 0xa8, 0x67, 0x1c, 0xb4, 0x73, 0xdc, 0xda, 0x91, 0x75, - 0x75, 0x30, 0xbe, 0x07, 0xa7, 0xe6, 0x8a, 0xfa, 0xc6, 0x83, 0x9d, 0xe7, 0x8e, 0x2b, 0x4b, 0x6b, - 0x00, 0xdf, 0x81, 0x5b, 0x6f, 0x64, 0xc9, 0x69, 0x60, 0x82, 0xbe, 0x7a, 0x41, 0x6a, 0x39, 0x6d, - 0xa6, 0x38, 0x01, 0x7f, 0x2b, 0x6b, 0x65, 0x56, 0x07, 0x66, 0x75, 0xcf, 0xfd, 0xb5, 0x07, 0xee, - 0x0f, 0x7d, 0x64, 0xfc, 0x16, 0xce, 0x5e, 0xc6, 0xc6, 0x8b, 0x76, 0x60, 0x96, 0xed, 0xa4, 0x2d, - 0x15, 0x03, 0x74, 0x51, 0x4f, 0x19, 0x62, 0x99, 0x05, 0x8c, 0x4f, 0xc3, 0xe1, 0x67, 0xf0, 0xd5, - 0x43, 0x73, 0xa9, 0x06, 0x1d, 0xcd, 0x27, 0x89, 0xbd, 0xe4, 0x15, 0xaf, 0x44, 0xb6, 0x13, 0x37, - 0xa6, 0xbb, 0x61, 0x8a, 0xa5, 0x9e, 0x6a, 0x9a, 0x38, 0x01, 0xdf, 0x26, 0xc6, 0x18, 0xce, 0xb7, - 0x87, 0x92, 0x57, 0x0f, 0xb9, 0x14, 0xfa, 0x67, 0xce, 0x39, 0x4b, 0x47, 0x46, 0x5c, 0x4a, 0xb1, - 0x94, 0xe2, 0xf2, 0x03, 0xb8, 0x26, 0x37, 0x06, 0xe0, 0xae, 0xb2, 0x9f, 0xfc, 0x31, 0xec, 0xe9, - 0x72, 0x29, 0x37, 0x2c, 0x0f, 0x09, 0x02, 0x0c, 0x17, 0xb9, 0x5c, 0xb3, 0x3c, 0xec, 0x5f, 0x7e, - 0x85, 0x81, 0x7e, 0x0b, 0x38, 0x02, 0xaf, 0x4d, 0xdd, 0xb0, 0x26, 0x5c, 0x48, 0xf0, 0x1c, 0x82, - 0xe7, 0x0c, 0x61, 0x1f, 0x3d, 0x70, 0xee, 0xb8, 0x0a, 0x1d, 0x8d, 0xdc, 0x67, 0x05, 0xaf, 0xc2, - 0xc1, 0x75, 0xf8, 0xfb, 0x18, 0x91, 0x3f, 0xc7, 0x88, 0xfc, 0x3d, 0x46, 0xe4, 0xd7, 0xbf, 0xa8, - 0xb7, 0x1e, 0x9a, 0x07, 0xfb, 0xe9, 0x29, 0x00, 0x00, 0xff, 0xff, 0x7b, 0x6d, 0x88, 0x2f, 0xf3, - 0x02, 0x00, 0x00, + 0x14, 0x86, 0xab, 0x38, 0x8e, 0xed, 0xe3, 0x36, 0x35, 0x87, 0x6e, 0x88, 0x5c, 0x98, 0x60, 0xb6, + 0x11, 0xca, 0x70, 0x21, 0x63, 0x30, 0x76, 0xb7, 0xae, 0x90, 0x5e, 0x24, 0x37, 0x6e, 0xd9, 0x6d, + 0x51, 0x52, 0xa1, 0x18, 0xec, 0xc8, 0xd8, 0xca, 0x58, 0xde, 0x62, 0x8f, 0xb5, 0xcb, 0x3d, 0xc2, + 0xc8, 0xf6, 0x20, 0x43, 0xb2, 0x55, 0xb7, 0x17, 0x21, 0xe7, 0xfc, 0xe7, 0xfb, 0x65, 0xfe, 0x23, + 0x41, 0xdc, 0xb0, 0xb2, 0x2a, 0x78, 0xdd, 0x5c, 0x95, 0x5c, 0xd5, 0xf9, 0xa6, 0x5a, 0x77, 0x45, + 0x5a, 0xd5, 0x52, 0x49, 0xf4, 0xad, 0x3c, 0x79, 0xa5, 0x1e, 0x73, 0xc1, 0x1b, 0x75, 0xd5, 0xfd, + 0xb7, 0x40, 0xf2, 0x6f, 0x00, 0xa3, 0x95, 0x61, 0x10, 0x61, 0xb8, 0x63, 0x25, 0xa7, 0x64, 0x4a, + 0x66, 0x41, 0x66, 0x6a, 0xad, 0x29, 0x26, 0x1a, 0x3a, 0x98, 0x3a, 0x5a, 0xd3, 0x35, 0x26, 0x30, + 0x54, 0x87, 0x8a, 0x53, 0x67, 0x4a, 0x66, 0xe3, 0xf9, 0x38, 0xb5, 0x9f, 0x48, 0xef, 0x0f, 0x15, + 0xcf, 0xcc, 0x0c, 0xe7, 0xe0, 0x6d, 0xe4, 0x7e, 0xa7, 0x78, 0x4d, 0xdd, 0x29, 0x99, 0x85, 0xf3, + 0xd7, 0x3d, 0xf6, 0xb5, 0x1d, 0x7c, 0x63, 0xc5, 0x9e, 0xdf, 0x9e, 0x64, 0x16, 0xc4, 0xf7, 0xe0, + 0x0a, 0xb6, 0x17, 0x9c, 0x8e, 0x8c, 0xe3, 0xa2, 0x77, 0x2c, 0xb4, 0x6c, 0xf9, 0x16, 0xc2, 0x4f, + 0x10, 0x6c, 0xf3, 0x46, 0x49, 0x51, 0xb3, 0x92, 0x7a, 0xc6, 0x41, 0x7b, 0xc7, 0xad, 0x1d, 0x59, + 0x57, 0x0f, 0xe3, 0x3b, 0x70, 0x1a, 0xae, 0xa8, 0x6f, 0x3c, 0xd8, 0x7b, 0xee, 0xb8, 0xb2, 0xb4, + 0x06, 0xf0, 0x2d, 0xb8, 0xcd, 0x46, 0x56, 0x9c, 0x06, 0x26, 0xe8, 0xf9, 0x33, 0x52, 0xcb, 0x59, + 0x3b, 0xc5, 0x09, 0xf8, 0x5b, 0xd9, 0x28, 0xb3, 0x3a, 0x30, 0xab, 0x7b, 0xea, 0xaf, 0x3d, 0x70, + 0xbf, 0xeb, 0x23, 0x93, 0x37, 0x70, 0xfa, 0x3c, 0x36, 0x5e, 0x74, 0x03, 0xb3, 0x6c, 0x27, 0xeb, + 0xa8, 0x04, 0xa0, 0x8f, 0xfa, 0x92, 0x21, 0x96, 0x59, 0xc0, 0xf8, 0x65, 0x38, 0xfc, 0x08, 0xbe, + 0x7a, 0x68, 0x2f, 0xd5, 0xa0, 0xe1, 0x7c, 0x92, 0xda, 0x4b, 0x5e, 0xf1, 0x5a, 0xe4, 0x3b, 0x71, + 0x63, 0xba, 0x1b, 0xa6, 0x58, 0xe6, 0xa9, 0xb6, 0x49, 0x52, 0xf0, 0x6d, 0x62, 0x4c, 0xe0, 0x6c, + 0x7b, 0xa8, 0x78, 0xfd, 0x50, 0x48, 0xa1, 0x7f, 0xe6, 0x9c, 0xd3, 0x2c, 0x34, 0xe2, 0x52, 0x8a, + 0xa5, 0x14, 0x97, 0x9f, 0xc1, 0x35, 0xb9, 0x31, 0x00, 0x77, 0x95, 0xff, 0xe0, 0x8f, 0xd1, 0x89, + 0x2e, 0x97, 0x72, 0xc3, 0x8a, 0x88, 0x20, 0xc0, 0x68, 0x51, 0xc8, 0x35, 0x2b, 0xa2, 0x01, 0x9e, + 0x43, 0x68, 0x88, 0x4e, 0x70, 0x2e, 0xbf, 0xc0, 0x50, 0x3f, 0x0e, 0x0c, 0xc1, 0xeb, 0xd6, 0xd0, + 0x9a, 0x4d, 0xda, 0x88, 0xe0, 0x19, 0x04, 0x4f, 0xa1, 0xa2, 0x01, 0x7a, 0xe0, 0xdc, 0x71, 0x15, + 0x39, 0x1a, 0xb9, 0xcf, 0x4b, 0x5e, 0x47, 0xc3, 0xeb, 0xe8, 0xd7, 0x31, 0x26, 0xbf, 0x8f, 0x31, + 0xf9, 0x73, 0x8c, 0xc9, 0xcf, 0xbf, 0xf1, 0xc9, 0x7a, 0x64, 0x5e, 0xf0, 0x87, 0xff, 0x01, 0x00, + 0x00, 0xff, 0xff, 0xd3, 0x80, 0x0a, 0x59, 0x04, 0x03, 0x00, 0x00, } diff --git a/samplers/metricpb/metric.proto b/samplers/metricpb/metric.proto index dd56c918c..90e4b9820 100644 --- a/samplers/metricpb/metric.proto +++ b/samplers/metricpb/metric.proto @@ -27,6 +27,7 @@ enum Scope { Mixed = 0; Local = 1; Global = 2; + MixedGlobal = 3; } // Type can be any of the valid metric types recognized by Veneur. diff --git a/samplers/mixedhisto.go b/samplers/mixedhisto.go index a887ddf79..593933fa2 100644 --- a/samplers/mixedhisto.go +++ b/samplers/mixedhisto.go @@ -67,7 +67,7 @@ func (m MixedHisto) Sample(sample float64, sampleRate float32, host string) { } // Flush returns metrics for the specified percentiles and aggregates. -func (m MixedHisto) Flush(percentiles []float64, aggregates HistogramAggregates) []InterMetric { +func (m MixedHisto) Flush(percentiles []float64, aggregates HistogramAggregates, mixedHosts map[string]struct{}) []InterMetric { ms := m.histo.Flush(0, percentiles, HistogramAggregates{}, false) // doesn't support median! Would require implementing separated digests. @@ -84,6 +84,9 @@ func (m MixedHisto) Flush(percentiles []float64, aggregates HistogramAggregates) } } for host, _ := range m.max { + if _, ok := mixedHosts[host]; !ok { + continue + } if (aggregates.Value & AggregateMax) != 0 { ms = append(ms, metric("max", m.max[host], host)) } diff --git a/samplers/mixedhisto_test.go b/samplers/mixedhisto_test.go index 3894fd9fd..4cd2accbd 100644 --- a/samplers/mixedhisto_test.go +++ b/samplers/mixedhisto_test.go @@ -100,11 +100,13 @@ func testSample(ps []float64, aggs Aggregate, inms []sampleCase, ts []TestMetric return func(t *testing.T) { t.Parallel() mh := NewMixedHisto("test", nil) + mixedServers := make(map[string]struct{}) for _, inm := range inms { mh.Sample(inm.val, 1, inm.host) + mixedServers[inm.host] = struct{}{} } - results := ToTestMetrics(mh.Flush(ps, HistogramAggregates{aggs, 0})) + results := ToTestMetrics(mh.Flush(ps, HistogramAggregates{aggs, 0}, mixedServers)) assert.ElementsMatch( t, results, @@ -283,11 +285,13 @@ func testMerge(ps []float64, aggs Aggregate, mergeCase []mergeCase, ts []TestMet return func(t *testing.T) { t.Parallel() mh := NewMixedHisto("test", nil) + mixedServers := make(map[string]struct{}) for _, c := range mergeCase { mh.Merge(c.host, histvalue(c.samples)) + mixedServers[c.host] = struct{}{} } - results := ToTestMetrics(mh.Flush(ps, HistogramAggregates{aggs, 0})) + results := ToTestMetrics(mh.Flush(ps, HistogramAggregates{aggs, 0}, mixedServers)) assert.ElementsMatch( t, results, diff --git a/server.go b/server.go index 89d67de10..c349774e9 100644 --- a/server.go +++ b/server.go @@ -585,9 +585,29 @@ func NewFromConfig(logger *logrus.Logger, conf Config) (*Server, error) { ret.grpcListenAddress = conf.GrpcAddress if ret.grpcListenAddress != "" { // convert all the workers to the proper interface + var sinks []metricingester.Sink - ret.grpcServer = importsrv.New(metricingester.AggregatingIngestor{}, - importsrv.WithTraceClient(ret.TraceClient)) + sinks = append(sinks, conf.AdditionalMetricSinks...) + + for _, s := range ret.metricSinks { + sinks = append(sinks, s) + } + for _, s := range ret.plugins { + sinks = append(sinks, s) + } + + ing := metricingester.NewFlushingIngester( + conf.NumWorkers, + ret.interval, + sinks, + conf.Percentiles, + ret.HistogramAggregates.Value, + ) + ing.Start() + ret.grpcServer = importsrv.New( + ing, + importsrv.WithTraceClient(ret.TraceClient), + ) } logger.WithField("config", conf).Debug("Initialized server") diff --git a/server_test.go b/server_test.go index 3a438178e..f24b3555b 100644 --- a/server_test.go +++ b/server_test.go @@ -134,6 +134,12 @@ func generateMetrics() (metricValues []float64, expectedMetrics map[string]float // so that flushes to these sinks do "nothing". func setupVeneurServer(t testing.TB, config Config, transport http.RoundTripper, mSink sinks.MetricSink, sSink sinks.SpanSink) *Server { logger := logrus.New() + if mSink == nil { + // Install a blackhole sink if we have no other sinks + bhs, _ := blackhole.NewBlackholeMetricSink() + mSink = bhs + } + config.AdditionalMetricSinks = append(config.AdditionalMetricSinks, mSink) server, err := NewFromConfig(logger, config) if err != nil { t.Fatal(err) @@ -150,11 +156,6 @@ func setupVeneurServer(t testing.TB, config Config, transport http.RoundTripper, trace.NeutralizeClient(server.TraceClient) server.TraceClient = nil - if mSink == nil { - // Install a blackhole sink if we have no other sinks - bhs, _ := blackhole.NewBlackholeMetricSink() - mSink = bhs - } server.metricSinks = append(server.metricSinks, mSink) if sSink == nil {