diff --git a/CHANGELOG.md b/CHANGELOG.md index 876877200..0a2f95433 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,10 @@ ## Added * Added a timeout for sink ingestion to all sinks, which prevents a single slow sink from blocking ingestion on other span sinks indefinitely. Thanks, [aditya](https://github.com/chimeracoder)! +## Changes +* `kafka_span_sample_rate_percent` now allows floating-point values. `kafka.NewKafkaSpanSink` now takes a floating point sample rate, accordingly. Thanks [chimeracoder](https://github.com/chimeracoder)! + + ## Bugfixes * Added a timeout to the Kafka sink, which prevents the Kafka client from blocking other span sinks. Thanks, [aditya](https://github.com/chimeracoder)! diff --git a/config.go b/config.go index 331e05b18..63d24803e 100644 --- a/config.go +++ b/config.go @@ -36,7 +36,7 @@ type Config struct { KafkaSpanBufferFrequency string `yaml:"kafka_span_buffer_frequency"` KafkaSpanBufferMesages int `yaml:"kafka_span_buffer_mesages"` KafkaSpanRequireAcks string `yaml:"kafka_span_require_acks"` - KafkaSpanSampleRatePercent int `yaml:"kafka_span_sample_rate_percent"` + KafkaSpanSampleRatePercent float64 `yaml:"kafka_span_sample_rate_percent"` KafkaSpanSampleTag string `yaml:"kafka_span_sample_tag"` KafkaSpanSerializationFormat string `yaml:"kafka_span_serialization_format"` KafkaSpanTopic string `yaml:"kafka_span_topic"` diff --git a/example.yaml b/example.yaml index c25800ef6..e87c8ac52 100644 --- a/example.yaml +++ b/example.yaml @@ -292,10 +292,8 @@ kafka_span_topic: "veneur_spans" # of traceID kafka_span_sample_tag: "" -# Sample rate in percent (as an integer) -# This should ideally be a floating point number, but at the time this was -# written, gojson interpreted whole-number floats in yaml as integers. -kafka_span_sample_rate_percent: 100 +# Percentage of spans that will be sent to the Kafka sink +kafka_span_sample_rate_percent: 2.0 kafka_metric_buffer_bytes: 0 diff --git a/sinks/kafka/kafka.go b/sinks/kafka/kafka.go index 98815ac1d..1bfb09824 100644 --- a/sinks/kafka/kafka.go +++ b/sinks/kafka/kafka.go @@ -225,7 +225,7 @@ func (k *KafkaMetricSink) FlushOtherSamples(ctx context.Context, samples []ssf.S } // NewKafkaSpanSink creates a new Kafka Plugin. -func NewKafkaSpanSink(logger *logrus.Logger, cl *trace.Client, brokers string, topic string, partitioner string, ackRequirement string, retries int, bufferBytes int, bufferMessages int, bufferDuration string, serializationFormat string, sampleTag string, sampleRatePercentage int) (*KafkaSpanSink, error) { +func NewKafkaSpanSink(logger *logrus.Logger, cl *trace.Client, brokers string, topic string, partitioner string, ackRequirement string, retries int, bufferBytes int, bufferMessages int, bufferDuration string, serializationFormat string, sampleTag string, sampleRatePercentage float64) (*KafkaSpanSink, error) { if logger == nil { logger = &logrus.Logger{Out: ioutil.Discard} }