Skip to content

Commit

Permalink
Added deletion modifier + tests.
Browse files Browse the repository at this point in the history
Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Nov 12, 2020
1 parent f4bccbb commit 5590442
Show file tree
Hide file tree
Showing 23 changed files with 1,257 additions and 456 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#3277](https://github.com/thanos-io/thanos/pull/3277) Thanos Query: Introduce dynamic lookback interval. This allows queries with large step to make use of downsampled data.
- [#3409](https://github.com/thanos-io/thanos/pull/3409) Compactor: Added support for no-compact-mark.json which excludes the block from compaction.
- [#3245](https://github.com/thanos-io/thanos/pull/3245) Query Frontend: Add `query-frontend.org-id-header` flag to specify HTTP header(s) to populate slow query log (e.g. X-Grafana-User).
- [#3415](https://github.com/thanos-io/thanos/pull/3415) Tools: Added `thanos tools bucket mark` command that allows to mark given block for deletion or for no-compact
- [#3421](https://github.com/thanos-io/thanos/pull/3421) Tools: Added `thanos tools bucket rewrite` command allowing to delete series from given block.

### Fixed

Expand Down
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,6 @@ github.com/prometheus/client_golang/prometheus.{DefaultGatherer,DefBuckets,NewUn
github.com/prometheus/client_golang/prometheus.{NewCounter,NewCounterVec,NewCounterVec,NewGauge,NewGaugeVec,NewGaugeFunc,\
NewHistorgram,NewHistogramVec,NewSummary,NewSummaryVec}=github.com/prometheus/client_golang/prometheus/promauto.{NewCounter,\
NewCounterVec,NewCounterVec,NewGauge,NewGaugeVec,NewGaugeFunc,NewHistorgram,NewHistogramVec,NewSummary,NewSummaryVec},\
github.com/prometheus/prometheus/tsdb/errors=github.com/thanos-io/thanos/pkg/errutil,\
sync/atomic=go.uber.org/atomic" ./...
@$(FAILLINT) -paths "fmt.{Print,Println,Sprint}" -ignore-tests ./...
@echo ">> linting all of the Go files GOGC=${GOGC}"
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func runCompact(
return errors.Wrap(err, "get content of relabel configuration")
}

relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml)
relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml, block.SelectorSupportedRelabelActions)
if err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ func main() {
}

if len(confContentYaml) == 0 {
level.Info(logger).Log("msg", "Tracing will be disabled")
tracer = client.NoopTracer()
} else {
tracer, closer, err = client.NewTracer(ctx, logger, metrics, confContentYaml)
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func runStore(
return errors.Wrap(err, "get content of relabel configuration")
}

relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml)
relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml, block.SelectorSupportedRelabelActions)
if err != nil {
return err
}
Expand Down
129 changes: 111 additions & 18 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ package main

import (
"context"
"crypto/rand"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
Expand All @@ -25,11 +28,14 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
v1 "github.com/thanos-io/thanos/pkg/api/blocks"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/compactv2"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/extkingpin"
Expand All @@ -47,6 +53,7 @@ import (
"github.com/thanos-io/thanos/pkg/verifier"
"golang.org/x/text/language"
"golang.org/x/text/message"
"gopkg.in/yaml.v3"
)

const extpromPrefix = "thanos_bucket_"
Expand Down Expand Up @@ -509,7 +516,7 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat
return errors.Wrap(err, "get content of relabel configuration")
}

relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml)
relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml, block.SelectorSupportedRelabelActions)
if err != nil {
return err
}
Expand Down Expand Up @@ -765,15 +772,20 @@ func registerBucketMarkBlock(app extkingpin.AppClause, objStoreConfig *extflag.P
}

func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) {
cmd := app.Command(component.Rewrite.String(), "Rewrite chosen blocks in the bucket, while deleting or modifying series. Once rewritten, the old block is marked for deletion."+
cmd := app.Command(component.Rewrite.String(), "Rewrite chosen blocks in the bucket, while deleting or modifying series"+
"Resulted block has modified stats in meta.json. Additionally compaction.sources are altered to not confuse readers of meta.json."+
"Instead thanos.rewrite section is added with useful info like old sources and deletion requests. "+
"NOTE: It's recommended to turn off compactor while doing this operation. If the compactor is running and touching exactly same block that"+
"is being rewritten, the resulted rewritten block might only cause overlap (mitigated by marking overlapping block manually for deletion)"+
"and the data you wanted to rewrite could already part of bigger block.\n\n"+
"Use FILESYSTEM type of bucket to rewrite block on disk (suitable for vanilla Prometheus)"+
"WARNING: This procedure is *IRREVERSIBLE* after certain time (delete delay), so do backup your blocks first (you can use objstore.config-backup flags for this command)")
"After rewrite, it's caller responsibility to delete or mark source block for deletion to avoid overlaps."+
"WARNING: This procedure is *IRREVERSIBLE* after certain time (delete delay), so do backup your blocks first.")
blockIDs := cmd.Flag("id", "ID (ULID) of the blocks for rewrite (repeated flag).").Required().Strings()
objStoreBackupConfig := extkingpin.RegisterCommonObjStoreFlags(cmd, "-backup", false, "Used for backup-ing block before rewrite if you choose so (only use in non-dry run mode).")
tmpDir := cmd.Flag("tmp.dir", "Working directory for temporary files").Default(filepath.Join(os.TempDir(), "thanos-rewrite")).String()
dryRun := cmd.Flag("dry-run", "Prints the series changes instead of doing them. Defaults to true, for user to double check. (: Pass --no-dry-run to skip this.").Default("true").Bool()
toDelete := extflag.RegisterPathOrContent(cmd, "rewrite.to-delete-config", "YAML file that contains []metadata.DeletionRequest that will be applied to blocks", true)
provideChangeLog := cmd.Flag("rewrite.add-change-log", "If specified, all modifications are written to new block directory. Disable if latency is to high.").Default("true").Bool()
cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
Expand All @@ -785,6 +797,16 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
return err
}

deletionsYaml, err := toDelete.Content()
if err != nil {
return err
}

var deletions []metadata.DeletionRequest
if err := yaml.Unmarshal(deletionsYaml, &deletions); err != nil {
return err
}

var ids []ulid.ULID
for _, id := range *blockIDs {
u, err := ulid.Parse(id)
Expand All @@ -794,25 +816,96 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
ids = append(ids, u)
}

var backupBkt objstore.InstrumentedBucket
if !*dryRun {
confContentYaml, err := objStoreBackupConfig.Content()
if err != nil {
return err
}

backupBkt, err = client.NewBucket(logger, confContentYaml, reg, component.Cleanup.String())
if err != nil {
return err
}
if err := os.RemoveAll(*tmpDir); err != nil {
return err
}
if err := os.MkdirAll(*tmpDir, os.ModePerm); err != nil {
return err
}

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
chunkPool := chunkenc.NewPool()
changeLog := compactv2.NewChangeLog(ioutil.Discard)
for _, id := range ids {
// Delete series from block & repair.
// Delete series from block & modify.
level.Info(logger).Log("msg", "downloading block", "source", id)
if err := block.Download(ctx, logger, bkt, id, filepath.Join(*tmpDir, id.String())); err != nil {
return errors.Wrapf(err, "download %v", id)
}

meta, err := metadata.Read(filepath.Join(*tmpDir, id.String()))
if err != nil {
return errors.Wrapf(err, "read meta of %v", id)
}
b, err := tsdb.OpenBlock(logger, filepath.Join(*tmpDir, id.String()), chunkPool)
if err != nil {
return errors.Wrapf(err, "open block %v", id)
}

p := compactv2.NewProgressLogger(logger, int(b.Meta().Stats.NumSeries))
newID := ulid.MustNew(ulid.Now(), rand.Reader)
meta.ULID = newID
meta.Thanos.Rewrites = append(meta.Thanos.Rewrites, metadata.Rewrite{
Sources: meta.Compaction.Sources,
DeletionsApplied: deletions,
})
meta.Compaction.Sources = []ulid.ULID{newID}
meta.Thanos.Source = metadata.BucketRewriteSource

if err := os.MkdirAll(filepath.Join(*tmpDir, newID.String()), os.ModePerm); err != nil {
return err
}

if *provideChangeLog {
f, err := os.OpenFile(filepath.Join(*tmpDir, newID.String(), "change.log"), os.O_CREATE|os.O_WRONLY, os.ModePerm)
if err != nil {
return err
}
defer runutil.CloseWithLogOnErr(logger, f, "close changelog")

changeLog = compactv2.NewChangeLog(f)
level.Info(logger).Log("msg", "changelog will be available", "file", filepath.Join(*tmpDir, newID.String(), "change.log"))
}

d, err := block.NewDiskWriter(ctx, logger, filepath.Join(*tmpDir, newID.String()))
if err != nil {
return err
}

var comp *compactv2.Compactor
if *dryRun {
comp = compactv2.NewDryRun(*tmpDir, logger, changeLog, chunkPool)
} else {
comp = compactv2.New(*tmpDir, logger, changeLog, chunkPool)
}

level.Info(logger).Log("msg", "starting rewrite for block", "source", id, "new", newID, "toDelete", string(deletionsYaml))
if err := comp.WriteSeries(ctx, []block.Reader{b}, d, p, compactv2.WithDeletionModifier(deletions...)); err != nil {
return errors.Wrapf(err, "writing series from %v to %v", id, newID)
}

if *dryRun {
level.Info(logger).Log("msg", "dry run finished. Changes should be printed to stderr")
return nil
}

level.Info(logger).Log("msg", "wrote new block after modifications; flushing", "source", id, "new", newID)
meta.Stats, err = d.Flush()
if err != nil {
return errors.Wrap(err, "flush")
}
if err := meta.WriteToDir(logger, filepath.Join(*tmpDir, newID.String())); err != nil {
return err
}

level.Info(logger).Log("msg", "uploading new block", "source", id, "new", newID)
if err := block.Upload(ctx, logger, bkt, filepath.Join(*tmpDir, newID.String())); err != nil {
return errors.Wrap(err, "upload")
}
level.Info(logger).Log("msg", "uploaded", "source", id, "new", newID)
}
level.Info(logger).Log("msg", "marking for deletion done", "IDs", strings.Join(*blockIDs, ","))
level.Info(logger).Log("msg", "rewrite done", "IDs", strings.Join(*blockIDs, ","))
return nil
}, func(err error) {
cancel()
Expand Down
126 changes: 126 additions & 0 deletions docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,23 @@ Subcommands:
is currently running compacting same block, this operation would be
potentially a noop.
tools bucket rewrite --id=ID [<flags>]
Rewrite chosen blocks in the bucket, while deleting or modifying
seriesResulted block has modified stats in meta.json. Additionally
compaction.sources are altered to not confuse readers of meta.json.Instead
thanos.rewrite section is added with useful info like old sources and
deletion requestsNOTE: It's recommended to turn off compactor while doing
this operation. If the compactor is running and touching exactly same block
thatis being rewritten, the resulted rewritten block might only cause
overlap (mitigated by marking overlapping block manually for deletion)and
the data you wanted to rewrite could already part of bigger block.
Use FILESYSTEM type of bucket to rewrite block on disk (suitable for vanilla
Prometheus)After rewrite, it's caller responsibility to delete or mark
source block for deletion to avoid overlaps.WARNING: This procedure is
*IRREVERSIBLE* after certain time (delete delay), so do backup your blocks
first.
tools rules-check --rules=RULES
Check if the rule files are valid or not.
Expand Down Expand Up @@ -154,6 +171,23 @@ Subcommands:
is currently running compacting same block, this operation would be
potentially a noop.
tools bucket rewrite --id=ID [<flags>]
Rewrite chosen blocks in the bucket, while deleting or modifying
seriesResulted block has modified stats in meta.json. Additionally
compaction.sources are altered to not confuse readers of meta.json.Instead
thanos.rewrite section is added with useful info like old sources and
deletion requestsNOTE: It's recommended to turn off compactor while doing
this operation. If the compactor is running and touching exactly same block
thatis being rewritten, the resulted rewritten block might only cause
overlap (mitigated by marking overlapping block manually for deletion)and
the data you wanted to rewrite could already part of bigger block.
Use FILESYSTEM type of bucket to rewrite block on disk (suitable for vanilla
Prometheus)After rewrite, it's caller responsibility to delete or mark
source block for deletion to avoid overlaps.WARNING: This procedure is
*IRREVERSIBLE* after certain time (delete delay), so do backup your blocks
first.
```

Expand Down Expand Up @@ -619,6 +653,98 @@ Flags:
```

### Bucket Rewrite

`tools bucket rewrite` reewrites chosen blocks in the bucket, while deleting or modifying series.

For example we can remove all non counters from the block you have on your disk (e.g in Prometheus dir):

```bash
thanos tools bucket rewrite --no-dry-run \
--id 01DN3SK96XDAEKRB1AN30AAW6E \
--objstore.config "
type: FILESYSTEM
config:
directory: <local dir>
" \
--rewrite.to-delete-config "
- matchers: \"{__name__!~\\\".*total\\\"}\"
"
```

By default, rewrite also produces `change.log` in the tmp local dir. Look for log message like:

```
ts=2020-11-09T00:40:13.703322181Z caller=level.go:63 level=info msg="changelog will be available" file=/tmp/thanos-rewrite/01EPN74E401ZD2SQXS4SRY6DZX/change.log`
```

[embedmd]:# (flags/tools_bucket_rewrite.txt $)
```$
usage: thanos tools bucket rewrite --id=ID [<flags>]
Rewrite chosen blocks in the bucket, while deleting or modifying seriesResulted
block has modified stats in meta.json. Additionally compaction.sources are
altered to not confuse readers of meta.json.Instead thanos.rewrite section is
added with useful info like old sources and deletion requestsNOTE: It's
recommended to turn off compactor while doing this operation. If the compactor
is running and touching exactly same block thatis being rewritten, the resulted
rewritten block might only cause overlap (mitigated by marking overlapping block
manually for deletion)and the data you wanted to rewrite could already part of
bigger block.
Use FILESYSTEM type of bucket to rewrite block on disk (suitable for vanilla
Prometheus)After rewrite, it's caller responsibility to delete or mark source
block for deletion to avoid overlaps.WARNING: This procedure is *IRREVERSIBLE*
after certain time (delete delay), so do backup your blocks first.
Flags:
-h, --help Show context-sensitive help (also try
--help-long and --help-man).
--version Show application version.
--log.level=info Log filtering level.
--log.format=logfmt Log format to use. Possible options: logfmt or
json.
--tracing.config-file=<file-path>
Path to YAML file with tracing configuration.
See format details:
https://thanos.io/tip/thanos/tracing.md/#configuration
--tracing.config=<content>
Alternative to 'tracing.config-file' flag (lower
priority). Content of YAML file with tracing
configuration. See format details:
https://thanos.io/tip/thanos/tracing.md/#configuration
--objstore.config-file=<file-path>
Path to YAML file that contains object store
configuration. See format details:
https://thanos.io/tip/thanos/storage.md/#configuration
--objstore.config=<content>
Alternative to 'objstore.config-file' flag
(lower priority). Content of YAML file that
contains object store configuration. See format
details:
https://thanos.io/tip/thanos/storage.md/#configuration
--id=ID ... ID (ULID) of the blocks for rewrite (repeated
flag).
--tmp.dir="/tmp/thanos-rewrite"
Working directory for temporary files
--dry-run Prints the series changes instead of doing them.
Defaults to true, for user to double check. (:
Pass --no-dry-run to skip this.
--rewrite.to-delete-config-file=<file-path>
Path to YAML file that contains
[]metadata.DeletionRequest that will be applied
to blocks
--rewrite.to-delete-config=<content>
Alternative to 'rewrite.to-delete-config-file'
flag (lower priority). Content of YAML file that
contains []metadata.DeletionRequest that will be
applied to blocks
--rewrite.add-change-log If specified, all modifications are written to
new block directory. Disable if latency is to
high.
```

## Rules-check

The `tools rules-check` subcommand contains tools for validation of Prometheus rules.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ replace (
// Update to v1.1.1 to make sure windows CI pass.
github.com/elastic/go-sysinfo => github.com/elastic/go-sysinfo v1.1.1
// Make sure Prometheus version is pinned as Prometheus semver does not include Go APIs.
github.com/prometheus/prometheus => ../prometheus
github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20201108220916-6ba28869528e
github.com/sercand/kuberesolver => github.com/sercand/kuberesolver v2.4.0+incompatible
google.golang.org/grpc => google.golang.org/grpc v1.29.1

Expand Down
Loading

0 comments on commit 5590442

Please sign in to comment.