Skip to content

Commit

Permalink
Add validation for Properties
Browse files Browse the repository at this point in the history
  • Loading branch information
kenneth-jia committed Jan 5, 2023
1 parent 30ec31a commit 13bddf3
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 16 deletions.
17 changes: 9 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ Eventually, we worked out the ***modern-cpp-kafka***, -- a ***header-only*** lib
* Kafka cluster setup

* [Quick Start For Cluster Setup](https://kafka.apache.org/documentation/#quickstart)

* [Cluster Setup Scripts For Test](https://github.com/morganstanley/modern-cpp-kafka/blob/main/scripts/start-local-kafka-cluster.py)

* [Kafka Broker Configuration](doc/KafkaBrokerConfiguration.md)
Expand All @@ -200,12 +200,13 @@ Eventually, we worked out the ***modern-cpp-kafka***, -- a ***header-only*** lib
| `KAFKA_BROKER_PIDS` | The broker PIDs for test runner to manipulate | `export KAFKA_BROKER_PIDS=61567,61569,61571` |
| `KAFKA_CLIENT_ADDITIONAL_SETTINGS` | Could be used for addtional configuration for Kafka clients | `export KAFKA_CLIENT_ADDITIONAL_SETTINGS="security.protocol=SASL_PLAINTEXT;sasl.kerberos.service.name=...;sasl.kerberos.keytab=...;sasl.kerberos.principal=..."` |

* The environment variable `KAFKA_BROKER_LIST` is mandatory for integration/robustness test
* The environment variable `KAFKA_BROKER_LIST` is mandatory for integration/robustness test, which requires the Kafka cluster.

* The environment variable `KAFKA_BROKER_PIDS` is mandatory for robustness test, which requires the Kafka cluster and the privilege to stop/resume the brokers.

* The environment variable `KAFKA_BROKER_PIDS` is mandatory for robustness test
| Test Type | `KAFKA_BROKER_LIST` | `KAFKA_BROKER_PIDS` |
| -------------------------------------------------------------------------------------------------- | -------------------- | ------------------- |
| [tests/unit](https://github.com/morganstanley/modern-cpp-kafka/tree/main/tests/unit) | - | - |
| [tests/integration](https://github.com/morganstanley/modern-cpp-kafka/tree/main/tests/integration) | Required | - |
| [tests/robustness](https://github.com/morganstanley/modern-cpp-kafka/tree/main/tests/robustness) | Required | Required |

| Test Type | Requires Kafka Cluster | Requires Privilege to Stop/Resume the Brokers |
| -------------------------------------------------------------------------------------------------- | ------------------------ | --------------------------------------------- |
| [tests/unit](https://github.com/morganstanley/modern-cpp-kafka/tree/main/tests/unit) | - | - |
| [tests/integration](https://github.com/morganstanley/modern-cpp-kafka/tree/main/tests/integration) | Y (`KAFKA_BROKER_LIST`) | - |
| [tests/robustness`](https://github.com/morganstanley/modern-cpp-kafka/tree/main/tests/robustness) | Y (`KAFKA_BROKER_LIST`) | Y (`KAFKA_BROKER_PIDS`) |
54 changes: 46 additions & 8 deletions include/kafka/Properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,21 @@ class Properties
template<class T>
static std::string getString(const std::string& value) { return value; }

const ValueType& validate(const std::string& key) const
{
static const std::vector<std::string> nonStringValueKeys = {
"log_cb", "error_cb", "stats_cb", "oauthbearer_token_refresh_cb", "interceptors"
};

if ((expectedKey.empty() && std::any_of(nonStringValueKeys.cbegin(), nonStringValueKeys.cend(), [key](const auto& k) { return k == key; }))
|| (!expectedKey.empty() && key != expectedKey))
{
throw std::runtime_error("Invalid key/value for configuration: " + key);
}

return *this;
}

template<class T>
struct ObjWrap: public Object
{
Expand All @@ -55,18 +70,35 @@ class Properties

ValueType() = default;

ValueType(const std::string& value) { object = std::make_shared<ObjWrap<std::string>>(value); } // NOLINT
ValueType(const LogCallback& cb) { object = std::make_shared<ObjWrap<LogCallback>>(cb); } // NOLINT
ValueType(const ErrorCallback& cb) { object = std::make_shared<ObjWrap<ErrorCallback>>(cb); } // NOLINT
ValueType(const StatsCallback& cb) { object = std::make_shared<ObjWrap<StatsCallback>>(cb); } // NOLINT
ValueType(const OauthbearerTokenRefreshCallback& cb) { object = std::make_shared<ObjWrap<OauthbearerTokenRefreshCallback>>(cb); } // NOLINT
ValueType(const Interceptors& interceptors) { object = std::make_shared<ObjWrap<Interceptors>>(interceptors); } // NOLINT
ValueType(const std::string& value) // NOLINT
{ object = std::make_shared<ObjWrap<std::string>>(value); }

ValueType(const LogCallback& cb) // NOLINT
: expectedKey("log_cb")
{ object = std::make_shared<ObjWrap<LogCallback>>(cb); }

ValueType(const ErrorCallback& cb) // NOLINT
: expectedKey("error_cb")
{ object = std::make_shared<ObjWrap<ErrorCallback>>(cb); }

ValueType(const StatsCallback& cb) // NOLINT
: expectedKey("stats_cb")
{ object = std::make_shared<ObjWrap<StatsCallback>>(cb); }

ValueType(const OauthbearerTokenRefreshCallback& cb) // NOLINT
: expectedKey("oauthbearer_token_refresh_cb")
{ object = std::make_shared<ObjWrap<OauthbearerTokenRefreshCallback>>(cb); }

ValueType(const Interceptors& interceptors) // NOLINT
: expectedKey("interceptors")
{ object = std::make_shared<ObjWrap<Interceptors>>(interceptors); }

bool operator==(const ValueType& rhs) const { return toString() == rhs.toString(); }

std::string toString() const { return object->toString(); }

private:
std::string expectedKey;
std::shared_ptr<Object> object;
};

Expand All @@ -76,7 +108,13 @@ class Properties

Properties() = default;
Properties(const Properties&) = default;
Properties(PropertiesMap kvMap): _kvMap(std::move(kvMap)) {} // NOLINT
Properties(PropertiesMap kvMap): _kvMap(std::move(kvMap)) // NOLINT
{
for (const auto& kv: _kvMap)
{
kv.second.validate(kv.first);
}
}
virtual ~Properties() = default;

bool operator==(const Properties& rhs) const { return map() == rhs.map(); }
Expand All @@ -88,7 +126,7 @@ class Properties
template <class T>
Properties& put(const std::string& key, const T& value)
{
_kvMap[key] = ValueType(value);
_kvMap[key] = ValueType(value).validate(key);
return *this;
}

Expand Down
64 changes: 64 additions & 0 deletions tests/unit/TestProperties.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,67 @@ TEST(Properties, SensitiveProperties)

EXPECT_EQ("sasl.password=*|sasl.username=*|ssl.key.password=*|ssl.key.pem=*|ssl.keystore.password=*|ssl_key=*", props.toString());
}

TEST(Properties, Validation)
{
kafka::Properties props;

props.put("whatever", "somevalue");

// Test with invalid keys
auto tryWithInvalidKey = [&props](auto v)
{
try
{
props.put("invalid_key", v);
return false;
}
catch (const std::runtime_error& e)
{
std::cout << "Exception caught: " << e.what() << std::endl;
}
return true;
};

EXPECT_TRUE(tryWithInvalidKey([](int /*level*/, const char* /*filename*/, int /*lineno*/, const char* msg) { std::cout << msg << std::endl; }));
EXPECT_TRUE(tryWithInvalidKey([](const kafka::Error& err) { std::cerr << err.toString() << std::endl; }));
EXPECT_TRUE(tryWithInvalidKey([](const std::string& stats) { std::cout << stats << std::endl; }));
const kafka::clients::OauthbearerTokenRefreshCallback oauthTokenRefreshCb = [](const std::string&) { return kafka::clients::SaslOauthbearerToken(); };
EXPECT_TRUE(tryWithInvalidKey(oauthTokenRefreshCb));
EXPECT_TRUE(tryWithInvalidKey(kafka::clients::Interceptors{}));

// Test with invalid values
const auto tryWithInvalidValue = [&props](const std::string& key)
{
try
{
props.put(key, "haha");
return false;
}
catch (const std::runtime_error& e)
{
std::cout << "exception caught: " << e.what() << std::endl;
}
return true;
};

EXPECT_TRUE(tryWithInvalidValue(kafka::clients::Config::LOG_CB));
EXPECT_TRUE(tryWithInvalidValue(kafka::clients::Config::ERROR_CB));
EXPECT_TRUE(tryWithInvalidValue(kafka::clients::Config::STATS_CB));
EXPECT_TRUE(tryWithInvalidValue(kafka::clients::Config::OAUTHBEARER_TOKEN_REFRESH_CB));
EXPECT_TRUE(tryWithInvalidValue(kafka::clients::Config::INTERCEPTORS));

// Failure within constructor
try
{
const kafka::Properties properties = {{
{ "interceptorsxx", { kafka::clients::Interceptors{} } },
}};
EXPECT_FALSE(true);
}
catch (const std::runtime_error& e)
{
std::cout << "exception caught: " << e.what() << std::endl;
}
}

0 comments on commit 13bddf3

Please sign in to comment.