Skip to content

Commit

Permalink
Interceptors for threads (start, exit)
Browse files Browse the repository at this point in the history
  • Loading branch information
kenneth-jia committed Oct 12, 2022
1 parent 5ce1f8c commit 7418ab3
Show file tree
Hide file tree
Showing 10 changed files with 284 additions and 49 deletions.
6 changes: 1 addition & 5 deletions doc/KafkaConsumerQuickStart.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,11 @@ About `Error`'s `value()`s, there are 2 cases
* How many threads would be created by a KafkaConsumer?
Excluding the user's main thread, if `enable.auto.commit` is `false`, the `KafkaConsumer` would start another (N + 2) threads in the background; otherwise, the `KafkaConsumer` would start (N + 3) background threads. (N means the number of BOOTSTRAP_SERVERS)
1. Each broker (in the list of BOOTSTRAP_SERVERS) would take a seperate thread to transmit messages towards a kafka cluster server.
2. Another 3 threads will handle internal operations, consumer group operations, and kinds of timers, etc.
3. To enable the auto commit, one more thread would be create, which keeps polling/processing the offset-commit callback event.
E.g, if a KafkaConsumer was created with property of `BOOTSTRAP_SERVERS=127.0.0.1:8888,127.0.0.1:8889,127.0.0.1:8890`, it would take 6 threads in total (including the main thread).
3. To enable the auto events-polling, one more background thread would be created, which keeps polling/processing the offset-commit callback event.
* Which one of these threads will handle the callbacks?
Expand Down
8 changes: 2 additions & 6 deletions doc/KafkaProducerQuickStart.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,19 +199,15 @@ Larger `QUEUE_BUFFERING_MAX_MESSAGES`/`QUEUE_BUFFERING_MAX_KBYTES` might help to
### How many threads would be created by a KafkaProducer?
Excluding the user's main thread, `KafkaProducer` would start (N + 3) background threads. (N means the number of BOOTSTRAP_SERVERS)
Most of these background threads are started internally by librdkafka.
Here is a brief introduction what they're used for,
1. Each broker (in the list of BOOTSTRAP_SERVERS) would take a separate thread to transmit messages towards a kafka cluster server.
2. Another 2 background threads would handle internal operations and kinds of timers, etc.
3. One more background thread to keep polling the delivery callback event.
2. Another 2 threads would handle internal operations and kinds of timers, etc.
E.g, if a `KafkaProducer` was created with property of `BOOTSTRAP_SERVERS=127.0.0.1:8888,127.0.0.1:8889,127.0.0.1:8890`, it would take 7 threads in total (including the main thread).
3. To enale the auto events-polling, one more background thread would be created, which keeps polling the delivery callback event.
### Which one of these threads will handle the callbacks
Expand Down
5 changes: 4 additions & 1 deletion include/kafka/AdminClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ class AdminClient: public KafkaClient
public:
explicit AdminClient(const Properties& properties)
: KafkaClient(ClientType::AdminClient,
KafkaClient::validateAndReformProperties(properties))
KafkaClient::validateAndReformProperties(properties),
ConfigCallbacksRegister{},
EventsPollingOption::Auto,
Interceptors{})
{
}

Expand Down
31 changes: 31 additions & 0 deletions include/kafka/Interceptors.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#pragma once

#include <kafka/Project.h>

#include <functional>


namespace KAFKA_API { namespace clients {

class Interceptors
{
public:
using ThreadStartCallback = std::function<void(const std::string&, const std::string&)>;
using ThreadExitCallback = std::function<void(const std::string&, const std::string&)>;

Interceptors& onThreadStart(ThreadStartCallback cb) { _valid = true; _threadStartCb = std::move(cb); return *this; }
Interceptors& onThreadExit(ThreadExitCallback cb) { _valid = true; _threadExitCb = std::move(cb); return *this; }

ThreadStartCallback onThreadStart() const { return _threadStartCb; }
ThreadExitCallback onThreadExit() const { return _threadExitCb; }

bool empty() const { return !_valid; }

private:
ThreadStartCallback _threadStartCb;
ThreadExitCallback _threadExitCb;
bool _valid = false;
};

} } // end of KAFKA_API::clients

101 changes: 90 additions & 11 deletions include/kafka/KafkaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include <kafka/BrokerMetadata.h>
#include <kafka/Error.h>
#include <kafka/Interceptors.h>
#include <kafka/KafkaException.h>
#include <kafka/Log.h>
#include <kafka/Properties.h>
Expand All @@ -20,6 +21,7 @@
#include <mutex>
#include <string>
#include <thread>
#include <vector>


namespace KAFKA_API { namespace clients {
Expand Down Expand Up @@ -168,10 +170,11 @@ class KafkaClient

using ConfigCallbacksRegister = std::function<void(rd_kafka_conf_t*)>;

KafkaClient(ClientType clientType,
const Properties& properties,
const ConfigCallbacksRegister& extraConfigRegister = ConfigCallbacksRegister{},
EventsPollingOption eventsPollingOption = EventsPollingOption::Auto);
KafkaClient(ClientType clientType,
const Properties& properties,
const ConfigCallbacksRegister& extraConfigRegister,
EventsPollingOption eventsPollingOption,
Interceptors interceptors);

rd_kafka_t* getClientHandle() const { return _rk.get(); }

Expand Down Expand Up @@ -221,8 +224,11 @@ class KafkaClient
Logger _logger;
StatsCallback _statsCb;
ErrorCallback _errorCb;
rd_kafka_unique_ptr _rk;

EventsPollingOption _eventsPollingOption;
Interceptors _interceptors;

rd_kafka_unique_ptr _rk;

static std::string getClientTypeString(ClientType type)
{
Expand All @@ -239,6 +245,11 @@ class KafkaClient
// Error callback (for librdkafka)
static void errorCallback(rd_kafka_t* rk, int err, const char* reason, void* opaque);

// Interceptor callback (for librdkafka)
static rd_kafka_resp_err_t configInterceptorOnNew(rd_kafka_t* rk, const rd_kafka_conf_t* conf, void* opaque, char* errStr, std::size_t maxErrStrSize);
static rd_kafka_resp_err_t interceptorOnThreadStart(rd_kafka_t* rk, rd_kafka_thread_type_t threadType, const char* threadName, void* opaque);
static rd_kafka_resp_err_t interceptorOnThreadExit(rd_kafka_t* rk, rd_kafka_thread_type_t threadType, const char* threadName, void* opaque);

// Log callback (for class instance)
void onLog(int level, const char* fac, const char* buf) const;

Expand All @@ -248,6 +259,10 @@ class KafkaClient
// Error callback (for class instance)
void onError(const Error& error);

// Interceptor callback (for class instance)
void interceptThreadStart(const std::string& threadName, const std::string& threadType);
void interceptThreadExit(const std::string& threadName, const std::string& threadType);

static const constexpr char* BOOTSTRAP_SERVERS = "bootstrap.servers";
static const constexpr char* CLIENT_ID = "client.id";
static const constexpr char* LOG_LEVEL = "log_level";
Expand Down Expand Up @@ -275,8 +290,9 @@ class KafkaClient
class PollThread
{
public:
explicit PollThread(Pollable& pollable)
: _running(true), _thread(keepPolling, std::ref(_running), std::ref(pollable))
using InterceptorCb = std::function<void()>;
explicit PollThread(const InterceptorCb& entryCb, const InterceptorCb& exitCb, Pollable& pollable)
: _running(true), _thread(keepPolling, std::ref(_running), entryCb, exitCb, std::ref(pollable))
{
}

Expand All @@ -288,12 +304,19 @@ class KafkaClient
}

private:
static void keepPolling(std::atomic_bool& running, Pollable& pollable)
static void keepPolling(std::atomic_bool& running,
const InterceptorCb& entryCb,
const InterceptorCb& exitCb,
Pollable& pollable)
{
entryCb();

while (running.load())
{
pollable.poll(CALLBACK_POLLING_INTERVAL_MS);
}

exitCb();
}

static constexpr int CALLBACK_POLLING_INTERVAL_MS = 10;
Expand All @@ -306,7 +329,10 @@ class KafkaClient
{
_pollable = std::make_unique<KafkaClient::PollableCallback>(pollableCallback);

if (isWithAutoEventsPolling()) _pollThread = std::make_unique<PollThread>(*_pollable);
auto entryCb = [this]() { interceptThreadStart("events-polling", "background"); };
auto exitCb = [this]() { interceptThreadExit("events-polling", "background"); };

if (isWithAutoEventsPolling()) _pollThread = std::make_unique<PollThread>(entryCb, exitCb, *_pollable);
}

void stopBackgroundPollingIfNecessary()
Expand All @@ -331,8 +357,10 @@ inline
KafkaClient::KafkaClient(ClientType clientType,
const Properties& properties,
const ConfigCallbacksRegister& extraConfigRegister,
EventsPollingOption eventsPollingOption)
: _eventsPollingOption(eventsPollingOption)
EventsPollingOption eventsPollingOption,
Interceptors interceptors)
: _eventsPollingOption(eventsPollingOption),
_interceptors(std::move(interceptors))
{
static const std::set<std::string> PRIVATE_PROPERTY_KEYS = { "max.poll.records" };

Expand Down Expand Up @@ -403,6 +431,13 @@ KafkaClient::KafkaClient(ClientType clientType,
// Other Callbacks
if (extraConfigRegister) extraConfigRegister(rk_conf.get());

// Interceptor
if (!_interceptors.empty())
{
Error result{ rd_kafka_conf_interceptor_add_on_new(rk_conf.get(), "on_new", KafkaClient::configInterceptorOnNew, nullptr) };
KAFKA_THROW_IF_WITH_ERROR(result);
}

// Set client handler
_rk.reset(rd_kafka_new((clientType == ClientType::KafkaConsumer ? RD_KAFKA_CONSUMER : RD_KAFKA_PRODUCER),
rk_conf.release(), // rk_conf's ownship would be transferred to rk, after the "rd_kafka_new()" call
Expand Down Expand Up @@ -534,6 +569,50 @@ KafkaClient::errorCallback(rd_kafka_t* rk, int err, const char* reason, void* /*
kafkaClient(rk).onError(error);
}

inline void
KafkaClient::interceptThreadStart(const std::string& threadName, const std::string& threadType)
{
if (const auto& cb = _interceptors.onThreadStart()) cb(threadName, threadType);
}

inline void
KafkaClient::interceptThreadExit(const std::string& threadName, const std::string& threadType)
{
if (const auto& cb = _interceptors.onThreadExit()) cb(threadName, threadType);
}

inline rd_kafka_resp_err_t
KafkaClient::configInterceptorOnNew(rd_kafka_t* rk, const rd_kafka_conf_t* /*conf*/, void* opaque, char* /*errStr*/, std::size_t /*maxErrStrSize*/)
{
if (auto result = rd_kafka_interceptor_add_on_thread_start(rk, "on_thread_start", KafkaClient::interceptorOnThreadStart, opaque))
{
return result;
}

if (auto result = rd_kafka_interceptor_add_on_thread_exit(rk, "on_thread_exit", KafkaClient::interceptorOnThreadExit, opaque))
{
return result;
}

return RD_KAFKA_RESP_ERR_NO_ERROR;
}

inline rd_kafka_resp_err_t
KafkaClient::interceptorOnThreadStart(rd_kafka_t* rk, rd_kafka_thread_type_t threadType, const char* threadName, void* /*opaque*/)
{
kafkaClient(rk).interceptThreadStart(threadName, toString(threadType));

return RD_KAFKA_RESP_ERR_NO_ERROR;
}

inline rd_kafka_resp_err_t
KafkaClient::interceptorOnThreadExit(rd_kafka_t* rk, rd_kafka_thread_type_t threadType, const char* threadName, void* /*opaque*/)
{
kafkaClient(rk).interceptThreadExit(threadName, toString(threadType));

return RD_KAFKA_RESP_ERR_NO_ERROR;
}

inline Optional<BrokerMetadata>
KafkaClient::fetchBrokerMetadata(const std::string& topic, std::chrono::milliseconds timeout, bool disableErrorLogging)
{
Expand Down
12 changes: 8 additions & 4 deletions include/kafka/KafkaConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ class KafkaConsumer: public KafkaClient
* - RD_KAFKA_RESP_ERR__INVALID_ARG : Invalid BOOTSTRAP_SERVERS property
* - RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE: Fail to create internal threads
*/
explicit KafkaConsumer(const Properties& properties,
EventsPollingOption eventsPollingOption = EventsPollingOption::Auto);
explicit KafkaConsumer(const Properties& properties,
EventsPollingOption eventsPollingOption = EventsPollingOption::Auto,
const Interceptors& interceptors = Interceptors{});

/**
* The destructor for KafkaConsumer.
Expand Down Expand Up @@ -373,11 +374,14 @@ KafkaConsumer::registerConfigCallbacks(rd_kafka_conf_t* conf)
}

inline
KafkaConsumer::KafkaConsumer(const Properties &properties, EventsPollingOption eventsPollingOption)
KafkaConsumer::KafkaConsumer(const Properties& properties,
EventsPollingOption eventsPollingOption,
const Interceptors& interceptors)
: KafkaClient(ClientType::KafkaConsumer,
validateAndReformProperties(properties),
registerConfigCallbacks,
eventsPollingOption)
eventsPollingOption,
interceptors)
{
// Pick up the "max.poll.records" property
if (auto maxPollRecordsProperty = properties.getProperty(consumer::Config::MAX_POLL_RECORDS))
Expand Down
12 changes: 8 additions & 4 deletions include/kafka/KafkaProducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ class KafkaProducer: public KafkaClient
* - RD_KAFKA_RESP_ERR__INVALID_ARG : Invalid BOOTSTRAP_SERVERS property
* - RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE: Fail to create internal threads
*/
explicit KafkaProducer(const Properties& properties,
EventsPollingOption eventsPollingOption = EventsPollingOption::Auto);
explicit KafkaProducer(const Properties& properties,
EventsPollingOption eventsPollingOption = EventsPollingOption::Auto,
const Interceptors& interceptors = Interceptors{});

/**
* The destructor for KafkaProducer.
Expand Down Expand Up @@ -221,11 +222,14 @@ class KafkaProducer: public KafkaClient
};

inline
KafkaProducer::KafkaProducer(const Properties& properties, EventsPollingOption eventsPollingOption)
KafkaProducer::KafkaProducer(const Properties& properties,
EventsPollingOption eventsPollingOption,
const Interceptors& interceptors)
: KafkaClient(ClientType::KafkaProducer,
validateAndReformProperties(properties),
registerConfigCallbacks,
eventsPollingOption)
eventsPollingOption,
interceptors)
{
// Start background polling (if needed)
startBackgroundPollingIfNecessary([this](int timeoutMs){ pollCallbacks(timeoutMs); });
Expand Down
18 changes: 18 additions & 0 deletions include/kafka/RdKafkaHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <librdkafka/rdkafka.h>

#include <cassert>
#include <memory>

namespace KAFKA_API {
Expand Down Expand Up @@ -48,6 +49,23 @@ using rd_kafka_consumer_group_metadata_unique_ptr = std::unique_ptr<rd_kafka_con
inline void RkErrorDeleter(rd_kafka_error_t* p) { rd_kafka_error_destroy(p); }
using rd_kafka_error_shared_ptr = std::shared_ptr<rd_kafka_error_t>;


inline std::string toString(rd_kafka_thread_type_t threadType)
{
switch (threadType)
{
case RD_KAFKA_THREAD_MAIN:
return "main";
case RD_KAFKA_THREAD_BACKGROUND:
return "background";
case RD_KAFKA_THREAD_BROKER:
return "broker";
default:
assert(false);
return "NA";
}
}

// Convert from rd_kafka_xxx datatypes
inline TopicPartitionOffsets getTopicPartitionOffsets(const rd_kafka_topic_partition_list_t* rk_tpos)
{
Expand Down
Loading

0 comments on commit 7418ab3

Please sign in to comment.