Skip to content

Commit

Permalink
New addon -- KafkaMetrics
Browse files Browse the repository at this point in the history
  • Loading branch information
kenneth-jia committed Dec 1, 2021
1 parent d7ccbee commit 12bae97
Show file tree
Hide file tree
Showing 5 changed files with 773 additions and 4 deletions.
17 changes: 13 additions & 4 deletions .github/workflows/kafka_api_ci_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:
- master

env:
KAFKA_SRC_LINK: https://apache.newfountain.nl/kafka/2.7.1/kafka_2.13-2.7.1.tgz
KAFKA_SRC_LINK: https://archive.apache.org/dist/kafka/2.8.1/kafka_2.13-2.8.1.tgz
CPU_CORE_NUM: 2
LIBRDKAFKA_VERSION: 1.7.0
BUILD_SUB_DIR: build/sub-build
Expand Down Expand Up @@ -134,7 +134,11 @@ jobs:
make -j${CPU_CORE_NUM} && sudo make install
cd ../
# 5. Install tools to generate document
# 5. Install rapidjson (for `addons/KafkaMetrics.h`)
wget -nv https://github.com/Tencent/rapidjson/archive/refs/tags/v1.1.0.tar.gz
tar -xzf v1.1.0.tar.gz
# 6. Install tools to generate document
if [ ${GENERATE_DOC} ]; then
sudo apt install -y python3-pip
sudo pip3 install markdown
Expand Down Expand Up @@ -184,11 +188,13 @@ jobs:
export BUILD_OPTION="${BUILD_OPTION} -DBUILD_OPTION_GEN_DOC=ON"
fi
export RAPIDJSON_INCLUDE_DIRS=`pwd`/rapidjson-1.1.0/include
env CXX=${BUILD_CXX} cmake ../.. ${CMAKE_CXX_STANDARD} ${CMAKE_BUILD_TYPE} ${BUILD_OPTION}
- name: Build
run: |
cd ${BUILD_SUB_DIR}
make -j${CPU_CORE_NUM} VERBOSE=1
- name: Install
Expand Down Expand Up @@ -275,6 +281,9 @@ jobs:
cp -v "C:\VCPKG\INSTALLED\x86-windows\bin\boost_program_options-vc142-mt-x32-1_77.dll" "C:\VCPKG\INSTALLED\x86-windows\bin\boost_program_options.dll"
cp -v "C:\VCPKG\INSTALLED\x86-windows\bin\boost_program_options-vc142-mt-x32-1_77.pdb" "C:\VCPKG\INSTALLED\x86-windows\bin\boost_program_options.pdb"
# Install rapidjson
vcpkg install rapidjson
vcpkg integrate install
- name: Config
Expand Down Expand Up @@ -303,8 +312,8 @@ jobs:
tree "tests"
# Install kafka
Invoke-WebRequest -Uri $Env:KAFKA_SRC_LINK -OutFile kafka_2.13-2.7.1.tgz
tar xvzf kafka_2.13-2.7.1.tgz
Invoke-WebRequest -Uri $Env:KAFKA_SRC_LINK -OutFile kafka_2.13-2.8.1.tgz
tar xvzf kafka_2.13-2.8.1.tgz
ctest -VV -L $Env:TEST_LABELS
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ Eventually, we worked out the ***modern-cpp-kafka***, -- a header-only library t

* `SASL_LIBRARYDIR`/`SASL_LIBRARY` -- if SASL connection support is wanted

* `RAPIDJSON_INCLUDE_DIRS` -- `addons/KafkaMetrics` requires **rapidjson** headers

* Create an empty directory for the build, and `cd` to it

* Build commands
Expand Down
208 changes: 208 additions & 0 deletions include/kafka/addons/KafkaMetrics.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
#pragma once

#include "kafka/Project.h"

// https://github.com/Tencent/rapidjson/releases/tag/v1.1.0
#include "rapidjson/document.h"
#include "rapidjson/stringbuffer.h"
#include "rapidjson/writer.h"

#include <algorithm>
#include <iostream>
#include <sstream>
#include <stdexcept>
#include <string>
#include <utility>
#include <vector>


namespace KAFKA_API {

/**
* \brief Helps to parse the metrics string with JSON format.
*/
class KafkaMetrics
{
public:
/**
* \brief Initilize with the metrics string.
*/
explicit KafkaMetrics(std::string jsonMetrics);

static const constexpr char* WILDCARD = "*";

using KeysType = std::vector<std::string>;

/**
* \brief The matched keys (for wildcards) and the value.
*/
template<typename ValueType>
using ResultsType = std::vector<std::pair<KeysType, ValueType>>;

/**
* \brief Get integer value(s) for the specified metrics.
* Note: the wildcard ("*") is supported.
*/
ResultsType<std::int64_t> getInt(const KeysType& keys) { return get<std::int64_t>(keys); }

/**
* \brief Get string value(s) for the specified metrics.
* Note: the wildcard ("*") is supported.
*/
ResultsType<std::string> getString(const KeysType& keys) { return get<std::string>(keys); }

static std::string toString(const KafkaMetrics::KeysType& keys);

template<typename ValueType>
static std::string toString(const KafkaMetrics::ResultsType<ValueType>& results);

private:
template<typename ValueType>
ResultsType<ValueType> get(const KeysType& keys);

template<typename ValueType>
void getResults(ResultsType<ValueType>& results,
KeysType& keysForWildcards,
rapidjson::Value::ConstMemberIterator iter,
KeysType::const_iterator keysToParse,
KeysType::const_iterator keysEnd);

template<typename ValueType>
static ValueType getValue(rapidjson::Value::ConstMemberIterator iter);

#if COMPILER_SUPPORTS_CPP_17
std::string _decodeBuf;
#else
std::vector<char> _decodeBuf;
#endif
rapidjson::Document _jsonDoc;
};

inline
KafkaMetrics::KafkaMetrics(std::string jsonMetrics)
#if COMPILER_SUPPORTS_CPP_17
: _decodeBuf(std::move(jsonMetrics))
#else
: _decodeBuf(jsonMetrics.cbegin(), jsonMetrics.cend() + 1)
#endif
{
if (_jsonDoc.ParseInsitu(_decodeBuf.data()).HasParseError())
{
throw std::runtime_error("Failed to parse string with JSON format!");
}
}

template<>
inline std::int64_t
KafkaMetrics::getValue<std::int64_t>(rapidjson::Value::ConstMemberIterator iter)
{
return iter->value.GetInt();
}

template<>
inline std::string
KafkaMetrics::getValue<std::string>(rapidjson::Value::ConstMemberIterator iter)
{
return iter->value.GetString();
}

template<typename ValueType>
inline KafkaMetrics::ResultsType<ValueType>
KafkaMetrics::get(const KeysType& keys)
{
if (keys.empty()) throw std::invalid_argument("Input keys cannot be empty!");
if (keys.front() == WILDCARD) throw std::invalid_argument("The first key cannot be wildcard!");
if (keys.back() == WILDCARD) throw std::invalid_argument("The last key cannot be wildcard!");

ResultsType<ValueType> results;

rapidjson::Value::ConstMemberIterator iter = _jsonDoc.FindMember(keys.front().c_str());
if (iter == _jsonDoc.MemberEnd()) return results;

if (keys.size() == 1)
{
if (std::is_same<ValueType, std::string>::value ? iter->value.IsString() : iter->value.IsInt())
{
results.emplace_back(KeysType{}, getValue<ValueType>(iter));
}

return results;
}

KeysType keysForWildcards;

getResults(results, keysForWildcards, iter, keys.cbegin() + 1, keys.cend());
return results;
}

template<typename ValueType>
inline void
KafkaMetrics::getResults(KafkaMetrics::ResultsType<ValueType>& results,
KeysType& keysForWildcards,
rapidjson::Value::ConstMemberIterator iter,
KeysType::const_iterator keysToParse,
KeysType::const_iterator keysEnd)
{
if (!iter->value.IsObject()) return;

const auto& key = *(keysToParse++);
const bool isTheEnd = (keysToParse == keysEnd);

if (key == WILDCARD)
{
for (rapidjson::Value::ConstMemberIterator subIter = iter->value.MemberBegin(); subIter != iter->value.MemberEnd(); ++subIter)
{
KeysType newKeysForWildcards = keysForWildcards;
newKeysForWildcards.emplace_back(subIter->name.GetString());

getResults(results, newKeysForWildcards, subIter, keysToParse, keysEnd);
}
}
else
{
rapidjson::Value::ConstMemberIterator subIter = iter->value.FindMember(key.c_str());
if (subIter == iter->value.MemberEnd()) return;

if (!isTheEnd)
{
getResults(results, keysForWildcards, subIter, keysToParse, keysEnd);
}
else if (std::is_same<ValueType, std::string>::value ? subIter->value.IsString() : subIter->value.IsInt())
{
results.emplace_back(keysForWildcards, getValue<ValueType>(subIter));
}
}
}

inline std::string
KafkaMetrics::toString(const KafkaMetrics::KeysType& keys)
{
std::string ret;

std::for_each(keys.cbegin(), keys.cend(),
[&ret](const auto& key){ ret.append((ret.empty() ? std::string() : std::string(", ")) + "\"" + key + "\""); });

return ret;
}

template<typename ValueType>
inline std::string
KafkaMetrics::toString(const KafkaMetrics::ResultsType<ValueType>& results)
{
std::ostringstream oss;
bool isTheFirstOne = true;

std::for_each(results.cbegin(), results.cend(),
[&oss, &isTheFirstOne](const auto& result) {
const auto keysString = toString(result.first);

oss << (isTheFirstOne ? (isTheFirstOne = false, "") : ", ")
<< (keysString.empty() ? "" : (std::string("[") + keysString + "]:"));
oss << (std::is_same<ValueType, std::string>::value ? "\"" : "") << result.second << (std::is_same<ValueType, std::string>::value ? "\"" : "");
});

return oss.str();
}

} // end of KAFKA_API

17 changes: 17 additions & 0 deletions tests/unit/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,25 @@
project("kafka-unit-test")

#---------------------------
# rapidjson
#---------------------------
if (DEFINED ENV{RAPIDJSON_INCLUDE_DIRS})
set(RAPIDJSON_INCLUDE_DIRS $ENV{RAPIDJSON_INCLUDE_DIRS})
else ()
find_package(rapidjson REQUIRED)
if (NOT RAPIDJSON_INCLUDE_DIRS)
message(FATAL_ERROR "Rapidjson not found!")
endif ()
endif ()

message(STATUS "rapidjson include directory: ${RAPIDJSON_INCLUDE_DIRS}")


# Target
file(GLOB TEST_SRCS *.cc)

include_directories(${PROJECT_NAME} SYSTEM INTERFACE ${RAPIDJSON_INCLUDE_DIRS})

add_executable("${PROJECT_NAME}" ${TEST_SRCS})
target_link_libraries("${PROJECT_NAME}" modern-cpp-kafka-api gtest gtest_main)

Expand Down
Loading

0 comments on commit 12bae97

Please sign in to comment.