Skip to content

Commit

Permalink
Change kafka producer client config (#2030)
Browse files Browse the repository at this point in the history
(cherry picked from commit 44054d4)
  • Loading branch information
jerryfan01234 authored and mergify[bot] committed Aug 7, 2024
1 parent d72b224 commit f50316a
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 2 deletions.
5 changes: 3 additions & 2 deletions protocol/indexer/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ type IndexerFlags struct {

// List of default values
const (
DefaultMaxRetries = 3
DefaultMaxRetries = 20
)

// List of CLI flags
const (
FlagKafkaConnStr = "indexer-kafka-conn-str"
FlagKafkaConnStr = "indexer-kafka-conn-str"
// max retry should be set so that max retry * retry backoff > Zookeeper session.timeout + some buffer
FlagKafkaMaxRetry = "indexer-kafka-max-retry"
FlagSendOffchainData = "indexer-send-offchain-data"
MsgSenderInstanceForTest = "msgsender-instance-for-test"
Expand Down
3 changes: 3 additions & 0 deletions protocol/indexer/msgsender/msgsender_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ func NewIndexerMessageSenderKafka(
config.Producer.Return.Errors = true
config.Producer.Return.Successes = true
config.Producer.Retry.Max = indexerFlags.MaxRetries
// max retry should be set so that max retry * retry backoff > Zookeeper session.timeout + some buffer
config.Producer.Retry.Backoff = 1000 * time.Millisecond
config.Producer.MaxMessageBytes = 4194304 // 4MB
config.Producer.RequiredAcks = sarama.WaitForAll
// Use the JVM compatible parititoner to match `kafkajs` which is used in the indexer services.
config.Producer.Partitioner = kafkautil.NewJVMCompatiblePartitioner
producer, err := sarama.NewAsyncProducer(indexerFlags.KafkaAddrs, config)
Expand Down

0 comments on commit f50316a

Please sign in to comment.