Skip to content

Commit

Permalink
Refactor Properties
Browse files Browse the repository at this point in the history
  • Loading branch information
kenneth-jia committed Dec 7, 2022
1 parent b39fd40 commit 0d6b4fa
Show file tree
Hide file tree
Showing 31 changed files with 896 additions and 888 deletions.
18 changes: 9 additions & 9 deletions examples/kafka_async_producer_copy_payload.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,24 @@

int main(int argc, char **argv)
{
using namespace kafka;
using namespace kafka::clients;
using namespace kafka::clients::producer;

if (argc != 3) {
std::cerr << "Usage: " << argv[0] << " <brokers> <topic>\n";
exit(argc == 1 ? 0 : 1); // NOLINT
}

const std::string brokers = argv[1];
const kafka::Topic topic = argv[2];
const Topic topic = argv[2];

try {

// Create configuration object
const kafka::Properties props ({
{"bootstrap.servers", brokers},
{"enable.idempotence", "true"},
const Properties props ({
{"bootstrap.servers", {brokers}},
{"enable.idempotence", {"true" }},
});

// Create a producer instance
Expand All @@ -32,13 +34,11 @@ int main(int argc, char **argv)

for (std::string line; std::getline(std::cin, line);) {
// The ProducerRecord doesn't own `line`, it is just a thin wrapper
auto record = producer::ProducerRecord(topic,
kafka::NullKey,
kafka::Value(line.c_str(), line.size()));
auto record = ProducerRecord(topic, NullKey, Value(line.c_str(), line.size()));
// Send the message
producer.send(record,
// The delivery report handler
[](const producer::RecordMetadata& metadata, const kafka::Error& error) {
[](const RecordMetadata& metadata, const Error& error) {
if (!error) {
std::cout << "% Message delivered: " << metadata.toString() << std::endl;
} else {
Expand All @@ -53,7 +53,7 @@ int main(int argc, char **argv)

// producer.close(); // No explicit close is needed, RAII will take care of it

} catch (const kafka::KafkaException& e) {
} catch (const KafkaException& e) {
std::cerr << "% Unexpected exception caught: " << e.what() << std::endl;
}
}
Expand Down
20 changes: 11 additions & 9 deletions examples/kafka_async_producer_not_copy_payload.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,24 @@

int main(int argc, char **argv)
{
using namespace kafka;
using namespace kafka::clients;
using namespace kafka::clients::producer;

if (argc != 3) {
std::cerr << "Usage: " << argv[0] << " <brokers> <topic>\n";
exit(argc == 1 ? 0 : 1); // NOLINT
}

const std::string brokers = argv[1];
const kafka::Topic topic = argv[2];
const Topic topic = argv[2];

try {

// Create configuration object
const kafka::Properties props ({
{"bootstrap.servers", brokers},
{"enable.idempotence", "true"},
const Properties props ({
{"bootstrap.servers", {brokers}},
{"enable.idempotence", {"true" }},
});

// Create a producer instance
Expand All @@ -34,17 +36,17 @@ int main(int argc, char **argv)
std::getline(std::cin, *line);
line = std::make_shared<std::string>()) {
// The ProducerRecord doesn't own `line`, it is just a thin wrapper
auto record = producer::ProducerRecord(topic,
kafka::NullKey,
kafka::Value(line->c_str(), line->size()));
auto record = ProducerRecord(topic,
NullKey,
Value(line->c_str(), line->size()));

// Send the message
producer.send(record,
// The delivery report handler
// Note: Here we capture the shared_pointer of `line`,
// which holds the content for `record.value()`.
// It makes sure the memory block is valid until the lambda finishes.
[line](const producer::RecordMetadata& metadata, const kafka::Error& error) {
[line](const RecordMetadata& metadata, const Error& error) {
if (!error) {
std::cout << "% Message delivered: " << metadata.toString() << std::endl;
} else {
Expand All @@ -57,7 +59,7 @@ int main(int argc, char **argv)

// producer.close(); // No explicit close is needed, RAII will take care of it

} catch (const kafka::KafkaException& e) {
} catch (const KafkaException& e) {
std::cerr << "% Unexpected exception caught: " << e.what() << std::endl;
}
}
Expand Down
18 changes: 11 additions & 7 deletions examples/kafka_auto_commit_consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,28 @@

int main(int argc, char **argv)
{
using namespace kafka;
using namespace kafka::clients;
using namespace kafka::clients::consumer;

if (argc != 3) {
std::cerr << "Usage: " << argv[0] << " <brokers> <topic>\n";
exit(argc == 1 ? 0 : 1); // NOLINT
}

const std::string brokers = argv[1];
const kafka::Topic topic = argv[2];
const Topic topic = argv[2];

try {

// Create configuration object
const kafka::Properties props ({
{"bootstrap.servers", brokers},
{"enable.auto.commit", "true"}
const Properties props ({
{"bootstrap.servers", {brokers}},
{"enable.auto.commit", {"true" }}
});

// Create a consumer instance
kafka::clients::KafkaConsumer consumer(props);
KafkaConsumer consumer(props);

// Subscribe to topics
consumer.subscribe({topic});
Expand All @@ -41,7 +45,7 @@ int main(int argc, char **argv)
std::cout << " Partition: " << record.partition() << std::endl;
std::cout << " Offset : " << record.offset() << std::endl;
std::cout << " Timestamp: " << record.timestamp().toString() << std::endl;
std::cout << " Headers : " << kafka::toString(record.headers()) << std::endl;
std::cout << " Headers : " << toString(record.headers()) << std::endl;
std::cout << " Key [" << record.key().toString() << "]" << std::endl;
std::cout << " Value [" << record.value().toString() << "]" << std::endl;
} else {
Expand All @@ -52,7 +56,7 @@ int main(int argc, char **argv)

// consumer.close(); // No explicit close is needed, RAII will take care of it

} catch (const kafka::KafkaException& e) {
} catch (const KafkaException& e) {
std::cerr << "% Unexpected exception caught: " << e.what() << std::endl;
}
}
Expand Down
18 changes: 11 additions & 7 deletions examples/kafka_manual_commit_consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,27 @@

int main(int argc, char **argv)
{
using namespace kafka;
using namespace kafka::clients;
using namespace kafka::clients::consumer;

if (argc != 3) {
std::cerr << "Usage: " << argv[0] << " <brokers> <topic>\n";
exit(argc == 1 ? 0 : 1); // NOLINT
}

const std::string brokers = argv[1];
const kafka::Topic topic = argv[2];
const Topic topic = argv[2];

try {

// Create configuration object
const kafka::Properties props ({
{"bootstrap.servers", brokers},
const Properties props ({
{"bootstrap.servers", {brokers}},
});

// Create a consumer instance
kafka::clients::KafkaConsumer consumer(props);
KafkaConsumer consumer(props);

// Subscribe to topics
consumer.subscribe({topic});
Expand All @@ -47,7 +51,7 @@ int main(int argc, char **argv)
std::cout << " Partition: " << record.partition() << std::endl;
std::cout << " Offset : " << record.offset() << std::endl;
std::cout << " Timestamp: " << record.timestamp().toString() << std::endl;
std::cout << " Headers : " << kafka::toString(record.headers()) << std::endl;
std::cout << " Headers : " << toString(record.headers()) << std::endl;
std::cout << " Key [" << record.key().toString() << "]" << std::endl;
std::cout << " Value [" << record.value().toString() << "]" << std::endl;

Expand All @@ -61,7 +65,7 @@ int main(int argc, char **argv)
auto now = std::chrono::steady_clock::now();
if (now - lastTimeCommitted > std::chrono::seconds(1)) {
// Commit offsets for messages polled
std::cout << "% syncCommit offsets: " << kafka::utility::getCurrentTime() << std::endl;
std::cout << "% syncCommit offsets: " << utility::getCurrentTime() << std::endl;
consumer.commitSync(); // or commitAsync()

lastTimeCommitted = now;
Expand All @@ -72,7 +76,7 @@ int main(int argc, char **argv)

// consumer.close(); // No explicit close is needed, RAII will take care of it

} catch (const kafka::KafkaException& e) {
} catch (const KafkaException& e) {
std::cerr << "% Unexpected exception caught: " << e.what() << std::endl;
}
}
Expand Down
22 changes: 12 additions & 10 deletions examples/kafka_sync_producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,24 @@

int main(int argc, char **argv)
{
using namespace kafka;
using namespace kafka::clients;
using namespace kafka::clients::producer;

if (argc != 3) {
std::cerr << "Usage: " << argv[0] << " <brokers> <topic>\n";
exit(argc == 1 ? 0 : 1); // NOLINT
}

const std::string brokers = argv[1];
const kafka::Topic topic = argv[2];
const Topic topic = argv[2];

try {

// Create configuration object
const kafka::Properties props({
{"bootstrap.servers", brokers},
{"enable.idempotence", "true"},
const Properties props({
{"bootstrap.servers", {brokers}},
{"enable.idempotence", {"true" }},
});

// Create a producer instance.
Expand All @@ -31,15 +33,15 @@ int main(int argc, char **argv)

for (std::string line; std::getline(std::cin, line);) {
// The ProducerRecord doesn't own `line`, it is just a thin wrapper
auto record = producer::ProducerRecord(topic,
kafka::NullKey,
kafka::Value(line.c_str(), line.size()));
auto record = ProducerRecord(topic,
NullKey,
Value(line.c_str(), line.size()));

// Send the message.
try {
const producer::RecordMetadata metadata = producer.syncSend(record);
const RecordMetadata metadata = producer.syncSend(record);
std::cout << "% Message delivered: " << metadata.toString() << std::endl;
} catch (const kafka::KafkaException& e) {
} catch (const KafkaException& e) {
std::cerr << "% Message delivery failed: " << e.error().message() << std::endl;
}

Expand All @@ -48,7 +50,7 @@ int main(int argc, char **argv)

// producer.close(); // No explicit close is needed, RAII will take care of it

} catch (const kafka::KafkaException& e) {
} catch (const KafkaException& e) {
std::cerr << "% Unexpected exception caught: " << e.what() << std::endl;
}
}
Expand Down
18 changes: 9 additions & 9 deletions include/kafka/AdminClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include <vector>


namespace KAFKA_API { namespace clients {
namespace KAFKA_API { namespace clients { namespace admin {

/**
* The administrative client for Kafka, which supports managing and inspecting topics, etc.
Expand All @@ -27,11 +27,7 @@ class AdminClient: public KafkaClient
{
public:
explicit AdminClient(const Properties& properties)
: KafkaClient(ClientType::AdminClient,
KafkaClient::validateAndReformProperties(properties),
ConfigCallbacksRegister{},
EventsPollingOption::Auto,
Interceptors{})
: KafkaClient(ClientType::AdminClient, KafkaClient::validateAndReformProperties(properties))
{
}

Expand Down Expand Up @@ -148,10 +144,14 @@ AdminClient::createTopics(const Topics& topics,

for (const auto& conf: topicConfig.map())
{
const rd_kafka_resp_err_t err = rd_kafka_NewTopic_set_config(rkNewTopics.back().get(), conf.first.c_str(), conf.second.c_str());
const auto& k = conf.first;
const auto& v = topicConfig.getProperty(k);
if (!v) continue;

const rd_kafka_resp_err_t err = rd_kafka_NewTopic_set_config(rkNewTopics.back().get(), k.c_str(), v->c_str());
if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
{
const std::string errMsg = "Invalid config[" + conf.first + "=" + conf.second + "]";
const std::string errMsg = "Invalid config[" + k + "=" + *v + "]";
KAFKA_API_DO_LOG(Log::Level::Err, errMsg.c_str());
return admin::CreateTopicsResult(Error{RD_KAFKA_RESP_ERR__INVALID_ARG, errMsg});
}
Expand Down Expand Up @@ -344,5 +344,5 @@ AdminClient::deleteRecords(const TopicPartitionOffsets& topicPartitionOffsets,
return admin::DeleteRecordsResult(combineErrors(errors));
}

} } // end of KAFKA_API::clients
} } } // end of KAFKA_API::clients::admin

Loading

0 comments on commit 0d6b4fa

Please sign in to comment.