From 25209ded335f1fe8e5a0a1959527a64b5e9dddac Mon Sep 17 00:00:00 2001 From: JESUS RUIZ HERNANDEZ Date: Wed, 27 Nov 2024 17:17:12 +0100 Subject: [PATCH] Fix: Produce memory leak --- src/KafkaFlow/Producers/MessageProducer.cs | 35 ++++++++++++++-------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/src/KafkaFlow/Producers/MessageProducer.cs b/src/KafkaFlow/Producers/MessageProducer.cs index 4894023ea..e3248d693 100644 --- a/src/KafkaFlow/Producers/MessageProducer.cs +++ b/src/KafkaFlow/Producers/MessageProducer.cs @@ -1,9 +1,9 @@ -using System; -using System.Text; -using System.Threading.Tasks; using Confluent.Kafka; using KafkaFlow.Authentication; using KafkaFlow.Configuration; +using System; +using System.Text; +using System.Threading.Tasks; namespace KafkaFlow.Producers; @@ -334,20 +334,31 @@ private void InternalProduce( var localProducer = this.EnsureProducer(); var message = CreateMessage(context); - if (partition.HasValue) + try { + if (partition.HasValue) + { + localProducer.Produce( + new TopicPartition(context.ProducerContext.Topic, partition.Value), + message, + Handler); + + return; + } + localProducer.Produce( - new TopicPartition(context.ProducerContext.Topic, partition.Value), + context.ProducerContext.Topic, message, Handler); - - return; } - - localProducer.Produce( - context.ProducerContext.Topic, - message, - Handler); + catch (Exception ex) + { + DeliveryReport report = new() + { + Error = new Error(ErrorCode.Local_Fatal, ex.Message, isFatal: true), + }; + Handler(report); + } void Handler(DeliveryReport report) {