Skip to content

Commit

Permalink
Implement index.MaybeRecyclePostings
Browse files Browse the repository at this point in the history
This method would call an optional Recycle() method on the postings,
that would reuse the reference by putting it into a pool.

The heuristic I've followed is that it should be called by the method
that performs the Next() check, as it's that method who knows that
postings can't be used anymore.

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
  • Loading branch information
colega committed Oct 15, 2024
1 parent 35ec40c commit 793cac1
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 10 deletions.
2 changes: 2 additions & 0 deletions cmd/promtool/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ func analyzeBlock(ctx context.Context, path, blockID string, limit int, runExten

chks := []chunks.Meta{}
builder := labels.ScratchBuilder{}
defer index.MaybeRecyclePostings(p)
for p.Next() {
if err = ir.Series(p.At(), &builder, &chks); err != nil {
return err
Expand Down Expand Up @@ -636,6 +637,7 @@ func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb.
histogramChunkSize := make([]int, 0)
histogramChunkBucketsCount := make([]int, 0)
var builder labels.ScratchBuilder
defer index.MaybeRecyclePostings(postingsr)
for postingsr.Next() {
var chks []chunks.Meta
if err := indexr.Series(postingsr.At(), &builder, &chks); err != nil {
Expand Down
1 change: 1 addition & 0 deletions tsdb/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,7 @@ func (pb *Block) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Mat
if err != nil {
return fmt.Errorf("select series: %w", err)
}
defer index.MaybeRecyclePostings(p)

ir := pb.indexr

Expand Down
1 change: 1 addition & 0 deletions tsdb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -1548,6 +1548,7 @@ func (h *Head) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Match
}

var stones []tombstones.Stone
defer index.MaybeRecyclePostings(p)
for p.Next() {
if err := ctx.Err(); err != nil {
return fmt.Errorf("select series: %w", err)
Expand Down
4 changes: 3 additions & 1 deletion tsdb/head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
series := make([]*memSeries, 0, 128)

// Fetch all the series only once.
defer index.MaybeRecyclePostings(p)
for p.Next() {
s := h.head.series.getByID(chunks.HeadSeriesRef(p.At()))
if s == nil {
Expand Down Expand Up @@ -165,7 +166,7 @@ func (h *headIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCou
}

out := make([]storage.SeriesRef, 0, 128)

defer index.MaybeRecyclePostings(p)
for p.Next() {
s := h.head.series.getByID(chunks.HeadSeriesRef(p.At()))
if s == nil {
Expand Down Expand Up @@ -299,6 +300,7 @@ func (h *headIndexReader) LabelValueFor(_ context.Context, id storage.SeriesRef,
func (h *headIndexReader) LabelNamesFor(ctx context.Context, series index.Postings) ([]string, error) {
namesMap := make(map[string]struct{})
i := 0
defer index.MaybeRecyclePostings(series)
for series.Next() {
i++
if i%checkContextEveryNIterations == 0 && ctx.Err() != nil {
Expand Down
2 changes: 2 additions & 0 deletions tsdb/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1575,6 +1575,7 @@ func (r *Reader) LabelNamesFor(ctx context.Context, postings Postings) ([]string
// Gather offsetsMap the name offsetsMap in the symbol table first
offsetsMap := make(map[uint32]struct{})
i := 0
defer MaybeRecyclePostings(postings)
for postings.Next() {
id := postings.At()
i++
Expand Down Expand Up @@ -1869,6 +1870,7 @@ func (r *Reader) ShardedPostings(p Postings, shardIndex, shardCount uint64) Post
seriesHashCache = r.cacheProvider.SeriesHashCache()
}

defer MaybeRecyclePostings(p)
for p.Next() {
id := p.At()

Expand Down
4 changes: 4 additions & 0 deletions tsdb/index/labelvalues.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ func (p *MemPostings) labelValuesFor(postings Postings, name string, includeMatc

// intersect returns whether p1 and p2 have at least one series in common.
func intersect(p1, p2 Postings) bool {
defer MaybeRecyclePostings(p1)
defer MaybeRecyclePostings(p2)
if !p1.Next() || !p2.Next() {
return false
}
Expand Down Expand Up @@ -302,6 +304,8 @@ func intersect(p1, p2 Postings) bool {

// contains returns whether subp is contained in p.
func contains(p, subp Postings) bool {
defer MaybeRecyclePostings(p)
defer MaybeRecyclePostings(subp)
for subp.Next() {
if needle := subp.At(); !p.Seek(needle) || p.At() != needle {
return false
Expand Down
102 changes: 93 additions & 9 deletions tsdb/index/postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ func (p *MemPostings) labelValues(name string) []string {

// ExpandPostings returns the postings expanded as a slice.
func ExpandPostings(p Postings) (res []storage.SeriesRef, err error) {
defer MaybeRecyclePostings(p)
for p.Next() {
res = append(res, p.At())
}
Expand Down Expand Up @@ -524,8 +525,20 @@ type intersectPostings struct {
cur storage.SeriesRef
}

var (
intersectPostingsPool = sync.Pool{New: func() any { return new(intersectPostings) }}
_ recycler = new(intersectPostings)
)

func newIntersectPostings(its ...Postings) *intersectPostings {
return &intersectPostings{arr: its}
it := intersectPostingsPool.Get().(*intersectPostings)
it.arr = its
return it
}

func (it *intersectPostings) Recycle() {
*it = intersectPostings{}
intersectPostingsPool.Put(it)
}

func (it *intersectPostings) At() storage.SeriesRef {
Expand Down Expand Up @@ -596,10 +609,23 @@ type mergedPostings struct {
cur storage.SeriesRef
}

var (
mergedPostingsPool = sync.Pool{New: func() any { return new(mergedPostings) }}
_ recycler = new(mergedPostings)
)

func newMergedPostings(p []Postings) (m *mergedPostings, nonEmpty bool) {
const maxVal = storage.SeriesRef(math.MaxUint64) // This value must be higher than all real values used in the tree.
lt := loser.New(p, maxVal)
return &mergedPostings{p: p, h: lt}, true
it := mergedPostingsPool.Get().(*mergedPostings)
it.p = p
it.h = lt
return it, true
}

func (it *mergedPostings) Recycle() {
*it = mergedPostings{}
mergedPostingsPool.Put(it)
}

func (it *mergedPostings) Next() bool {
Expand Down Expand Up @@ -661,27 +687,43 @@ type removedPostings struct {

initialized bool
fok, rok bool
recycled bool
}

var (
removedPostingsPool = sync.Pool{New: func() any { return new(removedPostings) }}
_ recycler = new(removedPostings)
)

func newRemovedPostings(full, remove Postings) *removedPostings {
return &removedPostings{
full: full,
remove: remove,
}
it := removedPostingsPool.Get().(*removedPostings)
it.full = full
it.remove = remove
return it
}

func (rp *removedPostings) Recycle() {
*rp = removedPostings{}
removedPostingsPool.Put(rp)
}

func (rp *removedPostings) At() storage.SeriesRef {
return rp.cur
}

func (rp *removedPostings) Next() bool {
func (rp *removedPostings) Next() (next bool) {
if !rp.initialized {
rp.fok = rp.full.Next()
rp.rok = rp.remove.Next()
rp.initialized = true
}
for {
if !rp.fok {
if !rp.recycled {
MaybeRecyclePostings(rp.full)
MaybeRecyclePostings(rp.remove)
rp.recycled = true
}
return false
}

Expand Down Expand Up @@ -732,12 +774,24 @@ type ListPostings struct {
cur storage.SeriesRef
}

var (
listPostingsPool = sync.Pool{New: func() any { return new(ListPostings) }}
_ recycler = new(ListPostings)
)

func NewListPostings(list []storage.SeriesRef) Postings {
return newListPostings(list...)
}

func newListPostings(list ...storage.SeriesRef) *ListPostings {
return &ListPostings{list: list}
it := listPostingsPool.Get().(*ListPostings)
it.list = list
return it
}

func (it *ListPostings) Recycle() {
*it = ListPostings{}
listPostingsPool.Put(it)
}

func (it *ListPostings) At() storage.SeriesRef {
Expand Down Expand Up @@ -785,8 +839,20 @@ type bigEndianPostings struct {
cur uint32
}

var (
bigEndianPostingsPool = sync.Pool{New: func() any { return new(bigEndianPostings) }}
_ recycler = new(bigEndianPostings)
)

func newBigEndianPostings(list []byte) *bigEndianPostings {
return &bigEndianPostings{list: list}
it := bigEndianPostingsPool.Get().(*bigEndianPostings)
it.list = list
return it
}

func (it *bigEndianPostings) Recycle() {
*it = bigEndianPostings{}
bigEndianPostingsPool.Put(it)
}

func (it *bigEndianPostings) At() storage.SeriesRef {
Expand Down Expand Up @@ -852,8 +918,10 @@ func (c *PostingsCloner) Clone() Postings {
// if intersection is non empty, then i is added to the indexes returned.
// Returned indexes are not sorted.
func FindIntersectingPostings(p Postings, candidates []Postings) (indexes []int, err error) {
defer MaybeRecyclePostings(p)
h := make(postingsWithIndexHeap, 0, len(candidates))
for idx, it := range candidates {
defer MaybeRecyclePostings(candidates[idx])
switch {
case it.Next():
h = append(h, postingsWithIndex{index: idx, p: it})
Expand Down Expand Up @@ -885,8 +953,10 @@ func FindIntersectingPostings(p Postings, candidates []Postings) (indexes []int,
// The idea is the need to find postings iterators not fully contained in a set you wish to exclude.
// Returned indexes are not sorted.
func findNonContainedPostings(p Postings, candidates []Postings) (indexes []int, err error) {
defer MaybeRecyclePostings(p)
h := make(postingsWithIndexHeap, 0, len(candidates))
for idx, it := range candidates {
defer MaybeRecyclePostings(candidates[idx])
switch {
case it.Next():
h = append(h, postingsWithIndex{index: idx, p: it})
Expand Down Expand Up @@ -957,6 +1027,7 @@ func (h postingsWithIndexHeap) at() storage.SeriesRef { return h[0].p.At() }
// If Next() returns fails and there's no error reported by Postings.Err(), then root is marked as removed and heap is fixed.
func (h *postingsWithIndexHeap) next() error {
pi := (*h)[0]
// recycled in the caller.
next := pi.p.Next()
if next {
heap.Fix(h, 0)
Expand Down Expand Up @@ -1002,3 +1073,16 @@ func (h *postingsWithIndexHeap) Pop() interface{} {
*h = old[0 : n-1]
return x
}

func MaybeRecyclePostings(p Postings) {
if r, ok := p.(recycler); ok {
r.Recycle()
}
}

// recycler is an interface for types that can be recycled.
// it's not exported to avoid creating a contract on its name.
// it's not anonymous to allow us enforce it on some types.
type recycler interface {
Recycle()
}
1 change: 1 addition & 0 deletions tsdb/ooo_head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead,
p = hr.SortedPostings(p)

var lastSeq, lastOff int
defer index.MaybeRecyclePostings(p)
for p.Next() {
seriesRef := p.At()
ms := head.series.getByID(chunks.HeadSeriesRef(seriesRef))
Expand Down
9 changes: 9 additions & 0 deletions tsdb/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ func labelValuesWithMatchers(ctx context.Context, r IndexReader, name string, ma
if err != nil {
return nil, fmt.Errorf("fetching postings for matchers: %w", err)
}
defer index.MaybeRecyclePostings(p)

// Let's see if expanded postings for matchers have smaller cardinality than label values.
// Since computing label values from series is expensive, we apply a limit on number of expanded
Expand Down Expand Up @@ -457,6 +458,7 @@ func labelValuesWithMatchers(ctx context.Context, r IndexReader, name string, ma

// If we haven't reached end of postings, we prepend our expanded postings to "p", and continue.
p = newPrependPostings(expanded, p)
defer index.MaybeRecyclePostings(p)
}

valuesPostings := make([]index.Postings, len(allValues))
Expand All @@ -465,6 +467,7 @@ func labelValuesWithMatchers(ctx context.Context, r IndexReader, name string, ma
if err != nil {
return nil, fmt.Errorf("fetching postings for %s=%q: %w", name, value, err)
}
defer index.MaybeRecyclePostings(valuesPostings[i])
}
indexes, err := index.FindIntersectingPostings(p, valuesPostings)
if err != nil {
Expand Down Expand Up @@ -595,6 +598,8 @@ type blockBaseSeriesSet struct {
bufChks []chunks.Meta
builder labels.ScratchBuilder
err error

postingsRecycled bool
}

func (b *blockBaseSeriesSet) Next() bool {
Expand Down Expand Up @@ -675,6 +680,10 @@ func (b *blockBaseSeriesSet) Next() bool {
b.curr.intervals = intervals
return true
}
if !b.postingsRecycled {
index.MaybeRecyclePostings(b.p)
b.postingsRecycled = true
}
return false
}

Expand Down

0 comments on commit 793cac1

Please sign in to comment.