diff --git a/README.md b/README.md index 70ab044d..eab5bd6f 100644 --- a/README.md +++ b/README.md @@ -187,7 +187,7 @@ Eventually, we worked out the ***modern-cpp-kafka***, -- a ***header-only*** lib * Kafka cluster setup * [Quick Start For Cluster Setup](https://kafka.apache.org/documentation/#quickstart) - + * [Cluster Setup Scripts For Test](https://github.com/morganstanley/modern-cpp-kafka/blob/main/scripts/start-local-kafka-cluster.py) * [Kafka Broker Configuration](doc/KafkaBrokerConfiguration.md) @@ -200,12 +200,13 @@ Eventually, we worked out the ***modern-cpp-kafka***, -- a ***header-only*** lib | `KAFKA_BROKER_PIDS` | The broker PIDs for test runner to manipulate | `export KAFKA_BROKER_PIDS=61567,61569,61571` | | `KAFKA_CLIENT_ADDITIONAL_SETTINGS` | Could be used for addtional configuration for Kafka clients | `export KAFKA_CLIENT_ADDITIONAL_SETTINGS="security.protocol=SASL_PLAINTEXT;sasl.kerberos.service.name=...;sasl.kerberos.keytab=...;sasl.kerberos.principal=..."` | - * The environment variable `KAFKA_BROKER_LIST` is mandatory for integration/robustness test + * The environment variable `KAFKA_BROKER_LIST` is mandatory for integration/robustness test, which requires the Kafka cluster. + + * The environment variable `KAFKA_BROKER_PIDS` is mandatory for robustness test, which requires the Kafka cluster and the privilege to stop/resume the brokers. - * The environment variable `KAFKA_BROKER_PIDS` is mandatory for robustness test + | Test Type | `KAFKA_BROKER_LIST` | `KAFKA_BROKER_PIDS` | + | -------------------------------------------------------------------------------------------------- | -------------------- | ------------------- | + | [tests/unit](https://github.com/morganstanley/modern-cpp-kafka/tree/main/tests/unit) | - | - | + | [tests/integration](https://github.com/morganstanley/modern-cpp-kafka/tree/main/tests/integration) | Required | - | + | [tests/robustness](https://github.com/morganstanley/modern-cpp-kafka/tree/main/tests/robustness) | Required | Required | - | Test Type | Requires Kafka Cluster | Requires Privilege to Stop/Resume the Brokers | - | -------------------------------------------------------------------------------------------------- | ------------------------ | --------------------------------------------- | - | [tests/unit](https://github.com/morganstanley/modern-cpp-kafka/tree/main/tests/unit) | - | - | - | [tests/integration](https://github.com/morganstanley/modern-cpp-kafka/tree/main/tests/integration) | Y (`KAFKA_BROKER_LIST`) | - | - | [tests/robustness`](https://github.com/morganstanley/modern-cpp-kafka/tree/main/tests/robustness) | Y (`KAFKA_BROKER_LIST`) | Y (`KAFKA_BROKER_PIDS`) | diff --git a/include/kafka/Properties.h b/include/kafka/Properties.h index dcc2a9e4..e2920150 100644 --- a/include/kafka/Properties.h +++ b/include/kafka/Properties.h @@ -42,6 +42,21 @@ class Properties template static std::string getString(const std::string& value) { return value; } + const ValueType& validate(const std::string& key) const + { + static const std::vector nonStringValueKeys = { + "log_cb", "error_cb", "stats_cb", "oauthbearer_token_refresh_cb", "interceptors" + }; + + if ((expectedKey.empty() && std::any_of(nonStringValueKeys.cbegin(), nonStringValueKeys.cend(), [key](const auto& k) { return k == key; })) + || (!expectedKey.empty() && key != expectedKey)) + { + throw std::runtime_error("Invalid key/value for configuration: " + key); + } + + return *this; + } + template struct ObjWrap: public Object { @@ -55,18 +70,35 @@ class Properties ValueType() = default; - ValueType(const std::string& value) { object = std::make_shared>(value); } // NOLINT - ValueType(const LogCallback& cb) { object = std::make_shared>(cb); } // NOLINT - ValueType(const ErrorCallback& cb) { object = std::make_shared>(cb); } // NOLINT - ValueType(const StatsCallback& cb) { object = std::make_shared>(cb); } // NOLINT - ValueType(const OauthbearerTokenRefreshCallback& cb) { object = std::make_shared>(cb); } // NOLINT - ValueType(const Interceptors& interceptors) { object = std::make_shared>(interceptors); } // NOLINT + ValueType(const std::string& value) // NOLINT + { object = std::make_shared>(value); } + + ValueType(const LogCallback& cb) // NOLINT + : expectedKey("log_cb") + { object = std::make_shared>(cb); } + + ValueType(const ErrorCallback& cb) // NOLINT + : expectedKey("error_cb") + { object = std::make_shared>(cb); } + + ValueType(const StatsCallback& cb) // NOLINT + : expectedKey("stats_cb") + { object = std::make_shared>(cb); } + + ValueType(const OauthbearerTokenRefreshCallback& cb) // NOLINT + : expectedKey("oauthbearer_token_refresh_cb") + { object = std::make_shared>(cb); } + + ValueType(const Interceptors& interceptors) // NOLINT + : expectedKey("interceptors") + { object = std::make_shared>(interceptors); } bool operator==(const ValueType& rhs) const { return toString() == rhs.toString(); } std::string toString() const { return object->toString(); } private: + std::string expectedKey; std::shared_ptr object; }; @@ -76,7 +108,13 @@ class Properties Properties() = default; Properties(const Properties&) = default; - Properties(PropertiesMap kvMap): _kvMap(std::move(kvMap)) {} // NOLINT + Properties(PropertiesMap kvMap): _kvMap(std::move(kvMap)) // NOLINT + { + for (const auto& kv: _kvMap) + { + kv.second.validate(kv.first); + } + } virtual ~Properties() = default; bool operator==(const Properties& rhs) const { return map() == rhs.map(); } @@ -88,7 +126,7 @@ class Properties template Properties& put(const std::string& key, const T& value) { - _kvMap[key] = ValueType(value); + _kvMap[key] = ValueType(value).validate(key); return *this; } diff --git a/tests/unit/TestProperties.cc b/tests/unit/TestProperties.cc index 1ed4853f..f4ea6f3d 100644 --- a/tests/unit/TestProperties.cc +++ b/tests/unit/TestProperties.cc @@ -126,3 +126,67 @@ TEST(Properties, SensitiveProperties) EXPECT_EQ("sasl.password=*|sasl.username=*|ssl.key.password=*|ssl.key.pem=*|ssl.keystore.password=*|ssl_key=*", props.toString()); } + +TEST(Properties, Validation) +{ + kafka::Properties props; + + props.put("whatever", "somevalue"); + + // Test with invalid keys + auto tryWithInvalidKey = [&props](auto v) + { + try + { + props.put("invalid_key", v); + return false; + } + catch (const std::runtime_error& e) + { + std::cout << "Exception caught: " << e.what() << std::endl; + } + return true; + }; + + EXPECT_TRUE(tryWithInvalidKey([](int /*level*/, const char* /*filename*/, int /*lineno*/, const char* msg) { std::cout << msg << std::endl; })); + EXPECT_TRUE(tryWithInvalidKey([](const kafka::Error& err) { std::cerr << err.toString() << std::endl; })); + EXPECT_TRUE(tryWithInvalidKey([](const std::string& stats) { std::cout << stats << std::endl; })); + const kafka::clients::OauthbearerTokenRefreshCallback oauthTokenRefreshCb = [](const std::string&) { return kafka::clients::SaslOauthbearerToken(); }; + EXPECT_TRUE(tryWithInvalidKey(oauthTokenRefreshCb)); + EXPECT_TRUE(tryWithInvalidKey(kafka::clients::Interceptors{})); + + // Test with invalid values + const auto tryWithInvalidValue = [&props](const std::string& key) + { + try + { + props.put(key, "haha"); + return false; + } + catch (const std::runtime_error& e) + { + std::cout << "exception caught: " << e.what() << std::endl; + } + return true; + }; + + EXPECT_TRUE(tryWithInvalidValue(kafka::clients::Config::LOG_CB)); + EXPECT_TRUE(tryWithInvalidValue(kafka::clients::Config::ERROR_CB)); + EXPECT_TRUE(tryWithInvalidValue(kafka::clients::Config::STATS_CB)); + EXPECT_TRUE(tryWithInvalidValue(kafka::clients::Config::OAUTHBEARER_TOKEN_REFRESH_CB)); + EXPECT_TRUE(tryWithInvalidValue(kafka::clients::Config::INTERCEPTORS)); + + // Failure within constructor + try + { + const kafka::Properties properties = {{ + { "interceptorsxx", { kafka::clients::Interceptors{} } }, + }}; + EXPECT_FALSE(true); + } + catch (const std::runtime_error& e) + { + std::cout << "exception caught: " << e.what() << std::endl; + } +} +