Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <yb532204897@gmail.com>
  • Loading branch information
yeya24 committed Nov 12, 2020
1 parent a5c2470 commit 4a5af18
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 18 deletions.
10 changes: 6 additions & 4 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,7 @@ func registerBucketReplicate(app extkingpin.AppClause, objStoreConfig *extflag.P
Default("0000-01-01T00:00:00Z"))
maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to replicate. Thanos Replicate will replicate only metrics, which happened earlier than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("9999-12-31T23:59:59Z"))
ids := cmd.Flag("id", "Block to be replicated to the destination bucket. If this is specified, then only "+
"IDs will be used to match blocks and other matchers will be ignored. It only runs once. Repeated field").Strings()
ids := cmd.Flag("id", "Block to be replicated to the destination bucket. IDs will be used to match blocks and other matchers will be ignored. When specified, this command will be run only once after successful replication. Repeated field").Strings()

cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
matchers, err := replicate.ParseFlagMatchers(*matcherStrs)
Expand All @@ -461,9 +460,12 @@ func registerBucketReplicate(app extkingpin.AppClause, objStoreConfig *extflag.P
resolutionLevels = append(resolutionLevels, compact.ResolutionLevel(lvl.Milliseconds()))
}

blockIDs := make([]ulid.ULID, 0, len(*ids))
for _, id := range *ids {
if _, err := ulid.Parse(id); err != nil {
if bid, err := ulid.Parse(id); err != nil {
return errors.Wrap(err, "invalid ULID found in --id flag")
} else {
blockIDs = append(blockIDs, bid)
}
}

Expand All @@ -482,7 +484,7 @@ func registerBucketReplicate(app extkingpin.AppClause, objStoreConfig *extflag.P
*singleRun,
minTime,
maxTime,
*ids,
blockIDs,
)
})
}
Expand Down
33 changes: 33 additions & 0 deletions pkg/compactv2/modifiers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package compactv2

import (
"github.com/prometheus/prometheus/tsdb/tombstones"
"reflect"
"testing"
)

func TestIntersection(t *testing.T) {
type args struct {
i tombstones.Interval
dranges tombstones.Intervals
}
tests := []struct {
name string
args args
want tombstones.Intervals
}{
{
name: "test",
args: args{i: tombstones.Interval{Maxt: 10, Mint: 0},
dranges: tombstones.Intervals{{Maxt: 12, Mint: 5}}},
want: tombstones.Intervals{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := Intersection(tt.args.i, tt.args.dranges); !reflect.DeepEqual(got, tt.want) {
t.Errorf("Intersection() = %v, want %v", got, tt.want)
}
})
}
}
2 changes: 1 addition & 1 deletion pkg/replicate/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func RunReplicate(
toObjStoreConfig *extflag.PathOrContent,
singleRun bool,
minTime, maxTime *thanosmodel.TimeOrDurationValue,
blockIDs []string,
blockIDs []ulid.ULID,
) error {
logger = log.With(logger, "component", "replicate")

Expand Down
19 changes: 8 additions & 11 deletions pkg/replicate/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type BlockFilter struct {
labelSelector labels.Selector
resolutionLevels map[compact.ResolutionLevel]struct{}
compactionLevels map[int]struct{}
blockIDs map[string]struct{}
blockIDs []ulid.ULID
}

// NewBlockFilter returns block filter.
Expand All @@ -41,7 +41,7 @@ func NewBlockFilter(
labelSelector labels.Selector,
resolutionLevels []compact.ResolutionLevel,
compactionLevels []int,
blockIDs []string,
blockIDs []ulid.ULID,
) *BlockFilter {
allowedResolutions := make(map[compact.ResolutionLevel]struct{})
for _, resolutionLevel := range resolutionLevels {
Expand All @@ -51,16 +51,13 @@ func NewBlockFilter(
for _, compactionLevel := range compactionLevels {
allowedCompactions[compactionLevel] = struct{}{}
}
blockSet := make(map[string]struct{})
for _, id := range blockIDs {
blockSet[id] = struct{}{}
}

return &BlockFilter{
labelSelector: labelSelector,
logger: logger,
resolutionLevels: allowedResolutions,
compactionLevels: allowedCompactions,
blockIDs: blockSet,
blockIDs: blockIDs,
}
}

Expand All @@ -73,11 +70,11 @@ func (bf *BlockFilter) Filter(b *metadata.Meta) bool {

// If required block IDs are set, we only match required blocks and ignore others.
if len(bf.blockIDs) > 0 {
id := b.ULID.String()
if _, ok := bf.blockIDs[id]; ok {
return true
for _, id := range bf.blockIDs {
if b.ULID == id {
return true
}
}
level.Debug(bf.logger).Log("msg", "filtering block", "reason", "block ID doesn't match", "block_uuid", id)
return false
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/replicate/scheme_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestReplicationSchemeAll(t *testing.T) {
var cases = []struct {
name string
selector labels.Selector
blockIDs []string
blockIDs []ulid.ULID
prepare func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket)
assert func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket)
}{
Expand Down Expand Up @@ -295,7 +295,7 @@ func TestReplicationSchemeAll(t *testing.T) {
},
{
name: "BlockIDs",
blockIDs: []string{testBlockID.String()},
blockIDs: []ulid.ULID{testBlockID},
prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
meta := testMeta(testBlockID)

Expand Down

0 comments on commit 4a5af18

Please sign in to comment.