Skip to content

Commit

Permalink
bucket verify: fix to parse all blocks (thanos-io#4879)
Browse files Browse the repository at this point in the history
Only the first block is parsed. This commit fixes this issue to parse all blocks present in the bucket.

Signed-off-by: Aymeric <aymeric.daurelle@cdiscount.com>
  • Loading branch information
Aymeric committed Nov 19, 2021
1 parent 243526d commit 81714a6
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 57 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#4779](https://github.com/thanos-io/thanos/pull/4779) Examples: Fix the interactive test for MacOS users.
- [#4792](https://github.com/thanos-io/thanos/pull/4792) Store: Fix data race in BucketedBytes pool.
- [#4769](https://github.com/thanos-io/thanos/pull/4769) Query-frontend+api: add "X-Request-ID" field and other fields to start call log.
- [#4879](https://github.com/thanos-io/thanos/pull/4879) bucket verify: fix to parce all blocks from a bucket.

### Changed

Expand Down
129 changes: 72 additions & 57 deletions pkg/verifier/index_issue.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (IndexKnownIssues) VerifyRepair(ctx Context, idMatcher func(ulid.ULID) bool

for id, meta := range metas {
if idMatcher != nil && !idMatcher(id) {
return nil
continue
}

tmpdir, err := ioutil.TempDir("", fmt.Sprintf("index-issue-block-%s-", id))
Expand All @@ -51,78 +51,93 @@ func (IndexKnownIssues) VerifyRepair(ctx Context, idMatcher func(ulid.ULID) bool
}
}()

if err = objstore.DownloadFile(ctx, ctx.Logger, ctx.Bkt, path.Join(id.String(), block.IndexFilename), filepath.Join(tmpdir, block.IndexFilename)); err != nil {
return errors.Wrapf(err, "download index file %s", path.Join(id.String(), block.IndexFilename))
}

stats, err := block.GatherIndexHealthStats(ctx.Logger, filepath.Join(tmpdir, block.IndexFilename), meta.MinTime, meta.MaxTime)
if err != nil {
return errors.Wrapf(err, "gather index issues %s", id)
}

level.Debug(ctx.Logger).Log("stats", fmt.Sprintf("%+v", stats), "id", id)
if err = stats.AnyErr(); err == nil {
return nil
stats, err := verifyIndex(ctx, id, tmpdir, meta)
if err == nil {
level.Debug(ctx.Logger).Log("msg", "no issue", "id", id)
continue
}

level.Warn(ctx.Logger).Log("msg", "detected issue", "id", id, "err", err)

if !repair {
// Only verify.
return nil
continue
}

if stats.OutOfOrderChunks > stats.DuplicatedChunks {
level.Warn(ctx.Logger).Log("msg", "detected overlaps are not entirely by duplicated chunks. We are able to repair only duplicates", "id", id)
// Verify repaired block before uploading it.
if err = repairIndex(stats, ctx, id, meta, tmpdir); err != nil {
level.Error(ctx.Logger).Log("msg", "could not repair index", "err", err)
}
level.Info(ctx.Logger).Log("msg", "all good, continuing", "id", id)
continue
}

if stats.OutsideChunks > (stats.CompleteOutsideChunks + stats.Issue347OutsideChunks) {
level.Warn(ctx.Logger).Log("msg", "detected outsiders are not all 'complete' outsiders or outsiders from https://github.com/prometheus/tsdb/issues/347. We can safely delete only these outsiders", "id", id)
}
level.Info(ctx.Logger).Log("msg", "verified issue", "with-repair", repair)
return nil
}

if meta.Thanos.Downsample.Resolution > 0 {
return errors.New("cannot repair downsampled blocks")
}
func repairIndex(stats block.HealthStats, ctx Context, id ulid.ULID, meta *metadata.Meta, dir string) (err error) {
if stats.OutOfOrderChunks > stats.DuplicatedChunks {
level.Warn(ctx.Logger).Log("msg", "detected overlaps are not entirely by duplicated chunks. We are able to repair only duplicates", "id", id)
}

level.Info(ctx.Logger).Log("msg", "downloading block for repair", "id", id)
if err = block.Download(ctx, ctx.Logger, ctx.Bkt, id, path.Join(tmpdir, id.String())); err != nil {
return errors.Wrapf(err, "download block %s", id)
}
level.Info(ctx.Logger).Log("msg", "downloaded block to be repaired", "id", id, "issue")

level.Info(ctx.Logger).Log("msg", "repairing block", "id", id, "issue")
resid, err := block.Repair(
ctx.Logger,
tmpdir,
id,
metadata.BucketRepairSource,
block.IgnoreCompleteOutsideChunk,
block.IgnoreDuplicateOutsideChunk,
block.IgnoreIssue347OutsideChunk,
)
if err != nil {
return errors.Wrapf(err, "repair failed for block %s", id)
}
level.Info(ctx.Logger).Log("msg", "verifying repaired block", "id", id, "newID", resid)
if stats.OutsideChunks > (stats.CompleteOutsideChunks + stats.Issue347OutsideChunks) {
level.Warn(ctx.Logger).Log("msg", "detected outsiders are not all 'complete' outsiders or outsiders from https://github.com/prometheus/tsdb/issues/347. We can safely delete only these outsiders", "id", id)
}

// Verify repaired block before uploading it.
if err := block.VerifyIndex(ctx.Logger, filepath.Join(tmpdir, resid.String(), block.IndexFilename), meta.MinTime, meta.MaxTime); err != nil {
return errors.Wrapf(err, "repaired block is invalid %s", resid)
}
if meta.Thanos.Downsample.Resolution > 0 {
return errors.Wrapf(err, "cannot repair downsampled blocks", "id", id)
}

level.Info(ctx.Logger).Log("msg", "uploading repaired block", "newID", resid)
if err = block.Upload(ctx, ctx.Logger, ctx.Bkt, filepath.Join(tmpdir, resid.String()), metadata.NoneFunc); err != nil {
return errors.Wrapf(err, "upload of %s failed", resid)
}
level.Info(ctx.Logger).Log("msg", "downloading block for repair", "id", id)
if err = block.Download(ctx, ctx.Logger, ctx.Bkt, id, path.Join(dir, id.String())); err != nil {
return errors.Wrapf(err, "download block", "id", id)
}
level.Info(ctx.Logger).Log("msg", "downloaded block to be repaired", "id", id, "issue")

level.Info(ctx.Logger).Log("msg", "repairing block", "id", id, "issue")
resid, err := block.Repair(
ctx.Logger,
dir,
id,
metadata.BucketRepairSource,
block.IgnoreCompleteOutsideChunk,
block.IgnoreDuplicateOutsideChunk,
block.IgnoreIssue347OutsideChunk,
)
if err != nil {
return errors.Wrapf(err, "repair failed for block", "id", id)
}
level.Info(ctx.Logger).Log("msg", "verifying repaired block", "id", id, "newID", resid)

level.Info(ctx.Logger).Log("msg", "safe deleting broken block", "id", id, "issue")
if err := BackupAndDeleteDownloaded(ctx, filepath.Join(tmpdir, id.String()), id); err != nil {
return errors.Wrapf(err, "safe deleting old block %s failed", id)
}
level.Info(ctx.Logger).Log("msg", "all good, continuing", "id", id)
return nil
if err := block.VerifyIndex(ctx.Logger, filepath.Join(dir, resid.String(), block.IndexFilename), meta.MinTime, meta.MaxTime); err != nil {
return errors.Wrapf(err, "repaired block is invalid", "resid", resid)
}

level.Info(ctx.Logger).Log("msg", "uploading repaired block", "newID", resid)
if err = block.Upload(ctx, ctx.Logger, ctx.Bkt, filepath.Join(dir, resid.String()), metadata.NoneFunc); err != nil {
return errors.Wrapf(err, "upload of %s failed", resid)
}

level.Info(ctx.Logger).Log("msg", "safe deleting broken block", "id", id, "issue")
if err := BackupAndDeleteDownloaded(ctx, filepath.Join(dir, id.String()), id); err != nil {
return errors.Wrapf(err, "safe deleting old block %s failed", id)
}

level.Info(ctx.Logger).Log("msg", "verified issue", "with-repair", repair)
return nil
}

func verifyIndex(ctx Context, id ulid.ULID, dir string, meta *metadata.Meta) (stats block.HealthStats, err error) {
if err := objstore.DownloadFile(ctx, ctx.Logger, ctx.Bkt, path.Join(id.String(), block.IndexFilename), filepath.Join(dir, block.IndexFilename)); err != nil {
return stats, errors.Wrapf(err, "download index file", path.Join(id.String(), block.IndexFilename))
}

stats, err = block.GatherIndexHealthStats(ctx.Logger, filepath.Join(dir, block.IndexFilename), meta.MinTime, meta.MaxTime)
if err != nil {
return stats, errors.Wrapf(err, "gather index issues", "id", id)
}

level.Debug(ctx.Logger).Log("stats", fmt.Sprintf("%+v", stats), "id", id)

return stats, stats.AnyErr()
}

0 comments on commit 81714a6

Please sign in to comment.