From fa9473cf4562b3f76bcb69d9de1274988e073271 Mon Sep 17 00:00:00 2001 From: Aymeric Date: Fri, 19 Nov 2021 11:20:43 +0100 Subject: [PATCH] bucket verify: fix to parse all blocks (#4879) Only the first block is parsed. This commit fixes this issue to parse all blocks present in the bucket. Signed-off-by: Aymeric --- CHANGELOG.md | 1 + pkg/verifier/index_issue.go | 128 ++++++++++++++++++++---------------- 2 files changed, 72 insertions(+), 57 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d1e4648ee..1242b3331b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#4769](https://github.com/thanos-io/thanos/pull/4769) Query-frontend+api: add "X-Request-ID" field and other fields to start call log. - [#4918](https://github.com/thanos-io/thanos/pull/4918) Tracing: Fixing force tracing with Jaeger. - [#4928](https://github.com/thanos-io/thanos/pull/4928) Azure: Only create an http client once, to conserve memory. +- [#4879](https://github.com/thanos-io/thanos/pull/4879) Bucket verify: Fixed bug causing wrong number of blocks to be checked. ### Changed diff --git a/pkg/verifier/index_issue.go b/pkg/verifier/index_issue.go index a3f1b3e838..61e84293c1 100644 --- a/pkg/verifier/index_issue.go +++ b/pkg/verifier/index_issue.go @@ -39,7 +39,7 @@ func (IndexKnownIssues) VerifyRepair(ctx Context, idMatcher func(ulid.ULID) bool for id, meta := range metas { if idMatcher != nil && !idMatcher(id) { - return nil + continue } tmpdir, err := ioutil.TempDir("", fmt.Sprintf("index-issue-block-%s-", id)) @@ -52,78 +52,92 @@ func (IndexKnownIssues) VerifyRepair(ctx Context, idMatcher func(ulid.ULID) bool } }() - if err = objstore.DownloadFile(ctx, ctx.Logger, ctx.Bkt, path.Join(id.String(), block.IndexFilename), filepath.Join(tmpdir, block.IndexFilename)); err != nil { - return errors.Wrapf(err, "download index file %s", path.Join(id.String(), block.IndexFilename)) - } - - stats, err := block.GatherIndexHealthStats(ctx.Logger, filepath.Join(tmpdir, block.IndexFilename), meta.MinTime, meta.MaxTime) - if err != nil { - return errors.Wrapf(err, "gather index issues %s", id) - } - - level.Debug(ctx.Logger).Log("stats", fmt.Sprintf("%+v", stats), "id", id) - if err = stats.AnyErr(); err == nil { - return nil + stats, err := verifyIndex(ctx, id, tmpdir, meta) + if err == nil { + level.Debug(ctx.Logger).Log("msg", "no issue", "id", id) + continue } level.Warn(ctx.Logger).Log("msg", "detected issue", "id", id, "err", err) if !repair { // Only verify. - return nil + continue } - if stats.OutOfOrderChunks > stats.DuplicatedChunks { - level.Warn(ctx.Logger).Log("msg", "detected overlaps are not entirely by duplicated chunks. We are able to repair only duplicates", "id", id) + if err = repairIndex(stats, ctx, id, meta, tmpdir); err != nil { + level.Error(ctx.Logger).Log("msg", "could not repair index", "err", err) + continue } + level.Info(ctx.Logger).Log("msg", "all good, continuing", "id", id) + } - if stats.OutsideChunks > (stats.CompleteOutsideChunks + stats.Issue347OutsideChunks) { - level.Warn(ctx.Logger).Log("msg", "detected outsiders are not all 'complete' outsiders or outsiders from https://github.com/prometheus/tsdb/issues/347. We can safely delete only these outsiders", "id", id) - } + level.Info(ctx.Logger).Log("msg", "verified issue", "with-repair", repair) + return nil +} - if meta.Thanos.Downsample.Resolution > 0 { - return errors.New("cannot repair downsampled blocks") - } +func repairIndex(stats block.HealthStats, ctx Context, id ulid.ULID, meta *metadata.Meta, dir string) (err error) { + if stats.OutOfOrderChunks > stats.DuplicatedChunks { + level.Warn(ctx.Logger).Log("msg", "detected overlaps are not entirely by duplicated chunks. We are able to repair only duplicates", "id", id) + } - level.Info(ctx.Logger).Log("msg", "downloading block for repair", "id", id) - if err = block.Download(ctx, ctx.Logger, ctx.Bkt, id, path.Join(tmpdir, id.String())); err != nil { - return errors.Wrapf(err, "download block %s", id) - } - level.Info(ctx.Logger).Log("msg", "downloaded block to be repaired", "id", id, "issue") - - level.Info(ctx.Logger).Log("msg", "repairing block", "id", id, "issue") - resid, err := block.Repair( - ctx.Logger, - tmpdir, - id, - metadata.BucketRepairSource, - block.IgnoreCompleteOutsideChunk, - block.IgnoreDuplicateOutsideChunk, - block.IgnoreIssue347OutsideChunk, - ) - if err != nil { - return errors.Wrapf(err, "repair failed for block %s", id) - } - level.Info(ctx.Logger).Log("msg", "verifying repaired block", "id", id, "newID", resid) + if stats.OutsideChunks > (stats.CompleteOutsideChunks + stats.Issue347OutsideChunks) { + level.Warn(ctx.Logger).Log("msg", "detected outsiders are not all 'complete' outsiders or outsiders from https://github.com/prometheus/tsdb/issues/347. We can safely delete only these outsiders", "id", id) + } - // Verify repaired block before uploading it. - if err := block.VerifyIndex(ctx.Logger, filepath.Join(tmpdir, resid.String(), block.IndexFilename), meta.MinTime, meta.MaxTime); err != nil { - return errors.Wrapf(err, "repaired block is invalid %s", resid) - } + if meta.Thanos.Downsample.Resolution > 0 { + return errors.Wrap(err, "cannot repair downsampled blocks") + } - level.Info(ctx.Logger).Log("msg", "uploading repaired block", "newID", resid) - if err = block.Upload(ctx, ctx.Logger, ctx.Bkt, filepath.Join(tmpdir, resid.String()), metadata.NoneFunc); err != nil { - return errors.Wrapf(err, "upload of %s failed", resid) - } + level.Info(ctx.Logger).Log("msg", "downloading block for repair", "id", id) + if err = block.Download(ctx, ctx.Logger, ctx.Bkt, id, path.Join(dir, id.String())); err != nil { + return errors.Wrapf(err, "download block %s", id) + } + level.Info(ctx.Logger).Log("msg", "downloaded block to be repaired", "id", id, "issue") + + level.Info(ctx.Logger).Log("msg", "repairing block", "id", id, "issue") + resid, err := block.Repair( + ctx.Logger, + dir, + id, + metadata.BucketRepairSource, + block.IgnoreCompleteOutsideChunk, + block.IgnoreDuplicateOutsideChunk, + block.IgnoreIssue347OutsideChunk, + ) + if err != nil { + return errors.Wrapf(err, "repair failed for block %s", id) + } + level.Info(ctx.Logger).Log("msg", "verifying repaired block", "id", id, "newID", resid) - level.Info(ctx.Logger).Log("msg", "safe deleting broken block", "id", id, "issue") - if err := BackupAndDeleteDownloaded(ctx, filepath.Join(tmpdir, id.String()), id); err != nil { - return errors.Wrapf(err, "safe deleting old block %s failed", id) - } - level.Info(ctx.Logger).Log("msg", "all good, continuing", "id", id) - return nil + if err := block.VerifyIndex(ctx.Logger, filepath.Join(dir, resid.String(), block.IndexFilename), meta.MinTime, meta.MaxTime); err != nil { + return errors.Wrapf(err, "repaired block is invalid %s", resid) + } + + level.Info(ctx.Logger).Log("msg", "uploading repaired block", "newID", resid) + if err = block.Upload(ctx, ctx.Logger, ctx.Bkt, filepath.Join(dir, resid.String()), metadata.NoneFunc); err != nil { + return errors.Wrapf(err, "upload of %s failed", resid) + } + + level.Info(ctx.Logger).Log("msg", "safe deleting broken block", "id", id, "issue") + if err := BackupAndDeleteDownloaded(ctx, filepath.Join(dir, id.String()), id); err != nil { + return errors.Wrapf(err, "safe deleting old block %s failed", id) } - level.Info(ctx.Logger).Log("msg", "verified issue", "with-repair", repair) return nil } + +func verifyIndex(ctx Context, id ulid.ULID, dir string, meta *metadata.Meta) (stats block.HealthStats, err error) { + if err := objstore.DownloadFile(ctx, ctx.Logger, ctx.Bkt, path.Join(id.String(), block.IndexFilename), filepath.Join(dir, block.IndexFilename)); err != nil { + return stats, errors.Wrapf(err, "download index file %s", path.Join(id.String(), block.IndexFilename)) + } + + stats, err = block.GatherIndexHealthStats(ctx.Logger, filepath.Join(dir, block.IndexFilename), meta.MinTime, meta.MaxTime) + if err != nil { + return stats, errors.Wrapf(err, "gather index issues %s", id) + } + + level.Debug(ctx.Logger).Log("stats", fmt.Sprintf("%+v", stats), "id", id) + + return stats, stats.AnyErr() +}