-
Notifications
You must be signed in to change notification settings - Fork 86
/
TestTransaction.cc
78 lines (59 loc) · 3.71 KB
/
TestTransaction.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#include "../utils/TestUtility.h"
#include "kafka/KafkaConsumer.h"
#include "kafka/KafkaProducer.h"
#include "gtest/gtest.h"
#include <chrono>
TEST(Transaction, DeliveryFailure)
{
const kafka::Topic topic = kafka::utility::getRandomString();
const std::string transactionId = kafka::utility::getRandomString();
const std::string messageToSent = "message to sent";
const std::size_t numMessages = 10;
KafkaTestUtility::CreateKafkaTopic(topic, 5, 3);
{
auto record = kafka::clients::producer::ProducerRecord(topic, kafka::NullKey, kafka::Value(messageToSent.c_str(), messageToSent.size()));
kafka::clients::producer::KafkaProducer producer(KafkaTestUtility::GetKafkaClientCommonConfig()
.put(kafka::clients::producer::ProducerConfig::MESSAGE_TIMEOUT_MS, "3000") // The delivery would fail in a short timeout
.put(kafka::clients::producer::ProducerConfig::TRANSACTIONAL_ID, transactionId)
.put(kafka::clients::Config::ERROR_CB, KafkaTestUtility::DumpError));
std::cout << "[" << kafka::utility::getCurrentTime() << "] Producer created." << std::endl;
producer.initTransactions();
std::cout << "[" << kafka::utility::getCurrentTime() << "] Producer initialized the transaction." << std::endl;
producer.beginTransaction();
std::cout << "[" << kafka::utility::getCurrentTime() << "] Producer began the transaction." << std::endl;
// Pause the brokers for a while
auto asyncTask = KafkaTestUtility::PauseBrokersForAWhile(std::chrono::seconds(10));
// Send messages
for (std::size_t i = 0; i < numMessages; ++i)
{
producer.send(record,
[](const kafka::clients::producer::RecordMetadata& metadata, const kafka::Error& error) {
std::cout << "[" << kafka::utility::getCurrentTime() << "] Producer got the delivery result: " << error.message()
<< ", with metadata: " << metadata.toString() << std::endl;
});
std::cout << "[" << kafka::utility::getCurrentTime() << "] Producer async-sent the message: " << record.toString() << std::endl;
}
// Will fail to commit and abortTransaction
try
{
std::cout << "[" << kafka::utility::getCurrentTime() << "] Producer will commit the transaction" << record.toString() << std::endl;
producer.commitTransaction();
}
catch (const kafka::KafkaException& e)
{
std::cout << "[" << kafka::utility::getCurrentTime() << "] Exception caught: " << e.what() << std::endl;
EXPECT_EQ(RD_KAFKA_RESP_ERR__INCONSISTENT, e.error().value());
std::cout << "[" << kafka::utility::getCurrentTime() << "] Producer will abort the transaction" << record.toString() << std::endl;
producer.abortTransaction();
}
}
// Check all received messages (incluing uncommitted)
{
kafka::clients::consumer::KafkaConsumer consumer(KafkaTestUtility::GetKafkaClientCommonConfig()
.put(kafka::clients::consumer::ConsumerConfig::AUTO_OFFSET_RESET, "earliest")
.put(kafka::clients::consumer::ConsumerConfig::ISOLATION_LEVEL, "read_uncommitted"));
consumer.subscribe({topic});
auto records = KafkaTestUtility::ConsumeMessagesUntilTimeout(consumer, std::chrono::seconds(1));
EXPECT_EQ(0, records.size());
}
}