diff --git a/src/KafkaFlow/Producers/MessageProducer.cs b/src/KafkaFlow/Producers/MessageProducer.cs index 4894023ea..e3248d693 100644 --- a/src/KafkaFlow/Producers/MessageProducer.cs +++ b/src/KafkaFlow/Producers/MessageProducer.cs @@ -1,9 +1,9 @@ -using System; -using System.Text; -using System.Threading.Tasks; using Confluent.Kafka; using KafkaFlow.Authentication; using KafkaFlow.Configuration; +using System; +using System.Text; +using System.Threading.Tasks; namespace KafkaFlow.Producers; @@ -334,20 +334,31 @@ private void InternalProduce( var localProducer = this.EnsureProducer(); var message = CreateMessage(context); - if (partition.HasValue) + try { + if (partition.HasValue) + { + localProducer.Produce( + new TopicPartition(context.ProducerContext.Topic, partition.Value), + message, + Handler); + + return; + } + localProducer.Produce( - new TopicPartition(context.ProducerContext.Topic, partition.Value), + context.ProducerContext.Topic, message, Handler); - - return; } - - localProducer.Produce( - context.ProducerContext.Topic, - message, - Handler); + catch (Exception ex) + { + DeliveryReport report = new() + { + Error = new Error(ErrorCode.Local_Fatal, ex.Message, isFatal: true), + }; + Handler(report); + } void Handler(DeliveryReport report) {