Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize compactor block downloading and verification step #4435

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 29 additions & 20 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,40 +724,49 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp

// Once we have a plan we need to download the actual data.
begin := time.Now()

toCompactDirs := make([]string, 0, len(toCompact))

var eg errgroup.Group
for _, meta := range toCompact {
meta := meta
bdir := filepath.Join(dir, meta.ULID.String())
toCompactDirs = append(toCompactDirs, bdir)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice, I wonder only, do we always have enough resources (network, mem, CPU) for it to not fail / be slower overall?

Copy link
Contributor Author

@yeya24 yeya24 Jul 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it spawns number of blocks goroutines to do the work. We can use workers to limit the concurrency if the resource is a concern.

for _, s := range meta.Compaction.Sources {
if _, ok := uniqueSources[s]; ok {
return false, ulid.ULID{}, halt(errors.Errorf("overlapping sources detected for plan %v", toCompact))
}
uniqueSources[s] = struct{}{}
}

if err := block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir); err != nil {
return false, ulid.ULID{}, retry(errors.Wrapf(err, "download block %s", meta.ULID))
}
eg.Go(func() error {
if err := block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir); err != nil {
return retry(errors.Wrapf(err, "download block %s", meta.ULID))
}

// Ensure all input blocks are valid.
stats, err := block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime)
if err != nil {
return false, ulid.ULID{}, errors.Wrapf(err, "gather index issues for block %s", bdir)
}
// Ensure all input blocks are valid.
stats, err := block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime)
if err != nil {
return errors.Wrapf(err, "gather index issues for block %s", bdir)
}

if err := stats.CriticalErr(); err != nil {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels))
}
if err := stats.CriticalErr(); err != nil {
return halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels))
}

if err := stats.Issue347OutsideChunksErr(); err != nil {
return false, ulid.ULID{}, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID)
}
if err := stats.Issue347OutsideChunksErr(); err != nil {
return issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID)
}

if err := stats.PrometheusIssue5372Err(); !cg.acceptMalformedIndex && err != nil {
return false, ulid.ULID{}, errors.Wrapf(err,
"block id %s, try running with --debug.accept-malformed-index", meta.ULID)
}
toCompactDirs = append(toCompactDirs, bdir)
if err := stats.PrometheusIssue5372Err(); !cg.acceptMalformedIndex && err != nil {
return errors.Wrapf(err, "block id %s, try running with --debug.accept-malformed-index", meta.ULID)
}

return nil
})
}

if err := eg.Wait(); err != nil {
return false, ulid.ULID{}, err
}
level.Info(cg.logger).Log("msg", "downloaded and verified blocks; compacting blocks", "plan", fmt.Sprintf("%v", toCompactDirs), "duration", time.Since(begin))

Expand Down