From 74c72c83124aa65fac6ee4122a3e1fefa55d40b9 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 3 Jul 2024 19:20:12 +0200 Subject: [PATCH] Add -ingest-storage.kafka.target-consumer-lag-at-startup support (#8579) * Add -ingest-storage.kafka.target-consumer-lag-at-startup support Signed-off-by: Marco Pracucci * Updated doc Signed-off-by: Marco Pracucci * Renamed loggerWithCurrentLag() to loggerWithCurrentLagIfSet() Signed-off-by: Marco Pracucci * Clarify comment and doc Signed-off-by: Marco Pracucci * Fix CHANGELOG Signed-off-by: Marco Pracucci * Apply suggestions from code review Co-authored-by: Taylor C <41653732+tacole02@users.noreply.github.com> --------- Signed-off-by: Marco Pracucci Co-authored-by: Taylor C <41653732+tacole02@users.noreply.github.com> --- CHANGELOG.md | 2 +- cmd/mimir/config-descriptor.json | 12 +- cmd/mimir/help-all.txt.tmpl | 4 +- cmd/mimir/help.txt.tmpl | 4 +- .../configuration-parameters/index.md | 15 +- .../mimir/manage/mimir-runbooks/_index.md | 4 +- pkg/storage/ingest/config.go | 21 +- pkg/storage/ingest/config_test.go | 30 +++ pkg/storage/ingest/reader.go | 120 ++++++++++-- pkg/storage/ingest/reader_test.go | 185 +++++++++++++++--- 10 files changed, 344 insertions(+), 53 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a4f3f1d1d85..ed119c387a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ * [CHANGE] Querier: return only samples within the queried start/end time range when executing a remote read request using "SAMPLES" mode. Previously, samples outside of the range could have been returned. Samples outside of the queried time range may still be returned when executing a remote read request using "STREAMED_XOR_CHUNKS" mode. #8463 * [CHANGE] Store-gateway: enabled `-blocks-storage.bucket-store.max-concurrent-queue-timeout` by default with a timeout of 5 seconds. #8496 * [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 -* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 +* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 * What it is: * When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path. * New configuration options: diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index d0d1d7bf05f..056999ebe27 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -6578,11 +6578,21 @@ "fieldFlag": "ingest-storage.kafka.consume-from-timestamp-at-startup", "fieldType": "int" }, + { + "kind": "field", + "name": "target_consumer_lag_at_startup", + "required": false, + "desc": "The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup.", + "fieldValue": null, + "fieldDefaultValue": 2000000000, + "fieldFlag": "ingest-storage.kafka.target-consumer-lag-at-startup", + "fieldType": "duration" + }, { "kind": "field", "name": "max_consumer_lag_at_startup", "required": false, - "desc": "The maximum tolerated lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set 0 to disable waiting for maximum consumer lag being honored at startup.", + "desc": "The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup.", "fieldValue": null, "fieldDefaultValue": 15000000000, "fieldFlag": "ingest-storage.kafka.max-consumer-lag-at-startup", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 9a57205158d..36853739f8d 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1332,9 +1332,11 @@ Usage of ./cmd/mimir/mimir: -ingest-storage.kafka.last-produced-offset-retry-timeout duration How long to retry a failed request to get the last produced offset. (default 10s) -ingest-storage.kafka.max-consumer-lag-at-startup duration - The maximum tolerated lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s) + The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s) -ingest-storage.kafka.producer-max-record-size-bytes int The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes. (default 15983616) + -ingest-storage.kafka.target-consumer-lag-at-startup duration + The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 2s) -ingest-storage.kafka.topic string The Kafka topic name. -ingest-storage.kafka.wait-strong-read-consistency-timeout duration diff --git a/cmd/mimir/help.txt.tmpl b/cmd/mimir/help.txt.tmpl index fea0cb61f09..c38af96352a 100644 --- a/cmd/mimir/help.txt.tmpl +++ b/cmd/mimir/help.txt.tmpl @@ -414,9 +414,11 @@ Usage of ./cmd/mimir/mimir: -ingest-storage.kafka.last-produced-offset-retry-timeout duration How long to retry a failed request to get the last produced offset. (default 10s) -ingest-storage.kafka.max-consumer-lag-at-startup duration - The maximum tolerated lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s) + The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s) -ingest-storage.kafka.producer-max-record-size-bytes int The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes. (default 15983616) + -ingest-storage.kafka.target-consumer-lag-at-startup duration + The best-effort maximum lag a consumer tries to achieve at startup. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 2s) -ingest-storage.kafka.topic string The Kafka topic name. -ingest-storage.kafka.wait-strong-read-consistency-timeout duration diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index fb7dc93f074..5f9c4efd3ee 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -3763,10 +3763,19 @@ kafka: # CLI flag: -ingest-storage.kafka.consume-from-timestamp-at-startup [consume_from_timestamp_at_startup: | default = 0] - # The maximum tolerated lag before a consumer is considered to have caught up + # The best-effort maximum lag a consumer tries to achieve at startup. Set both + # -ingest-storage.kafka.target-consumer-lag-at-startup and + # -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting + # for maximum consumer lag being honored at startup. + # CLI flag: -ingest-storage.kafka.target-consumer-lag-at-startup + [target_consumer_lag_at_startup: | default = 2s] + + # The guaranteed maximum lag before a consumer is considered to have caught up # reading from a partition at startup, becomes ACTIVE in the hash ring and - # passes the readiness check. Set 0 to disable waiting for maximum consumer - # lag being honored at startup. + # passes the readiness check. Set both + # -ingest-storage.kafka.target-consumer-lag-at-startup and + # -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting + # for maximum consumer lag being honored at startup. # CLI flag: -ingest-storage.kafka.max-consumer-lag-at-startup [max_consumer_lag_at_startup: | default = 15s] diff --git a/docs/sources/mimir/manage/mimir-runbooks/_index.md b/docs/sources/mimir/manage/mimir-runbooks/_index.md index f14a1fd973e..7372015f202 100644 --- a/docs/sources/mimir/manage/mimir-runbooks/_index.md +++ b/docs/sources/mimir/manage/mimir-runbooks/_index.md @@ -1388,7 +1388,9 @@ This alert fires when "receive delay" reported by ingester during "starting" pha How it **works**: -- When ingester is starting, it needs to fetch and process records from Kafka until preconfigured consumption lag is honored. The maximum tolerated lag before an ingester is considered to have caught up reading from a partition at startup can be configured via `-ingest-storage.kafka.max-consumer-lag-at-startup`. +- When an ingester starts, it needs to fetch and process records from Kafka until a preconfigured consumption lag is honored. There are two configuration options that control the lag before an ingester is considered to have caught up reading from a partition at startup: + - `-ingest-storage.kafka.max-consumer-lag-at-startup`: this is the guaranteed maximum lag before an ingester is considered to have caught up. The ingester doesn't become ACTIVE in the hash ring and doesn't pass the readiness check until the measured lag is below this setting. + - `-ingest-storage.kafka.target-consumer-lag-at-startup`: this is the desired maximum lag that an ingester sets to achieve at startup. This setting is a best-effort. The ingester is granted a "grace period" to have the measured lag below this setting. However, the ingester still starts if the target lag hasn't been reached within this "grace period", as long as the max lag is honored. The "grace period" is equal to the configured `-ingest-storage.kafka.max-consumer-lag-at-startup`. - Each record has a timestamp when it was sent to Kafka by the distributor. When ingester reads the record, it computes "receive delay" as a difference between current time (when record was read) and time when record was sent to Kafka. This receive delay is reported in the metric `cortex_ingest_storage_reader_receive_delay_seconds`. You can see receive delay on `Mimir / Writes` dashboard, in section "Ingester (ingest storage – end-to-end latency)". - Under normal conditions when ingester is processing records faster than records are appearing, receive delay should be decreasing, until `-ingest-storage.kafka.max-consumer-lag-at-startup` is honored. - When ingester is starting, and observed "receive delay" is increasing, alert is raised. diff --git a/pkg/storage/ingest/config.go b/pkg/storage/ingest/config.go index de5484aa50d..e96726cc3cd 100644 --- a/pkg/storage/ingest/config.go +++ b/pkg/storage/ingest/config.go @@ -17,6 +17,10 @@ const ( consumeFromStart = "start" consumeFromEnd = "end" consumeFromTimestamp = "timestamp" + + kafkaConfigFlagPrefix = "ingest-storage.kafka" + targetConsumerLagAtStartupFlag = kafkaConfigFlagPrefix + ".target-consumer-lag-at-startup" + maxConsumerLagAtStartupFlag = kafkaConfigFlagPrefix + ".max-consumer-lag-at-startup" ) var ( @@ -25,6 +29,8 @@ var ( ErrInvalidWriteClients = errors.New("the configured number of write clients is invalid (must be greater than 0)") ErrInvalidConsumePosition = errors.New("the configured consume position is invalid") ErrInvalidProducerMaxRecordSizeBytes = fmt.Errorf("the configured producer max record size bytes must be a value between %d and %d", minProducerRecordDataBytesLimit, maxProducerRecordDataBytesLimit) + ErrInconsistentConsumerLagAtStartup = fmt.Errorf("the target and max consumer lag at startup must be either both set to 0 or to a value greater than 0") + ErrInvalidMaxConsumerLagAtStartup = fmt.Errorf("the configured max consumer lag at startup must greater or equal than the configured target consumer lag") consumeFromPositionOptions = []string{consumeFromLastOffset, consumeFromStart, consumeFromEnd, consumeFromTimestamp} ) @@ -38,7 +44,7 @@ type Config struct { func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.Enabled, "ingest-storage.enabled", false, "True to enable the ingestion via object storage.") - cfg.KafkaConfig.RegisterFlagsWithPrefix("ingest-storage.kafka", f) + cfg.KafkaConfig.RegisterFlagsWithPrefix(kafkaConfigFlagPrefix, f) cfg.Migration.RegisterFlagsWithPrefix("ingest-storage.migration", f) } @@ -73,6 +79,7 @@ type KafkaConfig struct { ConsumeFromPositionAtStartup string `yaml:"consume_from_position_at_startup"` ConsumeFromTimestampAtStartup int64 `yaml:"consume_from_timestamp_at_startup"` + TargetConsumerLagAtStartup time.Duration `yaml:"target_consumer_lag_at_startup"` MaxConsumerLagAtStartup time.Duration `yaml:"max_consumer_lag_at_startup"` AutoCreateTopicEnabled bool `yaml:"auto_create_topic_enabled"` @@ -106,7 +113,11 @@ func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) f.StringVar(&cfg.ConsumeFromPositionAtStartup, prefix+".consume-from-position-at-startup", consumeFromLastOffset, fmt.Sprintf("From which position to start consuming the partition at startup. Supported options: %s.", strings.Join(consumeFromPositionOptions, ", "))) f.Int64Var(&cfg.ConsumeFromTimestampAtStartup, prefix+".consume-from-timestamp-at-startup", 0, fmt.Sprintf("Milliseconds timestamp after which the consumption of the partition starts at startup. Only applies when consume-from-position-at-startup is %s", consumeFromTimestamp)) - f.DurationVar(&cfg.MaxConsumerLagAtStartup, prefix+".max-consumer-lag-at-startup", 15*time.Second, "The maximum tolerated lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set 0 to disable waiting for maximum consumer lag being honored at startup.") + + howToDisableConsumerLagAtStartup := fmt.Sprintf("Set both -%s and -%s to 0 to disable waiting for maximum consumer lag being honored at startup.", targetConsumerLagAtStartupFlag, maxConsumerLagAtStartupFlag) + f.DurationVar(&cfg.TargetConsumerLagAtStartup, targetConsumerLagAtStartupFlag, 2*time.Second, "The best-effort maximum lag a consumer tries to achieve at startup. "+howToDisableConsumerLagAtStartup) + f.DurationVar(&cfg.MaxConsumerLagAtStartup, maxConsumerLagAtStartupFlag, 15*time.Second, "The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. "+howToDisableConsumerLagAtStartup) + f.BoolVar(&cfg.AutoCreateTopicEnabled, prefix+".auto-create-topic-enabled", true, "Enable auto-creation of Kafka topic if it doesn't exist.") f.IntVar(&cfg.AutoCreateTopicDefaultPartitions, prefix+".auto-create-topic-default-partitions", 0, "When auto-creation of Kafka topic is enabled and this value is positive, Kafka's num.partitions configuration option is set on Kafka brokers with this value when Mimir component that uses Kafka starts. This configuration option specifies the default number of partitions that the Kafka broker uses for auto-created topics. Note that this is a Kafka-cluster wide setting, and applies to any auto-created topic. If the setting of num.partitions fails, Mimir proceeds anyways, but auto-created topics could have an incorrect number of partitions.") @@ -141,6 +152,12 @@ func (cfg *KafkaConfig) Validate() error { if cfg.ProducerMaxRecordSizeBytes < minProducerRecordDataBytesLimit || cfg.ProducerMaxRecordSizeBytes > maxProducerRecordDataBytesLimit { return ErrInvalidProducerMaxRecordSizeBytes } + if (cfg.TargetConsumerLagAtStartup != 0) != (cfg.MaxConsumerLagAtStartup != 0) { + return ErrInconsistentConsumerLagAtStartup + } + if cfg.MaxConsumerLagAtStartup < cfg.TargetConsumerLagAtStartup { + return ErrInvalidMaxConsumerLagAtStartup + } return nil } diff --git a/pkg/storage/ingest/config_test.go b/pkg/storage/ingest/config_test.go index 9e23fb7b0af..375179c37e3 100644 --- a/pkg/storage/ingest/config_test.go +++ b/pkg/storage/ingest/config_test.go @@ -95,6 +95,36 @@ func TestConfig_Validate(t *testing.T) { }, expectedErr: ErrInvalidProducerMaxRecordSizeBytes, }, + "should fail if target consumer lag is enabled but max consumer lag is not": { + setup: func(cfg *Config) { + cfg.Enabled = true + cfg.KafkaConfig.Address = "localhost" + cfg.KafkaConfig.Topic = "test" + cfg.KafkaConfig.TargetConsumerLagAtStartup = 2 * time.Second + cfg.KafkaConfig.MaxConsumerLagAtStartup = 0 + }, + expectedErr: ErrInconsistentConsumerLagAtStartup, + }, + "should fail if max consumer lag is enabled but target consumer lag is not": { + setup: func(cfg *Config) { + cfg.Enabled = true + cfg.KafkaConfig.Address = "localhost" + cfg.KafkaConfig.Topic = "test" + cfg.KafkaConfig.TargetConsumerLagAtStartup = 0 + cfg.KafkaConfig.MaxConsumerLagAtStartup = 2 * time.Second + }, + expectedErr: ErrInconsistentConsumerLagAtStartup, + }, + "should fail if target consumer lag is > max consumer lag": { + setup: func(cfg *Config) { + cfg.Enabled = true + cfg.KafkaConfig.Address = "localhost" + cfg.KafkaConfig.Topic = "test" + cfg.KafkaConfig.TargetConsumerLagAtStartup = 2 * time.Second + cfg.KafkaConfig.MaxConsumerLagAtStartup = 1 * time.Second + }, + expectedErr: ErrInvalidMaxConsumerLagAtStartup, + }, } for testName, testData := range tests { diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 6019bed27ef..e3e95d79924 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -37,6 +37,7 @@ const ( var ( errWaitStrongReadConsistencyTimeoutExceeded = errors.Wrap(context.DeadlineExceeded, "wait strong read consistency timeout exceeded") + errWaitTargetLagDeadlineExceeded = errors.Wrap(context.DeadlineExceeded, "target lag deadline exceeded") ) type record struct { @@ -136,9 +137,9 @@ func (r *PartitionReader) start(ctx context.Context) (returnErr error) { } // Enforce the max consumer lag (if enabled). - if maxLag := r.kafkaCfg.MaxConsumerLagAtStartup; maxLag > 0 { + if targetLag, maxLag := r.kafkaCfg.TargetConsumerLagAtStartup, r.kafkaCfg.MaxConsumerLagAtStartup; targetLag > 0 && maxLag > 0 { if startOffset != kafkaOffsetEnd { - if err := r.processNextFetchesUntilMaxLagHonored(ctx, maxLag); err != nil { + if err := r.processNextFetchesUntilTargetOrMaxLagHonored(ctx, targetLag, maxLag); err != nil { return err } } else { @@ -191,20 +192,80 @@ func (r *PartitionReader) processNextFetches(ctx context.Context, delayObserver r.notifyLastConsumedOffset(fetches) } -func (r *PartitionReader) processNextFetchesUntilMaxLagHonored(ctx context.Context, maxLag time.Duration) error { - level.Info(r.logger).Log("msg", "partition reader is starting to consume partition until max consumer lag is honored", "max_lag", maxLag) +// processNextFetchesUntilTargetOrMaxLagHonored process records from Kafka until at least the maxLag is honored. +// This function does a best-effort to get lag below targetLag, but it's not guaranteed that it will be +// reached once this function successfully returns (only maxLag is guaranteed). +func (r *PartitionReader) processNextFetchesUntilTargetOrMaxLagHonored(ctx context.Context, targetLag, maxLag time.Duration) error { + logger := log.With(r.logger, "target_lag", targetLag, "max_lag", maxLag) + level.Info(logger).Log("msg", "partition reader is starting to consume partition until target and max consumer lag is honored") + + attempts := []func() (currLag time.Duration, _ error){ + // First process fetches until at least the max lag is honored. + func() (time.Duration, error) { + return r.processNextFetchesUntilLagHonored(ctx, maxLag, logger) + }, + + // If the target lag hasn't been reached with the first attempt (which stops once at least the max lag + // is honored) then we try to reach the (lower) target lag within a fixed time (best-effort). + // The timeout is equal to the max lag. This is done because we expect at least a 2x replay speed + // from Kafka (which means at most it takes 1s to ingest 2s of data): assuming new data is continuously + // written to the partition, we give the reader maxLag time to replay the backlog + ingest the new data + // written in the meanwhile. + func() (time.Duration, error) { + timedCtx, cancel := context.WithTimeoutCause(ctx, maxLag, errWaitTargetLagDeadlineExceeded) + defer cancel() + + return r.processNextFetchesUntilLagHonored(timedCtx, targetLag, logger) + }, + + // If the target lag hasn't been reached with the previous attempt that we'll move on. However, + // we still need to guarantee that in the meanwhile the lag didn't increase and max lag is still honored. + func() (time.Duration, error) { + return r.processNextFetchesUntilLagHonored(ctx, maxLag, logger) + }, + } + + var currLag time.Duration + for _, attempt := range attempts { + var err error + + currLag, err = attempt() + if errors.Is(err, errWaitTargetLagDeadlineExceeded) { + continue + } + if err != nil { + return err + } + if currLag <= targetLag { + level.Info(logger).Log( + "msg", "partition reader consumed partition and current lag is lower than configured target consumer lag", + "last_consumed_offset", r.consumedOffsetWatcher.LastConsumedOffset(), + "current_lag", currLag, + ) + return nil + } + } + level.Warn(logger).Log( + "msg", "partition reader consumed partition and current lag is lower than configured max consumer lag but higher than target consumer lag", + "last_consumed_offset", r.consumedOffsetWatcher.LastConsumedOffset(), + "current_lag", currLag, + ) + return nil +} + +func (r *PartitionReader) processNextFetchesUntilLagHonored(ctx context.Context, maxLag time.Duration, logger log.Logger) (currLag time.Duration, _ error) { boff := backoff.New(ctx, backoff.Config{ - MinBackoff: 250 * time.Millisecond, - MaxBackoff: 2 * time.Second, - MaxRetries: 0, // retry forever + MinBackoff: 100 * time.Millisecond, + MaxBackoff: time.Second, + MaxRetries: 0, // Retry forever (unless context is canceled / deadline exceeded). }) for boff.Ongoing() { // Send a direct request to the Kafka backend to fetch the partition start offset. partitionStartOffset, err := r.offsetReader.FetchPartitionStartOffset(ctx) if err != nil { - level.Warn(r.logger).Log("msg", "partition reader failed to fetch partition start offset", "err", err) + level.Warn(logger).Log("msg", "partition reader failed to fetch partition start offset", "err", err) boff.Wait() continue } @@ -212,25 +273,25 @@ func (r *PartitionReader) processNextFetchesUntilMaxLagHonored(ctx context.Conte // Send a direct request to the Kafka backend to fetch the last produced offset. // We intentionally don't use WaitNextFetchLastProducedOffset() to not introduce further // latency. + lastProducedOffsetRequestedAt := time.Now() lastProducedOffset, err := r.offsetReader.FetchLastProducedOffset(ctx) if err != nil { - level.Warn(r.logger).Log("msg", "partition reader failed to fetch last produced offset", "err", err) + level.Warn(logger).Log("msg", "partition reader failed to fetch last produced offset", "err", err) boff.Wait() continue } - lastProducedOffsetFetchedAt := time.Now() - - // Ensure there're some records to consume. For example, if the partition has been inactive for a long + // Ensure there are some records to consume. For example, if the partition has been inactive for a long // time and all its records have been deleted, the partition start offset may be > 0 but there are no // records to actually consume. if partitionStartOffset > lastProducedOffset { - level.Info(r.logger).Log("msg", "partition reader found no records to consume because partition is empty", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset) - return nil + level.Info(logger).Log("msg", "partition reader found no records to consume because partition is empty", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset) + return 0, nil } - // This message is NOT expected to be logged with a very high rate. - level.Info(r.logger).Log("msg", "partition reader is consuming records to honor max consumer lag", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset) + // This message is NOT expected to be logged with a very high rate. In this log we display the last measured + // lag. If we don't have it (lag is zero value), then it will not be logged. + level.Info(loggerWithCurrentLagIfSet(logger, currLag)).Log("msg", "partition reader is consuming records to honor target and max consumer lag", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset) for boff.Ongoing() { // Continue reading until we reached the desired offset. @@ -243,18 +304,27 @@ func (r *PartitionReader) processNextFetchesUntilMaxLagHonored(ctx context.Conte } if boff.Err() != nil { - return boff.Err() + // TODO should be moved to dskit's backoff + if ctx.Err() != nil { + return 0, context.Cause(ctx) + } + + return 0, boff.Err() } // If it took less than the max desired lag to replay the partition // then we can stop here, otherwise we'll have to redo it. - if currLag := time.Since(lastProducedOffsetFetchedAt); currLag <= maxLag { - level.Info(r.logger).Log("msg", "partition reader consumed partition and current lag is less than configured max consumer lag", "last_consumed_offset", r.consumedOffsetWatcher.LastConsumedOffset(), "current_lag", currLag, "max_lag", maxLag) - return nil + if currLag = time.Since(lastProducedOffsetRequestedAt); currLag <= maxLag { + return currLag, nil } } - return boff.Err() + // TODO should be moved to dskit's backoff + if ctx.Err() != nil { + return 0, context.Cause(ctx) + } + + return 0, boff.Err() } func filterOutErrFetches(fetches kgo.Fetches) kgo.Fetches { @@ -279,6 +349,14 @@ func isErrFetch(fetch kgo.Fetch) bool { return false } +func loggerWithCurrentLagIfSet(logger log.Logger, currLag time.Duration) log.Logger { + if currLag <= 0 { + return logger + } + + return log.With(logger, "current_lag", currLag) +} + func (r *PartitionReader) logFetchErrors(fetches kgo.Fetches) { mErr := multierror.New() fetches.EachError(func(topic string, partition int32, err error) { diff --git a/pkg/storage/ingest/reader_test.go b/pkg/storage/ingest/reader_test.go index 6401f15c3da..183fc532425 100644 --- a/pkg/storage/ingest/reader_test.go +++ b/pkg/storage/ingest/reader_test.go @@ -13,6 +13,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/grafana/dskit/concurrency" "github.com/grafana/dskit/services" "github.com/grafana/dskit/test" "github.com/prometheus/client_golang/prometheus" @@ -351,7 +352,7 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { ) // Create and start the reader. We expect the reader to start even if partition is empty. - reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withMaxConsumerLagAtStartup(time.Second), withRegistry(reg)) + reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withTargetAndMaxConsumerLagAtStartup(time.Second, time.Second), withRegistry(reg)) require.NoError(t, services.StartAndAwaitRunning(ctx, reader)) require.NoError(t, services.StopAndAwaitTerminated(ctx, reader)) @@ -363,7 +364,7 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { `), "cortex_ingest_storage_reader_last_consumed_offset")) }) - t.Run("should immediately switch to Running state if configured max lag is 0", func(t *testing.T) { + t.Run("should immediately switch to Running state if configured target / max lag is 0", func(t *testing.T) { t.Parallel() var ( @@ -386,7 +387,7 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { t.Log("produced 2 records") // Create and start the reader. We expect the reader to start even if Fetch is failing. - reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withMaxConsumerLagAtStartup(0), withRegistry(reg)) + reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withTargetAndMaxConsumerLagAtStartup(0, 0), withRegistry(reg)) require.NoError(t, services.StartAndAwaitRunning(ctx, reader)) require.NoError(t, services.StopAndAwaitTerminated(ctx, reader)) @@ -398,7 +399,7 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { `), "cortex_ingest_storage_reader_last_consumed_offset")) }) - t.Run("should consume partition from start if last committed offset is missing and wait until max lag is honored", func(t *testing.T) { + t.Run("should consume partition from start if last committed offset is missing and wait until target lag is honored", func(t *testing.T) { t.Parallel() var ( @@ -432,7 +433,11 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { // Create and start the reader. reg := prometheus.NewPedanticRegistry() - reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withMaxConsumerLagAtStartup(time.Second), withRegistry(reg)) + logs := &concurrency.SyncBuffer{} + reader := createReader(t, clusterAddr, topicName, partitionID, consumer, + withTargetAndMaxConsumerLagAtStartup(time.Second, 2*time.Second), + withRegistry(reg), + withLogger(log.NewLogfmtLogger(logs))) require.NoError(t, reader.StartAsync(ctx)) t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, reader)) @@ -456,6 +461,9 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { return reader.State() }) + // We expect the reader to have switched to running because target consumer lag has been honored. + assert.Contains(t, logs.String(), "partition reader consumed partition and current lag is lower than configured target consumer lag") + assert.Equal(t, int64(2), consumedRecordsCount.Load()) // We expect the last consumed offset to be tracked in a metric. @@ -468,7 +476,7 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { }) }) - t.Run("should consume partition from start if last committed offset is missing and wait until max lag is honored and retry if a failure occurs when fetching last produced offset", func(t *testing.T) { + t.Run("should consume partition from start if last committed offset is missing and wait until target lag is honored and retry if a failure occurs when fetching last produced offset", func(t *testing.T) { t.Parallel() var ( @@ -502,7 +510,11 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { // Create and start the reader. reg := prometheus.NewPedanticRegistry() - reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withMaxConsumerLagAtStartup(time.Second), withRegistry(reg)) + logs := &concurrency.SyncBuffer{} + reader := createReader(t, clusterAddr, topicName, partitionID, consumer, + withTargetAndMaxConsumerLagAtStartup(time.Second, 2*time.Second), + withRegistry(reg), + withLogger(log.NewLogfmtLogger(logs))) require.NoError(t, reader.StartAsync(ctx)) t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, reader)) @@ -526,6 +538,9 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { return reader.State() }) + // We expect the reader to have switched to running because target consumer lag has been honored. + assert.Contains(t, logs.String(), "partition reader consumed partition and current lag is lower than configured target consumer lag") + assert.Equal(t, int64(2), consumedRecordsCount.Load()) // We expect the last consumed offset to be tracked in a metric. @@ -538,7 +553,7 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { }) }) - t.Run("should consume partition from end if position=end, and skip honoring max lag", func(t *testing.T) { + t.Run("should consume partition from end if position=end, and skip honoring target / max lag", func(t *testing.T) { t.Parallel() var ( @@ -578,7 +593,7 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { t.Log("produced 2 records before starting the reader") // Create and start the reader. - reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withConsumeFromPositionAtStartup(consumeFromEnd), withMaxConsumerLagAtStartup(time.Second), withRegistry(reg)) + reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withConsumeFromPositionAtStartup(consumeFromEnd), withTargetAndMaxConsumerLagAtStartup(time.Second, time.Second), withRegistry(reg)) require.NoError(t, reader.StartAsync(ctx)) t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, reader)) @@ -620,7 +635,7 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { }) }) - t.Run("should consume partition from start if position=start, and wait until max lag is honored", func(t *testing.T) { + t.Run("should consume partition from start if position=start, and wait until target lag is honored", func(t *testing.T) { t.Parallel() var ( @@ -670,7 +685,12 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { // Create and start the reader. reg := prometheus.NewPedanticRegistry() - reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withConsumeFromPositionAtStartup(consumeFromStart), withMaxConsumerLagAtStartup(time.Second), withRegistry(reg)) + logs := &concurrency.SyncBuffer{} + reader := createReader(t, clusterAddr, topicName, partitionID, consumer, + withConsumeFromPositionAtStartup(consumeFromStart), + withTargetAndMaxConsumerLagAtStartup(time.Second, 2*time.Second), + withRegistry(reg), + withLogger(log.NewLogfmtLogger(logs))) require.NoError(t, reader.StartAsync(ctx)) t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, reader)) @@ -693,6 +713,9 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { return reader.State() }) + // We expect the reader to have switched to running because target consumer lag has been honored. + assert.Contains(t, logs.String(), "partition reader consumed partition and current lag is lower than configured target consumer lag") + // We expect the reader to have consumed the partition from start. test.Poll(t, time.Second, []string{"record-1", "record-2"}, func() interface{} { consumedRecordsMx.Lock() @@ -712,7 +735,7 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { } }) - t.Run("should consume partition from the timestamp if position=timestamp, and wait until max lag is honored", func(t *testing.T) { + t.Run("should consume partition from the timestamp if position=timestamp, and wait until target lag is honored", func(t *testing.T) { t.Parallel() var ( @@ -764,7 +787,12 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { consumedRecordsMx.Unlock() reg := prometheus.NewPedanticRegistry() - reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withConsumeFromTimestampAtStartup(consumeFromTs.UnixMilli()), withMaxConsumerLagAtStartup(time.Second), withRegistry(reg)) + logs := &concurrency.SyncBuffer{} + reader := createReader(t, clusterAddr, topicName, partitionID, consumer, + withConsumeFromTimestampAtStartup(consumeFromTs.UnixMilli()), + withTargetAndMaxConsumerLagAtStartup(time.Second, 2*time.Second), + withRegistry(reg), + withLogger(log.NewLogfmtLogger(logs))) require.NoError(t, reader.StartAsync(ctx)) t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, reader)) @@ -787,6 +815,9 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { return reader.State() }) + // We expect the reader to have switched to running because target consumer lag has been honored. + assert.Contains(t, logs.String(), "partition reader consumed partition and current lag is lower than configured target consumer lag") + // We expect the reader to have consumed the partition from the third record. test.Poll(t, time.Second, []string{"record-3", "record-4"}, func() interface{} { consumedRecordsMx.Lock() @@ -805,7 +836,7 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { }) }) - t.Run("should consume partition from last committed offset if position=last-offset, and wait until max lag is honored", func(t *testing.T) { + t.Run("should consume partition from last committed offset if position=last-offset, and wait until target lag is honored", func(t *testing.T) { t.Parallel() var ( @@ -854,7 +885,12 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { // Create and start the reader. reg := prometheus.NewPedanticRegistry() - reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withConsumeFromPositionAtStartup(consumeFromLastOffset), withMaxConsumerLagAtStartup(time.Second), withRegistry(reg)) + logs := &concurrency.SyncBuffer{} + reader := createReader(t, clusterAddr, topicName, partitionID, consumer, + withConsumeFromPositionAtStartup(consumeFromLastOffset), + withTargetAndMaxConsumerLagAtStartup(time.Second, 2*time.Second), + withRegistry(reg), + withLogger(log.NewLogfmtLogger(logs))) require.NoError(t, reader.StartAsync(ctx)) t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, reader)) @@ -897,6 +933,99 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { } }) + t.Run("should consume partition from last committed offset if position=last-offset, and wait until max lag is honored if can't honor target lag", func(t *testing.T) { + t.Parallel() + + var ( + cluster, clusterAddr = testkafka.CreateCluster(t, partitionID+1, topicName) + writeClient = newKafkaProduceClient(t, clusterAddr) + nextRecordID = atomic.NewInt32(0) + targetLag = 500 * time.Millisecond + maxLag = 2 * time.Second + ) + + // Wait until all goroutines used in this test have done. + testRoutines := sync.WaitGroup{} + t.Cleanup(testRoutines.Wait) + + // Create a channel to signal goroutines once the test has done. + testDone := make(chan struct{}) + t.Cleanup(func() { + close(testDone) + }) + + consumer := consumerFunc(func(_ context.Context, _ []record) error { + return nil + }) + + cluster.ControlKey(int16(kmsg.ListOffsets), func(kreq kmsg.Request) (kmsg.Response, error, bool) { + cluster.KeepControl() + + // Slow down each ListOffsets request to take longer than the target lag. + req := kreq.(*kmsg.ListOffsetsRequest) + if len(req.Topics) > 0 && len(req.Topics[0].Partitions) > 0 && req.Topics[0].Partitions[0].Timestamp == kafkaOffsetEnd { + cluster.SleepControl(func() { + testRoutines.Add(1) + defer testRoutines.Done() + + delay := time.Duration(float64(targetLag) * 1.1) + t.Logf("artificially slowing down OffsetFetch request by %s", delay.String()) + + select { + case <-testDone: + case <-time.After(delay): + } + }) + } + + return nil, nil, false + }) + + // Produce a record. + produceRecord(ctx, t, writeClient, topicName, partitionID, []byte(fmt.Sprintf("record-%d", nextRecordID.Inc()))) + t.Log("produced 1 record") + + // Continue to produce records at a high pace, so that we simulate the case there are always new + // records to fetch. + testRoutines.Add(1) + go func() { + defer testRoutines.Done() + + for { + select { + case <-testDone: + return + + case <-time.After(targetLag / 2): + produceRecord(ctx, t, writeClient, topicName, partitionID, []byte(fmt.Sprintf("record-%d", nextRecordID.Inc()))) + t.Log("produced 1 record") + } + } + }() + + // Create and start the reader. + reg := prometheus.NewPedanticRegistry() + logs := &concurrency.SyncBuffer{} + reader := createReader(t, clusterAddr, topicName, partitionID, consumer, + withConsumeFromPositionAtStartup(consumeFromLastOffset), + withTargetAndMaxConsumerLagAtStartup(targetLag, maxLag), + withRegistry(reg), + withLogger(log.NewLogfmtLogger(logs))) + require.NoError(t, reader.StartAsync(ctx)) + t.Cleanup(func() { + require.NoError(t, services.StopAndAwaitTerminated(ctx, reader)) + }) + + // We expect the reader to catch up, and then switch to Running state. + test.Poll(t, maxLag*5, services.Running, func() interface{} { + return reader.State() + }) + + // We expect the reader to have switched to running because max consumer lag has been honored + // but target lag has not. + assert.Contains(t, logs.String(), "partition reader consumed partition and current lag is lower than configured max consumer lag but higher than target consumer lag") + }) + t.Run("should not wait indefinitely if context is cancelled while fetching last produced offset", func(t *testing.T) { t.Parallel() @@ -915,7 +1044,7 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { }) // Create and start the reader. - reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withMaxConsumerLagAtStartup(time.Second)) + reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withTargetAndMaxConsumerLagAtStartup(time.Second, time.Second)) readerCtx, cancelReaderCtx := context.WithCancel(ctx) require.NoError(t, reader.StartAsync(readerCtx)) @@ -958,7 +1087,7 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { t.Log("produced 2 records") // Create and start the reader. - reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withMaxConsumerLagAtStartup(time.Second)) + reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withTargetAndMaxConsumerLagAtStartup(time.Second, time.Second)) readerCtx, cancelReaderCtx := context.WithCancel(ctx) require.NoError(t, reader.StartAsync(readerCtx)) @@ -1033,7 +1162,8 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) { reg := prometheus.NewPedanticRegistry() reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withConsumeFromPositionAtStartup(consumeFromPosition), - withMaxConsumerLagAtStartup(time.Second), + withConsumeFromTimestampAtStartup(time.Now().UnixMilli()), // For the test where position=timestamp. + withTargetAndMaxConsumerLagAtStartup(time.Second, time.Second), withRegistry(reg)) require.NoError(t, services.StartAndAwaitRunning(ctx, reader)) @@ -1072,7 +1202,7 @@ func TestPartitionReader_fetchLastCommittedOffset(t *testing.T) { var ( cluster, clusterAddr = testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, partitionID+1, topicName) consumer = consumerFunc(func(context.Context, []record) error { return nil }) - reader = createReader(t, clusterAddr, topicName, partitionID, consumer, withMaxConsumerLagAtStartup(time.Second)) + reader = createReader(t, clusterAddr, topicName, partitionID, consumer, withTargetAndMaxConsumerLagAtStartup(time.Second, time.Second)) ) cluster.ControlKey(int16(kmsg.OffsetFetch), func(request kmsg.Request) (kmsg.Response, error, bool) { @@ -1103,7 +1233,7 @@ func TestPartitionReader_fetchLastCommittedOffset(t *testing.T) { var ( cluster, clusterAddr = testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, partitionID+1, topicName) consumer = consumerFunc(func(context.Context, []record) error { return nil }) - reader = createReader(t, clusterAddr, topicName, partitionID, consumer, withMaxConsumerLagAtStartup(time.Second)) + reader = createReader(t, clusterAddr, topicName, partitionID, consumer, withTargetAndMaxConsumerLagAtStartup(time.Second, time.Second)) ) cluster.ControlKey(int16(kmsg.OffsetFetch), func(request kmsg.Request) (kmsg.Response, error, bool) { @@ -1144,7 +1274,7 @@ func TestPartitionReader_fetchLastCommittedOffset(t *testing.T) { var ( cluster, clusterAddr = testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, partitionID+1, topicName) consumer = consumerFunc(func(context.Context, []record) error { return nil }) - reader = createReader(t, clusterAddr, topicName, partitionID, consumer, withMaxConsumerLagAtStartup(time.Second)) + reader = createReader(t, clusterAddr, topicName, partitionID, consumer, withTargetAndMaxConsumerLagAtStartup(time.Second, time.Second)) ) cluster.ControlKey(int16(kmsg.OffsetFetch), func(request kmsg.Request) (kmsg.Response, error, bool) { @@ -1402,8 +1532,9 @@ func withLastProducedOffsetPollInterval(i time.Duration) func(cfg *readerTestCfg } } -func withMaxConsumerLagAtStartup(maxLag time.Duration) func(cfg *readerTestCfg) { +func withTargetAndMaxConsumerLagAtStartup(targetLag, maxLag time.Duration) func(cfg *readerTestCfg) { return func(cfg *readerTestCfg) { + cfg.kafka.TargetConsumerLagAtStartup = targetLag cfg.kafka.MaxConsumerLagAtStartup = maxLag } } @@ -1433,6 +1564,12 @@ func withRegistry(reg *prometheus.Registry) func(cfg *readerTestCfg) { } } +func withLogger(logger log.Logger) func(cfg *readerTestCfg) { + return func(cfg *readerTestCfg) { + cfg.logger = logger + } +} + func defaultReaderTestConfig(t *testing.T, addr string, topicName string, partitionID int32, consumer recordConsumer) *readerTestCfg { return &readerTestCfg{ registry: prometheus.NewPedanticRegistry(), @@ -1448,6 +1585,10 @@ func createReader(t *testing.T, addr string, topicName string, partitionID int32 for _, o := range opts { o(cfg) } + + // Ensure the config is valid. + require.NoError(t, cfg.kafka.Validate()) + reader, err := newPartitionReader(cfg.kafka, cfg.partitionID, "test-group", cfg.consumer, cfg.logger, cfg.registry) require.NoError(t, err)