Skip to content

Commit

Permalink
Reset frameBytesLeft when cutting a frame in streamChunkedReadRespons…
Browse files Browse the repository at this point in the history
…es (#4423)

* Reset frameBytesLeft when cutting a frame in streamChunkedReadResponses
* Update CHANGELOG.md
* Rename frameBytesLeft -> frameBytesRemaining
* Update pkg/querier/remote_read_test.go

Signed-off-by: Justin Lei <justin.lei@grafana.com>
Co-authored-by: Tyler Reid <tyler.reid@grafana.com>
  • Loading branch information
leizor and Tyler Reid authored Mar 9, 2023
1 parent 44aad50 commit 74c6d77
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ Querying with using `{__mimir_storage__="ephemeral"}` selector no longer works.
* [BUGFIX] Ingester: conversion of global limits `max-series-per-user`, `max-series-per-metric`, `max-metadata-per-user` and `max-metadata-per-metric` into corresponding local limits now takes into account the number of ingesters in each zone. #4238
* [BUGFIX] Ingester: track `cortex_ingester_memory_series` metric consistently with `cortex_ingester_memory_series_created_total` and `cortex_ingester_memory_series_removed_total`. #4312
* [BUGFIX] Querier: fixed a bug which was incorrectly matching series with regular expression label matchers with begin/end anchors in the middle of the regular expression. #4340
* [BUGFIX] Querier: Streaming remote read will now continue to return multiple chunks per frame after the first frame. #4423

### Mixin

Expand Down
19 changes: 12 additions & 7 deletions pkg/querier/remote_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,7 @@ func streamChunkedReadResponses(stream io.Writer, ss storage.ChunkSeriesSet, que
iter = series.Iterator(iter)
lbls = mimirpb.FromLabelsToLabelAdapters(series.Labels())

frameBytesLeft := maxBytesInFrame
for _, lbl := range lbls {
frameBytesLeft -= lbl.Size()
}

frameBytesRemaining := initializedFrameBytesRemaining(maxBytesInFrame, lbls)
isNext := iter.Next()

for isNext {
Expand All @@ -263,11 +259,11 @@ func streamChunkedReadResponses(stream io.Writer, ss storage.ChunkSeriesSet, que
Type: client.StreamChunk_Encoding(chk.Chunk.Encoding()),
Data: chk.Chunk.Bytes(),
})
frameBytesLeft -= chks[len(chks)-1].Size()
frameBytesRemaining -= chks[len(chks)-1].Size()

// We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size.
isNext = iter.Next()
if frameBytesLeft > 0 && isNext {
if frameBytesRemaining > 0 && isNext {
continue
}

Expand All @@ -288,10 +284,19 @@ func streamChunkedReadResponses(stream io.Writer, ss storage.ChunkSeriesSet, que
return errors.Wrap(err, "write to stream")
}
chks = chks[:0]
frameBytesRemaining = initializedFrameBytesRemaining(maxBytesInFrame, lbls)
}
if err := iter.Err(); err != nil {
return err
}
}
return ss.Err()
}

func initializedFrameBytesRemaining(maxBytesInFrame int, lbls []mimirpb.LabelAdapter) int {
frameBytesLeft := maxBytesInFrame
for _, lbl := range lbls {
frameBytesLeft -= lbl.Size()
}
return frameBytesLeft
}
41 changes: 31 additions & 10 deletions pkg/querier/remote_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ func TestStreamedRemoteRead(t *testing.T) {
},
},
},
"with 241 samples, we expect 1 frame with 2 chunks, and 1 frame with 1 chunk due to frame limit": {
samples: getNSamples(241),
"with 481 samples, we expect 2 frames with 2 chunks, and 1 frame with 1 chunk due to frame limit": {
samples: getNSamples(481),
expectedResults: []*client.StreamReadResponse{
{
ChunkedSeries: []*client.StreamChunkedSeries{
Expand All @@ -197,18 +197,17 @@ func TestStreamedRemoteRead(t *testing.T) {
MinTimeMs: 0,
MaxTimeMs: 119,
Type: client.XOR,
Data: getIndexedXORChunk(0, 241),
Data: getIndexedXORChunk(0, 481),
},
{
MinTimeMs: 120,
MaxTimeMs: 239,
Type: client.XOR,
Data: getIndexedXORChunk(1, 241),
Data: getIndexedXORChunk(1, 481),
},
},
},
},
QueryIndex: 0,
},
{
ChunkedSeries: []*client.StreamChunkedSeries{
Expand All @@ -217,14 +216,34 @@ func TestStreamedRemoteRead(t *testing.T) {
Chunks: []client.StreamChunk{
{
MinTimeMs: 240,
MaxTimeMs: 240,
MaxTimeMs: 359,
Type: client.XOR,
Data: getIndexedXORChunk(2, 241),
Data: getIndexedXORChunk(2, 481),
},
{
MinTimeMs: 360,
MaxTimeMs: 479,
Type: client.XOR,
Data: getIndexedXORChunk(3, 481),
},
},
},
},
},
{
ChunkedSeries: []*client.StreamChunkedSeries{
{
Labels: []mimirpb.LabelAdapter{{Name: "foo", Value: "bar"}},
Chunks: []client.StreamChunk{
{
MinTimeMs: 480,
MaxTimeMs: 480,
Type: client.XOR,
Data: getIndexedXORChunk(4, 481),
},
},
},
},
QueryIndex: 0,
},
},
},
Expand All @@ -243,8 +262,9 @@ func TestStreamedRemoteRead(t *testing.T) {
}, nil
},
}
// Labelset has 10 bytes. Full frame in test data has roughly 160 bytes. This allows us to have at max 2 frames in this test.
maxBytesInFrame := 10 + 160*2
// The labelset for this test has 10 bytes and a full chunk is roughly 165 bytes; for this test we want a
// frame to contain at most 2 chunks.
maxBytesInFrame := 10 + 165*2

handler := remoteReadHandler(q, maxBytesInFrame, log.NewNopLogger())

Expand Down Expand Up @@ -283,6 +303,7 @@ func TestStreamedRemoteRead(t *testing.T) {
require.Equal(t, tc.expectedResults[i], &res)
i++
}
require.Len(t, tc.expectedResults, i)
})
}
}
Expand Down

0 comments on commit 74c6d77

Please sign in to comment.