Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(common.socket): parallel parsing #15891

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

LarsStegman
Copy link
Contributor

Summary

This is a proof of concept for the issue I raised in #15884. I feel like there might be a better way to implement this, than my way. I was thinking of something like the Options pattern, but I am not sure about this.

I am very open to discuss this! @srebhan do you have time to discuss this? I am not even sure if it is something you're open to add to Telegraf, but I think it is worth it considering the increased performance we see in our application.

Checklist

  • No AI generated code was used in this PR

Related issues

resolves #15884

@srebhan srebhan marked this pull request as draft September 16, 2024 13:40
@srebhan
Copy link
Member

srebhan commented Sep 16, 2024

Hey! Thanks for being so active and contributing your findings and fixes! We recently added something similar to the kafka consumer plugin allowing to override the timestamp provided by parsing.
I see two options:

  1. Implementing the override in the input plugin using metric.SetTime() to overwrite the timestamp after parsing.
  2. Implementing the override as a general option for inputs in the running_input model.

Option two was discussed a while ago but we never found the time to implement it. Let me know if you are interested in this and I will detail more on what needs to be done.

@srebhan srebhan self-assigned this Sep 16, 2024
@LarsStegman
Copy link
Contributor Author

You're welcome, it helps me too, since I now only need to maintain some proprietary plugins. Maintaining a fork that touches a lot of the internals is very time consuming. And most importantly, I enjoy contributing. I am also glad that my company allows me to.

I'm up to working on it. I wasn't really aware of how the internals for Telegraf worked, since I worked mostly on plugins. This weekend of profiling and optimizing really made things a lot clearer. I think I see how it would be implemented on the running_input, but please elaborate.

Regarding moving parsing for inputs.socket_listener to a separate goroutine. Should I make that a separate PR after the override has been added to the running_input? Is that something that stands a chance of getting merged?

@srebhan
Copy link
Member

srebhan commented Sep 19, 2024

I think I see how it would be implemented on the running_input, but please elaborate.

The idea is to add a general timesource option with the choices

  • metric: use the timestamp of the metric as provided by the input plugin e.g. through parsing (default)
  • collection-start: use the timestamp when gathering started for all metrics
  • collection-end: use the timestamp when gathering finished for all metrics

to all inputs by adding a new config option (and parse it) to the inputs.

Then you adapt the metric using the start time (stored in the model struct) or end time (stored in the model struct) and call SetTime() on all metrics, or do nothing dependent on the config option.

Does this make things more clear?

Regarding moving parsing for inputs.socket_listener to a separate goroutine. Should I make that a separate PR after the override has been added to the running_input? Is that something that stands a chance of getting merged?

I think we might accept this if there is a compelling argumentation why this is needed and what the benefit will be. I guess you are already having those numbers as you are asking... ;-)
One thing, please use a thread pool like github.com/alitto/pond (see e.g. the snmp_lookup processor) to allow the user to keep control over the number of threads spawned as otherwise it might be easily possible to DoS machines...

@LarsStegman
Copy link
Contributor Author

Yes, that makes it clear and is about what I expected. I'll try to work on it tomorrow!

I'll also work on the async parsing.

@srebhan
Copy link
Member

srebhan commented Sep 19, 2024

Just to be clear, async parsing should be an own PR! Please also double check that parser calls are thread-safe! I remember that we had this a while back with Avro but I want to urge you to double check! :-)

@LarsStegman
Copy link
Contributor Author

LarsStegman commented Sep 19, 2024

Yes, that was my plan as well :) Easier to get things merged when the changes are small and isolated.

I thought the "contract" requires calls to Parse to be thread safe, right?

@LarsStegman
Copy link
Contributor Author

The idea is to add a general timesource option with the choices

  • metric: use the timestamp of the metric as provided by the input plugin e.g. through parsing (default)
  • collection-start: use the timestamp when gathering started for all metrics
  • collection-end: use the timestamp when gathering finished for all metrics

I am working on implementing it, but I am a little uncertain how this would work for Service plugins. Those often don't do anything in Gather, so overriding it with a collection-start/end time is useless.

I was thinking of maybe adding the same feature to the RunningParser, but even then there is no guarantee this is the correct time. The time when the Parse function is called, does not need to be the same as the time the original data comes in. For example, I read the UDP buffer, and then send the data into a goroutine to be parsed. The goroutine is not guaranteed to be started immediately.

@srebhan
Copy link
Member

srebhan commented Sep 19, 2024

@LarsStegman right, service plugins will need to take care of timestamp overriding themselves...

@LarsStegman LarsStegman changed the title WIP: feat(parsers): support passing receive timestamp to parser feat(inputs.socket_listener): parallel parsing Sep 20, 2024
@telegraf-tiger telegraf-tiger bot added feat Improvement on an existing feature such as adding a new setting/mode to an existing plugin plugin/input 1. Request for new input plugins 2. Issues/PRs that are related to input plugins labels Sep 20, 2024
@LarsStegman LarsStegman marked this pull request as ready for review September 20, 2024 12:47
@LarsStegman LarsStegman force-pushed the feat/parser-with-timestamp branch 2 times, most recently from c41db77 to 9eaa313 Compare September 21, 2024 06:12
Copy link
Member

@srebhan srebhan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @LarsStegman for your contribution! A few comments from my side.

plugins/common/socket/datagram.go Outdated Show resolved Hide resolved
plugins/common/socket/datagram.go Show resolved Hide resolved
plugins/common/socket/datagram.go Outdated Show resolved Hide resolved
plugins/common/socket/datagram.go Outdated Show resolved Hide resolved
plugins/common/socket/datagram.go Outdated Show resolved Hide resolved
plugins/common/socket/datagram.go Outdated Show resolved Hide resolved
plugins/common/socket/socket.go Outdated Show resolved Hide resolved
plugins/common/socket/socket.go Outdated Show resolved Hide resolved
plugins/common/socket/stream.go Show resolved Hide resolved
Comment on lines 55 to 77
onData := func(_ net.Addr, data []byte) {
onData := func(_ net.Addr, data []byte, receiveTime time.Time) {
metrics, err := sl.parser.Parse(data)

if err != nil {
acc.AddError(err)
return
}

if len(metrics) == 0 {
once.Do(func() {
sl.Log.Debug(internal.NoMetricsCreatedMsg)
})
}

for _, m := range metrics {
switch sl.TimeSource {
case "", "metric":
case "receive_time":
m.SetTime(receiveTime)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should not be here... :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should, since service plugins do not work with #15917

#15891 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah but please make this an own PR to separate feature introduction!

Copy link
Contributor Author

@LarsStegman LarsStegman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I answered some questions and gave some explanations. I have not yet implemented any changes, I will do that tomorrow.

plugins/common/socket/datagram.go Show resolved Hide resolved
plugins/common/socket/datagram.go Outdated Show resolved Hide resolved
plugins/common/socket/stream.go Show resolved Hide resolved
Comment on lines 55 to 77
onData := func(_ net.Addr, data []byte) {
onData := func(_ net.Addr, data []byte, receiveTime time.Time) {
metrics, err := sl.parser.Parse(data)

if err != nil {
acc.AddError(err)
return
}

if len(metrics) == 0 {
once.Do(func() {
sl.Log.Debug(internal.NoMetricsCreatedMsg)
})
}

for _, m := range metrics {
switch sl.TimeSource {
case "", "metric":
case "receive_time":
m.SetTime(receiveTime)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should, since service plugins do not work with #15917

#15891 (comment)

plugins/common/socket/socket.go Outdated Show resolved Hide resolved
@@ -20,6 +21,7 @@ var once sync.Once

type SocketListener struct {
ServiceAddress string `toml:"service_address"`
TimeSource string `toml:"time_source"`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it okay to use the same name as in #15917 ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might cause issues if the terms are not the same and checked somewhere. Either make sure there is no collision or (easier) use a different option.

@telegraf-tiger
Copy link
Contributor

telegraf-tiger bot commented Oct 1, 2024

@srebhan
Copy link
Member

srebhan commented Oct 2, 2024

Please move the time-source part to another PR and keep the parallelization in here!

@LarsStegman LarsStegman changed the title feat(inputs.socket_listener): parallel parsing feat(common.socket): parallel parsing Oct 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feat Improvement on an existing feature such as adding a new setting/mode to an existing plugin plugin/input 1. Request for new input plugins 2. Issues/PRs that are related to input plugins
Projects
None yet
Development

Successfully merging this pull request may close these issues.

perf: Async parsing
2 participants