diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d3d0145cdd..781af417b09 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -75,6 +75,7 @@ Querying with using `{__mimir_storage__="ephemeral"}` selector no longer works. * [BUGFIX] Ingester: conversion of global limits `max-series-per-user`, `max-series-per-metric`, `max-metadata-per-user` and `max-metadata-per-metric` into corresponding local limits now takes into account the number of ingesters in each zone. #4238 * [BUGFIX] Ingester: track `cortex_ingester_memory_series` metric consistently with `cortex_ingester_memory_series_created_total` and `cortex_ingester_memory_series_removed_total`. #4312 * [BUGFIX] Querier: fixed a bug which was incorrectly matching series with regular expression label matchers with begin/end anchors in the middle of the regular expression. #4340 +* [BUGFIX] Querier: Streaming remote read will now continue to return multiple chunks per frame after the first frame. #4423 ### Mixin diff --git a/pkg/querier/remote_read.go b/pkg/querier/remote_read.go index 653f44bfd4c..11207b95fbe 100644 --- a/pkg/querier/remote_read.go +++ b/pkg/querier/remote_read.go @@ -242,11 +242,7 @@ func streamChunkedReadResponses(stream io.Writer, ss storage.ChunkSeriesSet, que iter = series.Iterator(iter) lbls = mimirpb.FromLabelsToLabelAdapters(series.Labels()) - frameBytesLeft := maxBytesInFrame - for _, lbl := range lbls { - frameBytesLeft -= lbl.Size() - } - + frameBytesRemaining := initializedFrameBytesRemaining(maxBytesInFrame, lbls) isNext := iter.Next() for isNext { @@ -263,11 +259,11 @@ func streamChunkedReadResponses(stream io.Writer, ss storage.ChunkSeriesSet, que Type: client.StreamChunk_Encoding(chk.Chunk.Encoding()), Data: chk.Chunk.Bytes(), }) - frameBytesLeft -= chks[len(chks)-1].Size() + frameBytesRemaining -= chks[len(chks)-1].Size() // We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size. isNext = iter.Next() - if frameBytesLeft > 0 && isNext { + if frameBytesRemaining > 0 && isNext { continue } @@ -288,6 +284,7 @@ func streamChunkedReadResponses(stream io.Writer, ss storage.ChunkSeriesSet, que return errors.Wrap(err, "write to stream") } chks = chks[:0] + frameBytesRemaining = initializedFrameBytesRemaining(maxBytesInFrame, lbls) } if err := iter.Err(); err != nil { return err @@ -295,3 +292,11 @@ func streamChunkedReadResponses(stream io.Writer, ss storage.ChunkSeriesSet, que } return ss.Err() } + +func initializedFrameBytesRemaining(maxBytesInFrame int, lbls []mimirpb.LabelAdapter) int { + frameBytesLeft := maxBytesInFrame + for _, lbl := range lbls { + frameBytesLeft -= lbl.Size() + } + return frameBytesLeft +} diff --git a/pkg/querier/remote_read_test.go b/pkg/querier/remote_read_test.go index d34def82f3b..b49c78fddff 100644 --- a/pkg/querier/remote_read_test.go +++ b/pkg/querier/remote_read_test.go @@ -185,8 +185,8 @@ func TestStreamedRemoteRead(t *testing.T) { }, }, }, - "with 241 samples, we expect 1 frame with 2 chunks, and 1 frame with 1 chunk due to frame limit": { - samples: getNSamples(241), + "with 481 samples, we expect 2 frames with 2 chunks, and 1 frame with 1 chunk due to frame limit": { + samples: getNSamples(481), expectedResults: []*client.StreamReadResponse{ { ChunkedSeries: []*client.StreamChunkedSeries{ @@ -197,18 +197,17 @@ func TestStreamedRemoteRead(t *testing.T) { MinTimeMs: 0, MaxTimeMs: 119, Type: client.XOR, - Data: getIndexedXORChunk(0, 241), + Data: getIndexedXORChunk(0, 481), }, { MinTimeMs: 120, MaxTimeMs: 239, Type: client.XOR, - Data: getIndexedXORChunk(1, 241), + Data: getIndexedXORChunk(1, 481), }, }, }, }, - QueryIndex: 0, }, { ChunkedSeries: []*client.StreamChunkedSeries{ @@ -217,14 +216,34 @@ func TestStreamedRemoteRead(t *testing.T) { Chunks: []client.StreamChunk{ { MinTimeMs: 240, - MaxTimeMs: 240, + MaxTimeMs: 359, Type: client.XOR, - Data: getIndexedXORChunk(2, 241), + Data: getIndexedXORChunk(2, 481), + }, + { + MinTimeMs: 360, + MaxTimeMs: 479, + Type: client.XOR, + Data: getIndexedXORChunk(3, 481), + }, + }, + }, + }, + }, + { + ChunkedSeries: []*client.StreamChunkedSeries{ + { + Labels: []mimirpb.LabelAdapter{{Name: "foo", Value: "bar"}}, + Chunks: []client.StreamChunk{ + { + MinTimeMs: 480, + MaxTimeMs: 480, + Type: client.XOR, + Data: getIndexedXORChunk(4, 481), }, }, }, }, - QueryIndex: 0, }, }, }, @@ -243,8 +262,9 @@ func TestStreamedRemoteRead(t *testing.T) { }, nil }, } - // Labelset has 10 bytes. Full frame in test data has roughly 160 bytes. This allows us to have at max 2 frames in this test. - maxBytesInFrame := 10 + 160*2 + // The labelset for this test has 10 bytes and a full chunk is roughly 165 bytes; for this test we want a + // frame to contain at most 2 chunks. + maxBytesInFrame := 10 + 165*2 handler := remoteReadHandler(q, maxBytesInFrame, log.NewNopLogger()) @@ -283,6 +303,7 @@ func TestStreamedRemoteRead(t *testing.T) { require.Equal(t, tc.expectedResults[i], &res) i++ } + require.Len(t, tc.expectedResults, i) }) } }