diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index c878a5c02f..4277fa1a3d 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -28,6 +28,8 @@ import ( "sync" "time" + "golang.org/x/sync/errgroup" + "github.com/m3db/m3/src/cluster/shard" "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/persist" @@ -114,6 +116,7 @@ type readSeriesBlocksWorker struct { } func (w *readSeriesBlocksWorker) readSeriesBlocks() error { + defer close(w.dataCh) for { id, tags, data, expectedChecksum, err := w.reader.Read() if err != nil && !errors.Is(err, io.EOF) { @@ -926,7 +929,6 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( zap.Time("blockStart", blockStart), zap.Int("volume", mostRecentCompleteSnapshot.ID.VolumeIndex)) - var workerErr error worker := &readSeriesBlocksWorker{ dataCh: make(chan seriesBlock, readSeriesBlocksWorkerChannelSize), reader: reader, @@ -938,20 +940,15 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( nsCtx: nsCtx, } - go func() { - if err := worker.readSeriesBlocks(); err != nil { - s.log.Error("series read blocks error", zap.Error(err)) - workerErr = err - } - close(worker.dataCh) - }() - + ctx := context.NewBackground() + errs, _ := errgroup.WithContext(ctx.GoContext()) + errs.Go(worker.readSeriesBlocks) if err := s.loadBlocks(worker.dataCh, writeType); err != nil { close(worker.dataCh) return err } - return workerErr + return errs.Wait() } func (s *commitLogSource) loadBlocks(dataCh <-chan seriesBlock, writeType series.WriteType) error {