Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bucket verify: fix to parse all blocks #4879

Merged
merged 1 commit into from
Dec 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#4769](https://github.com/thanos-io/thanos/pull/4769) Query-frontend+api: add "X-Request-ID" field and other fields to start call log.
- [#4918](https://github.com/thanos-io/thanos/pull/4918) Tracing: Fixing force tracing with Jaeger.
- [#4928](https://github.com/thanos-io/thanos/pull/4928) Azure: Only create an http client once, to conserve memory.
- [#4879](https://github.com/thanos-io/thanos/pull/4879) Bucket verify: Fixed bug causing wrong number of blocks to be checked.

### Changed

Expand Down
128 changes: 71 additions & 57 deletions pkg/verifier/index_issue.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (IndexKnownIssues) VerifyRepair(ctx Context, idMatcher func(ulid.ULID) bool

for id, meta := range metas {
if idMatcher != nil && !idMatcher(id) {
return nil
continue
}

tmpdir, err := ioutil.TempDir("", fmt.Sprintf("index-issue-block-%s-", id))
Expand All @@ -52,78 +52,92 @@ func (IndexKnownIssues) VerifyRepair(ctx Context, idMatcher func(ulid.ULID) bool
}
}()

if err = objstore.DownloadFile(ctx, ctx.Logger, ctx.Bkt, path.Join(id.String(), block.IndexFilename), filepath.Join(tmpdir, block.IndexFilename)); err != nil {
return errors.Wrapf(err, "download index file %s", path.Join(id.String(), block.IndexFilename))
}

stats, err := block.GatherIndexHealthStats(ctx.Logger, filepath.Join(tmpdir, block.IndexFilename), meta.MinTime, meta.MaxTime)
if err != nil {
return errors.Wrapf(err, "gather index issues %s", id)
}

level.Debug(ctx.Logger).Log("stats", fmt.Sprintf("%+v", stats), "id", id)
if err = stats.AnyErr(); err == nil {
return nil
stats, err := verifyIndex(ctx, id, tmpdir, meta)
if err == nil {
level.Debug(ctx.Logger).Log("msg", "no issue", "id", id)
continue
}

level.Warn(ctx.Logger).Log("msg", "detected issue", "id", id, "err", err)

if !repair {
// Only verify.
return nil
continue
}

if stats.OutOfOrderChunks > stats.DuplicatedChunks {
level.Warn(ctx.Logger).Log("msg", "detected overlaps are not entirely by duplicated chunks. We are able to repair only duplicates", "id", id)
if err = repairIndex(stats, ctx, id, meta, tmpdir); err != nil {
level.Error(ctx.Logger).Log("msg", "could not repair index", "err", err)
continue
}
level.Info(ctx.Logger).Log("msg", "all good, continuing", "id", id)
}

if stats.OutsideChunks > (stats.CompleteOutsideChunks + stats.Issue347OutsideChunks) {
level.Warn(ctx.Logger).Log("msg", "detected outsiders are not all 'complete' outsiders or outsiders from https://github.com/prometheus/tsdb/issues/347. We can safely delete only these outsiders", "id", id)
}
level.Info(ctx.Logger).Log("msg", "verified issue", "with-repair", repair)
return nil
}

if meta.Thanos.Downsample.Resolution > 0 {
return errors.New("cannot repair downsampled blocks")
}
func repairIndex(stats block.HealthStats, ctx Context, id ulid.ULID, meta *metadata.Meta, dir string) (err error) {
if stats.OutOfOrderChunks > stats.DuplicatedChunks {
level.Warn(ctx.Logger).Log("msg", "detected overlaps are not entirely by duplicated chunks. We are able to repair only duplicates", "id", id)
}

level.Info(ctx.Logger).Log("msg", "downloading block for repair", "id", id)
if err = block.Download(ctx, ctx.Logger, ctx.Bkt, id, path.Join(tmpdir, id.String())); err != nil {
return errors.Wrapf(err, "download block %s", id)
}
level.Info(ctx.Logger).Log("msg", "downloaded block to be repaired", "id", id, "issue")

level.Info(ctx.Logger).Log("msg", "repairing block", "id", id, "issue")
resid, err := block.Repair(
ctx.Logger,
tmpdir,
id,
metadata.BucketRepairSource,
block.IgnoreCompleteOutsideChunk,
block.IgnoreDuplicateOutsideChunk,
block.IgnoreIssue347OutsideChunk,
)
if err != nil {
return errors.Wrapf(err, "repair failed for block %s", id)
}
level.Info(ctx.Logger).Log("msg", "verifying repaired block", "id", id, "newID", resid)
if stats.OutsideChunks > (stats.CompleteOutsideChunks + stats.Issue347OutsideChunks) {
level.Warn(ctx.Logger).Log("msg", "detected outsiders are not all 'complete' outsiders or outsiders from https://github.com/prometheus/tsdb/issues/347. We can safely delete only these outsiders", "id", id)
}

// Verify repaired block before uploading it.
if err := block.VerifyIndex(ctx.Logger, filepath.Join(tmpdir, resid.String(), block.IndexFilename), meta.MinTime, meta.MaxTime); err != nil {
return errors.Wrapf(err, "repaired block is invalid %s", resid)
}
if meta.Thanos.Downsample.Resolution > 0 {
return errors.Wrap(err, "cannot repair downsampled blocks")
}

level.Info(ctx.Logger).Log("msg", "uploading repaired block", "newID", resid)
if err = block.Upload(ctx, ctx.Logger, ctx.Bkt, filepath.Join(tmpdir, resid.String()), metadata.NoneFunc); err != nil {
return errors.Wrapf(err, "upload of %s failed", resid)
}
level.Info(ctx.Logger).Log("msg", "downloading block for repair", "id", id)
if err = block.Download(ctx, ctx.Logger, ctx.Bkt, id, path.Join(dir, id.String())); err != nil {
return errors.Wrapf(err, "download block %s", id)
}
level.Info(ctx.Logger).Log("msg", "downloaded block to be repaired", "id", id, "issue")

level.Info(ctx.Logger).Log("msg", "repairing block", "id", id, "issue")
resid, err := block.Repair(
ctx.Logger,
dir,
id,
metadata.BucketRepairSource,
block.IgnoreCompleteOutsideChunk,
block.IgnoreDuplicateOutsideChunk,
block.IgnoreIssue347OutsideChunk,
)
if err != nil {
return errors.Wrapf(err, "repair failed for block %s", id)
}
level.Info(ctx.Logger).Log("msg", "verifying repaired block", "id", id, "newID", resid)

level.Info(ctx.Logger).Log("msg", "safe deleting broken block", "id", id, "issue")
if err := BackupAndDeleteDownloaded(ctx, filepath.Join(tmpdir, id.String()), id); err != nil {
return errors.Wrapf(err, "safe deleting old block %s failed", id)
}
level.Info(ctx.Logger).Log("msg", "all good, continuing", "id", id)
return nil
if err := block.VerifyIndex(ctx.Logger, filepath.Join(dir, resid.String(), block.IndexFilename), meta.MinTime, meta.MaxTime); err != nil {
return errors.Wrapf(err, "repaired block is invalid %s", resid)
}

level.Info(ctx.Logger).Log("msg", "uploading repaired block", "newID", resid)
if err = block.Upload(ctx, ctx.Logger, ctx.Bkt, filepath.Join(dir, resid.String()), metadata.NoneFunc); err != nil {
return errors.Wrapf(err, "upload of %s failed", resid)
}

level.Info(ctx.Logger).Log("msg", "safe deleting broken block", "id", id, "issue")
if err := BackupAndDeleteDownloaded(ctx, filepath.Join(dir, id.String()), id); err != nil {
return errors.Wrapf(err, "safe deleting old block %s failed", id)
}

level.Info(ctx.Logger).Log("msg", "verified issue", "with-repair", repair)
return nil
}

func verifyIndex(ctx Context, id ulid.ULID, dir string, meta *metadata.Meta) (stats block.HealthStats, err error) {
if err := objstore.DownloadFile(ctx, ctx.Logger, ctx.Bkt, path.Join(id.String(), block.IndexFilename), filepath.Join(dir, block.IndexFilename)); err != nil {
return stats, errors.Wrapf(err, "download index file %s", path.Join(id.String(), block.IndexFilename))
}

stats, err = block.GatherIndexHealthStats(ctx.Logger, filepath.Join(dir, block.IndexFilename), meta.MinTime, meta.MaxTime)
if err != nil {
return stats, errors.Wrapf(err, "gather index issues %s", id)
}

level.Debug(ctx.Logger).Log("stats", fmt.Sprintf("%+v", stats), "id", id)

return stats, stats.AnyErr()
}