Skip to content

Commit

Permalink
Avoid un-symbolizing labels if not needed.
Browse files Browse the repository at this point in the history
Similar optimization to what we did on Thanos: thanos-io/thanos#3531

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Dec 4, 2020
1 parent 8b64b70 commit a6f11c1
Show file tree
Hide file tree
Showing 12 changed files with 291 additions and 176 deletions.
6 changes: 5 additions & 1 deletion storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,18 @@ type LabelQuerier interface {
Close() error
}

// DiscardSamplesFunc is a special token to be placed in hint's Func field.
// There is no series function, this token is used for lookups that don't need samples.
const DiscardSamplesFunc = "series"

// SelectHints specifies hints passed for data selections.
// This is used only as an option for implementation to use.
type SelectHints struct {
Start int64 // Start time in milliseconds for this select.
End int64 // End time in milliseconds for this select.

Step int64 // Query step size in milliseconds.
Func string // String representation of surrounding function or aggregation.
Func string // String representation of surrounding function or aggregation or "series" if samples can be skipped.

Grouping []string // List of label names used in aggregation.
By bool // Indicate whether it is without or by.
Expand Down
21 changes: 9 additions & 12 deletions tsdb/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ type IndexReader interface {
// Series populates the given labels and chunk metas for the series identified
// by the reference.
// Returns storage.ErrNotFound if the ref does not resolve to a known series.
Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error
//Series(id uint64, s *index.SymbolizedLabels, chks *[]chunks.Meta, skipChunks bool, selectMint, selectMaxt int64) (ok bool, err error)

// Series
// TODO(bwplotka): Add commentary.
Series() index.SeriesSelector

// LabelNames returns all the unique label names present in the index in sorted order.
LabelNames() ([]string, error)
Expand Down Expand Up @@ -437,11 +441,8 @@ func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings {
return r.ir.SortedPostings(p)
}

func (r blockIndexReader) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error {
if err := r.ir.Series(ref, lset, chks); err != nil {
return errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
}
return nil
func (r blockIndexReader) Series() index.SeriesSelector {
return r.ir.Series()
}

func (r blockIndexReader) LabelNames() ([]string, error) {
Expand Down Expand Up @@ -487,17 +488,13 @@ func (pb *Block) Delete(mint, maxt int64, ms ...*labels.Matcher) error {
return errors.Wrap(err, "select series")
}

ir := pb.indexr
series := pb.indexr.Series()

// Choose only valid postings which have chunks in the time-range.
stones := tombstones.NewMemTombstones()

var lset labels.Labels
var chks []chunks.Meta

Outer:
for p.Next() {
err := ir.Series(p.At(), &lset, &chks)
_, chks, err := series.Select(p.At(), false)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion tsdb/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}
all = indexr.SortedPostings(all)
// Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp.
sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, meta.MinTime, meta.MaxTime-1))
sets = append(sets, newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, false, meta.MinTime, meta.MaxTime-1))
syms := indexr.Symbols()
if i == 0 {
symbols = syms
Expand Down
7 changes: 2 additions & 5 deletions tsdb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1566,15 +1566,12 @@ func TestChunkAtBlockBoundary(t *testing.T) {
p, err := r.Postings(k, v)
require.NoError(t, err)

var (
lset labels.Labels
chks []chunks.Meta
)
s := r.Series()

chunkCount := 0

for p.Next() {
err = r.Series(p.At(), &lset, &chks)
_, chks, err := s.Select(p.At(), false)
require.NoError(t, err)
for _, c := range chks {
require.True(t, meta.MinTime <= c.MinTime && c.MaxTime <= meta.MaxTime,
Expand Down
72 changes: 55 additions & 17 deletions tsdb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -1672,41 +1672,79 @@ func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
return index.NewListPostings(ep)
}

// Series returns the series selector.
func (h *headIndexReader) Series() index.SeriesSelector {
return &headSeriesSelector{r: h}
}

type headSeriesSelector struct {
r *headIndexReader

bufLset labels.Labels
bufChks []chunks.Meta
}

// Series returns the series for the given reference.
func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks.Meta) error {
s := h.head.series.getByID(ref)
func (h *headSeriesSelector) Select(ref uint64, skipChunks bool, dranges ...tombstones.Interval) (labels.Labels, []chunks.Meta, error) {
s := h.r.head.series.getByID(ref)

if s == nil {
h.head.metrics.seriesNotFound.Inc()
return storage.ErrNotFound
h.r.head.metrics.seriesNotFound.Inc()
return nil, nil, storage.ErrNotFound
}
*lbls = append((*lbls)[:0], s.lset...)
h.bufLset = append(h.bufLset[:0], s.lset...)

s.Lock()
defer s.Unlock()

*chks = (*chks)[:0]
h.bufChks = (h.bufChks)[:0]

mintScope := dranges[0].Maxt
if mintScope < h.r.mint {
mintScope = h.r.mint
}
maxtScope := dranges[len(dranges)-1].Mint
if maxtScope > h.r.maxt {
maxtScope = h.r.maxt
}

// We are assuming mmapped chunk are sorted by min time.
for i, c := range s.mmappedChunks {
// Do not expose chunks that are outside of the specified range.
if !c.OverlapsClosedInterval(h.mint, h.maxt) {
continue
if c.minTime > maxtScope {
break
}

if c.maxTime >= mintScope && !(tombstones.Interval{Mint: c.minTime, Maxt: c.maxTime}).IsSubrange(dranges) {
// Found a full or partial chunk.
if skipChunks {
// We are not interested in chunks and we know there is at least one, that's enough to return series.
return h.bufLset, nil, nil
}

h.bufChks = append(h.bufChks, chunks.Meta{
MinTime: c.minTime,
MaxTime: c.maxTime,
Ref: packChunkID(s.ref, uint64(s.chunkID(i))),
})
}
*chks = append(*chks, chunks.Meta{
MinTime: c.minTime,
MaxTime: c.maxTime,
Ref: packChunkID(s.ref, uint64(s.chunkID(i))),
})
}
if s.headChunk != nil && s.headChunk.OverlapsClosedInterval(h.mint, h.maxt) {
*chks = append(*chks, chunks.Meta{

if s.headChunk != nil && !(tombstones.Interval{Mint: s.headChunk.minTime, Maxt: s.headChunk.maxTime}).IsSubrange(dranges) {
if skipChunks {
return h.bufLset, nil, nil
}

h.bufChks = append(h.bufChks, chunks.Meta{
MinTime: s.headChunk.minTime,
MaxTime: math.MaxInt64, // Set the head chunks as open (being appended to).
Ref: packChunkID(s.ref, uint64(s.chunkID(len(s.mmappedChunks)))),
})
}

return nil
if len(h.bufChks) == 0 {
return nil, nil, index.ErrNoChunkMatched
}
return h.bufLset, h.bufChks, nil
}

func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, error) {
Expand Down
34 changes: 14 additions & 20 deletions tsdb/head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1041,29 +1041,26 @@ func TestGCChunkAccess(t *testing.T) {
require.False(t, chunkCreated, "chunks was created")

idx := h.indexRange(0, 1500)
var (
lset labels.Labels
chunks []chunks.Meta
)
require.NoError(t, idx.Series(1, &lset, &chunks))
lset, chks, err := idx.Series().Select(1, true)
require.NoError(t, err)

require.Equal(t, labels.Labels{{
Name: "a", Value: "1",
}}, lset)
require.Equal(t, 2, len(chunks))
require.Equal(t, 2, len(chks))

cr, err := h.chunksRange(0, 1500, nil)
require.NoError(t, err)
_, err = cr.Chunk(chunks[0].Ref)
_, err = cr.Chunk(chks[0].Ref)
require.NoError(t, err)
_, err = cr.Chunk(chunks[1].Ref)
_, err = cr.Chunk(chks[1].Ref)
require.NoError(t, err)

require.NoError(t, h.Truncate(1500)) // Remove a chunk.

_, err = cr.Chunk(chunks[0].Ref)
_, err = cr.Chunk(chks[0].Ref)
require.Equal(t, storage.ErrNotFound, err)
_, err = cr.Chunk(chunks[1].Ref)
_, err = cr.Chunk(chks[1].Ref)
require.NoError(t, err)
}

Expand Down Expand Up @@ -1095,31 +1092,28 @@ func TestGCSeriesAccess(t *testing.T) {
require.False(t, chunkCreated, "chunks was created")

idx := h.indexRange(0, 2000)
var (
lset labels.Labels
chunks []chunks.Meta
)
require.NoError(t, idx.Series(1, &lset, &chunks))
lset, chks, err := idx.Series().Select(1, false)
require.NoError(t, err)

require.Equal(t, labels.Labels{{
Name: "a", Value: "1",
}}, lset)
require.Equal(t, 2, len(chunks))
require.Equal(t, 2, len(chks))

cr, err := h.chunksRange(0, 2000, nil)
require.NoError(t, err)
_, err = cr.Chunk(chunks[0].Ref)
_, err = cr.Chunk(chks[0].Ref)
require.NoError(t, err)
_, err = cr.Chunk(chunks[1].Ref)
_, err = cr.Chunk(chks[1].Ref)
require.NoError(t, err)

require.NoError(t, h.Truncate(2000)) // Remove the series.

require.Equal(t, (*memSeries)(nil), h.series.getByID(1))

_, err = cr.Chunk(chunks[0].Ref)
_, err = cr.Chunk(chks[0].Ref)
require.Equal(t, storage.ErrNotFound, err)
_, err = cr.Chunk(chunks[1].Ref)
_, err = cr.Chunk(chks[1].Ref)
require.Equal(t, storage.ErrNotFound, err)
}

Expand Down
Loading

0 comments on commit a6f11c1

Please sign in to comment.