Skip to content

Commit

Permalink
store the first raw value of a chunk during downsampling (#1709)
Browse files Browse the repository at this point in the history
* store the first raw value of a chunk during downsampling

As discussed in #1568, storing only the last raw value
of a chunk will lose a counter reset when:
a) the reset occurs at a chunk boundary, and
b) the last raw value of the earlier chunk is less than
the first aggregated value of the later chunk.

This commit stores the first raw value of a chunk during
the initial raw aggregation, and retains it during
subsequent aggregations. This is similar to the existing
handling for the last raw value of a chunk.

With this change, when counterSeriesIterator iterates over
a chunk boundary, it will see the last raw value of the
earlier chunk, then the first raw value of the later chunk,
and then the first aggregated value of the later chunk. The
first raw value will always be less than or equal to the
first aggregated value, so the only difference in
counterSeriesIterator's output will be the possible detection
of a reset and an extra sample after the chunk boundary.

Fixes: #1568

Signed-off-by: Alfred Landrum <alfred@leakybucket.org>

* changelog for #1709

Signed-off-by: Alfred Landrum <alfred@leakybucket.org>

* adjust existing downsampling tests

Signed-off-by: Alfred Landrum <alfred@leakybucket.org>

* add counter aggregation comments to CounterSeriesIterator

Signed-off-by: Alfred Landrum <alfred@leakybucket.org>
  • Loading branch information
alfred-landrum authored and bwplotka committed Nov 9, 2019
1 parent 1acdf7c commit 3debaeb
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#1656](https://github.com/thanos-io/thanos/pull/1656) Thanos Store now starts metric and status probe HTTP server earlier in its start-up sequence. `/-/healthy` endpoint now starts to respond with success earlier. `/metrics` endpoint starts serving metrics earlier as well. Make sure to point your readiness probes to the `/-/ready` endpoint rather than `/metrics`.
- [#1669](https://github.com/thanos-io/thanos/pull/1669) Fixed store sharding. Now it does not load excluded meta.jsons and load/fetch index-cache.json files.
- [#1670](https://github.com/thanos-io/thanos/pull/1670) Fixed un-ordered blocks upload. Sidecar now uploads the oldest blocks first.
- [#1568](https://github.com/thanos-io/thanos/pull/1709) Thanos Store now retains the first raw value of a chunk during downsampling to avoid losing some counter resets that occur on an aggregation boundary.

### Changed

Expand Down
77 changes: 48 additions & 29 deletions pkg/compact/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,6 @@ func (b *aggrChunkBuilder) add(t int64, aggr *aggregator) {
b.added++
}

func (b *aggrChunkBuilder) finalizeChunk(lastT int64, trueSample float64) {
b.apps[AggrCounter].Append(lastT, trueSample)
}

func (b *aggrChunkBuilder) encode() chunks.Meta {
return chunks.Meta{
MinTime: b.mint,
Expand All @@ -306,14 +302,17 @@ func downsampleRaw(data []sample, resolution int64) []chunks.Meta {
if len(data) == 0 {
return nil
}
var (
mint, maxt = data[0].t, data[len(data)-1].t
// We assume a raw resolution of 1 minute. In practice it will often be lower
// but this is sufficient for our heuristic to produce well-sized chunks.
numChunks = targetChunkCount(mint, maxt, 1*60*1000, resolution, len(data))
chks = make([]chunks.Meta, 0, numChunks)
batchSize = (len(data) / numChunks) + 1
)

mint, maxt := data[0].t, data[len(data)-1].t
// We assume a raw resolution of 1 minute. In practice it will often be lower
// but this is sufficient for our heuristic to produce well-sized chunks.
numChunks := targetChunkCount(mint, maxt, 1*60*1000, resolution, len(data))
return downsampleRawLoop(data, resolution, numChunks)
}

func downsampleRawLoop(data []sample, resolution int64, numChunks int) []chunks.Meta {
batchSize := (len(data) / numChunks) + 1
chks := make([]chunks.Meta, 0, numChunks)

for len(data) > 0 {
j := batchSize
Expand All @@ -327,14 +326,18 @@ func downsampleRaw(data []sample, resolution int64) []chunks.Meta {
for ; j < len(data) && data[j].t <= curW; j++ {
}

ab := newAggrChunkBuilder()
batch := data[:j]
data = data[j:]

ab := newAggrChunkBuilder()

// Encode first raw value; see CounterSeriesIterator.
ab.apps[AggrCounter].Append(batch[0].t, batch[0].v)

lastT := downsampleBatch(batch, resolution, ab.add)

// InjectThanosMeta the chunk's counter aggregate with the last true sample.
ab.finalizeChunk(lastT, batch[len(batch)-1].v)
// Encode last raw value; see CounterSeriesIterator.
ab.apps[AggrCounter].Append(lastT, batch[len(batch)-1].v)

chks = append(chks, ab.encode())
}
Expand Down Expand Up @@ -379,18 +382,21 @@ func downsampleBatch(data []sample, resolution int64, add func(int64, *aggregato

// downsampleAggr downsamples a sequence of aggregation chunks to the given resolution.
func downsampleAggr(chks []*AggrChunk, buf *[]sample, mint, maxt, inRes, outRes int64) ([]chunks.Meta, error) {
// We downsample aggregates only along chunk boundaries. This is required for counters
// to be downsampled correctly since a chunks' last counter value is the true last value
// of the original series. We need to preserve it even across multiple aggregation iterations.
var numSamples int
for _, c := range chks {
numSamples += c.NumSamples()
}
var (
numChunks = targetChunkCount(mint, maxt, inRes, outRes, numSamples)
res = make([]chunks.Meta, 0, numChunks)
batchSize = len(chks) / numChunks
)
numChunks := targetChunkCount(mint, maxt, inRes, outRes, numSamples)
return downsampleAggrLoop(chks, buf, outRes, numChunks)
}

func downsampleAggrLoop(chks []*AggrChunk, buf *[]sample, resolution int64, numChunks int) ([]chunks.Meta, error) {
// We downsample aggregates only along chunk boundaries. This is required
// for counters to be downsampled correctly since a chunk's first and last
// counter values are the true values of the original series. We need
// to preserve them even across multiple aggregation iterations.
res := make([]chunks.Meta, 0, numChunks)
batchSize := len(chks) / numChunks

for len(chks) > 0 {
j := batchSize
Expand All @@ -400,12 +406,13 @@ func downsampleAggr(chks []*AggrChunk, buf *[]sample, mint, maxt, inRes, outRes
part := chks[:j]
chks = chks[j:]

chk, err := downsampleAggrBatch(part, buf, outRes)
chk, err := downsampleAggrBatch(part, buf, resolution)
if err != nil {
return nil, err
}
res = append(res, chk)
}

return res, nil
}

Expand Down Expand Up @@ -512,6 +519,9 @@ func downsampleAggrBatch(chks []*AggrChunk, buf *[]sample, resolution int64) (ch
ab.chunks[AggrCounter] = chunkenc.NewXORChunk()
ab.apps[AggrCounter], _ = ab.chunks[AggrCounter].Appender()

// Retain first raw value; see CounterSeriesIterator.
ab.apps[AggrCounter].Append((*buf)[0].t, (*buf)[0].v)

lastT := downsampleBatch(*buf, resolution, func(t int64, a *aggregator) {
if t < mint {
mint = t
Expand All @@ -520,6 +530,8 @@ func downsampleAggrBatch(chks []*AggrChunk, buf *[]sample, resolution int64) (ch
}
ab.apps[AggrCounter].Append(t, a.counter)
})

// Retain last raw value; see CounterSeriesIterator.
ab.apps[AggrCounter].Append(lastT, it.lastV)

ab.mint = mint
Expand All @@ -532,11 +544,18 @@ type sample struct {
v float64
}

// CounterSeriesIterator iterates over an ordered sequence of chunks and treats decreasing
// values as counter reset.
// Additionally, it can deal with downsampled counter chunks, which set the last value of a chunk
// to the original last value. The last value can be detected by checking whether the timestamp
// did not increase w.r.t to the previous sample.
// CounterSeriesIterator generates monotonically increasing values by iterating
// over an ordered sequence of chunks, which should be raw or aggregated chunks
// of counter values. The generated samples can be used by PromQL functions
// like 'rate' that calculate differences between counter values.
//
// Counter aggregation chunks must have the first and last values from their
// original raw series: the first raw value should be the first value encoded
// in the chunk, and the last raw value is encoded by the duplication of the
// previous sample's timestamp. As iteration occurs between chunks, the
// comparison between the last raw value of the earlier chunk and the first raw
// value of the later chunk ensures that counter resets between chunks are
// recognized and that the correct value delta is calculated.
type CounterSeriesIterator struct {
chks []chunkenc.Iterator
i int // Current chunk.
Expand Down
122 changes: 120 additions & 2 deletions pkg/compact/downsample/downsample_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,124 @@ import (
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestDownsampleCounterBoundaryReset(t *testing.T) {

toAggrChunks := func(t *testing.T, cm []chunks.Meta) (res []*AggrChunk) {
for i := range cm {
achk, ok := cm[i].Chunk.(*AggrChunk)
testutil.Assert(t, ok, "expected *AggrChunk")
res = append(res, achk)
}
return
}

counterSamples := func(t *testing.T, achks []*AggrChunk) (res []sample) {
for _, achk := range achks {
chk, err := achk.Get(AggrCounter)
testutil.Ok(t, err)

iter := chk.Iterator(nil)
for iter.Next() {
t, v := iter.At()
res = append(res, sample{t, v})
}
}
return
}

counterIterate := func(t *testing.T, achks []*AggrChunk) (res []sample) {
var iters []chunkenc.Iterator
for _, achk := range achks {
chk, err := achk.Get(AggrCounter)
testutil.Ok(t, err)
iters = append(iters, chk.Iterator(nil))
}

citer := NewCounterSeriesIterator(iters...)
for citer.Next() {
t, v := citer.At()
res = append(res, sample{t: t, v: v})
}
return
}

type test struct {
raw []sample
rawAggrResolution int64
expectedRawAggrChunks int
rawCounterSamples []sample
rawCounterIterate []sample
aggrAggrResolution int64
aggrChunks int
aggrCounterSamples []sample
aggrCounterIterate []sample
}

tests := []test{
{
// In this test case, counter resets occur at the
// boundaries between the t=49,t=99 and t=99,t=149
// windows, and the values in the t=49, t=99, and
// t=149 windows are high enough that the resets
// will only be accounted for if the first raw value
// of a chunk is maintained during aggregation.
// See #1568 for more details.
raw: []sample{
{t: 10, v: 1}, {t: 20, v: 3}, {t: 30, v: 5},
{t: 50, v: 1}, {t: 60, v: 8}, {t: 70, v: 10},
{t: 120, v: 1}, {t: 130, v: 18}, {t: 140, v: 20},
{t: 160, v: 21}, {t: 170, v: 38}, {t: 180, v: 40},
},
rawAggrResolution: 50,
expectedRawAggrChunks: 4,
rawCounterSamples: []sample{
{t: 10, v: 1}, {t: 30, v: 5}, {t: 30, v: 5},
{t: 50, v: 1}, {t: 70, v: 10}, {t: 70, v: 10},
{t: 120, v: 1}, {t: 140, v: 20}, {t: 140, v: 20},
{t: 160, v: 21}, {t: 180, v: 40}, {t: 180, v: 40},
},
rawCounterIterate: []sample{
{t: 10, v: 1}, {t: 30, v: 5},
{t: 50, v: 6}, {t: 70, v: 15},
{t: 120, v: 16}, {t: 140, v: 35},
{t: 160, v: 36}, {t: 180, v: 55},
},
aggrAggrResolution: 2 * 50,
aggrChunks: 2,
aggrCounterSamples: []sample{
{t: 10, v: 1}, {t: 70, v: 15}, {t: 70, v: 10},
{t: 120, v: 1}, {t: 180, v: 40}, {t: 180, v: 40},
},
aggrCounterIterate: []sample{
{t: 10, v: 1}, {t: 70, v: 15},
{t: 120, v: 16}, {t: 180, v: 55},
},
},
}

doTest := func(t *testing.T, test *test) {
// Asking for more chunks than raw samples ensures that downsampleRawLoop
// will create chunks with samples from a single window.
cm := downsampleRawLoop(test.raw, test.rawAggrResolution, len(test.raw)+1)
testutil.Equals(t, test.expectedRawAggrChunks, len(cm))

rawAggrChunks := toAggrChunks(t, cm)
testutil.Equals(t, test.rawCounterSamples, counterSamples(t, rawAggrChunks))
testutil.Equals(t, test.rawCounterIterate, counterIterate(t, rawAggrChunks))

var buf []sample
acm, err := downsampleAggrLoop(rawAggrChunks, &buf, test.aggrAggrResolution, test.aggrChunks)
testutil.Ok(t, err)
testutil.Equals(t, test.aggrChunks, len(acm))

aggrAggrChunks := toAggrChunks(t, acm)
testutil.Equals(t, test.aggrCounterSamples, counterSamples(t, aggrAggrChunks))
testutil.Equals(t, test.aggrCounterIterate, counterIterate(t, aggrAggrChunks))
}

doTest(t, &tests[0])
}

func TestExpandChunkIterator(t *testing.T) {
// Validate that expanding the chunk iterator filters out-of-order samples
// and staleness markers.
Expand Down Expand Up @@ -56,7 +174,7 @@ func TestDownsampleRaw(t *testing.T) {
AggrSum: {{99, 7}, {199, 17}, {250, 1}},
AggrMin: {{99, 1}, {199, 2}, {250, 1}},
AggrMax: {{99, 3}, {199, 10}, {250, 1}},
AggrCounter: {{99, 4}, {199, 13}, {250, 14}, {250, 1}},
AggrCounter: {{20, 1}, {99, 4}, {199, 13}, {250, 14}, {250, 1}},
},
},
}
Expand Down Expand Up @@ -93,7 +211,7 @@ func TestDownsampleAggr(t *testing.T) {
AggrSum: {{499, 29}, {999, 100}},
AggrMin: {{499, -3}, {999, 0}},
AggrMax: {{499, 10}, {999, 100}},
AggrCounter: {{499, 210}, {999, 320}, {1299, 430}, {1299, 110}},
AggrCounter: {{99, 100}, {499, 210}, {999, 320}, {1299, 430}, {1299, 110}},
},
},
}
Expand Down

0 comments on commit 3debaeb

Please sign in to comment.