Skip to content

Commit

Permalink
tools: Added thanos bucket tool rewrite (for now: allowing block seri…
Browse files Browse the repository at this point in the history
…es deletions).

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Nov 12, 2020
1 parent 69a045a commit f4bccbb
Show file tree
Hide file tree
Showing 8 changed files with 994 additions and 9 deletions.
63 changes: 61 additions & 2 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func registerBucket(app extkingpin.AppClause) {
registerBucketDownsample(cmd, objStoreConfig)
registerBucketCleanup(cmd, objStoreConfig)
registerBucketMarkBlock(cmd, objStoreConfig)
registerBucketRewrite(cmd, objStoreConfig)
}

func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) {
Expand Down Expand Up @@ -717,13 +718,14 @@ func registerBucketMarkBlock(app extkingpin.AppClause, objStoreConfig *extflag.P
blockIDs := cmd.Flag("id", "ID (ULID) of the blocks to be marked for deletion (repeated flag)").Required().Strings()
marker := cmd.Flag("marker", "Marker to be put.").Required().Enum(metadata.DeletionMarkFilename, metadata.NoCompactMarkFilename)
details := cmd.Flag("details", "Human readable details to be put into marker.").Required().String()

cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Cleanup.String())
bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Mark.String())
if err != nil {
return err
}
Expand All @@ -732,7 +734,7 @@ func registerBucketMarkBlock(app extkingpin.AppClause, objStoreConfig *extflag.P
for _, id := range *blockIDs {
u, err := ulid.Parse(id)
if err != nil {
return errors.Errorf("id is not a valid block ULID, got: %v", id)
return errors.Errorf("block.id is not a valid UUID, got: %v", id)
}
ids = append(ids, u)
}
Expand Down Expand Up @@ -761,3 +763,60 @@ func registerBucketMarkBlock(app extkingpin.AppClause, objStoreConfig *extflag.P
return nil
})
}

func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) {
cmd := app.Command(component.Rewrite.String(), "Rewrite chosen blocks in the bucket, while deleting or modifying series. Once rewritten, the old block is marked for deletion."+
"NOTE: It's recommended to turn off compactor while doing this operation. If the compactor is running and touching exactly same block that"+
"is being rewritten, the resulted rewritten block might only cause overlap (mitigated by marking overlapping block manually for deletion)"+
"and the data you wanted to rewrite could already part of bigger block.\n\n"+
"Use FILESYSTEM type of bucket to rewrite block on disk (suitable for vanilla Prometheus)"+
"WARNING: This procedure is *IRREVERSIBLE* after certain time (delete delay), so do backup your blocks first (you can use objstore.config-backup flags for this command)")
blockIDs := cmd.Flag("id", "ID (ULID) of the blocks for rewrite (repeated flag).").Required().Strings()
objStoreBackupConfig := extkingpin.RegisterCommonObjStoreFlags(cmd, "-backup", false, "Used for backup-ing block before rewrite if you choose so (only use in non-dry run mode).")
dryRun := cmd.Flag("dry-run", "Prints the series changes instead of doing them. Defaults to true, for user to double check. (: Pass --no-dry-run to skip this.").Default("true").Bool()
cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Rewrite.String())
if err != nil {
return err
}

var ids []ulid.ULID
for _, id := range *blockIDs {
u, err := ulid.Parse(id)
if err != nil {
return errors.Errorf("id is not a valid block ULID, got: %v", id)
}
ids = append(ids, u)
}

var backupBkt objstore.InstrumentedBucket
if !*dryRun {
confContentYaml, err := objStoreBackupConfig.Content()
if err != nil {
return err
}

backupBkt, err = client.NewBucket(logger, confContentYaml, reg, component.Cleanup.String())
if err != nil {
return err
}
}

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
g.Add(func() error {
for _, id := range ids {
// Delete series from block & repair.
}
level.Info(logger).Log("msg", "marking for deletion done", "IDs", strings.Join(*blockIDs, ","))
return nil
}, func(err error) {
cancel()
})
return nil
})
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ replace (
// Update to v1.1.1 to make sure windows CI pass.
github.com/elastic/go-sysinfo => github.com/elastic/go-sysinfo v1.1.1
// Make sure Prometheus version is pinned as Prometheus semver does not include Go APIs.
github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20201029103703-63be30dceed9
github.com/prometheus/prometheus => ../prometheus
github.com/sercand/kuberesolver => github.com/sercand/kuberesolver v2.4.0+incompatible
google.golang.org/grpc => google.golang.org/grpc v1.29.1

Expand Down
12 changes: 6 additions & 6 deletions pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,15 +282,15 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source metadata.SourceT

chunkw, err := chunks.NewWriter(filepath.Join(resdir, ChunksDirname))
if err != nil {
return resid, errors.Wrap(err, "open chunk writer")
return resid, errors.Wrap(err, "open chunk seriesWriter")
}
defer runutil.CloseWithErrCapture(&err, chunkw, "repair chunk writer")
defer runutil.CloseWithErrCapture(&err, chunkw, "repair chunk seriesWriter")

indexw, err := index.NewWriter(context.TODO(), filepath.Join(resdir, IndexFilename))
if err != nil {
return resid, errors.Wrap(err, "open index writer")
return resid, errors.Wrap(err, "open index seriesWriter")
}
defer runutil.CloseWithErrCapture(&err, indexw, "repair index writer")
defer runutil.CloseWithErrCapture(&err, indexw, "repair index seriesWriter")

// TODO(fabxc): adapt so we properly handle the version once we update to an upstream
// that has multiple.
Expand Down Expand Up @@ -435,9 +435,9 @@ func rewrite(
series = []seriesRepair{}
)

var lset labels.Labels
var chks []chunks.Meta
for all.Next() {
var lset labels.Labels
var chks []chunks.Meta
id := all.At()

if err := indexr.Series(id, &lset, &chks); err != nil {
Expand Down
176 changes: 176 additions & 0 deletions pkg/block/writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package block

import (
"context"
"io"
"os"
"path/filepath"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/index"
)

// Reader is like tsdb.BlockReader but without tombstones and size methods.
type Reader interface {
// Index returns an IndexReader over the block's data.
Index() (tsdb.IndexReader, error)

// Chunks returns a ChunkReader over the block's data.
Chunks() (tsdb.ChunkReader, error)

Meta() tsdb.BlockMeta
}

// SeriesWriter is interface for writing series into one or multiple Blocks.
// Statistics has to be counted by implementation.
type SeriesWriter interface {
tsdb.IndexWriter
tsdb.ChunkWriter
}

// Writer is interface for creating block(s).
type Writer interface {
SeriesWriter

Flush() (tsdb.BlockStats, error)
}

type DiskWriter struct {
statsGatheringSeriesWriter

bTmp, bDir string
logger log.Logger
closers []io.Closer
}

const tmpForCreationBlockDirSuffix = ".tmp-for-creation"

// NewDiskWriter allows to write single TSDB block to disk and returns statistics.
func NewDiskWriter(ctx context.Context, logger log.Logger, bDir string) (_ *DiskWriter, err error) {
bTmp := bDir + tmpForCreationBlockDirSuffix

d := &DiskWriter{
bTmp: bTmp,
bDir: bDir,
logger: logger,
}
defer func() {
if err != nil {
err = tsdb_errors.NewMulti(err, tsdb_errors.CloseAll(d.closers)).Err()
if err := os.RemoveAll(bTmp); err != nil {
level.Error(logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error())
}
}
}()

if err = os.RemoveAll(bTmp); err != nil {
return nil, err
}
if err = os.MkdirAll(bTmp, 0777); err != nil {
return nil, err
}

chunkw, err := chunks.NewWriter(filepath.Join(bTmp, ChunksDirname))
if err != nil {
return nil, errors.Wrap(err, "open chunk writer")
}
d.closers = append(d.closers, chunkw)

// TODO(bwplotka): Setup instrumentedChunkWriter if we want to upstream this code.

indexw, err := index.NewWriter(ctx, filepath.Join(bTmp, IndexFilename))
if err != nil {
return nil, errors.Wrap(err, "open index writer")
}
d.closers = append(d.closers, indexw)
d.statsGatheringSeriesWriter = statsGatheringSeriesWriter{iw: indexw, cw: chunkw}
return d, nil
}

func (d *DiskWriter) Flush() (_ tsdb.BlockStats, err error) {
defer func() {
if err != nil {
err = tsdb_errors.NewMulti(err, tsdb_errors.CloseAll(d.closers)).Err()
if err := os.RemoveAll(d.bTmp); err != nil {
level.Error(d.logger).Log("msg", "removed tmp folder failed after block(s) write", "err", err.Error())
}
}
}()
df, err := fileutil.OpenDir(d.bTmp)
if err != nil {
return tsdb.BlockStats{}, errors.Wrap(err, "open temporary block dir")
}
defer func() {
if df != nil {
err = tsdb_errors.NewMulti(err, df.Close()).Err()
}
}()

if err := df.Sync(); err != nil {
return tsdb.BlockStats{}, errors.Wrap(err, "sync temporary dir file")
}

// Close temp dir before rename block dir (for windows platform).
if err = df.Close(); err != nil {
return tsdb.BlockStats{}, errors.Wrap(err, "close temporary dir")
}
df = nil

if err := tsdb_errors.CloseAll(d.closers); err != nil {
d.closers = nil
return tsdb.BlockStats{}, err
}
d.closers = nil

// Block successfully written, make it visible in destination dir by moving it from tmp one.
if err := fileutil.Replace(d.bTmp, d.bDir); err != nil {
return tsdb.BlockStats{}, errors.Wrap(err, "rename block dir")
}
return d.stats, nil
}

type statsGatheringSeriesWriter struct {
iw tsdb.IndexWriter
cw tsdb.ChunkWriter

stats tsdb.BlockStats
symbols int64
}

func (s *statsGatheringSeriesWriter) AddSymbol(sym string) error {
if err := s.iw.AddSymbol(sym); err != nil {
return err
}
s.symbols++
return nil
}

func (s *statsGatheringSeriesWriter) AddSeries(ref uint64, l labels.Labels, chks ...chunks.Meta) error {
if err := s.iw.AddSeries(ref, l, chks...); err != nil {
return err
}
s.stats.NumSeries++
return nil
}

func (s *statsGatheringSeriesWriter) WriteChunks(chks ...chunks.Meta) error {
if err := s.cw.WriteChunks(chks...); err != nil {
return err
}
s.stats.NumChunks += uint64(len(chks))
for _, chk := range chks {
s.stats.NumSamples += uint64(chk.Chunk.NumSamples())
}
return nil
}

func (s statsGatheringSeriesWriter) Close() error {
return tsdb_errors.NewMulti(s.iw.Close(), s.cw.Close()).Err()
}
Loading

0 comments on commit f4bccbb

Please sign in to comment.