Skip to content

Commit

Permalink
Add a generic chunk pool for batch.NewChunkMergeIterator
Browse files Browse the repository at this point in the history
Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
  • Loading branch information
codesome committed May 31, 2023
1 parent 1826aea commit 4e7bc2e
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 2 deletions.
25 changes: 23 additions & 2 deletions pkg/querier/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/util/zeropool"

"github.com/grafana/mimir/pkg/storage/chunk"
)
Expand Down Expand Up @@ -55,17 +56,37 @@ type iterator interface {
Err() error
}

var genericChunkSlicePool zeropool.Pool[[]GenericChunk]

func getGenericChunkSlice(n int) []GenericChunk {
sn := genericChunkSlicePool.Get()
if sn == nil || cap(sn) < n {
// Even if we get a slice from the pool, we do not put it back here
// to not grow the pool size more than required. This effectively
// replaces a smaller slice with a bigger slice in the pool.
return make([]GenericChunk, n)
}
return sn[:n]
}

func putGenericChunkSlice(sn []GenericChunk) {
genericChunkSlicePool.Put(sn)
}

// NewChunkMergeIterator returns a chunkenc.Iterator that merges Mimir chunks together.
func NewChunkMergeIterator(it chunkenc.Iterator, chunks []chunk.Chunk, _, _ model.Time) chunkenc.Iterator {
converted := make([]GenericChunk, len(chunks))
converted := getGenericChunkSlice(len(chunks))
for i, c := range chunks {
converted[i] = NewGenericChunk(int64(c.From), int64(c.Through), c.Data.NewIterator)
}

return NewGenericChunkMergeIterator(it, converted)
i := NewGenericChunkMergeIterator(it, converted)
putGenericChunkSlice(converted)
return i
}

// NewGenericChunkMergeIterator returns a chunkenc.Iterator that merges generic chunks together.
// This function must not hold a reference to the 'chunks' slice.
func NewGenericChunkMergeIterator(it chunkenc.Iterator, chunks []GenericChunk) chunkenc.Iterator {
var iter *mergeIterator

Expand Down
2 changes: 2 additions & 0 deletions pkg/querier/batch/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type mergeIterator struct {
currErr error
}

// newMergeIterator returns an iterator that merges generic chunks in batches.
// This functions must not hold a reference to the `cs` slice.
func newMergeIterator(it iterator, cs []GenericChunk) *mergeIterator {
c, ok := it.(*mergeIterator)
if ok {
Expand Down

0 comments on commit 4e7bc2e

Please sign in to comment.