-
Notifications
You must be signed in to change notification settings - Fork 174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Combine global/local logic into one unified flush and parallelize sinks. #292
Conversation
flusher.go
Outdated
span := tracer.StartSpan("flush").(*trace.Span) | ||
defer span.Finish() | ||
|
||
// TODO Move this to an independent ticker routine or something? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, I don't think we can do this in a separate goroutine, because we want this to be reporting the memory at the time of maximum usage (right before we clear and flush everything).
The alternate way of looking at it: we need to do it in a separate ticker, and choose our interval for that ticker such that we are sampling the underlying, sinusoidal data accurately.
Either way, this would also be a great place to report softnet data, once this is merged: #291
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is separately addressed in #294
flusher.go
Outdated
|
||
go s.metricSinks[0].FlushEventsChecks(span.Attach(ctx), events, checks) // we can do all of this separately | ||
go s.flushTraces(span.Attach(ctx)) | ||
go s.flushTraces(span.Attach(ctx)) // this too! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this comment no longer makes sense?
flusher_test.go
Outdated
@@ -174,7 +175,7 @@ func testFlushTraceDatadog(t *testing.T, protobuf, jsn io.Reader) { | |||
assert.NoError(t, err) | |||
|
|||
server.HandleTracePacket(packet) | |||
server.Flush() | |||
server.Flush(context.TODO()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we actually want context.Background()
for all of these, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ha! Yes, I was doing TODO out of habit for a temporary fix when in fact Background
was the permanent answer here. :) Thanks!
flusher.go
Outdated
go func(ms metricSink) { | ||
err := ms.Flush(span.Attach(ctx), finalMetrics) | ||
if err != nil { | ||
log.WithError(err).WithField("sink", ms.Name()).Warn("Error flushin sink") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flushing
:)
for _, sink := range s.metricSinks { | ||
wg.Add(1) | ||
go func(ms metricSink) { | ||
err := ms.Flush(span.Attach(ctx), finalMetrics) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that this is happening asynchronously, we'll want to make doubly-sure that our interface definition specifies that the Flush
method is not allowed to mutate the slice it receives.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely! How do we do that? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd add it to the interface definition in metric_sink.go
.
Gerald Rule: Copy Observability on Veneur and Unilog pull requestscc @stripe/observability |
b70830c
to
979f3bb
Compare
re-r? @aditya-stripe |
plugin_test.go
Outdated
@@ -82,7 +82,7 @@ func TestGlobalServerPluginFlush(t *testing.T) { | |||
}) | |||
} | |||
|
|||
f.server.Flush() | |||
f.server.Flush(context.TODO()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we also want Background()
here too. context.TODO
is intended for functions that are not yet fully traced, and for static analysis tools that can track whether a context originates with a TODO or a BACKGROUND.
One comment. lgtm after that, so feel free to self-approve :) |
4262f3c
to
142613d
Compare
142613d
to
a137e5c
Compare
Self-approval post rebase |
Summary
Improve the Server's
Flush
work from callingFlushLocal
andFlushGlobal
, which was a poorly compartmentalized implementation that led to code duplication and confusion.Motivation
At the end of the Datadog sink PR we are in a world with a single, hardcoded sink. This isn't the intent! We want to convert plugins to sinks and execute all of them.
This patch aims to do just that, but also takes an opportunity to refactor away the
FlushLocal
andFlushGlobal
functions. This was done by first inlining them, removing duplicate code, and then refactoring the special cases away. Notably, thetallyMetrics
size calculation.Notes
context.TODO()
sprinkled around because we want to implement a deadline for sinks. I will fix this.Test plan
Existing tests.
r? @aditya-stripe