Skip to content

Commit

Permalink
Add interceptor for broker's state & update to librdkafka v2.0.2
Browse files Browse the repository at this point in the history
  • Loading branch information
kenneth-jia committed Mar 7, 2023
1 parent 6b8ec71 commit 71c6a9e
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 103 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/kafka_api_bazel_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
env:
KAFKA_SRC_LINK: https://archive.apache.org/dist/kafka/3.3.1/kafka_2.13-3.3.1.tgz
CPU_CORE_NUM: 2
LIBRDKAFKA_TAG: v1.9.2
LIBRDKAFKA_TAG: v2.0.2

jobs:
kafka-api-bazel-build:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/kafka_api_ci_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
env:
KAFKA_SRC_LINK: https://archive.apache.org/dist/kafka/3.3.1/kafka_2.13-3.3.1.tgz
CPU_CORE_NUM: 2
LIBRDKAFKA_TAG: v1.9.2
LIBRDKAFKA_TAG: v2.0.2
BUILD_SUB_DIR: builds/sub-build

jobs:
Expand Down
28 changes: 6 additions & 22 deletions .github/workflows/kafka_api_demo_conan_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,9 @@ jobs:
steps:
- uses: actions/checkout@v2

- name: Prepare (non-windows)
if: ${{!contains(matrix.os, 'windows')}}
run: |
if [[ ${OS_VERSION} == 'macos'* ]]; then
brew install conan
else
pip3 install conan
fi
- name: Prepare (windows)
if: ${{contains(matrix.os, 'windows')}}
- name: Prepare
run: |
pip3 install conan
pip3 install conan==1.59.0
- name: Build (non-windows)
if: ${{!contains(matrix.os, 'windows')}}
Expand All @@ -52,11 +42,8 @@ jobs:
cmake .. -G "Unix Makefiles"
cmake --build .
bin/kafka_sync_producer
bin/kafka_async_producer_copy_payload
bin/kafka_async_producer_not_copy_payload
bin/kafka_auto_commit_consumer
bin/kafka_manual_commit_consumer
bin/kafka_producer
bin/kafka_consumer
- name: Build (windows)
if: contains(matrix.os, 'windows')
Expand All @@ -70,9 +57,6 @@ jobs:
cmake ..
cmake --build .
bin/kafka_sync_producer.exe
bin/kafka_async_producer_copy_payload.exe
bin/kafka_async_producer_not_copy_payload.exe
bin/kafka_auto_commit_consumer.exe
bin/kafka_manual_commit_consumer.exe
bin/kafka_producer.exe
bin/kafka_consumer.exe
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ About the *Modern C++ Kafka API*

The [modern-cpp-kafka API](http://opensource.morganstanley.com/modern-cpp-kafka/doxygen/annotated.html) is a layer of ***C++*** wrapper based on [librdkafka](https://github.com/confluentinc/librdkafka) (the ***C*** part only), with high quality, but more friendly to users.

- By now, [modern-cpp-kafka](https://github.com/morganstanley/modern-cpp-kafka) is compatible with [librdkafka v1.9.2](https://github.com/confluentinc/librdkafka/releases/tag/v1.9.2).
- By now, [modern-cpp-kafka](https://github.com/morganstanley/modern-cpp-kafka) is compatible with [librdkafka v2.0.2](https://github.com/confluentinc/librdkafka/releases/tag/v2.0.2).


```
Expand Down
23 changes: 6 additions & 17 deletions demo_projects_for_build/conan_build/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,11 @@ set(CMAKE_CXX_STANDARD_REQUIRED True)
include(${CMAKE_BINARY_DIR}/conanbuildinfo.cmake)
conan_basic_setup()

# Target: kafka_sync_producer
add_executable("kafka_sync_producer" "../../examples/kafka_sync_producer.cc")
target_link_libraries("kafka_sync_producer" ${CONAN_LIBS})
# Target: kafka_producer
add_executable("kafka_producer" "../../examples/kafka_async_producer_not_copy_payload.cc")
target_link_libraries("kafka_producer" ${CONAN_LIBS})

# Target: kafka_async_producer_copy_payload
add_executable("kafka_async_producer_copy_payload" "../../examples/kafka_async_producer_copy_payload.cc")
target_link_libraries("kafka_async_producer_copy_payload" ${CONAN_LIBS})
# Target: kafka_consumer
add_executable("kafka_consumer" "../../examples/kafka_auto_commit_consumer.cc")
target_link_libraries("kafka_consumer" ${CONAN_LIBS})

# Target: kafka_async_producer_not_copy_payload
add_executable("kafka_async_producer_not_copy_payload" "../../examples/kafka_async_producer_not_copy_payload.cc")
target_link_libraries("kafka_async_producer_not_copy_payload" ${CONAN_LIBS})

# Target: kafka_auto_commit_consumer
add_executable("kafka_auto_commit_consumer" "../../examples/kafka_auto_commit_consumer.cc")
target_link_libraries("kafka_auto_commit_consumer" ${CONAN_LIBS})

# Target: kafka_manual_commit_consumer
add_executable("kafka_manual_commit_consumer" "../../examples/kafka_manual_commit_consumer.cc")
target_link_libraries("kafka_manual_commit_consumer" ${CONAN_LIBS})
2 changes: 1 addition & 1 deletion demo_projects_for_build/conan_build/conanfile.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[requires]
modern-cpp-kafka/2022.06.15
modern-cpp-kafka/2023.01.05

[generators]
cmake
35 changes: 26 additions & 9 deletions include/kafka/Interceptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,59 @@ class Interceptors
/**
* Callback type for thread-start interceptor.
*/
using ThreadStartCallback = std::function<void(const std::string&, const std::string&)>;
using ThreadStartCb = std::function<void(const std::string&, const std::string&)>;

/**
* Callback type for thread-exit interceptor.
*/
using ThreadExitCallback = std::function<void(const std::string&, const std::string&)>;
using ThreadExitCb = std::function<void(const std::string&, const std::string&)>;

/**
* Callback type for broker-state-change interceptor.
*/
using BrokerStateChangeCb = std::function<void(int, const std::string&, const std::string&, int, const std::string&)>;

/**
* Set interceptor for thread start.
*/
Interceptors& onThreadStart(ThreadStartCallback cb) { _valid = true; _threadStartCb = std::move(cb); return *this; }
Interceptors& onThreadStart(ThreadStartCb cb) { _valid = true; _threadStartCb = std::move(cb); return *this; }

/**
* Set interceptor for thread exit.
*/
Interceptors& onThreadExit(ThreadExitCallback cb) { _valid = true; _threadExitCb = std::move(cb); return *this; }
Interceptors& onThreadExit(ThreadExitCb cb) { _valid = true; _threadExitCb = std::move(cb); return *this; }

/**
* Set interceptor for broker state change.
*/
Interceptors& onBrokerStateChange(BrokerStateChangeCb cb) { _valid = true; _brokerStateChangeCb = std::move(cb); return *this; }

/**
* Get interceptor for thread start.
*/
ThreadStartCallback onThreadStart() const { return _threadStartCb; }
ThreadStartCb onThreadStart() const { return _threadStartCb; }

/**
* Get interceptor for thread exit.
*/
ThreadExitCallback onThreadExit() const { return _threadExitCb; }
ThreadExitCb onThreadExit() const { return _threadExitCb; }

/**
* Get interceptor for broker state change.
*/
BrokerStateChangeCb onBrokerStateChange() const { return _brokerStateChangeCb; }

/**
* Check if there's no interceptor.
*/
bool empty() const { return !_valid; }

private:
ThreadStartCallback _threadStartCb;
ThreadExitCallback _threadExitCb;
bool _valid = false;
ThreadStartCb _threadStartCb;
ThreadExitCb _threadExitCb;
BrokerStateChangeCb _brokerStateChangeCb;

bool _valid = false;
};

} } // end of KAFKA_API::clients
Expand Down
25 changes: 23 additions & 2 deletions include/kafka/KafkaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ class KafkaClient
static rd_kafka_resp_err_t configInterceptorOnNew(rd_kafka_t* rk, const rd_kafka_conf_t* conf, void* opaque, char* errStr, std::size_t maxErrStrSize);
static rd_kafka_resp_err_t interceptorOnThreadStart(rd_kafka_t* rk, rd_kafka_thread_type_t threadType, const char* threadName, void* opaque);
static rd_kafka_resp_err_t interceptorOnThreadExit(rd_kafka_t* rk, rd_kafka_thread_type_t threadType, const char* threadName, void* opaque);
static rd_kafka_resp_err_t interceptorOnBrokerStateChange(rd_kafka_t* rk, int id, const char* secproto, const char* host, int port, const char* state, void* opaque);

// Log callback (for class instance)
void onLog(int level, const char* fac, const char* buf) const;
Expand All @@ -205,6 +206,7 @@ class KafkaClient
// Interceptor callback (for class instance)
void interceptThreadStart(const std::string& threadName, const std::string& threadType);
void interceptThreadExit(const std::string& threadName, const std::string& threadType);
void interceptBrokerStateChange(int id, const std::string& secproto, const std::string& host, int port, const std::string& state);

protected:
struct Pollable
Expand Down Expand Up @@ -608,6 +610,12 @@ KafkaClient::interceptThreadExit(const std::string& threadName, const std::strin
if (const auto& cb = _interceptors.onThreadExit()) cb(threadName, threadType);
}

inline void
KafkaClient::interceptBrokerStateChange(int id, const std::string& secproto, const std::string& host, int port, const std::string& state)
{
if (const auto& cb = _interceptors.onBrokerStateChange()) cb(id, secproto, host, port, state);
}

inline rd_kafka_resp_err_t
KafkaClient::configInterceptorOnNew(rd_kafka_t* rk, const rd_kafka_conf_t* /*conf*/, void* opaque, char* /*errStr*/, std::size_t /*maxErrStrSize*/)
{
Expand All @@ -621,25 +629,38 @@ KafkaClient::configInterceptorOnNew(rd_kafka_t* rk, const rd_kafka_conf_t* /*con
return result;
}

if (auto result = rd_kafka_interceptor_add_on_broker_state_change(rk, "on_broker_state_change", KafkaClient::interceptorOnBrokerStateChange, opaque))
{
return result;
}

return RD_KAFKA_RESP_ERR_NO_ERROR;
}

inline rd_kafka_resp_err_t
KafkaClient::interceptorOnThreadStart(rd_kafka_t* rk, rd_kafka_thread_type_t threadType, const char* threadName, void* /*opaque*/)
KafkaClient::interceptorOnThreadStart(rd_kafka_t* rk, rd_kafka_thread_type_t threadType, const char* threadName, void* /* opaque */)
{
kafkaClient(rk).interceptThreadStart(threadName, toString(threadType));

return RD_KAFKA_RESP_ERR_NO_ERROR;
}

inline rd_kafka_resp_err_t
KafkaClient::interceptorOnThreadExit(rd_kafka_t* rk, rd_kafka_thread_type_t threadType, const char* threadName, void* /*opaque*/)
KafkaClient::interceptorOnThreadExit(rd_kafka_t* rk, rd_kafka_thread_type_t threadType, const char* threadName, void* /* opaque */)
{
kafkaClient(rk).interceptThreadExit(threadName, toString(threadType));

return RD_KAFKA_RESP_ERR_NO_ERROR;
}

inline rd_kafka_resp_err_t
KafkaClient::interceptorOnBrokerStateChange(rd_kafka_t* rk, int id, const char* secproto, const char* host, int port, const char* state, void* /* opaque */)
{
kafkaClient(rk).interceptBrokerStateChange(id, secproto, host, port, state);

return RD_KAFKA_RESP_ERR_NO_ERROR;
}

inline Optional<BrokerMetadata>
KafkaClient::fetchBrokerMetadata(const std::string& topic, std::chrono::milliseconds timeout, bool disableErrorLogging)
{
Expand Down
128 changes: 79 additions & 49 deletions tests/integration/TestKafkaConsumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,67 +23,97 @@ TEST(KafkaConsumer, BasicPoll)

KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);

// The auto-commit consumer
kafka::clients::consumer::KafkaConsumer consumer(KafkaTestUtility::GetKafkaClientCommonConfig());
std::cout << "[" << kafka::utility::getCurrentTime() << "] " << consumer.name() << " started" << std::endl;
std::map<std::string, std::string> brokersState;

kafka::clients::Interceptors interceptors;
interceptors.onBrokerStateChange([&brokersState](int id, const std::string& proto, const std::string& name, int port, const std::string& state) {
const std::string brokerDescription = (std::to_string(id) + " - " + proto + "://" + name + ":" + std::to_string(port));
std::cout << "Broker[" << brokerDescription << "] ==> " << state << std::endl;
if (!name.empty() && name != "GroupCoordinator")
{
brokersState[name + ":" + std::to_string(port)] = state;
}
});

// Subscribe topics
consumer.subscribe({topic},
[](kafka::clients::consumer::RebalanceEventType et, const kafka::TopicPartitions& tps) {
if (et == kafka::clients::consumer::RebalanceEventType::PartitionsAssigned) {
// assignment finished
std::cout << "[" << kafka::utility::getCurrentTime() << "] assigned partitions: " << kafka::toString(tps) << std::endl;
}
});
EXPECT_FALSE(consumer.subscription().empty());
{
// Config the consumer with interceptors
kafka::clients::consumer::KafkaConsumer consumer(KafkaTestUtility::GetKafkaClientCommonConfig()
.put(kafka::clients::Config::INTERCEPTORS, interceptors));

// No message yet
auto records = KafkaTestUtility::ConsumeMessagesUntilTimeout(consumer, std::chrono::seconds(1));
EXPECT_EQ(0, records.size());
std::cout << "[" << kafka::utility::getCurrentTime() << "] " << consumer.name() << " started" << std::endl;

// Try to get the beginning offsets
const kafka::TopicPartition tp{topic, partition};
std::cout << "[" << kafka::utility::getCurrentTime() << "] Consumer get the beginningOffset[" << consumer.beginningOffsets({tp})[tp] << "]" << std::endl;;
// Subscribe topics
consumer.subscribe({topic},
[](kafka::clients::consumer::RebalanceEventType et, const kafka::TopicPartitions& tps) {
if (et == kafka::clients::consumer::RebalanceEventType::PartitionsAssigned) {
// assignment finished
std::cout << "[" << kafka::utility::getCurrentTime() << "] assigned partitions: " << kafka::toString(tps) << std::endl;
}
});
EXPECT_FALSE(consumer.subscription().empty());

// Prepare some messages to send
const std::vector<std::tuple<kafka::Headers, std::string, std::string>> messages = {
{kafka::Headers{}, "key1", "value1"},
{kafka::Headers{}, "key2", "value2"},
{kafka::Headers{}, "key3", "value3"},
};
// No message yet
auto records = KafkaTestUtility::ConsumeMessagesUntilTimeout(consumer, std::chrono::seconds(1));
EXPECT_EQ(0, records.size());

// Send the messages
KafkaTestUtility::ProduceMessages(topic, partition, messages);
// Should be able to get all brokers' state
EXPECT_EQ(KafkaTestUtility::GetNumberOfKafkaBrokers(), brokersState.size());
// All brokers' state should be "UP"
for (const auto& brokerState: brokersState)
{
EXPECT_EQ("UP", brokerState.second);
}

// Poll these messages
records = KafkaTestUtility::ConsumeMessagesUntilTimeout(consumer);
EXPECT_EQ(messages.size(), records.size());
// Try to get the beginning offsets
const kafka::TopicPartition tp{topic, partition};
std::cout << "[" << kafka::utility::getCurrentTime() << "] Consumer get the beginningOffset[" << consumer.beginningOffsets({tp})[tp] << "]" << std::endl;;

// Copyable ConsumerRecord
{
auto recordsCopy = records;
recordsCopy.clear();
}
// Prepare some messages to send
const std::vector<std::tuple<kafka::Headers, std::string, std::string>> messages = {
{kafka::Headers{}, "key1", "value1"},
{kafka::Headers{}, "key2", "value2"},
{kafka::Headers{}, "key3", "value3"},
};

// Check messages
std::size_t rcvMsgCount = 0;
for (auto& record: records)
{
ASSERT_TRUE(rcvMsgCount < messages.size());
// Send the messages
KafkaTestUtility::ProduceMessages(topic, partition, messages);

EXPECT_EQ(topic, record.topic());
EXPECT_EQ(partition, record.partition());
EXPECT_EQ(0, record.headers().size());
EXPECT_EQ(std::get<1>(messages[rcvMsgCount]).size(), record.key().size());
EXPECT_EQ(0, std::memcmp(std::get<1>(messages[rcvMsgCount]).c_str(), record.key().data(), record.key().size()));
EXPECT_EQ(std::get<2>(messages[rcvMsgCount]).size(), record.value().size());
EXPECT_EQ(0, std::memcmp(std::get<2>(messages[rcvMsgCount]).c_str(), record.value().data(), record.value().size()));
// Poll these messages
records = KafkaTestUtility::ConsumeMessagesUntilTimeout(consumer);
EXPECT_EQ(messages.size(), records.size());

++rcvMsgCount;
// Copyable ConsumerRecord
{
auto recordsCopy = records;
recordsCopy.clear();
}

// Check messages
std::size_t rcvMsgCount = 0;
for (auto& record: records)
{
ASSERT_TRUE(rcvMsgCount < messages.size());

EXPECT_EQ(topic, record.topic());
EXPECT_EQ(partition, record.partition());
EXPECT_EQ(0, record.headers().size());
EXPECT_EQ(std::get<1>(messages[rcvMsgCount]).size(), record.key().size());
EXPECT_EQ(0, std::memcmp(std::get<1>(messages[rcvMsgCount]).c_str(), record.key().data(), record.key().size()));
EXPECT_EQ(std::get<2>(messages[rcvMsgCount]).size(), record.value().size());
EXPECT_EQ(0, std::memcmp(std::get<2>(messages[rcvMsgCount]).c_str(), record.value().data(), record.value().size()));

++rcvMsgCount;
}

// Close the consumer
consumer.close();
}

// Close the consumer
consumer.close();
// All brokers' state should be "DOWN"
for (const auto& brokerState: brokersState)
{
EXPECT_EQ("DOWN", brokerState.second);
}
}

TEST(KafkaConsumer, PollWithHeaders)
Expand Down

0 comments on commit 71c6a9e

Please sign in to comment.