Skip to content

Commit

Permalink
Allow to configure Kafka OffsetCommit interval (#8135)
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci authored May 14, 2024
1 parent faf2b84 commit 9c7b085
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 38 deletions.
4 changes: 3 additions & 1 deletion pkg/storage/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ type KafkaConfig struct {
WriteTimeout time.Duration `yaml:"write_timeout"`
WriteClients int `yaml:"write_clients"`

ConsumerGroup string `yaml:"consumer_group"`
ConsumerGroup string `yaml:"consumer_group"`
ConsumerGroupOffsetCommitInterval time.Duration `yaml:"consumer_group_offset_commit_interval"`

LastProducedOffsetPollInterval time.Duration `yaml:"last_produced_offset_poll_interval"`
LastProducedOffsetRetryTimeout time.Duration `yaml:"last_produced_offset_retry_timeout"`
Expand All @@ -90,6 +91,7 @@ func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
f.IntVar(&cfg.WriteClients, prefix+".write-clients", 1, "The number of Kafka clients used by producers. When the configured number of clients is greater than 1, partitions are sharded among Kafka clients. An higher number of clients may provide higher write throughput at the cost of additional Metadata requests pressure to Kafka.")

f.StringVar(&cfg.ConsumerGroup, prefix+".consumer-group", "", "The consumer group used by the consumer to track the last consumed offset. The consumer group must be different for each ingester. If the configured consumer group contains the '<partition>' placeholder, it will be replaced with the actual partition ID owned by the ingester. When empty (recommended), Mimir will use the ingester instance ID to guarantee uniqueness.")
f.DurationVar(&cfg.ConsumerGroupOffsetCommitInterval, prefix+".consumer-group-offset-commit-interval", time.Second, "How frequently a consumer should commit the consumed offset to Kafka. The last committed offset is used at startup to continue the consumption from where it was left.")

f.DurationVar(&cfg.LastProducedOffsetPollInterval, prefix+".last-produced-offset-poll-interval", time.Second, "How frequently to poll the last produced offset, used to enforce strong read consistency.")
f.DurationVar(&cfg.LastProducedOffsetRetryTimeout, prefix+".last-produced-offset-retry-timeout", 10*time.Second, "How long to retry a failed request to get the last produced offset.")
Expand Down
30 changes: 13 additions & 17 deletions pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ type PartitionReader struct {
consumer recordConsumer
metrics readerMetrics

committer *partitionCommitter
commitInterval time.Duration
committer *partitionCommitter

// consumedOffsetWatcher is used to wait until a given offset has been consumed.
// This gets initialised with -1 which means nothing has been consumed from the partition yet.
Expand All @@ -83,7 +82,6 @@ func newPartitionReader(kafkaCfg KafkaConfig, partitionID int32, instanceID stri
consumer: consumer,
consumerGroup: kafkaCfg.GetConsumerGroup(instanceID, partitionID),
metrics: newReaderMetrics(partitionID, reg),
commitInterval: time.Second,
consumedOffsetWatcher: newPartitionOffsetWatcher(),
logger: log.With(logger, "partition", partitionID),
reg: reg,
Expand Down Expand Up @@ -119,7 +117,7 @@ func (r *PartitionReader) start(ctx context.Context) (returnErr error) {
if err != nil {
return errors.Wrap(err, "creating kafka reader client")
}
r.committer = newPartitionCommitter(r.kafkaCfg, kadm.NewClient(r.client), r.partitionID, r.consumerGroup, r.commitInterval, r.logger, r.reg)
r.committer = newPartitionCommitter(r.kafkaCfg, kadm.NewClient(r.client), r.partitionID, r.consumerGroup, r.logger, r.reg)

r.offsetReader = newPartitionOffsetReader(r.client, r.kafkaCfg.Topic, r.partitionID, r.kafkaCfg.LastProducedOffsetPollInterval, r.reg, r.logger)

Expand Down Expand Up @@ -576,10 +574,9 @@ func (r *PartitionReader) WaitReadConsistency(ctx context.Context) (returnErr er
type partitionCommitter struct {
services.Service

kafkaCfg KafkaConfig
commitInterval time.Duration
partitionID int32
consumerGroup string
kafkaCfg KafkaConfig
partitionID int32
consumerGroup string

toCommit *atomic.Int64
admClient *kadm.Client
Expand All @@ -593,15 +590,14 @@ type partitionCommitter struct {
lastCommittedOffset prometheus.Gauge
}

func newPartitionCommitter(kafkaCfg KafkaConfig, admClient *kadm.Client, partitionID int32, consumerGroup string, commitInterval time.Duration, logger log.Logger, reg prometheus.Registerer) *partitionCommitter {
func newPartitionCommitter(kafkaCfg KafkaConfig, admClient *kadm.Client, partitionID int32, consumerGroup string, logger log.Logger, reg prometheus.Registerer) *partitionCommitter {
c := &partitionCommitter{
logger: logger,
kafkaCfg: kafkaCfg,
partitionID: partitionID,
consumerGroup: consumerGroup,
toCommit: atomic.NewInt64(-1),
admClient: admClient,
commitInterval: commitInterval,
logger: logger,
kafkaCfg: kafkaCfg,
partitionID: partitionID,
consumerGroup: consumerGroup,
toCommit: atomic.NewInt64(-1),
admClient: admClient,

commitRequestsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_ingest_storage_reader_offset_commit_requests_total",
Expand Down Expand Up @@ -641,7 +637,7 @@ func (r *partitionCommitter) enqueueOffset(o int64) {
}

func (r *partitionCommitter) run(ctx context.Context) error {
commitTicker := time.NewTicker(r.commitInterval)
commitTicker := time.NewTicker(r.kafkaCfg.ConsumerGroupOffsetCommitInterval)
defer commitTicker.Stop()

previousOffset := r.toCommit.Load()
Expand Down
36 changes: 16 additions & 20 deletions pkg/storage/ingest/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,8 +1190,7 @@ func TestPartitionCommitter(t *testing.T) {
adm := kadm.NewClient(client)
reg := prometheus.NewPedanticRegistry()

interval := time.Second
committer := newPartitionCommitter(cfg, adm, partitionID, consumerGroup, interval, logger, reg)
committer := newPartitionCommitter(cfg, adm, partitionID, consumerGroup, logger, reg)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), committer))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), committer))
Expand All @@ -1209,7 +1208,7 @@ func TestPartitionCommitter(t *testing.T) {
commitRequestsShouldFail.Store(false)

// Now we expect the commit to succeed, once the committer will trigger the commit the next interval.
test.Poll(t, 10*interval, nil, func() interface{} {
test.Poll(t, 10*cfg.ConsumerGroupOffsetCommitInterval, nil, func() interface{} {
return promtest.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_ingest_storage_reader_last_committed_offset The last consumed offset successfully committed by the partition reader. Set to -1 if not offset has been committed yet.
# TYPE cortex_ingest_storage_reader_last_committed_offset gauge
Expand All @@ -1231,7 +1230,7 @@ func TestPartitionCommitter(t *testing.T) {
// Since we haven't enqueued any other offset and the last enqueued one has been successfully committed,
// we expect the committer to not issue any other request in the future.
expectedRequestsCount := commitRequestsCount.Load()
time.Sleep(3 * interval)
time.Sleep(3 * cfg.ConsumerGroupOffsetCommitInterval)
assert.Equal(t, expectedRequestsCount, commitRequestsCount.Load())
})
}
Expand All @@ -1257,7 +1256,7 @@ func TestPartitionCommitter_commit(t *testing.T) {

adm := kadm.NewClient(client)
reg := prometheus.NewPedanticRegistry()
committer := newPartitionCommitter(cfg, adm, partitionID, consumerGroup, time.Second, log.NewNopLogger(), reg)
committer := newPartitionCommitter(cfg, adm, partitionID, consumerGroup, log.NewNopLogger(), reg)

require.NoError(t, committer.commit(context.Background(), 123))

Expand Down Expand Up @@ -1297,7 +1296,7 @@ func TestPartitionCommitter_commit(t *testing.T) {

adm := kadm.NewClient(client)
reg := prometheus.NewPedanticRegistry()
committer := newPartitionCommitter(cfg, adm, partitionID, consumerGroup, time.Second, log.NewNopLogger(), reg)
committer := newPartitionCommitter(cfg, adm, partitionID, consumerGroup, log.NewNopLogger(), reg)

require.Error(t, committer.commit(context.Background(), 123))

Expand Down Expand Up @@ -1342,19 +1341,18 @@ func produceRecord(ctx context.Context, t *testing.T, writeClient *kgo.Client, t
}

type readerTestCfg struct {
kafka KafkaConfig
partitionID int32
consumer recordConsumer
registry *prometheus.Registry
logger log.Logger
commitInterval time.Duration
kafka KafkaConfig
partitionID int32
consumer recordConsumer
registry *prometheus.Registry
logger log.Logger
}

type readerTestCfgOtp func(cfg *readerTestCfg)

func withCommitInterval(i time.Duration) func(cfg *readerTestCfg) {
return func(cfg *readerTestCfg) {
cfg.commitInterval = i
cfg.kafka.ConsumerGroupOffsetCommitInterval = i
}
}

Expand Down Expand Up @@ -1391,12 +1389,11 @@ func withRegistry(reg *prometheus.Registry) func(cfg *readerTestCfg) {

func defaultReaderTestConfig(t *testing.T, addr string, topicName string, partitionID int32, consumer recordConsumer) *readerTestCfg {
return &readerTestCfg{
registry: prometheus.NewPedanticRegistry(),
logger: testutil.NewLogger(t),
kafka: createTestKafkaConfig(addr, topicName),
partitionID: partitionID,
consumer: consumer,
commitInterval: 10 * time.Second,
registry: prometheus.NewPedanticRegistry(),
logger: testutil.NewLogger(t),
kafka: createTestKafkaConfig(addr, topicName),
partitionID: partitionID,
consumer: consumer,
}
}

Expand All @@ -1407,7 +1404,6 @@ func createReader(t *testing.T, addr string, topicName string, partitionID int32
}
reader, err := newPartitionReader(cfg.kafka, cfg.partitionID, "test-group", cfg.consumer, cfg.logger, cfg.registry)
require.NoError(t, err)
reader.commitInterval = cfg.commitInterval

return reader
}
Expand Down

0 comments on commit 9c7b085

Please sign in to comment.