Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move runtime metrics reporting to a separate timer routine. #294

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Config struct {
OmitEmptyHostname bool `yaml:"omit_empty_hostname"`
Percentiles []float64 `yaml:"percentiles"`
ReadBufferSizeBytes int `yaml:"read_buffer_size_bytes"`
RuntimeMetricsFlushInterval string `yaml:"runtime_metrics_flush_interval"`
SentryDsn string `yaml:"sentry_dsn"`
SsfAddress string `yaml:"ssf_address"`
SsfBufferSize int `yaml:"ssf_buffer_size"`
Expand Down
11 changes: 5 additions & 6 deletions config_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"io/ioutil"
"net/url"
"os"
"time"

"github.com/kelseyhightower/envconfig"

Expand Down Expand Up @@ -137,10 +136,10 @@ func readConfig(r io.Reader) (c Config, err error) {
}
c.SsfListenAddresses = append(c.SsfListenAddresses, ssfAddrs...)

return c, nil
}
if c.RuntimeMetricsFlushInterval == "" {
// Default the runtime metrics to the flush interval if not overriden
c.RuntimeMetricsFlushInterval = c.Interval
}

// ParseInterval handles parsing the flush interval as a time.Duration
func (c Config) ParseInterval() (time.Duration, error) {
return time.ParseDuration(c.Interval)
return c, nil
}
2 changes: 1 addition & 1 deletion config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestReadConfig(t *testing.T) {
assert.Equal(t, "https://app.datadoghq.com", c.DatadogAPIHostname)
assert.Equal(t, 96, c.NumWorkers)

interval, err := c.ParseInterval()
interval, err := time.ParseDuration(c.Interval)
assert.NoError(t, err)
assert.Equal(t, interval, 10*time.Second)

Expand Down
5 changes: 3 additions & 2 deletions example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ num_workers: 96
num_readers: 1



# == LIMITS ==

# How big of a buffer to allocate for incoming metrics. Metrics longer than this
Expand All @@ -127,7 +126,6 @@ read_buffer_size_bytes: 2097152
# will post multiple times in parallel if the limit is exceeded.
flush_max_per_body: 25000


# == DIAGNOSTICS ==

# Sets the log level to DEBUG
Expand All @@ -139,6 +137,9 @@ sentry_dsn: ""
# Enables Go profiling
enable_profiling: false

# Veneur emits metrics about itself. By default it matches the `interval`, but
# you can tune it to a different interval if desired.
runtime_metrics_flush_interval: "10s"


# == SINKS ==
Expand Down
8 changes: 0 additions & 8 deletions flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"net"
"net/http"
"net/url"
"runtime"
"time"

"github.com/DataDog/datadog-go/statsd"
Expand All @@ -26,13 +25,6 @@ func (s *Server) Flush() {
span := tracer.StartSpan("flush").(*trace.Span)
defer span.Finish()

mem := &runtime.MemStats{}
runtime.ReadMemStats(mem)

s.Statsd.Gauge("mem.heap_alloc_bytes", float64(mem.HeapAlloc), nil, 1.0)
s.Statsd.Gauge("gc.number", float64(mem.NumGC), nil, 1.0)
s.Statsd.Gauge("gc.pause_total_ns", float64(mem.PauseTotalNs), nil, 1.0)

// right now we have only one destination plugin
// but eventually, this is where we would loop over our supported
// destinations
Expand Down
40 changes: 36 additions & 4 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"io"
"net"
"net/http"
"runtime"
"strings"
"sync"
"syscall"
Expand Down Expand Up @@ -87,10 +88,11 @@ type Server struct {
SSFListenAddrs []net.Addr
RcvbufBytes int

interval time.Duration
numReaders int
metricMaxLength int
traceMaxLengthBytes int
interval time.Duration
runtimeMetricInterval time.Duration
numReaders int
metricMaxLength int
traceMaxLengthBytes int

tlsConfig *tls.Config
tcpReadTimeout time.Duration
Expand Down Expand Up @@ -146,6 +148,12 @@ func NewFromConfig(conf Config) (ret Server, err error) {
if err != nil {
return
}

ret.runtimeMetricInterval, err = time.ParseDuration(conf.RuntimeMetricsFlushInterval)
if err != nil {
return
}

ret.HTTPClient = &http.Client{
// make sure that POSTs to datadog do not overflow the flush interval
Timeout: ret.interval * 9 / 10,
Expand Down Expand Up @@ -429,6 +437,30 @@ func (s *Server) Start() {
logrus.Info("Tracing sockets are configured - not reading trace socket")
}

// Emit runtime metrics about ourselves on a timer.
go func() {
defer func() {
ConsumePanic(s.Sentry, s.Statsd, s.Hostname, recover())
}()
ticker := time.NewTicker(s.runtimeMetricInterval)
for {
select {
case <-s.shutdown:
// stop flushing on graceful shutdown
ticker.Stop()
return
case <-ticker.C:
mem := &runtime.MemStats{}
runtime.ReadMemStats(mem)

s.Statsd.Gauge("mem.heap_alloc_bytes", float64(mem.HeapAlloc), nil, 1.0)
s.Statsd.Gauge("gc.number", float64(mem.NumGC), nil, 1.0)
s.Statsd.Gauge("gc.pause_total_ns", float64(mem.PauseTotalNs), nil, 1.0)
s.Statsd.Gauge("goroutines", float64(runtime.NumGoroutine()), nil, 1.0)
}
}
}()

// Flush every Interval forever!
go func() {
defer func() {
Expand Down
5 changes: 3 additions & 2 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ func generateConfig(forwardAddr string) Config {
Hostname: "localhost",

// Use a shorter interval for tests
Interval: DefaultFlushInterval.String(),
Interval: DefaultFlushInterval.String(),
RuntimeMetricsFlushInterval: DefaultFlushInterval.String(),
Key: "",
MetricMaxLength: 4096,
Percentiles: []float64{.5, .75, .99},
Expand Down Expand Up @@ -189,7 +190,7 @@ type fixture struct {
}

func newFixture(t *testing.T, config Config) *fixture {
interval, err := config.ParseInterval()
interval, err := time.ParseDuration(config.Interval)
assert.NoError(t, err)

// Set up a remote server (the API that we're sending the data to)
Expand Down