From 32c5ba5ab12f931e2d171d9afee4a070398739e6 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 8 May 2024 15:03:10 +0200 Subject: [PATCH] Use kgo.ManualPartitioner Signed-off-by: Marco Pracucci --- pkg/storage/ingest/writer.go | 27 ++------------------------- 1 file changed, 2 insertions(+), 25 deletions(-) diff --git a/pkg/storage/ingest/writer.go b/pkg/storage/ingest/writer.go index 78d1648cbaf..f3b6b292d14 100644 --- a/pkg/storage/ingest/writer.go +++ b/pkg/storage/ingest/writer.go @@ -201,8 +201,8 @@ func (w *Writer) newKafkaWriter(clientID int) (*kgo.Client, error) { kgo.RequiredAcks(kgo.AllISRAcks()), kgo.DefaultProduceTopic(w.kafkaCfg.Topic), - // Use a static partitioner because we want to be in control of the partition. - kgo.RecordPartitioner(newKafkaStaticPartitioner()), + // We set the partition field in each record. + kgo.RecordPartitioner(kgo.ManualPartitioner()), // Set the upper bounds the size of a record batch. kgo.ProducerBatchMaxBytes(16_000_000), @@ -240,26 +240,3 @@ func (w *Writer) newKafkaWriter(clientID int) (*kgo.Client, error) { ) return kgo.NewClient(opts...) } - -type kafkaStaticPartitioner struct{} - -func newKafkaStaticPartitioner() *kafkaStaticPartitioner { - return &kafkaStaticPartitioner{} -} - -// ForTopic implements kgo.Partitioner. -func (p *kafkaStaticPartitioner) ForTopic(string) kgo.TopicPartitioner { - return p -} - -// RequiresConsistency implements kgo.TopicPartitioner. -func (p *kafkaStaticPartitioner) RequiresConsistency(_ *kgo.Record) bool { - // Never let Kafka client to write the record to another partition - // if the partition is down. - return true -} - -// Partition implements kgo.TopicPartitioner. -func (p *kafkaStaticPartitioner) Partition(r *kgo.Record, _ int) int { - return int(r.Partition) -}