Skip to content

Commit

Permalink
support downsample block
Browse files Browse the repository at this point in the history
  • Loading branch information
shuaizhang committed Jul 15, 2019
1 parent 943930a commit 87e94eb
Show file tree
Hide file tree
Showing 7 changed files with 377 additions and 107 deletions.
9 changes: 8 additions & 1 deletion pkg/compact/dedup/dedup.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,14 @@ func (d *BucketDeduper) Dedup(ctx context.Context) error {
for k, v := range groups {
level.Info(d.logger).Log("msg", "starting to dedup replicas", "group", k)
d.metrics.deduplication.WithLabelValues(d.bkt.Name()).Inc()
if err := d.merger.Merge(ctx, v); err != nil {
if len(v) == 0 {
continue
}
resolution, err := v[0].Resolution()
if err != nil {
return errors.Wrapf(err, "merge replicas: %s", k)
}
if err := d.merger.Merge(ctx, resolution, v); err != nil {
d.metrics.deduplicationFailures.WithLabelValues(d.bkt.Name(), k).Inc()
return errors.Wrapf(err, "merge replicas: %s", k)
}
Expand Down
66 changes: 43 additions & 23 deletions pkg/compact/dedup/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/improbable-eng/thanos/pkg/query"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/labels"
)
Expand Down Expand Up @@ -144,14 +143,15 @@ func NewReplicaMerger(logger log.Logger, metrics *DedupMetrics, bkt objstore.Buc
}
}

func (rm *ReplicaMerger) Merge(ctx context.Context, replicas Replicas) error {
// Do the merge process on the replicas with same resolution
func (rm *ReplicaMerger) Merge(ctx context.Context, resolution int64, replicas Replicas) error {
groups := rm.plan(ctx, replicas)

for _, group := range groups {
if err := rm.prepare(ctx, group); err != nil {
return errors.Wrapf(err, "prepare phase of group: %s", group)
}
id, err := rm.merge(ctx, group)
id, err := rm.merge(ctx, resolution, group)
if err != nil {
return errors.Wrapf(err, "merge phase of group: %s", group)
}
Expand Down Expand Up @@ -228,7 +228,7 @@ func (rm *ReplicaMerger) download(ctx context.Context, b *metadata.Meta) error {
return nil
}

func (rm *ReplicaMerger) merge(ctx context.Context, group *BlockGroup) (*ulid.ULID, error) {
func (rm *ReplicaMerger) merge(ctx context.Context, resolution int64, group *BlockGroup) (*ulid.ULID, error) {
if len(group.blocks) == 0 {
return nil, nil
}
Expand All @@ -245,7 +245,7 @@ func (rm *ReplicaMerger) merge(ctx context.Context, group *BlockGroup) (*ulid.UL

for _, b := range group.blocks {
blockDir := filepath.Join(rm.dir, b.ULID.String())
reader, err := NewBlockReader(rm.logger, blockDir)
reader, err := NewBlockReader(rm.logger, resolution, blockDir)
if err != nil {
if err := reader.Close(); err != nil {
level.Warn(rm.logger).Log("msg", "failed to close block reader", "err", err)
Expand All @@ -259,7 +259,7 @@ func (rm *ReplicaMerger) merge(ctx context.Context, group *BlockGroup) (*ulid.UL
newMeta := rm.newMeta(baseBlock, newId, group.window)
blockDir := filepath.Join(rm.dir, newMeta.ULID.String())

if err := rm.write(readers, blockDir, newMeta, group.window); err != nil {
if err := rm.write(readers, blockDir, newMeta, group.window, resolution); err != nil {
return nil, err
}
return &newId, nil
Expand Down Expand Up @@ -287,7 +287,7 @@ func (rm *ReplicaMerger) newMeta(baseMeta *metadata.Meta, newId ulid.ULID, tw *T
return &newMeta
}

func (rm *ReplicaMerger) write(readers []*BlockReader, blockDir string, meta *metadata.Meta, tw *TimeWindow) error {
func (rm *ReplicaMerger) write(readers []*BlockReader, blockDir string, meta *metadata.Meta, tw *TimeWindow, resolution int64) error {
symbols, err := rm.getMergedSymbols(readers)
if err != nil {
return err
Expand Down Expand Up @@ -317,11 +317,11 @@ func (rm *ReplicaMerger) write(readers []*BlockReader, blockDir string, meta *me
if err := reader.ir.Series(reader.postings.At(), &lset, &chks); err != nil {
return err
}
buf[i] = NewSampleReader(reader.cr, lset, chks)
buf[i] = NewSampleReader(rm.logger, reader.cr, lset, chks, resolution)
running = true
}

cs, err := rm.getMergedChunkSeries(buf, tw)
cs, err := rm.getMergedChunkSeries(buf, tw, resolution)
if err != nil {
return err
}
Expand Down Expand Up @@ -366,7 +366,7 @@ func (rm *ReplicaMerger) getMergedSymbols(readers []*BlockReader) (map[string]st
return result, nil
}

func (rm *ReplicaMerger) getMergedChunkSeries(readers []*SampleReader, tw *TimeWindow) (*ChunkSeries, error) {
func (rm *ReplicaMerger) getMergedChunkSeries(readers []*SampleReader, tw *TimeWindow, resolution int64) (*ChunkSeries, error) {
buf := make([]*SampleReader, len(readers))
copy(buf, readers)

Expand All @@ -385,38 +385,58 @@ func (rm *ReplicaMerger) getMergedChunkSeries(readers []*SampleReader, tw *TimeW
}

lset := buf[0].lset
samples, err := buf[0].Read(tw)
d0, err := buf[0].Read(tw)
if err != nil {
return nil, err
}
it := query.NewDedupSeriesIterator(NewSampleIterator(nil), NewSampleIterator(samples))
mergedData := d0
for i := 1; i < len(buf); i++ {
if buf[i] == nil {
break
}
if labels.Compare(buf[i].lset, lset) != 0 {
break
}
ss, err := buf[i].Read(tw)
di, err := buf[i].Read(tw)
if err != nil {
return nil, err
}
if len(ss) == 0 {
if len(di) == 0 {
continue
}
it = query.NewDedupSeriesIterator(it, NewSampleIterator(ss))
mergedData = rm.mergeSamples(mergedData, di, resolution)
}

return NewSampleSeries(lset, rm.getMergedSamples(it)).ToChunkSeries()
return NewSampleSeries(lset, mergedData, resolution).ToChunkSeries()
}

func (rm *ReplicaMerger) getMergedSamples(it storage.SeriesIterator) []*Sample {
samples := make([]*Sample, 0)
for it.Next() {
t, v := it.At()
samples = append(samples, NewSample(t, v))
func (rm *ReplicaMerger) mergeSamples(a, b map[SampleType][]*Sample, res int64) map[SampleType][]*Sample {
result := make(map[SampleType][]*Sample)
if len(a) == 0 {
return b
}
return samples
if len(b) == 0 {
return a
}
merge := func(st SampleType) []*Sample {
it := query.NewDedupSeriesIterator(NewSampleIterator(a[st]), NewSampleIterator(b[st]))
samples := make([]*Sample, 0)
for it.Next() {
t, v := it.At()
samples = append(samples, NewSample(t, v))
}
return samples
}
if res == 0 {
result[RawSample] = merge(RawSample)
return result
}
result[CountSample] = merge(CountSample)
result[SumSample] = merge(SumSample)
result[MinSample] = merge(MinSample)
result[MaxSample] = merge(MaxSample)
result[CounterSample] = merge(CounterSample)
return result
}

func (rm *ReplicaMerger) upload(ctx context.Context, group *BlockGroup, newId *ulid.ULID) error {
Expand Down Expand Up @@ -483,4 +503,4 @@ func (rm *ReplicaMerger) deleteLocalBlock(id *ulid.ULID) error {
}
level.Debug(rm.logger).Log("msg", "deleted local block", "block", blockDir)
return nil
}
}
2 changes: 1 addition & 1 deletion pkg/compact/dedup/merger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,6 @@ func TestReplicaMerger_Merge(t *testing.T) {

merger := NewReplicaMerger(logger, metrics, bkt, dataDir, "replica")

err = merger.Merge(ctx, replicas)
err = merger.Merge(ctx, 0, replicas)
testutil.Ok(t, err)
}
Loading

0 comments on commit 87e94eb

Please sign in to comment.