-
Notifications
You must be signed in to change notification settings - Fork 524
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reduce the number of Kafka clients used by producers #8088
Conversation
Signed-off-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Marco Pracucci <marco@pracucci.com>
pkg/storage/ingest/writer.go
Outdated
@@ -195,7 +202,7 @@ func (w *Writer) newKafkaWriter(partitionID int32) (*kgo.Client, error) { | |||
kgo.DefaultProduceTopic(w.kafkaCfg.Topic), | |||
|
|||
// Use a static partitioner because we want to be in control of the partition. | |||
kgo.RecordPartitioner(newKafkaStaticPartitioner(int(partitionID))), | |||
kgo.RecordPartitioner(newKafkaStaticPartitioner()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kgo has ManualPartitioner()
which does the same as the newKafkaStaticPartitioner()
now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're absolute right. Done in 32c5ba5.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR looks good. I think you may be able to remove kafkaStaticPartitioner
Signed-off-by: Marco Pracucci <marco@pracucci.com>
Done, thanks! |
Signed-off-by: Marco Pracucci <marco@pracucci.com>
What this PR does
Currently we run 1 Kafka client per partition. This means that if we have 100 partitions, 100 distributors and a metadata refresh interval of 10s, distributors will issue
100 * 100 / 10 = 1000 req/s
to update cluster metadata. The cluster metadata is an expensive operation, and can put extra pressure to Kafka. The current approach doesn't scale.In this PR I propose to run 1 Kafka client per producer by default, but allow to increase it via a new config option. When > 1 clients are configured, the
Writer
will simply shard partitions between clients.Which issue(s) this PR fixes or relates to
N/A
Checklist
CHANGELOG.md
updated - the order of entries should be[CHANGE]
,[FEATURE]
,[ENHANCEMENT]
,[BUGFIX]
.about-versioning.md
updated with experimental features.