Skip to content

Commit

Permalink
Improve KafkaRecoverableProducer
Browse files Browse the repository at this point in the history
  • Loading branch information
kenneth-jia committed Jan 3, 2023
1 parent 3111fe8 commit e352a20
Showing 1 changed file with 9 additions and 16 deletions.
25 changes: 9 additions & 16 deletions include/kafka/addons/KafkaRecoverableProducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@

#include <kafka/Project.h>

#include <kafka/ClientCommon.h>
#include <kafka/KafkaClient.h>
#include <kafka/KafkaProducer.h>
#include <kafka/Types.h>

#include <deque>
#include <atomic>
#include <memory>
#include <mutex>
#include <vector>
#include <thread>


namespace KAFKA_API { namespace clients { namespace producer {
Expand All @@ -20,8 +18,8 @@ class KafkaRecoverableProducer
explicit KafkaRecoverableProducer(const Properties& properties)
: _properties(properties), _running(true)
{
_properties.put("enable.manual.events.poll", "true");
_properties.put("error_cb", [this](const Error& error) { if (error.isFatal()) _fatalError = std::make_unique<Error>(error); });
_properties.put(Config::ENABLE_MANUAL_EVENTS_POLL, "true");
_properties.put(Config::ERROR_CB, [this](const Error& error) { if (error.isFatal()) _fatalError = std::make_unique<Error>(error); });

_producer = createProducer();

Expand Down Expand Up @@ -60,8 +58,8 @@ class KafkaRecoverableProducer
{
const std::lock_guard<std::mutex> lock(_producerMutex);

_logLevel = level;
_producer->setLogLevel(*_logLevel);
_properties.put(Config::LOG_LEVEL, std::to_string(level));
_producer->setLogLevel(level);
}

/**
Expand Down Expand Up @@ -295,16 +293,11 @@ class KafkaRecoverableProducer

std::unique_ptr<KafkaProducer> createProducer()
{
auto producer = std::make_unique<KafkaProducer>(_properties);

if (_logLevel) producer->setLogLevel(*_logLevel);

return producer;
return std::make_unique<KafkaProducer>(_properties);
}

// Configurations for producer
Properties _properties;
Optional<int> _logLevel;
Properties _properties;

std::unique_ptr<Error> _fatalError;

Expand Down

0 comments on commit e352a20

Please sign in to comment.