From 82cbd27441a2425930171bbd7cb9ec8847dbe300 Mon Sep 17 00:00:00 2001 From: Kenneth Jia <48558845+kenneth-jia@users.noreply.github.com> Date: Thu, 21 Oct 2021 08:58:39 +0800 Subject: [PATCH] Fix KafkaRecoverableProducer.close() --- .../kafka/addons/KafkaRecoverableProducer.h | 10 +-- .../TestKafkaRecoverableProducer.cc | 84 ++++++++++--------- 2 files changed, 49 insertions(+), 45 deletions(-) diff --git a/include/kafka/addons/KafkaRecoverableProducer.h b/include/kafka/addons/KafkaRecoverableProducer.h index 3134a8b98..348622595 100644 --- a/include/kafka/addons/KafkaRecoverableProducer.h +++ b/include/kafka/addons/KafkaRecoverableProducer.h @@ -29,12 +29,7 @@ class KafkaRecoverableProducer ~KafkaRecoverableProducer() { - std::lock_guard lock(_producerMutex); - - _running = false; - if (_pollThread.joinable()) _pollThread.join(); - - _producer->close(); + if (_running) close(); } /** @@ -167,6 +162,9 @@ class KafkaRecoverableProducer { std::lock_guard lock(_producerMutex); + _running = false; + if (_pollThread.joinable()) _pollThread.join(); + _producer->close(timeout); } diff --git a/tests/integration/TestKafkaRecoverableProducer.cc b/tests/integration/TestKafkaRecoverableProducer.cc index ea4edd33c..fef8099f2 100644 --- a/tests/integration/TestKafkaRecoverableProducer.cc +++ b/tests/integration/TestKafkaRecoverableProducer.cc @@ -20,53 +20,59 @@ TEST(KafkaRecoverableProducer, SendMessages) KafkaTestUtility::CreateKafkaTopic(topic, 5, 3); - // Properties for the producer - const auto props = KafkaTestUtility::GetKafkaClientCommonConfig().put(kafka::clients::producer::Config::ACKS, "all"); - - // Recoverable producer - kafka::clients::KafkaRecoverableProducer producer(props); - - // Send messages - kafka::clients::producer::ProducerRecord::Id id = 0; - for (const auto& msg: messages) { - auto record = kafka::clients::producer::ProducerRecord(topic, partition, - kafka::Key(msg.first.c_str(), msg.first.size()), - kafka::Value(msg.second.c_str(), msg.second.size()), - id++); - std::cout << "[" <(records[i].key().data()), records[i].key().size())); - EXPECT_EQ(messages[i/2].second, std::string(static_cast(records[i].value().data()), records[i].value().size())); + // Prepare a consumer + const auto consumerProps = KafkaTestUtility::GetKafkaClientCommonConfig().put(kafka::clients::consumer::Config::AUTO_OFFSET_RESET, "earliest"); + kafka::clients::KafkaConsumer consumer(consumerProps); + consumer.setLogLevel(kafka::Log::Level::Crit); + consumer.subscribe({topic}); + + // Poll these messages + auto records = KafkaTestUtility::ConsumeMessagesUntilTimeout(consumer); + + // Check the messages + EXPECT_EQ(messages.size() * 2, records.size()); + for (std::size_t i = 0; i < records.size(); ++i) + { + EXPECT_EQ(messages[i/2].first, std::string(static_cast(records[i].key().data()), records[i].key().size())); + EXPECT_EQ(messages[i/2].second, std::string(static_cast(records[i].value().data()), records[i].value().size())); + } } }