Skip to content

Commit

Permalink
bucket verify: fix to parse all blocks (thanos-io#4879)
Browse files Browse the repository at this point in the history
Only the first block is parsed. This commit fixes this issue to parse all blocks present in the bucket.

Signed-off-by: Aymeric <aymeric.daurelle@cdiscount.com>
  • Loading branch information
Aymeric committed Dec 13, 2021
1 parent c7a44e2 commit e3503a5
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 57 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#4769](https://github.com/thanos-io/thanos/pull/4769) Query-frontend+api: add "X-Request-ID" field and other fields to start call log.
- [#4918](https://github.com/thanos-io/thanos/pull/4918) Tracing: Fixing force tracing with Jaeger.
- [#4928](https://github.com/thanos-io/thanos/pull/4928) Azure: Only create an http client once, to conserve memory.
- [#4879](https://github.com/thanos-io/thanos/pull/4879) bucket verify: fix to parce all blocks from a bucket.

### Changed

Expand Down
130 changes: 73 additions & 57 deletions pkg/verifier/index_issue.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (IndexKnownIssues) VerifyRepair(ctx Context, idMatcher func(ulid.ULID) bool

for id, meta := range metas {
if idMatcher != nil && !idMatcher(id) {
return nil
continue
}

tmpdir, err := ioutil.TempDir("", fmt.Sprintf("index-issue-block-%s-", id))
Expand All @@ -52,78 +52,94 @@ func (IndexKnownIssues) VerifyRepair(ctx Context, idMatcher func(ulid.ULID) bool
}
}()

if err = objstore.DownloadFile(ctx, ctx.Logger, ctx.Bkt, path.Join(id.String(), block.IndexFilename), filepath.Join(tmpdir, block.IndexFilename)); err != nil {
return errors.Wrapf(err, "download index file %s", path.Join(id.String(), block.IndexFilename))
}

stats, err := block.GatherIndexHealthStats(ctx.Logger, filepath.Join(tmpdir, block.IndexFilename), meta.MinTime, meta.MaxTime)
if err != nil {
return errors.Wrapf(err, "gather index issues %s", id)
}

level.Debug(ctx.Logger).Log("stats", fmt.Sprintf("%+v", stats), "id", id)
if err = stats.AnyErr(); err == nil {
return nil
stats, err := verifyIndex(ctx, id, tmpdir, meta)
if err == nil {
level.Debug(ctx.Logger).Log("msg", "no issue", "id", id)
continue
}

level.Warn(ctx.Logger).Log("msg", "detected issue", "id", id, "err", err)

if !repair {
// Only verify.
return nil
continue
}

if stats.OutOfOrderChunks > stats.DuplicatedChunks {
level.Warn(ctx.Logger).Log("msg", "detected overlaps are not entirely by duplicated chunks. We are able to repair only duplicates", "id", id)
// Verify repaired block before uploading it.
if err = repairIndex(stats, ctx, id, meta, tmpdir); err != nil {
level.Error(ctx.Logger).Log("msg", "could not repair index", "err", err)
continue
}
level.Info(ctx.Logger).Log("msg", "all good, continuing", "id", id)
continue
}

if stats.OutsideChunks > (stats.CompleteOutsideChunks + stats.Issue347OutsideChunks) {
level.Warn(ctx.Logger).Log("msg", "detected outsiders are not all 'complete' outsiders or outsiders from https://github.com/prometheus/tsdb/issues/347. We can safely delete only these outsiders", "id", id)
}
level.Info(ctx.Logger).Log("msg", "verified issue", "with-repair", repair)
return nil
}

if meta.Thanos.Downsample.Resolution > 0 {
return errors.New("cannot repair downsampled blocks")
}
func repairIndex(stats block.HealthStats, ctx Context, id ulid.ULID, meta *metadata.Meta, dir string) (err error) {
if stats.OutOfOrderChunks > stats.DuplicatedChunks {
level.Warn(ctx.Logger).Log("msg", "detected overlaps are not entirely by duplicated chunks. We are able to repair only duplicates", "id", id)
}

level.Info(ctx.Logger).Log("msg", "downloading block for repair", "id", id)
if err = block.Download(ctx, ctx.Logger, ctx.Bkt, id, path.Join(tmpdir, id.String())); err != nil {
return errors.Wrapf(err, "download block %s", id)
}
level.Info(ctx.Logger).Log("msg", "downloaded block to be repaired", "id", id, "issue")

level.Info(ctx.Logger).Log("msg", "repairing block", "id", id, "issue")
resid, err := block.Repair(
ctx.Logger,
tmpdir,
id,
metadata.BucketRepairSource,
block.IgnoreCompleteOutsideChunk,
block.IgnoreDuplicateOutsideChunk,
block.IgnoreIssue347OutsideChunk,
)
if err != nil {
return errors.Wrapf(err, "repair failed for block %s", id)
}
level.Info(ctx.Logger).Log("msg", "verifying repaired block", "id", id, "newID", resid)
if stats.OutsideChunks > (stats.CompleteOutsideChunks + stats.Issue347OutsideChunks) {
level.Warn(ctx.Logger).Log("msg", "detected outsiders are not all 'complete' outsiders or outsiders from https://github.com/prometheus/tsdb/issues/347. We can safely delete only these outsiders", "id", id)
}

// Verify repaired block before uploading it.
if err := block.VerifyIndex(ctx.Logger, filepath.Join(tmpdir, resid.String(), block.IndexFilename), meta.MinTime, meta.MaxTime); err != nil {
return errors.Wrapf(err, "repaired block is invalid %s", resid)
}
if meta.Thanos.Downsample.Resolution > 0 {
return errors.Wrap(err, "cannot repair downsampled blocks")
}

level.Info(ctx.Logger).Log("msg", "uploading repaired block", "newID", resid)
if err = block.Upload(ctx, ctx.Logger, ctx.Bkt, filepath.Join(tmpdir, resid.String()), metadata.NoneFunc); err != nil {
return errors.Wrapf(err, "upload of %s failed", resid)
}
level.Info(ctx.Logger).Log("msg", "downloading block for repair", "id", id)
if err = block.Download(ctx, ctx.Logger, ctx.Bkt, id, path.Join(dir, id.String())); err != nil {
return errors.Wrapf(err, "download block %s", id)
}
level.Info(ctx.Logger).Log("msg", "downloaded block to be repaired", "id", id, "issue")

level.Info(ctx.Logger).Log("msg", "repairing block", "id", id, "issue")
resid, err := block.Repair(
ctx.Logger,
dir,
id,
metadata.BucketRepairSource,
block.IgnoreCompleteOutsideChunk,
block.IgnoreDuplicateOutsideChunk,
block.IgnoreIssue347OutsideChunk,
)
if err != nil {
return errors.Wrapf(err, "repair failed for block %s", id)
}
level.Info(ctx.Logger).Log("msg", "verifying repaired block", "id", id, "newID", resid)

level.Info(ctx.Logger).Log("msg", "safe deleting broken block", "id", id, "issue")
if err := BackupAndDeleteDownloaded(ctx, filepath.Join(tmpdir, id.String()), id); err != nil {
return errors.Wrapf(err, "safe deleting old block %s failed", id)
}
level.Info(ctx.Logger).Log("msg", "all good, continuing", "id", id)
return nil
if err := block.VerifyIndex(ctx.Logger, filepath.Join(dir, resid.String(), block.IndexFilename), meta.MinTime, meta.MaxTime); err != nil {
return errors.Wrapf(err, "repaired block is invalid %s", resid)
}

level.Info(ctx.Logger).Log("msg", "uploading repaired block", "newID", resid)
if err = block.Upload(ctx, ctx.Logger, ctx.Bkt, filepath.Join(dir, resid.String()), metadata.NoneFunc); err != nil {
return errors.Wrapf(err, "upload of %s failed", resid)
}

level.Info(ctx.Logger).Log("msg", "safe deleting broken block", "id", id, "issue")
if err := BackupAndDeleteDownloaded(ctx, filepath.Join(dir, id.String()), id); err != nil {
return errors.Wrapf(err, "safe deleting old block %s failed", id)
}

level.Info(ctx.Logger).Log("msg", "verified issue", "with-repair", repair)
return nil
}

func verifyIndex(ctx Context, id ulid.ULID, dir string, meta *metadata.Meta) (stats block.HealthStats, err error) {
if err := objstore.DownloadFile(ctx, ctx.Logger, ctx.Bkt, path.Join(id.String(), block.IndexFilename), filepath.Join(dir, block.IndexFilename)); err != nil {
return stats, errors.Wrapf(err, "download index file %s", path.Join(id.String(), block.IndexFilename))
}

stats, err = block.GatherIndexHealthStats(ctx.Logger, filepath.Join(dir, block.IndexFilename), meta.MinTime, meta.MaxTime)
if err != nil {
return stats, errors.Wrapf(err, "gather index issues %s", id)
}

level.Debug(ctx.Logger).Log("stats", fmt.Sprintf("%+v", stats), "id", id)

return stats, stats.AnyErr()
}

0 comments on commit e3503a5

Please sign in to comment.