Skip to content

Commit

Permalink
Improve global logger
Browse files Browse the repository at this point in the history
  • Loading branch information
kenneth-jia committed Dec 7, 2022
1 parent 0d6b4fa commit 02cb4b6
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 93 deletions.
17 changes: 13 additions & 4 deletions include/kafka/ClientCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@
#include <kafka/Project.h>

#include <kafka/Error.h>
#include <kafka/RdKafkaHelper.h>
#include <kafka/Types.h>

#include <librdkafka/rdkafka.h>

#include <functional>

Expand All @@ -28,6 +24,19 @@ namespace KAFKA_API { namespace clients {
*/
using StatsCallback = std::function<void(const std::string&)>;

/**
* SASL OAUTHBEARER token info.
*/
struct SaslOauthbearerToken
{
using KeyValuePairs = std::map<std::string, std::string>;

std::string value;
std::chrono::microseconds mdLifetime{};
std::string mdPrincipalName;
KeyValuePairs extensions;
};

/**
* Callback type for OAUTHBEARER token refresh.
*/
Expand Down
77 changes: 12 additions & 65 deletions include/kafka/KafkaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include <climits>
#include <functional>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
Expand All @@ -46,15 +45,6 @@ class KafkaClient
*/
const std::string& name() const { return _clientName; }

/**
* Set a log callback for kafka clients, which do not have a client specific logging callback configured (see `setLogCallback`).
*/
static void setGlobalLogger(Logger logger = NullLogger)
{
std::call_once(Global<>::initOnce, [](){}); // Then no need to init within KafkaClient constructor
Global<>::logger = std::move(logger);
}

/**
* Set log level for the kafka client (the default value: 5).
*/
Expand Down Expand Up @@ -90,12 +80,11 @@ class KafkaClient
template<class ...Args>
void doLog(int level, const char* filename, int lineno, const char* format, Args... args) const
{
const auto& logger = _logCb ? _logCb : Global<>::logger;
if (level >= 0 && level <= _logLevel && logger)
if (level >= 0 && level <= _logLevel && _logCb)
{
LogBuffer<LOG_BUFFER_SIZE> logBuffer;
logBuffer.print("%s ", name().c_str()).print(format, args...);
logger(level, filename, lineno, logBuffer.c_str());
_logCb(level, filename, lineno, logBuffer.c_str());
}
}

Expand All @@ -106,28 +95,6 @@ class KafkaClient

#define KAFKA_API_DO_LOG(lvl, ...) doLog(lvl, __FILE__, __LINE__, ##__VA_ARGS__)

template<class ...Args>
static void doGlobalLog(int level, const char* filename, int lineno, const char* format, Args... args)
{
if (!Global<>::logger) return;

LogBuffer<LOG_BUFFER_SIZE> logBuffer;
logBuffer.print(format, args...);
Global<>::logger(level, filename, lineno, logBuffer.c_str());
}
static void doGlobalLog(int level, const char* filename, int lineno, const char* msg)
{
doGlobalLog(level, filename, lineno, "%s", msg);
}

/**
* Log for kafka clients, with the callback which `setGlobalLogger` assigned.
*
* E.g,
* KAFKA_API_LOG(Log::Level::Err, "something wrong happened! %s", detailedInfo.c_str());
*/
#define KAFKA_API_LOG(lvl, ...) KafkaClient::doGlobalLog(lvl, __FILE__, __LINE__, ##__VA_ARGS__)

#if COMPILER_SUPPORTS_CPP_17
static constexpr int DEFAULT_METADATA_TIMEOUT_MS = 10000;
#else
Expand Down Expand Up @@ -169,14 +136,6 @@ class KafkaClient
// Buffer size for single line logging
static const constexpr int LOG_BUFFER_SIZE = 1024;

// Global logger
template <typename T = void>
struct Global
{
static Logger logger;
static std::once_flag initOnce;
};

// Validate properties (and fix it if necesary)
static Properties validateAndReformProperties(const Properties& properties);

Expand All @@ -197,8 +156,8 @@ class KafkaClient
std::string _clientName;

std::atomic<int> _logLevel = {Log::Level::Notice};
Logger _logCb;

LogCallback _logCb = DefaultLogger;
StatsCallback _statsCb;
ErrorCallback _errorCb;
OauthbearerTokenRefreshCallback _oauthbearerTokenRefreshCb;
Expand Down Expand Up @@ -247,10 +206,6 @@ class KafkaClient
void interceptThreadStart(const std::string& threadName, const std::string& threadType);
void interceptThreadExit(const std::string& threadName, const std::string& threadType);

static const constexpr char* BOOTSTRAP_SERVERS = "bootstrap.servers";
static const constexpr char* CLIENT_ID = "client.id";
static const constexpr char* LOG_LEVEL = "log_level";

protected:
struct Pollable
{
Expand Down Expand Up @@ -331,11 +286,6 @@ class KafkaClient
std::unique_ptr<PollThread> _pollThread;
};

template <typename T>
Logger KafkaClient::Global<T>::logger;

template <typename T>
std::once_flag KafkaClient::Global<T>::initOnce;

inline
KafkaClient::KafkaClient(ClientType clientType,
Expand All @@ -345,23 +295,20 @@ KafkaClient::KafkaClient(ClientType clientType,
static const std::set<std::string> PRIVATE_PROPERTY_KEYS = { "max.poll.records" };

// Save clientID
if (auto clientId = properties.getProperty(CLIENT_ID))
if (auto clientId = properties.getProperty(Config::CLIENT_ID))
{
_clientId = *clientId;
_clientName = getClientTypeString(clientType) + "[" + _clientId + "]";
}

// Init global logger
std::call_once(Global<>::initOnce, [](){ Global<>::logger = DefaultLogger; });

// Log Callback
if (properties.contains("log_cb"))
{
setLogCallback(properties.get<LogCallback>("log_cb"));
}

// Save LogLevel
if (auto logLevel = properties.getProperty(LOG_LEVEL))
if (auto logLevel = properties.getProperty(Config::LOG_LEVEL))
{
try
{
Expand Down Expand Up @@ -480,7 +427,7 @@ KafkaClient::KafkaClient(ClientType clientType,
KAFKA_THROW_IF_WITH_ERROR(Error(rd_kafka_last_error()));

// Add brokers
auto brokers = properties.getProperty(BOOTSTRAP_SERVERS);
auto brokers = properties.getProperty(Config::BOOTSTRAP_SERVERS);
if (!brokers || rd_kafka_brokers_add(getClientHandle(), brokers->c_str()) == 0)
{
KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG,\
Expand All @@ -496,22 +443,22 @@ KafkaClient::validateAndReformProperties(const Properties& properties)
auto newProperties = properties;

// BOOTSTRAP_SERVERS property is mandatory
if (!newProperties.getProperty(BOOTSTRAP_SERVERS))
if (!newProperties.getProperty(Config::BOOTSTRAP_SERVERS))
{
KAFKA_THROW_ERROR(Error(RD_KAFKA_RESP_ERR__INVALID_ARG,\
"Validation failed! With no property [" + std::string(BOOTSTRAP_SERVERS) + "]"));
"Validation failed! With no property [" + std::string(Config::BOOTSTRAP_SERVERS) + "]"));
}

// If no "client.id" configured, generate a random one for user
if (!newProperties.getProperty(CLIENT_ID))
if (!newProperties.getProperty(Config::CLIENT_ID))
{
newProperties.put(CLIENT_ID, utility::getRandomString());
newProperties.put(Config::CLIENT_ID, utility::getRandomString());
}

// If no "log_level" configured, use Log::Level::Notice as default
if (!newProperties.getProperty(LOG_LEVEL))
if (!newProperties.getProperty(Config::LOG_LEVEL))
{
newProperties.put(LOG_LEVEL, std::to_string(static_cast<int>(Log::Level::Notice)));
newProperties.put(Config::LOG_LEVEL, std::to_string(static_cast<int>(Log::Level::Notice)));
}

return newProperties;
Expand Down
54 changes: 53 additions & 1 deletion include/kafka/Log.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <cassert>
#include <functional>
#include <iostream>
#include <mutex>


namespace KAFKA_API {
Expand Down Expand Up @@ -36,6 +37,8 @@ struct Log
}
};


// Log Buffer
template <std::size_t MAX_CAPACITY>
class LogBuffer
{
Expand Down Expand Up @@ -72,17 +75,66 @@ class LogBuffer
char* _wptr;
};

using Logger = std::function<void(int, const char*, int, const char* msg)>;

// Default Logger
inline void DefaultLogger(int level, const char* /*filename*/, int /*lineno*/, const char* msg)
{
std::cout << "[" << utility::getCurrentTime() << "]" << Log::levelString(static_cast<std::size_t>(level)) << " " << msg;
std::cout << std::endl;
}

// Null Logger
inline void NullLogger(int /*level*/, const char* /*filename*/, int /*lineno*/, const char* /*msg*/)
{
}


// Global Logger
template <typename T = void>
struct GlobalLogger
{
static clients::LogCallback logCb;
static std::once_flag initOnce;

static const constexpr int LOG_BUFFER_SIZE = 1024;

template<class ...Args>
static void doLog(int level, const char* filename, int lineno, const char* format, Args... args)
{
if (!GlobalLogger<>::logCb) return;

LogBuffer<LOG_BUFFER_SIZE> logBuffer;
logBuffer.print(format, args...);
GlobalLogger<>::logCb(level, filename, lineno, logBuffer.c_str());
}
};

template <typename T>
clients::LogCallback GlobalLogger<T>::logCb;

template <typename T>
std::once_flag GlobalLogger<T>::initOnce;

/**
* Set a global log interface for kafka API (Note: it takes no effect on Kafka clients).
*/
inline void setGlobalLogger(clients::LogCallback cb)
{
std::call_once(GlobalLogger<>::initOnce, [](){}); // Then no need to init within the first KAFKA_API_LOG call.
GlobalLogger<>::logCb = std::move(cb);
}

/**
* Log for kafka API (Note: not for Kafka client instances).
*
* E.g,
* KAFKA_API_LOG(Log::Level::Err, "something wrong happened! %s", detailedInfo.c_str());
*/
#define KAFKA_API_LOG(level, ...) do { \
std::call_once(GlobalLogger<>::initOnce, [](){ GlobalLogger<>::logCb = DefaultLogger; }); \
GlobalLogger<>::doLog(level, __FILE__, __LINE__, ##__VA_ARGS__); \
} while (0)


} // end of KAFKA_API

15 changes: 0 additions & 15 deletions include/kafka/Types.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,20 +199,5 @@ inline std::string toString(const TopicPartitionOffsets& tpos)
return ret;
}


/**
* SASL OAUTHBEARER token info.
*/
struct SaslOauthbearerToken
{
using KeyValuePairs = std::map<std::string, std::string>;

std::string value;
std::chrono::microseconds mdLifetime{};
std::string mdPrincipalName;
KeyValuePairs extensions;
};


} // end of KAFKA_API

8 changes: 4 additions & 4 deletions tools/KafkaConsoleConsumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ void RunConsumer(const std::string& topic, const kafka::clients::Config& props)
using namespace kafka::clients;
using namespace kafka::clients::consumer;

// Create a manual-commit consumer
KafkaClient::setGlobalLogger(kafka::Logger());
// Create a auto-commit consumer
KafkaConsumer consumer(props);

// Subscribe to topic
Expand Down Expand Up @@ -142,9 +141,8 @@ int main (int argc, char **argv)
// Use Ctrl-C to terminate the program
signal(SIGINT, stopRunning); // NOLINT

// Prepare consumer properties
//
using namespace kafka::clients;
// Prepare consumer properties
Config props;
props.put(Config::BOOTSTRAP_SERVERS, boost::algorithm::join(args->brokerList, ","));
// Get client id
Expand All @@ -156,6 +154,8 @@ int main (int argc, char **argv)
{
props.put(prop.first, prop.second);
}
// Disable logging
props.put(Config::LOG_CB, kafka::NullLogger);

// Start consumer
try
Expand Down
9 changes: 5 additions & 4 deletions tools/KafkaConsoleProducer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,10 @@ int main (int argc, char **argv)
{
props.put(prop.first, prop.second);
}
// Disable logging
props.put(Config::LOG_CB, kafka::NullLogger);

// Create a sync-send producer
KafkaClient::setGlobalLogger(kafka::Logger());
// Create a producer
KafkaProducer producer(props);

auto startPromptLine = []() { std::cout << "> "; };
Expand Down Expand Up @@ -131,9 +132,9 @@ int main (int argc, char **argv)
startPromptLine();
}
}
catch (const std::exception& e)
catch (const kafka::KafkaException& e)
{
std::cout << e.what() << std::endl;
std::cerr << "Exception thrown by producer: " << e.what() << std::endl;
return EXIT_FAILURE;
}

Expand Down

0 comments on commit 02cb4b6

Please sign in to comment.