offset commits on Async tasks #243
-
Hi, I have a question regarding the async (sink)operations in kafka-streams-dotnet. var builder = new StreamBuilder();
builder.Stream<string, string>("my-topic")
.MapValues(JsonConvert.DeserializeObject<MyObject>)
.ForeachAsync(async (record, _) => {
db.MyObjects.Add(record.Value);
await db.SaveChangesAsync();
},
RetryPolicy.NewBuilder()
.NumberOfRetry(10)
.RetryBackOffMs(100)
.RetriableException<DatabaseException>()
.RetryBehavior(EndRetryBehavior.FAIL)
.Build()); .. so writing to a database asynchronously. What happens under the hood in kafka-streams-dotnet? What about offset commits? Also, assume I have successfully processed messages 1 to 5 and have message 6 to 20 in-flight in the async block. And a related question:
So this would mean that using |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 2 replies
-
Hi @duke-bartholomew , What happens under the hood in kafka-streams-dotnet? No a single thread is created and dedicated to consume the internal request, call the external processor with logic retry and so on, and publish the result in the internal response topic. This thread has a logic with pause/resume in case the processor is too long compared to What about offset commits? Also, assume I have successfully processed messages 1 to 5 and have message 6 to 20 in-flight in the async block. The external DSL allow async/await syntax but some reasons like you mentioned, I can't execute the processing fully asynchronously. The commit is called every A new feature could be execute all the batch of records fetched from the consumer asynchronously and waiting all the task before committing the offset and fetch the next batch of record. I keep in mind. So this would mean that using ForEachAsync will not create such request/response topics? When you use Hope it's clear Best regards, |
Beta Was this translation helpful? Give feedback.
Hi @duke-bartholomew ,
What happens under the hood in kafka-streams-dotnet?
Does this mean a new thread per message in this case? (I come from the JVM world, so my perception of threads and the impact of them might be skewed when it comes to .NET threads)
No a single thread is created and dedicated to consume the internal request, call the external processor with logic retry and so on, and publish the result in the internal response topic. This thread has a logic with pause/resume in case the processor is too long compared to
max.poll.interval.ms
. Btw, in JAVA the consumer hasmax.poll.interval.ms
to consume a batch records between 0 andmax.poll.records
messages whereas in .Net the consu…