Skip to content

Commit

Permalink
Tests don't rely on auto.create.topics.enable=true from brokers
Browse files Browse the repository at this point in the history
  • Loading branch information
kenneth-jia committed May 24, 2021
1 parent 0762ec2 commit 6460715
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 13 deletions.
1 change: 1 addition & 0 deletions scripts/start-local-kafka-cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def GenerateBrokerConfig(brokerId, brokerPort, zookeeperPort, logDir):
offsets.commit.timeout.ms=10000
unclean.leader.election.enable=false
min.insync.replicas=2
auto.create.topics.enable=false
''')
properties = brokerTemplate.substitute(broker_id=brokerId, listener_port=brokerPort, zookeeper_port=zookeeperPort, log_dir=logDir)
return properties
Expand Down
21 changes: 19 additions & 2 deletions tests/integration/TestKafkaConsumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,8 @@ TEST(KafkaManualCommitConsumer, NoOffsetCommitCallback)

std::cout << "[" << Utility::getCurrentTime() << "] Topic[" << topic << "] would be used" << std::endl;

KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

// Prepare some messages to send
const std::vector<std::tuple<Headers, std::string, std::string>> messages = {
{Headers{}, "key1", "value1"},
Expand Down Expand Up @@ -423,6 +425,8 @@ TEST(KafkaManualCommitConsumer, OffsetCommitCallback)

std::cout << "[" << Utility::getCurrentTime() << "] Topic[" << topic << "] would be used" << std::endl;

KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

// Prepare some messages to send
const std::vector<std::tuple<Headers, std::string, std::string>> messages = {
{Headers{}, "key1", "value1"},
Expand Down Expand Up @@ -485,6 +489,8 @@ TEST(KafkaManualCommitConsumer, OffsetCommitCallbackTriggeredBeforeClose)

std::cout << "[" << Utility::getCurrentTime() << "] Topic[" << topic << "] would be used" << std::endl;

KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

// Prepare some messages to send
const std::vector<std::tuple<Headers, std::string, std::string>> messages = {
{Headers{}, "key1", "value1"},
Expand Down Expand Up @@ -544,6 +550,8 @@ TEST(KafkaManualCommitConsumer, OffsetCommitCallback_ManuallyPollEvents)

std::cout << "[" << Utility::getCurrentTime() << "] Topic[" << topic << "] would be used" << std::endl;

KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

// Prepare some messages to send
const std::vector<std::tuple<Headers, std::string, std::string>> messages = {
{Headers{}, "key1", "value1"},
Expand Down Expand Up @@ -610,6 +618,8 @@ TEST(KafkaManualCommitConsumer, OffsetCommitAndPosition)

std::cout << "[" << Utility::getCurrentTime() << "] Topic[" << topic << "] would be used" << std::endl;

KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

// Prepare some messages to send
const std::vector<std::tuple<Headers, std::string, std::string>> messages = {
{Headers{}, "key1", "value1"},
Expand Down Expand Up @@ -854,6 +864,9 @@ TEST(KafkaAutoCommitConsumer, OffsetCommitAndPosition)
const Partition partition = 0;

std::cout << "[" << Utility::getCurrentTime() << "] Topic[" << topic << "] would be used" << std::endl;

KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

// Prepare some messages to send
std::vector<std::tuple<Headers, std::string, std::string>> messages = {
{Headers{}, "key1", "value1"},
Expand Down Expand Up @@ -1234,7 +1247,6 @@ TEST(KafkaAutoCommitConsumer, WrongOperation_AssignThenSubscribe)
TEST(KafkaClient, GetBrokerMetadata)
{
const Topic topic = Utility::getRandomString();

KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

// Start consumer
Expand Down Expand Up @@ -1269,7 +1281,6 @@ TEST(KafkaClient, GetBrokerMetadata)
TEST(KafkaAutoCommitConsumer, SubscribeAndPoll)
{
const Topic topic = Utility::getRandomString();

KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

const auto props = KafkaTestUtility::GetKafkaClientCommonConfig().put(ConsumerConfig::ENABLE_PARTITION_EOF, "true");
Expand Down Expand Up @@ -1464,6 +1475,9 @@ TEST(KafkaManualCommitConsumer, OffsetsForTime)
const Topic topic2 = Utility::getRandomString();
const Partition partition2 = 1;

KafkaTestUtility::CreateKafkaTopic(topic1, 5, 3);
KafkaTestUtility::CreateKafkaTopic(topic2, 5, 3);

using namespace std::chrono;

constexpr int MESSAGES_NUM = 5;
Expand Down Expand Up @@ -1574,6 +1588,8 @@ TEST(KafkaManualCommitConsumer, RecoverByTime)
const Topic topic = Utility::getRandomString();
const Partition partition = 0;

KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

// Prepare some messages to send
const std::vector<std::pair<std::string, std::string>> messages = {
{"key1", "value1"},
Expand Down Expand Up @@ -1680,6 +1696,7 @@ TEST(KafkaManualCommitConsumer, RecoverByTime)
TEST(KafkaAutoCommitConsumer, AutoCreateTopics)
{
const Topic topic = Utility::getRandomString();
KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

KafkaAutoCommitConsumer consumer(KafkaTestUtility::GetKafkaClientCommonConfig()
.put("allow.auto.create.topics", "true"));
Expand Down
21 changes: 20 additions & 1 deletion tests/integration/TestKafkaProducer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ TEST(KafkaSyncProducer, SendMessagesWithAcks1)
const Topic topic = Utility::getRandomString();
const Partition partition = 0;

KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

// Properties for the producer
const auto props = KafkaTestUtility::GetKafkaClientCommonConfig().put(ProducerConfig::ACKS, "1");

Expand Down Expand Up @@ -67,6 +69,8 @@ TEST(KafkaSyncProducer, SendMessagesWithAcksAll)
const Topic topic = Utility::getRandomString();
const Partition partition = 0;

KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

// Properties for the producer
const auto props = KafkaTestUtility::GetKafkaClientCommonConfig().put(ProducerConfig::ACKS, "all");

Expand Down Expand Up @@ -143,6 +147,8 @@ TEST(KafkaSyncProducer, InSyncBrokersAckTimeout)
const Topic topic = Utility::getRandomString();
const Partition partition = 0;

KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

const auto key = std::string(100000, 'a');
const auto value = std::string(100000, 'a');
const auto record = ProducerRecord(topic, partition, Key(key.c_str(), key.size()), Value(value.c_str(), value.size()));
Expand Down Expand Up @@ -178,6 +184,7 @@ TEST(KafkaSyncProducer, DefaultPartitioner)
KafkaSyncProducer producer(KafkaTestUtility::GetKafkaClientCommonConfig());

const Topic topic = Utility::getRandomString();
KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

std::map<Partition, int> partitionCounts;
constexpr int MSG_NUM = 20;
Expand All @@ -199,6 +206,9 @@ TEST(KafkaSyncProducer, DefaultPartitioner)

TEST(KafkaSyncProducer, TryOtherPartitioners)
{
const Topic topic = Utility::getRandomString();
KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

// Try another "partitioner" instead of the default one
{
auto props = KafkaTestUtility::GetKafkaClientCommonConfig();
Expand All @@ -214,7 +224,7 @@ TEST(KafkaSyncProducer, TryOtherPartitioners)
std::string key;
std::string value = "v" + std::to_string(i);

auto record = ProducerRecord(Utility::getRandomString(), Key(key.c_str(), key.size()), Value(value.c_str(), value.size()));
auto record = ProducerRecord(topic, Key(key.c_str(), key.size()), Value(value.c_str(), value.size()));

auto metadata = producer.send(record);
std::cout << metadata.toString() << std::endl;
Expand Down Expand Up @@ -265,6 +275,8 @@ TEST(KafkaAsyncProducer, MessageDeliveryCallback)
const Topic topic = Utility::getRandomString();
const Partition partition = 0;

KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

// Properties for the producer
std::set<ProducerRecord::Id> msgIdsSent;

Expand Down Expand Up @@ -313,6 +325,8 @@ TEST(KafkaAsyncProducer, DeliveryCallback_ManuallyPollEvents)
const Topic topic = Utility::getRandomString();
const Partition partition = 0;

KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

// Properties for the producer
std::set<ProducerRecord::Id> msgIdsSent;

Expand Down Expand Up @@ -364,6 +378,8 @@ TEST(KafkaAsyncProducer, NoBlockSendingWhileQueueIsFull_ManuallyPollEvents)
const Topic topic = Utility::getRandomString();
const auto appThreadId = std::this_thread::get_id();

KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

int msgSentCnt = 0;

Producer::Callback drCallback =
Expand Down Expand Up @@ -435,6 +451,8 @@ TEST(KafkaAsyncProducer, TooLargeMessageForBroker)
const Topic topic = Utility::getRandomString();
const Partition partition = 0;

KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

const auto value = std::string(2048, 'a');
const auto record = ProducerRecord(topic, partition, Key(nullptr, 0), Value(value.c_str(), value.size()));

Expand Down Expand Up @@ -466,6 +484,7 @@ TEST(KafkaAsyncProducer, TooLargeMessageForBroker)
TEST(KafkaAsyncProducer, CopyRecordValueWithinSend)
{
const Topic topic = Utility::getRandomString();
KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

const auto props = KafkaTestUtility::GetKafkaClientCommonConfig()
.put(ProducerConfig::PARTITIONER, "murmur2"); // `ProducerRecord`s with empty key are mapped to a single partition
Expand Down
2 changes: 2 additions & 0 deletions tests/integration/TestTransaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ TEST(Transaction, ContinueTheTransaction)
const std::string transactionId = Utility::getRandomString();
const std::string messageToSent = "message to sent";

KafkaTestUtility::CreateKafkaTopic(topic, 1, 3);

// Start a producer to send the message, but fail to commit
{
KafkaAsyncProducer producer(KafkaTestUtility::GetKafkaClientCommonConfig()
Expand Down
11 changes: 7 additions & 4 deletions tests/robustness/TestKafkaConsumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ TEST(KafkaManualCommitConsumer, AlwaysFinishClosing_ManuallyPollEvents)
Topic topic = Utility::getRandomString();
Partition partition = 0;

KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

// Producer some messages
std::vector<std::tuple<Headers, std::string, std::string>> messages = {
{Headers{}, "key1", "value1"},
Expand Down Expand Up @@ -86,6 +88,8 @@ TEST(KafkaManualCommitConsumer, CommitOffsetWhileBrokersStop)
const Topic topic = Utility::getRandomString();
const Partition partition = 0;

KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

// Producer some messages
std::vector<std::tuple<Headers, std::string, std::string>> messages = {
{Headers{}, "key1", "value1"}
Expand Down Expand Up @@ -149,7 +153,6 @@ TEST(KafkaManualCommitConsumer, CommitOffsetWhileBrokersStop)
TEST(KafkaAutoCommitConsumer, BrokerStopBeforeConsumerStart)
{
const Topic topic = Utility::getRandomString();

KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

// Pause the brokers for a while
Expand Down Expand Up @@ -212,7 +215,6 @@ TEST(KafkaAutoCommitConsumer, BrokerStopBeforeSubscription)
auto asyncTask = KafkaTestUtility::PauseBrokersForAWhile(std::chrono::seconds(5));

const Topic topic = Utility::getRandomString();

KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

TopicPartitions assignment;
Expand Down Expand Up @@ -250,7 +252,6 @@ TEST(KafkaAutoCommitConsumer, BrokerStopBeforeSubscription)
TEST(KafkaAutoCommitConsumer, BrokerStopBeforeSeek)
{
const Topic topic = Utility::getRandomString();

KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

// Consumer properties
Expand Down Expand Up @@ -298,13 +299,14 @@ TEST(KafkaAutoCommitConsumer, BrokerStopBeforeSeek)
auto records = KafkaTestUtility::ConsumeMessagesUntilTimeout(consumer, std::chrono::seconds(10));
EXPECT_FALSE(records.empty());
ASSERT_TRUE(std::all_of(records.cbegin(), records.cend(), [](const auto& record){ return record.error().value() == RD_KAFKA_RESP_ERR__PARTITION_EOF; }));

std::cout << "[" << Utility::getCurrentTime() << "] " << consumer.name() << " polled " << records.size() << " EOFs" << std::endl;
}

TEST(KafkaAutoCommitConsumer, BrokerStopDuringMsgPoll)
{
const Topic topic = Utility::getRandomString();
KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

// Prepare messages to test
const std::vector<std::tuple<Headers, std::string, std::string>> messages = {
Expand Down Expand Up @@ -347,3 +349,4 @@ TEST(KafkaAutoCommitConsumer, BrokerStopDuringMsgPoll)

std::cout << "[" << Utility::getCurrentTime() << "] " << consumer.name() << " polled " << records.size() << " messages" << std::endl;
}

25 changes: 19 additions & 6 deletions tests/robustness/TestKafkaProducer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ TEST(KafkaSyncProducer, RecordTimestamp)
TEST(KafkaAsyncProducer, NoMissedDeliveryCallback)
{
const Topic topic = Utility::getRandomString();
KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

std::mutex inFlightMutex;
std::set<ProducerRecord::Id> inFlightIds;
Expand Down Expand Up @@ -151,6 +152,7 @@ TEST(KafkaAsyncProducer, NoMissedDeliveryCallback)
TEST(KafkaAsyncProducer, MightMissDeliveryCallbackIfCloseWithLimitedTimeout)
{
const Topic topic = Utility::getRandomString();
KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

std::size_t deliveryCount = 0;
{
Expand Down Expand Up @@ -195,6 +197,7 @@ TEST(KafkaAsyncProducer, BrokerStopWhileSendingMessages)
};

const Topic topic = Utility::getRandomString();
KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

std::size_t deliveryCount = 0;
{
Expand Down Expand Up @@ -241,6 +244,8 @@ TEST(KafkaAsyncProducer, Send_AckTimeout)
};

const Topic topic = Utility::getRandomString();
KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

{
KafkaAsyncProducer producer(KafkaTestUtility::GetKafkaClientCommonConfig()
.put(ProducerConfig::MESSAGE_TIMEOUT_MS, "3000")); // If with no response, the delivery would fail in a short time
Expand All @@ -252,7 +257,7 @@ TEST(KafkaAsyncProducer, Send_AckTimeout)
for (const auto& msg: messages)
{
auto record = ProducerRecord(topic, Key(msg.first.c_str(), msg.first.size()), Value(msg.second.c_str(), msg.second.size()));

producer.send(record, [&failureCount](const Producer::RecordMetadata& metadata, std::error_code ec) {
std::cout << "[" << Utility::getCurrentTime() << "] delivery callback: result[" << ec.message() << "], metadata[" << metadata.toString() << "]" << std::endl;
EXPECT_EQ(RD_KAFKA_RESP_ERR__MSG_TIMED_OUT, ec.value());
Expand All @@ -276,6 +281,8 @@ TEST(KafkaAsyncProducer, ManuallyPollEvents_AckTimeout)
};

const Topic topic = Utility::getRandomString();
KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

{
KafkaAsyncProducer producer(KafkaTestUtility::GetKafkaClientCommonConfig()
.put(ProducerConfig::MESSAGE_TIMEOUT_MS, "3000"), // If with no response, the delivery would fail in a short time
Expand All @@ -288,7 +295,7 @@ TEST(KafkaAsyncProducer, ManuallyPollEvents_AckTimeout)
for (const auto& msg: messages)
{
auto record = ProducerRecord(topic, Key(msg.first.c_str(), msg.first.size()), Value(msg.second.c_str(), msg.second.size()));

producer.send(record, [&failureCount](const Producer::RecordMetadata& metadata, std::error_code ec) {
std::cout << "[" << Utility::getCurrentTime() << "] delivery callback: result[" << ec.message() << "], metadata[" << metadata.toString() << "]" << std::endl;
EXPECT_EQ(RD_KAFKA_RESP_ERR__MSG_TIMED_OUT, ec.value());
Expand All @@ -299,7 +306,7 @@ TEST(KafkaAsyncProducer, ManuallyPollEvents_AckTimeout)

const auto timeout = std::chrono::seconds(10);
const auto interval = std::chrono::milliseconds(100);

for (const auto end = std::chrono::steady_clock::now() + timeout; std::chrono::steady_clock::now() < end;)
{
// Keep polling for the delivery-callbacks
Expand All @@ -318,6 +325,9 @@ TEST(KafkaAsyncProducer, ManuallyPollEvents_AlwaysFinishClosing)
{"3", "value3"},
};

const Topic topic = Utility::getRandomString();
KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

std::size_t failureCount = 0;
{
KafkaAsyncProducer producer(KafkaTestUtility::GetKafkaClientCommonConfig()
Expand All @@ -330,8 +340,8 @@ TEST(KafkaAsyncProducer, ManuallyPollEvents_AlwaysFinishClosing)
const auto appThreadId = std::this_thread::get_id();
for (const auto& msg: messages)
{
auto record = ProducerRecord(Utility::getRandomString(), Key(msg.first.c_str(), msg.first.size()), Value(msg.second.c_str(), msg.second.size()));
auto record = ProducerRecord(topic, Key(msg.first.c_str(), msg.first.size()), Value(msg.second.c_str(), msg.second.size()));

producer.send(record, [&failureCount, appThreadId](const Producer::RecordMetadata& metadata, std::error_code ec) {
std::cout << "[" << Utility::getCurrentTime() << "] delivery callback: result[" << ec.message() << "], metadata[" << metadata.toString() << "]" << std::endl;
EXPECT_EQ(RD_KAFKA_RESP_ERR__MSG_TIMED_OUT, ec.value());
Expand All @@ -348,12 +358,15 @@ TEST(KafkaAsyncProducer, ManuallyPollEvents_AlwaysFinishClosing)

TEST(KafkaSyncProducer, Send_AckTimeout)
{
const Topic topic = Utility::getRandomString();
KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

KafkaSyncProducer producer(KafkaTestUtility::GetKafkaClientCommonConfig().put(ProducerConfig::MESSAGE_TIMEOUT_MS, "3000"));

// Pause the brokers for a while
auto asyncTask = KafkaTestUtility::PauseBrokersForAWhile(std::chrono::seconds(5));

auto record = ProducerRecord(Utility::getRandomString(), NullKey, NullValue);
auto record = ProducerRecord(topic, NullKey, NullValue);
std::cout << "[" << Utility::getCurrentTime() << "] About to send record: " << record.toString() << std::endl;

EXPECT_KAFKA_THROW(producer.send(record), RD_KAFKA_RESP_ERR__MSG_TIMED_OUT);
Expand Down

0 comments on commit 6460715

Please sign in to comment.