From 8e7cce59f6fd634b81727a0657745afca264964a Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 17 Jan 2024 10:31:22 +0100 Subject: [PATCH 1/2] Ingest storage: configure BrokerMaxReadBytes on Kafka reader Signed-off-by: Marco Pracucci --- pkg/storage/ingest/reader.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index c47d8633c2d..dfd88bb4662 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -265,15 +265,22 @@ func (r *PartitionReader) recordFetchesMetrics(fetches kgo.Fetches) { } func (r *PartitionReader) newKafkaReader(at kgo.Offset) (*kgo.Client, error) { + const fetchMaxBytes = 100_000_000 + opts := append( commonKafkaClientOptions(r.kafkaCfg, r.metrics.kprom, r.logger), kgo.ConsumePartitions(map[string]map[int32]kgo.Offset{ r.kafkaCfg.Topic: {r.partitionID: at}, }), kgo.FetchMinBytes(1), - kgo.FetchMaxBytes(100_000_000), + kgo.FetchMaxBytes(fetchMaxBytes), kgo.FetchMaxWait(5*time.Second), kgo.FetchMaxPartitionBytes(50_000_000), + + // BrokerMaxReadBytes sets the maximum response size that can be read from + // Kafka. This is a safety measure to avoid OOMing on invalid responses. + // Recommendation is to set it 2x FetchMaxBytes. + kgo.BrokerMaxReadBytes(2*fetchMaxBytes), ) client, err := kgo.NewClient(opts...) if err != nil { From b90a229b9818a7fc207a7844f9bd97251bdcb56c Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 17 Jan 2024 11:45:30 +0100 Subject: [PATCH 2/2] Update pkg/storage/ingest/reader.go Co-authored-by: Dimitar Dimitrov --- pkg/storage/ingest/reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index dfd88bb4662..7a1093859f2 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -279,7 +279,7 @@ func (r *PartitionReader) newKafkaReader(at kgo.Offset) (*kgo.Client, error) { // BrokerMaxReadBytes sets the maximum response size that can be read from // Kafka. This is a safety measure to avoid OOMing on invalid responses. - // Recommendation is to set it 2x FetchMaxBytes. + // franz-go recommendation is to set it 2x FetchMaxBytes. kgo.BrokerMaxReadBytes(2*fetchMaxBytes), ) client, err := kgo.NewClient(opts...)