Skip to content

Commit

Permalink
add a flag to not check external labels when uploading blocks using b…
Browse files Browse the repository at this point in the history
…ucket rewrite (thanos-io#3840)

Signed-off-by: yeya24 <yb532204897@gmail.com>

add changelog

Signed-off-by: yeya24 <yb532204897@gmail.com>

rename param to checkExternalLabels

Signed-off-by: yeya24 <yb532204897@gmail.com>
  • Loading branch information
yeya24 authored Mar 29, 2021
1 parent 7e9958d commit 5aa5e41
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 9 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re

- [#3977](https://github.com/thanos-io/thanos/pull/3903) Expose exemplars for `http_request_duration_seconds` histogram if tracing is enabled.
- [#3903](https://github.com/thanos-io/thanos/pull/3903) Store: Returning custom grpc code when reaching series/chunk limits.
- [3919](https://github.com/thanos-io/thanos/pull/3919) Allow to disable automatically setting CORS headers using `--web.disable-cors` flag in each component that exposes an API.
- [#3919](https://github.com/thanos-io/thanos/pull/3919) Allow to disable automatically setting CORS headers using `--web.disable-cors` flag in each component that exposes an API.
- [#3840](https://github.com/thanos-io/thanos/pull/3840) Tools: Added a flag to support rewrite Prometheus TSDB blocks.

### Fixed

Expand Down
11 changes: 9 additions & 2 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,7 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
dryRun := cmd.Flag("dry-run", "Prints the series changes instead of doing them. Defaults to true, for user to double check. (: Pass --no-dry-run to skip this.").Default("true").Bool()
toDelete := extflag.RegisterPathOrContent(cmd, "rewrite.to-delete-config", "YAML file that contains []metadata.DeletionRequest that will be applied to blocks", true)
provideChangeLog := cmd.Flag("rewrite.add-change-log", "If specified, all modifications are written to new block directory. Disable if latency is to high.").Default("true").Bool()
promBlocks := cmd.Flag("prom-blocks", "If specified, we assume the blocks to be uploaded are only used with Prometheus so we don't check external labels in this case.").Default("false").Bool()
cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
Expand Down Expand Up @@ -908,8 +909,14 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
}

level.Info(logger).Log("msg", "uploading new block", "source", id, "new", newID)
if err := block.Upload(ctx, logger, bkt, filepath.Join(*tmpDir, newID.String()), metadata.HashFunc(*hashFunc)); err != nil {
return errors.Wrap(err, "upload")
if *promBlocks {
if err := block.UploadPromBlock(ctx, logger, bkt, filepath.Join(*tmpDir, newID.String()), metadata.HashFunc(*hashFunc)); err != nil {
return errors.Wrap(err, "upload")
}
} else {
if err := block.Upload(ctx, logger, bkt, filepath.Join(*tmpDir, newID.String()), metadata.HashFunc(*hashFunc)); err != nil {
return errors.Wrap(err, "upload")
}
}
level.Info(logger).Log("msg", "uploaded", "source", id, "new", newID)
}
Expand Down
3 changes: 3 additions & 0 deletions docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,9 @@ Flags:
--rewrite.add-change-log If specified, all modifications are written to
new block directory. Disable if latency is to
high.
--prom-blocks If specified, we assume the blocks to be
uploaded are only used with Prometheus so we
don't check external labels in this case.
```

Expand Down
26 changes: 20 additions & 6 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,23 @@ func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id
return nil
}

// Upload uploads block from given block dir that ends with block id.
// Upload uploads a TSDB block to the object storage. It verifies basic
// features of Thanos block.
func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc) error {
return upload(ctx, logger, bkt, bdir, hf, true)
}

// UploadPromBlock uploads a TSDB block to the object storage. It assumes
// the block is used in Prometheus so it doesn't check Thanos external labels.
func UploadPromBlock(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc) error {
return upload(ctx, logger, bkt, bdir, hf, false)
}

// upload uploads block from given block dir that ends with block id.
// It makes sure cleanup is done on error to avoid partial block uploads.
// It also verifies basic features of Thanos block.
// TODO(bplotka): Ensure bucket operations have reasonable backoff retries.
// NOTE: Upload updates `meta.Thanos.File` section.
func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc) error {
func upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, checkExternalLabels bool) error {
df, err := os.Stat(bdir)
if err != nil {
return err
Expand All @@ -119,20 +130,23 @@ func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st
return errors.Wrap(err, "read meta")
}

if meta.Thanos.Labels == nil || len(meta.Thanos.Labels) == 0 {
return errors.New("empty external labels are not allowed for Thanos block.")
if checkExternalLabels {
if meta.Thanos.Labels == nil || len(meta.Thanos.Labels) == 0 {
return errors.New("empty external labels are not allowed for Thanos block.")
}
}

metaEncoded := strings.Builder{}
meta.Thanos.Files, err = gatherFileStats(bdir, hf, logger)
if err != nil {
return errors.Wrap(err, "gather meta file stats")
}

metaEncoded := strings.Builder{}
if err := meta.Write(&metaEncoded); err != nil {
return errors.Wrap(err, "encode meta file")
}

// TODO(yeya24): Remove this step.
if err := bkt.Upload(ctx, path.Join(DebugMetas, fmt.Sprintf("%s.json", id)), strings.NewReader(metaEncoded.String())); err != nil {
return cleanUp(logger, bkt, id, errors.Wrap(err, "upload debug meta file"))
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,23 @@ func TestUpload(t *testing.T) {
testutil.Equals(t, "empty external labels are not allowed for Thanos block.", err.Error())
testutil.Equals(t, 4, len(bkt.Objects()))
}
{
// No external labels with UploadPromBlocks.
b2, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
{{Name: "a", Value: "2"}},
{{Name: "a", Value: "3"}},
{{Name: "a", Value: "4"}},
{{Name: "b", Value: "1"}},
}, 100, 0, 1000, nil, 124, metadata.NoneFunc)
testutil.Ok(t, err)
err = UploadPromBlock(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, b2.String()), metadata.NoneFunc)
testutil.Ok(t, err)
testutil.Equals(t, 8, len(bkt.Objects()))
testutil.Equals(t, 3736, len(bkt.Objects()[path.Join(b2.String(), ChunksDirname, "000001")]))
testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b2.String(), IndexFilename)]))
testutil.Equals(t, 525, len(bkt.Objects()[path.Join(b2.String(), MetaFilename)]))
}
}

func TestDelete(t *testing.T) {
Expand Down

0 comments on commit 5aa5e41

Please sign in to comment.