From 5dba46ba030e4294f7a7a19b69ce479a7643802a Mon Sep 17 00:00:00 2001 From: dixingxing Date: Sat, 15 Feb 2020 22:15:20 +0800 Subject: [PATCH] drainer: add KafkaClientID in DBConfig (#902) Improve configuration for kafka syncer: Add kafka-client-id in drainer.toml to config kafka client.id property. --- cmd/drainer/drainer.toml | 1 + drainer/sync/kafka.go | 4 ++++ drainer/sync/util.go | 1 + tests/binlog/drainer.toml | 1 + 4 files changed, 7 insertions(+) diff --git a/cmd/drainer/drainer.toml b/cmd/drainer/drainer.toml index bcc42c402..b52512233 100644 --- a/cmd/drainer/drainer.toml +++ b/cmd/drainer/drainer.toml @@ -137,6 +137,7 @@ port = 3306 # kafka-addrs = "127.0.0.1:9092" # kafka-version = "0.8.2.0" # kafka-max-messages = 1024 +# kafka-client-id = "tidb_binlog" # # # the topic name drainer will push msg, the default name is _obinlog diff --git a/drainer/sync/kafka.go b/drainer/sync/kafka.go index 434f7a589..1746591e8 100644 --- a/drainer/sync/kafka.go +++ b/drainer/sync/kafka.go @@ -77,6 +77,10 @@ func NewKafka(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter) (*Kafka return nil, errors.Trace(err) } + if len(cfg.KafkaClientID) > 0 { + config.ClientID = cfg.KafkaClientID + } + config.Producer.Flush.MaxMessages = cfg.KafkaMaxMessages // maintain minimal set that has been necessary so far diff --git a/drainer/sync/util.go b/drainer/sync/util.go index cfbbc233f..8afcb7842 100644 --- a/drainer/sync/util.go +++ b/drainer/sync/util.go @@ -34,6 +34,7 @@ type DBConfig struct { KafkaAddrs string `toml:"kafka-addrs" json:"kafka-addrs"` KafkaVersion string `toml:"kafka-version" json:"kafka-version"` KafkaMaxMessages int `toml:"kafka-max-messages" json:"kafka-max-messages"` + KafkaClientID string `toml:"kafka-client-id" json:"kafka-client-id"` TopicName string `toml:"topic-name" json:"topic-name"` // get it from pd ClusterID uint64 `toml:"-" json:"-"` diff --git a/tests/binlog/drainer.toml b/tests/binlog/drainer.toml index 5c751470d..89842a186 100644 --- a/tests/binlog/drainer.toml +++ b/tests/binlog/drainer.toml @@ -75,6 +75,7 @@ port = 3306 # kafka-addrs = "127.0.0.1:9092" # kafka-version = "0.8.2.0" # kafka-max-messages = 1024 +# kafka-client-id = "tidb_binlog" # # # the topic name drainer will push msg, the default name is _obinlog