Skip to content

Commit

Permalink
Unify tracking of metrics and events in CloudHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
tiedotguy committed Feb 6, 2022
1 parent 74931dd commit 89112f4
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 57 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
Next
----
- Breaking internal metrics: This removes the `cloudprovider.items_queued` metric, and now tracks
the absolute number of hosts to look up, regardless of type.

34.0.1
------
- Forcing a new tag release to allow for docker release
Expand Down
4 changes: 1 addition & 3 deletions METRICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ Metrics:
| cloudprovider.cache_refresh_negative | gauge (cumulative) | | The cumulative number of refreshes which had an error refreshing and used old data
| cloudprovider.cache_hit | gauge (cumulative) | | The cumulative number of cache hits (host was in the cache)
| cloudprovider.cache_miss | gauge (cumulative) | | The cumulative number of cache misses
| cloudprovider.hosts_queued | gauge (flush) | type | The absolute number of hosts waiting to be looked up
| cloudprovider.items_queued | gauge (flush) | type | The absolute number of metrics or events waiting for a host lookup to complete
| cloudprovider.hosts_queued | gauge (flush) | | The absolute number of hosts waiting to be looked up
| http.forwarder.invalid | counter | | The number of failures to prepare a batch of metrics to forward
| http.forwarder.created | counter | | The number of batches prepared for forwarding
| http.forwarder.sent | counter | | The number of batches successfully forwarded
Expand All @@ -70,7 +69,6 @@ Metrics:
| version | The git tag of the build
| commit | The short git commit of the build
| backend | The backend sending a particular metric
| type | Either metric or event for cloudprovider.hosts_queued, or event for cloudprovider.items_queued
| result | Success to indicate a batch of metrics was successfully processed, failure to indicate a batch of metrics was not processed, with additional failure tag for why)
| failure | The reason a batch of metrics was not processed
| server-name | The name of an http-server as specified in the config file
Expand Down
2 changes: 1 addition & 1 deletion pkg/statsd/handler_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (bh *BackendHandler) DispatchEvent(ctx context.Context, e *gostatsd.Event)
for _, backend := range bh.backends {
select {
case <-ctx.Done():
// Not all backends got the event, should decrement the wg counter
// Not all backends got the event, should decrement the wg counter to account for it
bh.eventWg.Add(eventsDispatched - len(bh.backends))
return
case bh.concurrentEvents <- struct{}{}:
Expand Down
102 changes: 49 additions & 53 deletions pkg/statsd/handler_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,28 @@ import (
"github.com/atlassian/gostatsd/pkg/stats"
)

type pendingMetricsAndEvents struct {
metrics *gostatsd.MetricMap
events []*gostatsd.Event
}

// CloudHandler enriches metrics and events with additional information fetched from cloud provider.
type CloudHandler struct {
// These fields are accessed by any go routine, must use atomic ops
statsCacheHit uint64 // Cumulative number of cache hits
statsCacheMiss uint64 // Cumulative number of cache misses

// All other stats fields may only be read or written by the main CloudHandler.Run goroutine
statsMetricHostsQueued uint64 // Absolute number of IPs waiting for a CP to respond for metrics
statsEventItemsQueued uint64 // Absolute number of events queued, waiting for a CP to respond
statsEventHostsQueued uint64 // Absolute number of IPs waiting for a CP to respond for events

cachedInstances gostatsd.CachedInstances
handler gostatsd.PipelineHandler
incomingMetrics chan *gostatsd.MetricMap
incomingEvents chan *gostatsd.Event

// emitChan triggers a write of all the current stats when it is given a Statser
emitChan chan stats.Statser
awaitingEvents map[gostatsd.Source][]*gostatsd.Event
awaitingMetrics map[gostatsd.Source]*gostatsd.MetricMap
emitChan chan stats.Statser

perHostPending map[gostatsd.Source]*pendingMetricsAndEvents
toLookupIPs []gostatsd.Source
wg sync.WaitGroup
wgPendingEvents sync.WaitGroup

estimatedTags int
}
Expand All @@ -45,8 +45,7 @@ func NewCloudHandler(cachedInstances gostatsd.CachedInstances, handler gostatsd.
incomingMetrics: make(chan *gostatsd.MetricMap),
incomingEvents: make(chan *gostatsd.Event),
emitChan: make(chan stats.Statser),
awaitingEvents: make(map[gostatsd.Source][]*gostatsd.Event),
awaitingMetrics: make(map[gostatsd.Source]*gostatsd.MetricMap),
perHostPending: make(map[gostatsd.Source]*pendingMetricsAndEvents),
estimatedTags: handler.EstimatedTags() + cachedInstances.EstimatedTags(),
}
}
Expand Down Expand Up @@ -105,17 +104,17 @@ func (ch *CloudHandler) DispatchEvent(ctx context.Context, e *gostatsd.Event) {
ch.handler.DispatchEvent(ctx, e)
return
}
ch.wg.Add(1) // Increment before sending to the channel
ch.wgPendingEvents.Add(1) // Increment before sending to the channel
select {
case <-ctx.Done():
ch.wg.Done()
ch.wgPendingEvents.Done()
case ch.incomingEvents <- e:
}
}

// WaitForEvents waits for all event-dispatching goroutines to finish.
func (ch *CloudHandler) WaitForEvents() {
ch.wg.Wait()
ch.wgPendingEvents.Wait()
ch.handler.WaitForEvents()
}

Expand Down Expand Up @@ -160,11 +159,8 @@ func (ch *CloudHandler) emit(statser stats.Statser) {
// atomic
statser.Gauge("cloudprovider.cache_hit", float64(atomic.LoadUint64(&ch.statsCacheHit)), nil)
statser.Gauge("cloudprovider.cache_miss", float64(atomic.LoadUint64(&ch.statsCacheMiss)), nil)
t := gostatsd.Tags{"type:metric"}
statser.Gauge("cloudprovider.hosts_queued", float64(ch.statsMetricHostsQueued), t)
t = gostatsd.Tags{"type:event"}
statser.Gauge("cloudprovider.hosts_queued", float64(ch.statsEventHostsQueued), t)
statser.Gauge("cloudprovider.items_queued", float64(ch.statsEventItemsQueued), t)
// non-atomic
statser.Gauge("cloudprovider.hosts_queued", float64(len(ch.perHostPending)), nil)
}

func (ch *CloudHandler) Run(ctx context.Context) {
Expand All @@ -184,10 +180,8 @@ func (ch *CloudHandler) Run(ctx context.Context) {
case info := <-infoSource:
ch.handleInstanceInfo(ctx, info)
case metrics := <-ch.incomingMetrics:
// Add metrics to awaitingMetrics, accumulate IPs to lookup
ch.handleIncomingMetrics(metrics)
case e := <-ch.incomingEvents:
// Add event to awaitingEvents, accumulate IPs to lookup
ch.handleIncomingEvent(e)
case statser := <-ch.emitChan:
ch.emit(statser)
Expand All @@ -203,38 +197,46 @@ func (ch *CloudHandler) Run(ctx context.Context) {
}

func (ch *CloudHandler) handleInstanceInfo(ctx context.Context, info gostatsd.InstanceInfo) {
mm := ch.awaitingMetrics[info.IP]
if mm != nil {
delete(ch.awaitingMetrics, info.IP)
ch.statsMetricHostsQueued--
go ch.updateAndDispatchMetrics(ctx, info.Instance, mm)
pending, ok := ch.perHostPending[info.IP]
if !ok {
return // got an instance for something we didn't request, ignore it.
}
events := ch.awaitingEvents[info.IP]
if len(events) > 0 {
delete(ch.awaitingEvents, info.IP)
ch.statsEventItemsQueued -= uint64(len(events))
ch.statsEventHostsQueued--
go ch.updateAndDispatchEvents(ctx, info.Instance, events)

delete(ch.perHostPending, info.IP)
if pending.metrics != nil {
go ch.updateAndDispatchMetrics(ctx, info.Instance, pending.metrics)
}
if len(pending.events) > 0 {
go ch.updateAndDispatchEvents(ctx, info.Instance, pending.events)
}
}

// prepareMetricQueue will ensure that ch.awaitingMetrics has a matching MetricMap for
// source, and return it. If it did not have one initially, it will also enqueue source
// for lookup. The functionality is overloaded to minimize code duplication.
func (ch *CloudHandler) prepareMetricQueue(source gostatsd.Source) *gostatsd.MetricMap {
if queue, ok := ch.awaitingMetrics[source]; ok {
return queue
}
if len(ch.awaitingEvents[source]) == 0 {
// preparePending will return a place to queue things that are waiting to be processed,
// and ensure that source will be looked up if it wasn't already.
func (ch *CloudHandler) preparePending(source gostatsd.Source) *pendingMetricsAndEvents {
if _, ok := ch.perHostPending[source]; !ok {
ch.perHostPending[source] = &pendingMetricsAndEvents{}
ch.toLookupIPs = append(ch.toLookupIPs, source)
ch.statsMetricHostsQueued++
}
queue := gostatsd.NewMetricMap()
ch.awaitingMetrics[source] = queue
return queue
return ch.perHostPending[source]
}

// prepareMetricQueue will ensure that ch.perHostPending has a matching MetricMap for
// the provided source and return it.
func (ch *CloudHandler) prepareMetricQueue(source gostatsd.Source) *gostatsd.MetricMap {
queue := ch.preparePending(source)
if queue.metrics == nil {
// There might be value in pushing this to preparePending, since the split is
// really only beneficial if a host is only sending events and not metrics, and
// this adds an extra comparison to every lookup.
queue.metrics = gostatsd.NewMetricMap()
}
return queue.metrics
}

func (ch *CloudHandler) handleIncomingMetrics(mm *gostatsd.MetricMap) {
// The <metric>.Source values could be from different hosts if they were
// forwarded, therefore we need to do a lookup each time.
mm.Counters.Each(func(metricName string, tagsKey string, c gostatsd.Counter) {
ch.prepareMetricQueue(c.Source).MergeCounter(metricName, tagsKey, c)
})
Expand All @@ -250,14 +252,8 @@ func (ch *CloudHandler) handleIncomingMetrics(mm *gostatsd.MetricMap) {
}

func (ch *CloudHandler) handleIncomingEvent(e *gostatsd.Event) {
queue := ch.awaitingEvents[e.Source]
ch.awaitingEvents[e.Source] = append(queue, e)
if len(queue) == 0 && ch.awaitingMetrics[e.Source] == nil {
// This is the first event for that IP in the queue. Need to fetch an Instance for this IP.
ch.toLookupIPs = append(ch.toLookupIPs, e.Source)
ch.statsEventHostsQueued++
}
ch.statsEventItemsQueued++
queue := ch.preparePending(e.Source)
queue.events = append(queue.events, e)
}

func (ch *CloudHandler) updateAndDispatchMetrics(ctx context.Context, instance *gostatsd.Instance, mmIn *gostatsd.MetricMap) {
Expand All @@ -284,7 +280,7 @@ func (ch *CloudHandler) updateAndDispatchMetrics(ctx context.Context, instance *
func (ch *CloudHandler) updateAndDispatchEvents(ctx context.Context, instance *gostatsd.Instance, events []*gostatsd.Event) {
var dispatched int
defer func() {
ch.wg.Add(-dispatched)
ch.wgPendingEvents.Add(-dispatched)
}()
for _, e := range events {
updateInplace(e, instance)
Expand Down

0 comments on commit 89112f4

Please sign in to comment.