diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index 17cd8d4341..c878a5c02f 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -114,7 +114,6 @@ type readSeriesBlocksWorker struct { } func (w *readSeriesBlocksWorker) readSeriesBlocks() error { - defer close(w.dataCh) for { id, tags, data, expectedChecksum, err := w.reader.Read() if err != nil && !errors.Is(err, io.EOF) { @@ -940,12 +939,23 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( } go func() { - if workerErr = worker.readSeriesBlocks(); workerErr != nil { - s.log.Error("series read blocks error", zap.Error(workerErr)) + if err := worker.readSeriesBlocks(); err != nil { + s.log.Error("series read blocks error", zap.Error(err)) + workerErr = err } + close(worker.dataCh) }() - for seriesBlock := range worker.dataCh { + if err := s.loadBlocks(worker.dataCh, writeType); err != nil { + close(worker.dataCh) + return err + } + + return workerErr +} + +func (s *commitLogSource) loadBlocks(dataCh <-chan seriesBlock, writeType series.WriteType) error { + for seriesBlock := range dataCh { // Load into series. seriesRef, err := seriesBlock.resolver.SeriesRef() if err != nil { @@ -956,8 +966,7 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( return err } } - - return workerErr + return nil } func (s *commitLogSource) mostRecentSnapshotByBlockShard( diff --git a/src/dbnode/storage/bootstrap/types.go b/src/dbnode/storage/bootstrap/types.go index c520f2441d..30dd871ae4 100644 --- a/src/dbnode/storage/bootstrap/types.go +++ b/src/dbnode/storage/bootstrap/types.go @@ -279,7 +279,7 @@ type NamespaceDataAccumulator interface { // CheckoutSeriesResult is the result of a checkout series operation. type CheckoutSeriesResult struct { - // Resolver is the series for the checkout operation. + // Resolver is the series read write ref resolver. Resolver SeriesRefResolver // Shard is the shard for the series. Shard uint32