Skip to content

Commit

Permalink
add fetch_min_bytes_max_wait
Browse files Browse the repository at this point in the history
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
  • Loading branch information
dimitarvdimitrov committed Sep 26, 2024
1 parent afde94b commit 6197d4b
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 6 deletions.
12 changes: 7 additions & 5 deletions pkg/storage/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,12 @@ type KafkaConfig struct {
// Used when logging unsampled client errors. Set from ingester's ErrorSampleRate.
FallbackClientErrorSampleRate int64 `yaml:"-"`

FetchConcurrency int `yaml:"fetch_concurrency"`
RecordsPerFetch int `yaml:"records_per_fetch"`
UseCompressedBytesAsFetchMaxBytes bool `yaml:"use_compressed_bytes_as_fetch_max_bytes"`
IngestionConcurrency int `yaml:"ingestion_concurrency"`
IngestionConcurrencyBatchSize int `yaml:"ingestion_concurrency_batch_size"`
FetchConcurrency int `yaml:"fetch_concurrency"`
FetchMinBytesMaxWait time.Duration `yaml:"fetch_min_bytes_max_wait"`
RecordsPerFetch int `yaml:"records_per_fetch"`
UseCompressedBytesAsFetchMaxBytes bool `yaml:"use_compressed_bytes_as_fetch_max_bytes"`
IngestionConcurrency int `yaml:"ingestion_concurrency"`
IngestionConcurrencyBatchSize int `yaml:"ingestion_concurrency_batch_size"`
}

func (cfg *KafkaConfig) RegisterFlags(f *flag.FlagSet) {
Expand Down Expand Up @@ -133,6 +134,7 @@ func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

f.DurationVar(&cfg.WaitStrongReadConsistencyTimeout, prefix+".wait-strong-read-consistency-timeout", 20*time.Second, "The maximum allowed for a read requests processed by an ingester to wait until strong read consistency is enforced. 0 to disable the timeout.")
f.IntVar(&cfg.FetchConcurrency, prefix+".fetch-concurrency", 1, "The number of concurrent fetch requests that the ingester sends to Kafka when catching up during startup.")
f.DurationVar(&cfg.FetchMinBytesMaxWait, prefix+".fetch-min-bytes-max-wait", defaultMinBytesWaitTime, "The maximum time to wait for the minimum number of bytes to be fetched from Kafka before returning the fetched records. This is used to avoid waiting indefinitely for the minimum number of bytes to be fetched.")
f.IntVar(&cfg.RecordsPerFetch, prefix+".records-per-fetch", 128, "The number of records to fetch from Kafka in a single request.")
f.BoolVar(&cfg.UseCompressedBytesAsFetchMaxBytes, prefix+".use-compressed-bytes-as-fetch-max-bytes", true, "When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently.")
f.IntVar(&cfg.IngestionConcurrency, prefix+".ingestion-concurrency", 0, "The number of concurrent ingestion streams to the TSDB head. 0 to disable.")
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func newPartitionReader(kafkaCfg KafkaConfig, partitionID int32, instanceID stri
consumerGroup: kafkaCfg.GetConsumerGroup(instanceID, partitionID),
metrics: newReaderMetrics(partitionID, reg),
consumedOffsetWatcher: newPartitionOffsetWatcher(),
concurrentFetchersMinBytesMaxWaitTime: defaultMinBytesWaitTime,
concurrentFetchersMinBytesMaxWaitTime: kafkaCfg.FetchMinBytesMaxWait,
logger: log.With(logger, "partition", partitionID),
reg: reg,
}
Expand Down

0 comments on commit 6197d4b

Please sign in to comment.