Skip to content

Commit

Permalink
Use kgo.ManualPartitioner
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci committed May 8, 2024
1 parent f2efb15 commit 32c5ba5
Showing 1 changed file with 2 additions and 25 deletions.
27 changes: 2 additions & 25 deletions pkg/storage/ingest/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ func (w *Writer) newKafkaWriter(clientID int) (*kgo.Client, error) {
kgo.RequiredAcks(kgo.AllISRAcks()),
kgo.DefaultProduceTopic(w.kafkaCfg.Topic),

// Use a static partitioner because we want to be in control of the partition.
kgo.RecordPartitioner(newKafkaStaticPartitioner()),
// We set the partition field in each record.
kgo.RecordPartitioner(kgo.ManualPartitioner()),

// Set the upper bounds the size of a record batch.
kgo.ProducerBatchMaxBytes(16_000_000),
Expand Down Expand Up @@ -240,26 +240,3 @@ func (w *Writer) newKafkaWriter(clientID int) (*kgo.Client, error) {
)
return kgo.NewClient(opts...)
}

type kafkaStaticPartitioner struct{}

func newKafkaStaticPartitioner() *kafkaStaticPartitioner {
return &kafkaStaticPartitioner{}
}

// ForTopic implements kgo.Partitioner.
func (p *kafkaStaticPartitioner) ForTopic(string) kgo.TopicPartitioner {
return p
}

// RequiresConsistency implements kgo.TopicPartitioner.
func (p *kafkaStaticPartitioner) RequiresConsistency(_ *kgo.Record) bool {
// Never let Kafka client to write the record to another partition
// if the partition is down.
return true
}

// Partition implements kgo.TopicPartitioner.
func (p *kafkaStaticPartitioner) Partition(r *kgo.Record, _ int) int {
return int(r.Partition)
}

0 comments on commit 32c5ba5

Please sign in to comment.