Skip to content

Commit

Permalink
Modify default transactional timeout for KafkaProducer
Browse files Browse the repository at this point in the history
  • Loading branch information
kenneth-jia committed Sep 19, 2022
1 parent 66536d6 commit 7cb90cc
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 23 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/kafka_api_bazel_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
env:
KAFKA_SRC_LINK: https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz
CPU_CORE_NUM: 2
LIBRDKAFKA_TAG: v1.9.0
LIBRDKAFKA_TAG: v1.9.2

jobs:
kafka-api-bazel-build:
Expand Down
5 changes: 3 additions & 2 deletions .github/workflows/kafka_api_ci_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
env:
KAFKA_SRC_LINK: https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz
CPU_CORE_NUM: 2
LIBRDKAFKA_TAG: v1.9.0
LIBRDKAFKA_TAG: v1.9.2
BUILD_SUB_DIR: builds/sub-build

jobs:
Expand Down Expand Up @@ -310,7 +310,8 @@ jobs:
$Env:GTEST_ROOT='C:\VCPKG\INSTALLED\x86-windows\'
$Env:BOOST_ROOT='C:\VCPKG\INSTALLED\x86-windows\'
$Env:LIBRDKAFKA_ROOT='C:\VCPKG\INSTALLED\x86-windows\'
$Env:LIBRDKAFKA_INCLUDE_DIR='C:\VCPKG\INSTALLED\x86-windows\include\'
$Env:LIBRDKAFKA_LIBRARY_DIR='C:\VCPKG\INSTALLED\x86-windows\lib\'
$Env:RAPIDJSON_INCLUDE_DIRS='C:\VCPKG\INSTALLED\x86-windows\include\'
cmake -B ./ -A Win32 -S ../.. "-DCMAKE_TOOLCHAIN_FILE=C:/vcpkg/scripts/buildsystems/vcpkg.cmake"
Expand Down
15 changes: 9 additions & 6 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,27 @@ if (NOT BUILD_OPTION_DOC_ONLY)
#---------------------------
# librdkafka library
#---------------------------
if (DEFINED ENV{LIBRDKAFKA_ROOT})
set(LIBRDKAFKA_INCLUDE_DIR $ENV{LIBRDKAFKA_ROOT}/include)
set(LIBRDKAFKA_LIBRARY_DIR $ENV{LIBRDKAFKA_ROOT}/lib)
if (DEFINED ENV{LIBRDKAFKA_INCLUDE_DIR})
set(LIBRDKAFKA_INCLUDE_DIR $ENV{LIBRDKAFKA_INCLUDE_DIR})
else ()
set(LIBRDKAFKA_INCLUDE_DIR /usr/local/include)
endif ()
if (DEFINED ENV{LIBRDKAFKA_LIBRARY_DIR})
set(LIBRDKAFKA_LIBRARY_DIR $ENV{LIBRDKAFKA_LIBRARY_DIR})
else ()
set(LIBRDKAFKA_LIBRARY_DIR /usr/local/lib)
endif ()

if (EXISTS "${LIBRDKAFKA_INCLUDE_DIR}/librdkafka/rdkafka.h")
message(STATUS "librdkafka include directory: ${LIBRDKAFKA_INCLUDE_DIR}")
else ()
message(FATAL_ERROR "Could not find headers: librdkafka!")
message(FATAL_ERROR "Could not find headers for librdkafka!")
endif ()

if (EXISTS "${LIBRDKAFKA_LIBRARY_DIR}/librdkafka.a" OR EXISTS "${LIBRDKAFKA_LIBRARY_DIR}/librdkafka.so" OR EXISTS "${LIBRDKAFKA_LIBRARY_DIR}/rdkafka.lib" )
message(STATUS "librdkafka library directory: ${LIBRDKAFKA_LIBRARY_DIR}")
else ()
message(FATAL_ERROR "Could not find library: librdkafka!")
message(FATAL_ERROR "Could not find library for librdkafka!")
endif ()


Expand All @@ -68,7 +71,7 @@ if (NOT BUILD_OPTION_DOC_ONLY)
if (PTHREAD_LIB)
message(STATUS "pthread library: ${PTHREAD_LIB}")
else ()
message(FATAL_ERROR "Could not find library: pthread!")
message(FATAL_ERROR "Could not find library for pthread!")
endif ()
endif ()

Expand Down
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

The [Modern C++ Kafka API](http://opensource.morganstanley.com/modern-cpp-kafka/doxygen/annotated.html) is a layer of C++ wrapper based on [librdkafka](https://github.com/edenhill/librdkafka) (the C part), with high quality, but more friendly to users.

- By now, [modern-cpp-kafka](https://github.com/morganstanley/modern-cpp-kafka) is compatible with [librdkafka v1.9.0](https://github.com/edenhill/librdkafka/releases/tag/v1.9.0).
- By now, [modern-cpp-kafka](https://github.com/morganstanley/modern-cpp-kafka) is compatible with [librdkafka v1.9.2](https://github.com/edenhill/librdkafka/releases/tag/v1.9.2).

```
KAFKA is a registered trademark of The Apache Software Foundation and
Expand Down Expand Up @@ -65,7 +65,9 @@ Eventually, we worked out the ***modern-cpp-kafka***, -- a header-only library t

* Specify library locations with environment variables

* `LIBRDKAFKA_ROOT` -- ***librdkafka*** headers and libraries
* `LIBRDKAFKA_INCLUDE_DIR` -- ***librdkafka*** headers

* `LIBRDKAFKA_LIBRARY_DIR` -- ***librdkafka*** libraries

* `GTEST_ROOT` -- ***googletest*** headers and libraries

Expand Down
12 changes: 2 additions & 10 deletions include/kafka/KafkaProducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class KafkaProducer: public KafkaClient
/**
* Needs to be called before any other methods when the transactional.id is set in the configuration.
*/
void initTransactions(std::chrono::milliseconds timeout = std::chrono::milliseconds(KafkaProducer::DEFAULT_INIT_TRANSACTIONS_TIMEOUT_MS));
void initTransactions(std::chrono::milliseconds timeout = std::chrono::milliseconds::max());

/**
* Should be called before the start of each new transaction.
Expand All @@ -151,7 +151,7 @@ class KafkaProducer: public KafkaClient
/**
* Commit the ongoing transaction.
*/
void commitTransaction(std::chrono::milliseconds timeout = std::chrono::milliseconds(KafkaProducer::DEFAULT_COMMIT_TRANSACTION_TIMEOUT_MS));
void commitTransaction(std::chrono::milliseconds timeout = std::chrono::milliseconds::max());

/**
* Abort the ongoing transaction.
Expand All @@ -166,14 +166,6 @@ class KafkaProducer: public KafkaClient
const consumer::ConsumerGroupMetadata& groupMetadata,
std::chrono::milliseconds timeout);

#if COMPILER_SUPPORTS_CPP_17
static constexpr int DEFAULT_INIT_TRANSACTIONS_TIMEOUT_MS = 10000;
static constexpr int DEFAULT_COMMIT_TRANSACTION_TIMEOUT_MS = 10000;
#else
enum { DEFAULT_INIT_TRANSACTIONS_TIMEOUT_MS = 10000 };
enum { DEFAULT_COMMIT_TRANSACTION_TIMEOUT_MS = 10000 };
#endif

private:
void pollCallbacks(int timeoutMs)
{
Expand Down
4 changes: 2 additions & 2 deletions include/kafka/addons/KafkaRecoverableProducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ class KafkaRecoverableProducer
/**
* Needs to be called before any other methods when the transactional.id is set in the configuration.
*/
void initTransactions(std::chrono::milliseconds timeout = std::chrono::milliseconds(KafkaProducer::DEFAULT_INIT_TRANSACTIONS_TIMEOUT_MS))
void initTransactions(std::chrono::milliseconds timeout = std::chrono::milliseconds::max())
{
std::lock_guard<std::mutex> lock(_producerMutex);

Expand All @@ -267,7 +267,7 @@ class KafkaRecoverableProducer
/**
* Commit the ongoing transaction.
*/
void commitTransaction(std::chrono::milliseconds timeout = std::chrono::milliseconds(KafkaProducer::DEFAULT_COMMIT_TRANSACTION_TIMEOUT_MS))
void commitTransaction(std::chrono::milliseconds timeout = std::chrono::milliseconds::max())
{
std::lock_guard<std::mutex> lock(_producerMutex);

Expand Down

0 comments on commit 7cb90cc

Please sign in to comment.