Skip to content

Commit

Permalink
Remove KafkaConsumer::poll(std::chrono::milliseconds timeout, std::ve…
Browse files Browse the repository at this point in the history
…ctor<consumer::ConsumerRecord>& output)
  • Loading branch information
kenneth-jia committed Apr 19, 2023
1 parent c5b8ff0 commit 8a23891
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 36 deletions.
2 changes: 2 additions & 0 deletions .clang-tidy
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ Checks: "*,\
-cppcoreguidelines-macro-usage,\
-cppcoreguidelines-avoid-magic-numbers,\
-cppcoreguidelines-avoid-non-const-global-variables,\
-cppcoreguidelines-avoid-const-or-ref-data-members,\
-cppcoreguidelines-avoid-do-while,\
-cppcoreguidelines-pro-type-vararg,\
-cppcoreguidelines-pro-bounds-array-to-pointer-decay,\
-cppcoreguidelines-pro-bounds-pointer-arithmetic,\
Expand Down
43 changes: 7 additions & 36 deletions include/kafka/KafkaConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,16 +213,6 @@ class KafkaConsumer: public KafkaClient
*/
std::vector<consumer::ConsumerRecord> poll(std::chrono::milliseconds timeout);

/**
* Fetch data for the topics or partitions specified using one of the subscribe/assign APIs.
* Returns the number of polled records (which have been saved into parameter `output`).
* Note: 1) The result could be fetched through ConsumerRecord (with member function `error`).
* 2) Make sure the `ConsumerRecord` be destructed before the `KafkaConsumer.close()`.
* Throws KafkaException with errors:
* - RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION: Unknow partition
*/
std::size_t poll(std::chrono::milliseconds timeout, std::vector<consumer::ConsumerRecord>& output);

/**
* Suspend fetching from the requested partitions. Future calls to poll() will not return any records from these partitions until they have been resumed using resume().
* Note: 1) After pausing, the application still need to call `poll()` at regular intervals.
Expand Down Expand Up @@ -320,8 +310,6 @@ class KafkaConsumer: public KafkaClient
// Register Callbacks for rd_kafka_conf_t
static void registerConfigCallbacks(rd_kafka_conf_t* conf);

void pollMessages(int timeoutMs, std::vector<consumer::ConsumerRecord>& output);

enum class PauseOrResumeOperation { Pause, Resume };
void pauseOrResumePartitions(const TopicPartitions& topicPartitions, PauseOrResumeOperation op);

Expand Down Expand Up @@ -820,45 +808,28 @@ KafkaConsumer::storeOffsetsIfNecessary(const std::vector<consumer::ConsumerRecor
}
}

// Fetch messages (internally used)
inline void
KafkaConsumer::pollMessages(int timeoutMs, std::vector<consumer::ConsumerRecord>& output)
// Fetch messages
inline std::vector<consumer::ConsumerRecord>
KafkaConsumer::poll(std::chrono::milliseconds timeout)
{
// Commit the offsets for these messages which had been polled last time (for "enable.auto.commit=true" case)
commitStoredOffsetsIfNecessary(CommitType::Async);

// Poll messages with librdkafka's API
std::vector<rd_kafka_message_t*> msgPtrArray(_maxPollRecords);
auto msgReceived = rd_kafka_consume_batch_queue(_rk_queue.get(), timeoutMs, msgPtrArray.data(), _maxPollRecords);
auto msgReceived = rd_kafka_consume_batch_queue(_rk_queue.get(), convertMsDurationToInt(timeout), msgPtrArray.data(), _maxPollRecords);
if (msgReceived < 0)
{
KAFKA_THROW_ERROR(Error(rd_kafka_last_error()));
}

// Wrap messages with ConsumerRecord
output.clear();
output.reserve(static_cast<std::size_t>(msgReceived));
std::for_each(msgPtrArray.begin(), msgPtrArray.begin() + msgReceived, [&output](rd_kafka_message_t* rkMsg) { output.emplace_back(rkMsg); });
std::vector<consumer::ConsumerRecord> records(msgPtrArray.begin(), msgPtrArray.begin() + msgReceived);

// Store the offsets for all these polled messages (for "enable.auto.commit=true" case)
storeOffsetsIfNecessary(output);
}

// Fetch messages (return via return value)
inline std::vector<consumer::ConsumerRecord>
KafkaConsumer::poll(std::chrono::milliseconds timeout)
{
std::vector<consumer::ConsumerRecord> result;
poll(timeout, result);
return result;
}
storeOffsetsIfNecessary(records);

// Fetch messages (return via input parameter)
inline std::size_t
KafkaConsumer::poll(std::chrono::milliseconds timeout, std::vector<consumer::ConsumerRecord>& output)
{
pollMessages(convertMsDurationToInt(timeout), output);
return output.size();
return records;
}

inline void
Expand Down

0 comments on commit 8a23891

Please sign in to comment.