From 0777cc425beeb08b0a95b00e096f20cf2138082f Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Tue, 2 Mar 2021 11:48:22 -0500 Subject: [PATCH 01/31] WIP [dbnode] Use series read write ref resolver --- src/dbnode/storage/bootstrap/types.go | 17 ++- src/dbnode/storage/bootstrap/util.go | 19 ++- .../namespace_bootstrap_data_accumulator.go | 12 +- src/dbnode/storage/shard.go | 119 +++++++++++++----- src/dbnode/storage/types.go | 14 +-- 5 files changed, 129 insertions(+), 52 deletions(-) diff --git a/src/dbnode/storage/bootstrap/types.go b/src/dbnode/storage/bootstrap/types.go index 0a64face66..d6c48a79a8 100644 --- a/src/dbnode/storage/bootstrap/types.go +++ b/src/dbnode/storage/bootstrap/types.go @@ -279,12 +279,10 @@ type NamespaceDataAccumulator interface { // CheckoutSeriesResult is the result of a checkout series operation. type CheckoutSeriesResult struct { - // Series is the series ref for the checkout operation. - Series SeriesRef + // Resolver is the series for the checkout operation. + Resolver SeriesRefResolver // Shard is the shard for the series. Shard uint32 - // UniqueIndex is the unique index for the series. - UniqueIndex uint64 } // NamespaceResults is the result of a bootstrap process. @@ -496,3 +494,14 @@ type SeriesRef interface { writeType series.WriteType, ) error } + +// SeriesRefResolver is a series resolver for just in time resolving of +// a series read write ref. +type SeriesRefResolver interface { + // SeriesRef returns the series read write ref. + SeriesRef() (SeriesRef, error) + // ReleaseRef must be called after using the series ref + // to release the reference count to the series so it can + // be expired by the owning shard eventually. + ReleaseRef() error +} diff --git a/src/dbnode/storage/bootstrap/util.go b/src/dbnode/storage/bootstrap/util.go index 877b059d8d..4dcdea3a30 100644 --- a/src/dbnode/storage/bootstrap/util.go +++ b/src/dbnode/storage/bootstrap/util.go @@ -281,15 +281,28 @@ func (a *TestDataAccumulator) checkoutSeriesWithLock( }).AnyTimes() result := CheckoutSeriesResult{ - Shard: shardID, - Series: mockSeries, - UniqueIndex: uint64(len(a.results) + 1), + Shard: shardID, + Resolver: &seriesStaticResolver{series: mockSeries}, } a.results[stringID] = result return result, true, streamErr } +var _ SeriesRefResolver = (*seriesStaticResolver)(nil) + +type seriesStaticResolver struct { + series *series.MockDatabaseSeries +} + +func (r *seriesStaticResolver) SeriesRef() (SeriesRef, error) { + return r.series, nil +} + +func (r *seriesStaticResolver) ReleaseRef() error { + return nil +} + // Release is a no-op on the test accumulator. func (a *TestDataAccumulator) Release() {} diff --git a/src/dbnode/storage/namespace_bootstrap_data_accumulator.go b/src/dbnode/storage/namespace_bootstrap_data_accumulator.go index 417eed9b9c..939003945f 100644 --- a/src/dbnode/storage/namespace_bootstrap_data_accumulator.go +++ b/src/dbnode/storage/namespace_bootstrap_data_accumulator.go @@ -25,7 +25,6 @@ import ( "sync" "github.com/m3db/m3/src/dbnode/storage/bootstrap" - "github.com/m3db/m3/src/dbnode/storage/series/lookup" "github.com/m3db/m3/src/x/ident" ) @@ -37,7 +36,7 @@ type namespaceDataAccumulator struct { sync.RWMutex closed bool namespace databaseNamespace - needsRelease []lookup.OnReleaseReadWriteRef + needsRelease []bootstrap.SeriesRefResolver } // NewDatabaseNamespaceDataAccumulator creates a data accumulator for @@ -60,11 +59,10 @@ func (a *namespaceDataAccumulator) CheckoutSeriesWithoutLock( return bootstrap.CheckoutSeriesResult{}, owned, err } - a.needsRelease = append(a.needsRelease, ref.ReleaseReadWriteRef) + a.needsRelease = append(a.needsRelease, ref.Resolver) return bootstrap.CheckoutSeriesResult{ - Series: ref.Series, - Shard: ref.Shard, - UniqueIndex: ref.UniqueIndex, + Resolver: ref.Resolver, + Shard: ref.Shard, }, true, nil } @@ -91,7 +89,7 @@ func (a *namespaceDataAccumulator) Close() error { // Release all refs. for _, elem := range a.needsRelease { - elem.OnReleaseReadWriteRef() + elem.ReleaseRef() } // Memset optimization for reset. diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index af3f34fa10..4963ec3d02 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -37,6 +37,7 @@ import ( "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/runtime" "github.com/m3db/m3/src/dbnode/storage/block" + "github.com/m3db/m3/src/dbnode/storage/bootstrap" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/dbnode/storage/index/convert" @@ -1060,46 +1061,108 @@ func (s *dbShard) SeriesReadWriteRef( if entry != nil { // The read/write ref is already incremented. return SeriesReadWriteRef{ - Series: entry, - Shard: s.shard, - UniqueIndex: entry.Index, - ReleaseReadWriteRef: entry, + Resolver: &seriesStaticResolver{ + entry: entry, + }, + Shard: s.shard, }, nil } - // NB(r): Insert synchronously so caller has access to the series - // immediately, otherwise calls to LoadBlock(..) etc on the series itself - // may have no effect if a collision with the same series - // being put in the insert queue may cause a block to be loaded to a - // series which gets discarded. - // TODO(r): Probably can't insert series sync otherwise we stall a ton - // of writes... need a better solution for bootstrapping. - // This is what can cause writes to degrade during bootstrap if - // write lock is super contended. - // Having said that, now that writes are kept in a separate "bootstrap" - // buffer in the series itself to normal writes then merged at end of - // bootstrap it somewhat mitigates some lock contention since the shard - // lock is still contended but at least series writes due to commit log - // bootstrapping do not interrupt normal writes waiting for ability - // to write to an individual series. - entry, err = s.insertSeriesSync(id, newTagsIterArg(tags), insertSyncOptions{ - insertType: insertSyncIncReaderWriterCount, - // NB(bodu): We transparently index in the series ref when - // bootstrapping now instead of when grabbing a ref. - hasPendingIndex: false, + result, err := s.insertSeriesAsyncBatched(id, tags, dbShardInsertAsyncOptions{ + // skipRateLimit for true since this method is used by bootstrapping + // and should not be rate limited. + skipRateLimit: true, + entryRefCountIncremented: true, }) if err != nil { return SeriesReadWriteRef{}, err } + // Series will wait for the result to be batched together and inserted. return SeriesReadWriteRef{ - Series: entry, - Shard: s.shard, - UniqueIndex: entry.Index, - ReleaseReadWriteRef: entry, + Resolver: &seriesResolver{ + insertAsyncResult: result, + shard: s, + }, + Shard: s.shard, }, nil } +type seriesStaticResolver struct { + entry *lookup.Entry +} + +func (r *seriesStaticResolver) SeriesRef() (bootstrap.SeriesRef, error) { + return r.entry.Series, nil +} + +func (r *seriesStaticResolver) ReleaseRef() error { + r.entry.OnReleaseReadWriteRef() + return nil +} + +type seriesResolver struct { + sync.RWMutex + + insertAsyncResult insertAsyncResult + shard *dbShard + + resolved bool + resolvedResult error + entry *lookup.Entry +} + +func (r *seriesResolver) resolve() error { + r.RLock() + alreadyResolved := r.resolved + resolvedResult := r.resolvedResult + r.RUnlock() + + if alreadyResolved { + return resolvedResult + } + + r.Lock() + defer r.Unlock() + + if r.resolved { + return r.resolvedResult + } + + r.resolved = true + r.insertAsyncResult.wg.Wait() + + // Retrieve the inserted entry + id := r.insertAsyncResult.copiedID + entry, _, err := r.shard.tryRetrieveWritableSeries(id) + if err != nil { + r.resolvedResult = err + return r.resolvedResult + } + if entry == nil { + r.resolvedResult = fmt.Errorf("could not resolve: %s", id) + return r.resolvedResult + } + + r.entry = entry + return nil +} + +func (r *seriesResolver) SeriesRef() (bootstrap.SeriesRef, error) { + if err := r.resolve(); err != nil { + return nil, err + } + return r.entry.Series, nil +} + +func (r *seriesResolver) ReleaseRef() error { + if err := r.resolve(); err != nil { + return err + } + r.entry.OnReleaseReadWriteRef() + return nil +} + func (s *dbShard) ReadEncoded( ctx context.Context, id ident.ID, diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index c601be82c3..99c3f8c56e 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -42,7 +42,6 @@ import ( "github.com/m3db/m3/src/dbnode/storage/limits/permits" "github.com/m3db/m3/src/dbnode/storage/repair" "github.com/m3db/m3/src/dbnode/storage/series" - "github.com/m3db/m3/src/dbnode/storage/series/lookup" "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/dbnode/ts/writes" "github.com/m3db/m3/src/dbnode/x/xio" @@ -481,18 +480,13 @@ type databaseNamespace interface { } // SeriesReadWriteRef is a read/write reference for a series, -// must make sure to release +// must make sure to release the read/write reference by calling +// release on the resolver. type SeriesReadWriteRef struct { - // Series reference for read/writing. - Series bootstrap.SeriesRef - // UniqueIndex is the unique index of the series (as applicable). - UniqueIndex uint64 + // Resolver resolves the reference for read/writing. + Resolver bootstrap.SeriesRefResolver // Shard is the shard of the series. Shard uint32 - // ReleaseReadWriteRef must be called after using the series ref - // to release the reference count to the series so it can - // be expired by the owning shard eventually. - ReleaseReadWriteRef lookup.OnReleaseReadWriteRef } // Shard is a time series database shard. From fa01bbb1c778512641ebb8ced0a17e993482ae4e Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Wed, 3 Mar 2021 11:14:24 +0200 Subject: [PATCH 02/31] fixing the build and making unit tests pass. --- .../bootstrapper/commitlog/source.go | 22 +++++- .../bootstrap/bootstrapper/fs/source.go | 7 +- .../bootstrap/bootstrapper/peers/source.go | 7 +- ...mespace_bootstrap_data_accumulator_test.go | 68 +++++++++---------- 4 files changed, 66 insertions(+), 38 deletions(-) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index 1f78331b70..52a94312f5 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -866,7 +866,12 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( } // Load into series. - if err := ref.Series.LoadBlock(dbBlock, writeType); err != nil { + seriesRef, err := ref.Resolver.SeriesRef() + if err != nil { + return err + } + + if err := seriesRef.LoadBlock(dbBlock, writeType); err != nil { return err } @@ -949,7 +954,20 @@ func (s *commitLogSource) startAccumulateWorker(worker *accumulateWorker) { ) worker.datapointsRead++ - _, _, err := entry.Series.Write(ctx, dp.Timestamp, dp.Value, + ref, err := entry.Resolver.SeriesRef() + if err != nil { + if worker.numErrors == 0 { + s.log.Error("failed to resolve series ref", zap.Error(err)) + } else { + // Always write a debug log, most of these will go nowhere if debug + // logging not enabled however. + s.log.Debug("failed to resolve series ref", zap.Error(err)) + } + worker.numErrors++ + continue + } + + _, _, err = ref.Write(ctx, dp.Timestamp, dp.Value, unit, annotation, series.WriteOptions{ SchemaDesc: namespace.namespaceContext.Schema, BootstrapWrite: true, diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go index 8ee64153ee..eec8688f2b 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go @@ -701,7 +701,12 @@ func (s *fileSystemSource) readNextEntryAndRecordBlock( seg := ts.NewSegment(data, nil, 0, ts.FinalizeHead) seriesBlock.Reset(blockStart, blockSize, seg, nsCtx) - if err := ref.Series.LoadBlock(seriesBlock, series.WarmWrite); err != nil { + + seriesRef, err := ref.Resolver.SeriesRef() + if err != nil { + return fmt.Errorf("unable to resolve seriesRef: %v", err) + } + if err := seriesRef.LoadBlock(seriesBlock, series.WarmWrite); err != nil { return fmt.Errorf("unable to load block: %v", err) } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index 90ab4ecec1..b8ed276d8c 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -449,8 +449,13 @@ func (s *peersSource) fetchBootstrapBlocksFromPeers( continue } + seriesRef, err := ref.Resolver.SeriesRef() + if err != nil { + s.log.Error("could not resolve seriesRef", zap.Error(err)) + continue + } for _, block := range entry.Blocks.AllBlocks() { - if err := ref.Series.LoadBlock(block, series.WarmWrite); err != nil { + if err := seriesRef.LoadBlock(block, series.WarmWrite); err != nil { unfulfill(currRange) s.log.Error("could not load series block", zap.Error(err)) } diff --git a/src/dbnode/storage/namespace_bootstrap_data_accumulator_test.go b/src/dbnode/storage/namespace_bootstrap_data_accumulator_test.go index ab24c1a2e8..a56c15115e 100644 --- a/src/dbnode/storage/namespace_bootstrap_data_accumulator_test.go +++ b/src/dbnode/storage/namespace_bootstrap_data_accumulator_test.go @@ -33,18 +33,23 @@ import ( ) var ( - id = ident.StringID("foo") - idErr = ident.StringID("bar") - tagIter ident.TagIterator - uniqueIdx = uint64(10) + id = ident.StringID("foo") + idErr = ident.StringID("bar") + tagIter ident.TagIterator ) -type releaser struct { - calls int +type seriesTestResolver struct { + series bootstrap.SeriesRef + calls int } -func (r *releaser) OnReleaseReadWriteRef() { +func (r *seriesTestResolver) SeriesRef() (bootstrap.SeriesRef, error) { + return r.series, nil +} + +func (r *seriesTestResolver) ReleaseRef() error { r.calls++ + return nil } type checkoutFn func(bootstrap.NamespaceDataAccumulator, uint32, @@ -82,17 +87,14 @@ func testCheckoutSeries(t *testing.T, checkoutFn checkoutFn) { ctrl := xtest.NewController(t) defer ctrl.Finish() var ( - ns = NewMockdatabaseNamespace(ctrl) - series = series.NewMockDatabaseSeries(ctrl) - acc = NewDatabaseNamespaceDataAccumulator(ns) - shardID = uint32(7) - - release = &releaser{} - ref = SeriesReadWriteRef{ - UniqueIndex: uniqueIdx, - Series: series, - ReleaseReadWriteRef: release, - Shard: shardID, + ns = NewMockdatabaseNamespace(ctrl) + series = series.NewMockDatabaseSeries(ctrl) + acc = NewDatabaseNamespaceDataAccumulator(ns) + shardID = uint32(7) + resolver = &seriesTestResolver{series: series} + ref = SeriesReadWriteRef{ + Resolver: resolver, + Shard: shardID, } ) @@ -105,16 +107,17 @@ func testCheckoutSeries(t *testing.T, checkoutFn checkoutFn) { seriesResult, err := checkoutFn(acc, shardID, id, tagIter) require.NoError(t, err) - require.Equal(t, series, seriesResult.Series) - require.Equal(t, uniqueIdx, seriesResult.UniqueIndex) + seriesRef, err := seriesResult.Resolver.SeriesRef() + require.NoError(t, err) + require.Equal(t, series, seriesRef) require.Equal(t, shardID, seriesResult.Shard) cast, ok := acc.(*namespaceDataAccumulator) require.True(t, ok) require.Equal(t, 1, len(cast.needsRelease)) - require.Equal(t, release, cast.needsRelease[0]) + require.Equal(t, resolver, cast.needsRelease[0]) // Ensure it hasn't been released. - require.Equal(t, 0, release.calls) + require.Equal(t, 0, resolver.calls) } func TestAccumulatorRelease(t *testing.T) { @@ -130,16 +133,13 @@ func testAccumulatorRelease(t *testing.T, checkoutFn checkoutFn) { defer ctrl.Finish() var ( - err error - ns = NewMockdatabaseNamespace(ctrl) - acc = NewDatabaseNamespaceDataAccumulator(ns) - shardID = uint32(1337) - - release = &releaser{} - ref = SeriesReadWriteRef{ - UniqueIndex: uniqueIdx, - Series: series.NewMockDatabaseSeries(ctrl), - ReleaseReadWriteRef: release, + err error + ns = NewMockdatabaseNamespace(ctrl) + acc = NewDatabaseNamespaceDataAccumulator(ns) + shardID = uint32(1337) + resolver = &seriesTestResolver{series: series.NewMockDatabaseSeries(ctrl)} + ref = SeriesReadWriteRef{ + Resolver: resolver, } ) @@ -150,12 +150,12 @@ func testAccumulatorRelease(t *testing.T, checkoutFn checkoutFn) { cast, ok := acc.(*namespaceDataAccumulator) require.True(t, ok) require.Equal(t, 1, len(cast.needsRelease)) - require.Equal(t, release, cast.needsRelease[0]) + require.Equal(t, resolver, cast.needsRelease[0]) require.NoError(t, acc.Close()) require.Equal(t, 0, len(cast.needsRelease)) // ensure release has been called. - require.Equal(t, 1, release.calls) + require.Equal(t, 1, resolver.calls) // ensure double-close errors. require.Error(t, acc.Close()) } From 1385c2fc36bd39ef22c9fb6494134831d61ef511 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Wed, 3 Mar 2021 12:22:47 +0200 Subject: [PATCH 03/31] trying to fix resolve error. --- .../storage/bootstrap/bootstrapper/commitlog/source.go | 2 +- src/dbnode/storage/shard.go | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index 52a94312f5..a2bf19ad8d 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -868,7 +868,7 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( // Load into series. seriesRef, err := ref.Resolver.SeriesRef() if err != nil { - return err + return fmt.Errorf("(commitlog) unable to resolve series ref: %v", err) } if err := seriesRef.LoadBlock(dbBlock, writeType); err != nil { diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 4963ec3d02..fe54cebfad 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -1129,8 +1129,8 @@ func (r *seriesResolver) resolve() error { return r.resolvedResult } - r.resolved = true r.insertAsyncResult.wg.Wait() + r.resolved = true // Retrieve the inserted entry id := r.insertAsyncResult.copiedID @@ -1139,11 +1139,16 @@ func (r *seriesResolver) resolve() error { r.resolvedResult = err return r.resolvedResult } + if entry == nil { + if r.insertAsyncResult.entry != nil { + r.entry = r.insertAsyncResult.entry + return nil + } + r.resolvedResult = fmt.Errorf("could not resolve: %s", id) return r.resolvedResult } - r.entry = entry return nil } From 9cc2a222ceb6e569c75a348fdbb1c2de752a4877 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Wed, 3 Mar 2021 12:49:14 +0200 Subject: [PATCH 04/31] trying to fix resolve error (2). --- .../storage/bootstrap/bootstrapper/commitlog/source.go | 4 ++-- src/dbnode/storage/shard.go | 5 ----- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index a2bf19ad8d..15031db3f6 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -504,7 +504,7 @@ func (s *commitLogSource) readCommitLog(namespaces bootstrap.Namespaces, span op // Check out the series for writing, no need for concurrency // as commit log bootstrapper does not perform parallel // checking out of series. - series, owned, err := accumulator.CheckoutSeriesWithoutLock( + series, owned, err := accumulator.CheckoutSeriesWithLock( entry.Series.Shard, entry.Series.ID, tagIter) @@ -856,7 +856,7 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( } // NB(r): No parallelization required to checkout the series. - ref, owned, err := accumulator.CheckoutSeriesWithoutLock(shard, id, tags) + ref, owned, err := accumulator.CheckoutSeriesWithLock(shard, id, tags) if err != nil { if !owned { // Skip bootstrapping this series if we don't own it. diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index fe54cebfad..c445d3b664 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -1141,11 +1141,6 @@ func (r *seriesResolver) resolve() error { } if entry == nil { - if r.insertAsyncResult.entry != nil { - r.entry = r.insertAsyncResult.entry - return nil - } - r.resolvedResult = fmt.Errorf("could not resolve: %s", id) return r.resolvedResult } From 8c02b88f63d527dca6adf6cc5bf131f4728238fc Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Wed, 3 Mar 2021 13:31:19 +0200 Subject: [PATCH 05/31] trying to fix resolve error (3). --- src/dbnode/storage/shard.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index c445d3b664..c4f815bc64 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -1072,7 +1072,7 @@ func (s *dbShard) SeriesReadWriteRef( // skipRateLimit for true since this method is used by bootstrapping // and should not be rate limited. skipRateLimit: true, - entryRefCountIncremented: true, + entryRefCountIncremented: false, // should be false because we weren't able to find entry }) if err != nil { return SeriesReadWriteRef{}, err @@ -1130,21 +1130,22 @@ func (r *seriesResolver) resolve() error { } r.insertAsyncResult.wg.Wait() - r.resolved = true - - // Retrieve the inserted entry id := r.insertAsyncResult.copiedID entry, _, err := r.shard.tryRetrieveWritableSeries(id) + // Retrieve the inserted entry if err != nil { r.resolvedResult = err + r.resolved = true return r.resolvedResult } if entry == nil { r.resolvedResult = fmt.Errorf("could not resolve: %s", id) + r.resolved = true return r.resolvedResult } r.entry = entry + r.resolved = true return nil } From 48fdcf84da50d14695618d116205af710d22076b Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Thu, 4 Mar 2021 11:12:41 +0200 Subject: [PATCH 06/31] add logging to check if series was expired. --- .../bootstrap/bootstrapper/commitlog/source.go | 4 ++-- .../storage/bootstrap/bootstrapper/fs/source.go | 2 +- .../storage/namespace_bootstrap_data_accumulator.go | 2 +- src/dbnode/storage/shard.go | 13 +++++++------ 4 files changed, 11 insertions(+), 10 deletions(-) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index 15031db3f6..82d8decc80 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -856,7 +856,7 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( } // NB(r): No parallelization required to checkout the series. - ref, owned, err := accumulator.CheckoutSeriesWithLock(shard, id, tags) + ref, owned, err := accumulator.CheckoutSeriesWithoutLock(shard, id, tags) if err != nil { if !owned { // Skip bootstrapping this series if we don't own it. @@ -868,7 +868,7 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( // Load into series. seriesRef, err := ref.Resolver.SeriesRef() if err != nil { - return fmt.Errorf("(commitlog) unable to resolve series ref: %v", err) + return fmt.Errorf("(commitlog) unable to resolve series ref: %w", err) } if err := seriesRef.LoadBlock(dbBlock, writeType); err != nil { diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go index eec8688f2b..a73827b4d0 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go @@ -704,7 +704,7 @@ func (s *fileSystemSource) readNextEntryAndRecordBlock( seriesRef, err := ref.Resolver.SeriesRef() if err != nil { - return fmt.Errorf("unable to resolve seriesRef: %v", err) + return fmt.Errorf("unable to resolve seriesRef: %w", err) } if err := seriesRef.LoadBlock(seriesBlock, series.WarmWrite); err != nil { return fmt.Errorf("unable to load block: %v", err) diff --git a/src/dbnode/storage/namespace_bootstrap_data_accumulator.go b/src/dbnode/storage/namespace_bootstrap_data_accumulator.go index 939003945f..e80c051bf1 100644 --- a/src/dbnode/storage/namespace_bootstrap_data_accumulator.go +++ b/src/dbnode/storage/namespace_bootstrap_data_accumulator.go @@ -89,7 +89,7 @@ func (a *namespaceDataAccumulator) Close() error { // Release all refs. for _, elem := range a.needsRelease { - elem.ReleaseRef() + _ = elem.ReleaseRef() } // Memset optimization for reset. diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index c4f815bc64..07820b68d3 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -876,6 +876,8 @@ func (s *dbShard) purgeExpiredSeries(expiredEntries []*lookup.Entry) { series.Close() s.list.Remove(elem) s.lookup.Delete(id) + s.logger.Info("entry expired", + zap.String("seriesId", id.String())) } s.Unlock() } @@ -1072,7 +1074,7 @@ func (s *dbShard) SeriesReadWriteRef( // skipRateLimit for true since this method is used by bootstrapping // and should not be rate limited. skipRateLimit: true, - entryRefCountIncremented: false, // should be false because we weren't able to find entry + entryRefCountIncremented: false, // should be false because we weren't able to find entry. }) if err != nil { return SeriesReadWriteRef{}, err @@ -1114,13 +1116,12 @@ type seriesResolver struct { func (r *seriesResolver) resolve() error { r.RLock() - alreadyResolved := r.resolved - resolvedResult := r.resolvedResult - r.RUnlock() - - if alreadyResolved { + if r.resolved { + resolvedResult := r.resolvedResult + r.RUnlock() return resolvedResult } + r.RUnlock() r.Lock() defer r.Unlock() From 537823a4c15247ca06df3041e4e7c972e8754963 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Thu, 4 Mar 2021 12:39:21 +0200 Subject: [PATCH 07/31] try incrementing reader-writer count for a new entry so it won't be purged. --- src/dbnode/storage/shard.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 07820b68d3..09f3a2c876 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -1070,6 +1070,7 @@ func (s *dbShard) SeriesReadWriteRef( }, nil } + s.RLock() result, err := s.insertSeriesAsyncBatched(id, tags, dbShardInsertAsyncOptions{ // skipRateLimit for true since this method is used by bootstrapping // and should not be rate limited. @@ -1077,9 +1078,11 @@ func (s *dbShard) SeriesReadWriteRef( entryRefCountIncremented: false, // should be false because we weren't able to find entry. }) if err != nil { + s.RUnlock() return SeriesReadWriteRef{}, err } - + result.entry.IncrementReaderWriterCount() + s.RUnlock() // Series will wait for the result to be batched together and inserted. return SeriesReadWriteRef{ Resolver: &seriesResolver{ From abf5160f54913119ee09a0b1840703eb224f6e9b Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Fri, 5 Mar 2021 09:47:48 +0200 Subject: [PATCH 08/31] handle accumulator close errors. optimize commit log snapshot load. --- .../bootstrapper/commitlog/source.go | 110 +++++++++++------- .../namespace_bootstrap_data_accumulator.go | 9 +- 2 files changed, 76 insertions(+), 43 deletions(-) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index 82d8decc80..58102f8247 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -33,6 +33,7 @@ import ( "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/persist/fs/commitlog" + "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/dbnode/storage/bootstrap" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/series" @@ -89,8 +90,6 @@ type bootstrapNamespace struct { accumulator bootstrap.NamespaceDataAccumulator } -type seriesMap map[seriesMapKey]*seriesMapEntry - type seriesMapKey struct { fileReadID uint64 uniqueIndex uint64 @@ -119,6 +118,11 @@ type accumulateWorker struct { numErrors int } +type seriesBlock struct { + resolver bootstrap.SeriesRefResolver + block block.DatabaseBlock +} + func newCommitLogSource( opts Options, inspection fs.Inspection, @@ -831,55 +835,23 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( zap.Time("blockStart", blockStart), zap.Int("volume", mostRecentCompleteSnapshot.ID.VolumeIndex)) - for { - id, tags, data, expectedChecksum, err := reader.Read() - if err != nil && err != io.EOF { - return err - } - if err == io.EOF { - break - } - - dbBlock := blocksPool.Get() - dbBlock.Reset(blockStart, blockSize, - ts.NewSegment(data, nil, 0, ts.FinalizeHead), nsCtx) - - // Resetting the block will trigger a checksum calculation, so use - // that instead of calculating it twice. - checksum, err := dbBlock.Checksum() - if err != nil { - return err - } - if checksum != expectedChecksum { - return fmt.Errorf("checksum for series: %s was %d but expected %d", - id, checksum, expectedChecksum) - } - - // NB(r): No parallelization required to checkout the series. - ref, owned, err := accumulator.CheckoutSeriesWithoutLock(shard, id, tags) - if err != nil { - if !owned { - // Skip bootstrapping this series if we don't own it. - continue - } - return err - } + seriesBlocks, err := s.readSeriesBlocks(reader, shard, accumulator, + blocksPool, blockStart, blockSize, nsCtx) + if err != nil { + return err + } + for _, seriesBlock := range seriesBlocks { // Load into series. - seriesRef, err := ref.Resolver.SeriesRef() + seriesRef, err := seriesBlock.resolver.SeriesRef() if err != nil { return fmt.Errorf("(commitlog) unable to resolve series ref: %w", err) } - if err := seriesRef.LoadBlock(dbBlock, writeType); err != nil { + if err := seriesRef.LoadBlock(seriesBlock.block, writeType); err != nil { return err } - - // Always finalize both ID and tags after loading block. - id.Finalize() - tags.Close() } - return nil } @@ -915,6 +887,60 @@ func (s *commitLogSource) mostRecentSnapshotByBlockShard( return mostRecentCompleteSnapshotByBlockShard, nil } +func (s *commitLogSource) readSeriesBlocks( + reader fs.DataFileSetReader, + shard uint32, + accumulator bootstrap.NamespaceDataAccumulator, + blocksPool block.DatabaseBlockPool, + blockStart time.Time, + blockSize time.Duration, + nsCtx namespace.Context, +) ([]seriesBlock, error) { + seriesBlocks := make([]seriesBlock, 0, 1024) + for { + id, tags, data, expectedChecksum, err := reader.Read() + if err != nil && err != io.EOF { + return nil, err + } + if err == io.EOF { + break + } + + dbBlock := blocksPool.Get() + dbBlock.Reset(blockStart, blockSize, + ts.NewSegment(data, nil, 0, ts.FinalizeHead), nsCtx) + + // Resetting the block will trigger a checksum calculation, so use + // that instead of calculating it twice. + checksum, err := dbBlock.Checksum() + if err != nil { + return nil, err + } + if checksum != expectedChecksum { + return nil, fmt.Errorf("checksum for series: %s was %d but expected %d", + id, checksum, expectedChecksum) + } + + res, owned, err := accumulator.CheckoutSeriesWithoutLock(shard, id, tags) + if err != nil { + if !owned { + // Skip bootstrapping this series if we don't own it. + continue + } + return nil, err + } + + seriesBlocks = append(seriesBlocks, seriesBlock{ + resolver: res.Resolver, + block: dbBlock, + }) + + id.Finalize() + tags.Close() + } + return seriesBlocks, nil +} + // TODO(rartoul): Refactor this to take the SnapshotMetadata files into account to reduce // the number of commitlog files that need to be read. func (s *commitLogSource) readCommitLogFilePredicate(f commitlog.FileFilterInfo) bool { diff --git a/src/dbnode/storage/namespace_bootstrap_data_accumulator.go b/src/dbnode/storage/namespace_bootstrap_data_accumulator.go index e80c051bf1..6eb9fb8f75 100644 --- a/src/dbnode/storage/namespace_bootstrap_data_accumulator.go +++ b/src/dbnode/storage/namespace_bootstrap_data_accumulator.go @@ -22,6 +22,7 @@ package storage import ( "errors" + "fmt" "sync" "github.com/m3db/m3/src/dbnode/storage/bootstrap" @@ -88,8 +89,11 @@ func (a *namespaceDataAccumulator) Close() error { a.closed = true // Release all refs. + var errs []error for _, elem := range a.needsRelease { - _ = elem.ReleaseRef() + if err := elem.ReleaseRef(); err != nil { + errs = append(errs, err) + } } // Memset optimization for reset. @@ -98,5 +102,8 @@ func (a *namespaceDataAccumulator) Close() error { } a.needsRelease = a.needsRelease[:0] + if len(errs) > 0 { + return fmt.Errorf("got %d release ref errors", len(errs)) + } return nil } From ef4d08dde9186f9018ff61da2851b6512a2bc96b Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Fri, 5 Mar 2021 13:29:45 +0200 Subject: [PATCH 09/31] test commit with warm flush and seriesId cache. --- .../storage/bootstrap/bootstrap_mock.go | 70 +++++++++++++- .../bootstrapper/commitlog/source.go | 92 ++++++++++++++----- src/dbnode/storage/bootstrap/types.go | 8 ++ src/dbnode/storage/bootstrap/util.go | 2 + src/dbnode/storage/series/lookup/entry.go | 3 +- 5 files changed, 151 insertions(+), 24 deletions(-) diff --git a/src/dbnode/storage/bootstrap/bootstrap_mock.go b/src/dbnode/storage/bootstrap/bootstrap_mock.go index 7ecbc95f65..0f9cd66f9c 100644 --- a/src/dbnode/storage/bootstrap/bootstrap_mock.go +++ b/src/dbnode/storage/bootstrap/bootstrap_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. // Source: github.com/m3db/m3/src/dbnode/storage/bootstrap/types.go -// Copyright (c) 2020 Uber Technologies, Inc. +// Copyright (c) 2021 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -29,6 +29,7 @@ import ( "time" "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" @@ -902,3 +903,70 @@ func (mr *MockSeriesRefMockRecorder) LoadBlock(block, writeType interface{}) *go mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadBlock", reflect.TypeOf((*MockSeriesRef)(nil).LoadBlock), block, writeType) } + +// WarmFlush mocks base method +func (m *MockSeriesRef) WarmFlush(ctx context.Context, blockStart time.Time, persistFn persist.DataFn, nsCtx namespace.Context) (series.FlushOutcome, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WarmFlush", ctx, blockStart, persistFn, nsCtx) + ret0, _ := ret[0].(series.FlushOutcome) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// WarmFlush indicates an expected call of WarmFlush +func (mr *MockSeriesRefMockRecorder) WarmFlush(ctx, blockStart, persistFn, nsCtx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WarmFlush", reflect.TypeOf((*MockSeriesRef)(nil).WarmFlush), ctx, blockStart, persistFn, nsCtx) +} + +// MockSeriesRefResolver is a mock of SeriesRefResolver interface +type MockSeriesRefResolver struct { + ctrl *gomock.Controller + recorder *MockSeriesRefResolverMockRecorder +} + +// MockSeriesRefResolverMockRecorder is the mock recorder for MockSeriesRefResolver +type MockSeriesRefResolverMockRecorder struct { + mock *MockSeriesRefResolver +} + +// NewMockSeriesRefResolver creates a new mock instance +func NewMockSeriesRefResolver(ctrl *gomock.Controller) *MockSeriesRefResolver { + mock := &MockSeriesRefResolver{ctrl: ctrl} + mock.recorder = &MockSeriesRefResolverMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockSeriesRefResolver) EXPECT() *MockSeriesRefResolverMockRecorder { + return m.recorder +} + +// SeriesRef mocks base method +func (m *MockSeriesRefResolver) SeriesRef() (SeriesRef, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SeriesRef") + ret0, _ := ret[0].(SeriesRef) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SeriesRef indicates an expected call of SeriesRef +func (mr *MockSeriesRefResolverMockRecorder) SeriesRef() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SeriesRef", reflect.TypeOf((*MockSeriesRefResolver)(nil).SeriesRef)) +} + +// ReleaseRef mocks base method +func (m *MockSeriesRefResolver) ReleaseRef() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReleaseRef") + ret0, _ := ret[0].(error) + return ret0 +} + +// ReleaseRef indicates an expected call of ReleaseRef +func (mr *MockSeriesRefResolverMockRecorder) ReleaseRef() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReleaseRef", reflect.TypeOf((*MockSeriesRefResolver)(nil).ReleaseRef)) +} diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index 58102f8247..8a1e829086 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -78,6 +78,7 @@ type commitLogSource struct { commitLogResult commitLogResult instrumentation *instrumentation + persistManager persist.Manager } type bootstrapNamespace struct { @@ -118,9 +119,9 @@ type accumulateWorker struct { numErrors int } -type seriesBlock struct { +type seriesBlocks struct { resolver bootstrap.SeriesRefResolver - block block.DatabaseBlock + blocks []block.DatabaseBlock } func newCommitLogSource( @@ -139,6 +140,9 @@ func newCommitLogSource( Logger(). With(zap.String("bootstrapper", "commitlog")) + // todo handle error + persistManager, _ := fs.NewPersistManager(opts.CommitLogOptions().FilesystemOptions()) + return &commitLogSource{ opts: opts, log: log, @@ -149,7 +153,7 @@ func newCommitLogSource( newIteratorFn: commitlog.NewIterator, snapshotFilesFn: fs.SnapshotFiles, newReaderFn: fs.NewReader, - + persistManager: persistManager, metrics: newCommitLogSourceMetrics(scope), instrumentation: newInstrumentation(opts, scope, log), } @@ -236,6 +240,7 @@ func (s *commitLogSource) Read( if err != nil { return bootstrap.NamespaceResults{}, err } + } } instrCtx.bootstrapSnapshotsCompleted() @@ -799,6 +804,7 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( bytesPool = blOpts.BytesPool() fsOpts = s.opts.CommitLogOptions().FilesystemOptions() nsCtx = namespace.NewContextFrom(ns) + ctx = context.NewBackground() ) // Bootstrap the snapshot file. @@ -841,17 +847,55 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( return err } - for _, seriesBlock := range seriesBlocks { + if len(seriesBlocks) < 1 { + return nil + } + + flushPreparer, err := s.persistManager.StartFlushPersist() + if err != nil { + return err + } + + for _, resolverBlocks := range seriesBlocks { // Load into series. - seriesRef, err := seriesBlock.resolver.SeriesRef() + seriesRef, err := resolverBlocks.resolver.SeriesRef() if err != nil { return fmt.Errorf("(commitlog) unable to resolve series ref: %w", err) } - if err := seriesRef.LoadBlock(seriesBlock.block, writeType); err != nil { - return err + for _, databaseBlock := range resolverBlocks.blocks { + if err := seriesRef.LoadBlock(databaseBlock, writeType); err != nil { + return err + } + } + + if writeType == series.WarmWrite { + prepareData, err := flushPreparer.PrepareData(persist.DataPrepareOptions{ + NamespaceMetadata: ns, + BlockStart: blockStart, + Shard: shard, + VolumeIndex: mostRecentCompleteSnapshot.ID.VolumeIndex, + FileSetType: persist.FileSetSnapshotType, + DeleteIfExists: false, + }) + if err != nil { + return err + } + + if _, err = seriesRef.WarmFlush(ctx, blockStart, prepareData.Persist, nsCtx); err != nil { + return err + } + + // todo close if flush fails as well. + if err = prepareData.Close(); err != nil { + return err + } } } + + if err = flushPreparer.DoneFlush(); err != nil { + return err + } return nil } @@ -895,8 +939,8 @@ func (s *commitLogSource) readSeriesBlocks( blockStart time.Time, blockSize time.Duration, nsCtx namespace.Context, -) ([]seriesBlock, error) { - seriesBlocks := make([]seriesBlock, 0, 1024) +) (map[string]*seriesBlocks, error) { + resultBlocks := map[string]*seriesBlocks{} for { id, tags, data, expectedChecksum, err := reader.Read() if err != nil && err != io.EOF { @@ -921,24 +965,30 @@ func (s *commitLogSource) readSeriesBlocks( id, checksum, expectedChecksum) } - res, owned, err := accumulator.CheckoutSeriesWithoutLock(shard, id, tags) - if err != nil { - if !owned { - // Skip bootstrapping this series if we don't own it. - continue + idString := id.String() + resolverBlocks, ok := resultBlocks[idString] + if !ok { + res, owned, err := accumulator.CheckoutSeriesWithoutLock(shard, id, tags) + if err != nil { + if !owned { + // Skip bootstrapping this series if we don't own it. + continue + } + return nil, err } - return nil, err - } - seriesBlocks = append(seriesBlocks, seriesBlock{ - resolver: res.Resolver, - block: dbBlock, - }) + resultBlocks[idString] = &seriesBlocks{ + resolver: res.Resolver, + blocks: []block.DatabaseBlock{dbBlock}, + } + } else { + resolverBlocks.blocks = append(resolverBlocks.blocks, dbBlock) + } id.Finalize() tags.Close() } - return seriesBlocks, nil + return resultBlocks, nil } // TODO(rartoul): Refactor this to take the SnapshotMetadata files into account to reduce diff --git a/src/dbnode/storage/bootstrap/types.go b/src/dbnode/storage/bootstrap/types.go index d6c48a79a8..e2fac3ab60 100644 --- a/src/dbnode/storage/bootstrap/types.go +++ b/src/dbnode/storage/bootstrap/types.go @@ -493,6 +493,14 @@ type SeriesRef interface { block block.DatabaseBlock, writeType series.WriteType, ) error + + // WarmFlush flushes data to disk. + WarmFlush( + ctx context.Context, + blockStart time.Time, + persistFn persist.DataFn, + nsCtx namespace.Context, + ) (series.FlushOutcome, error) } // SeriesRefResolver is a series resolver for just in time resolving of diff --git a/src/dbnode/storage/bootstrap/util.go b/src/dbnode/storage/bootstrap/util.go index 3724ff24f2..4f99f00669 100644 --- a/src/dbnode/storage/bootstrap/util.go +++ b/src/dbnode/storage/bootstrap/util.go @@ -283,6 +283,8 @@ func (a *TestDataAccumulator) checkoutSeriesWithLock( return true, series.WarmWrite, nil }).AnyTimes() + mockSeries.EXPECT().WarmFlush(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + result := CheckoutSeriesResult{ Shard: shardID, Resolver: &seriesStaticResolver{series: mockSeries}, diff --git a/src/dbnode/storage/series/lookup/entry.go b/src/dbnode/storage/series/lookup/entry.go index a1319af9c7..04bff598aa 100644 --- a/src/dbnode/storage/series/lookup/entry.go +++ b/src/dbnode/storage/series/lookup/entry.go @@ -26,7 +26,6 @@ import ( "time" "github.com/m3db/m3/src/dbnode/storage/block" - "github.com/m3db/m3/src/dbnode/storage/bootstrap" "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/ts/writes" @@ -80,7 +79,7 @@ var _ OnReleaseReadWriteRef = &Entry{} var _ index.OnIndexSeries = &Entry{} // ensure Entry satisfies the `bootstrap.SeriesRef` interface. -var _ bootstrap.SeriesRef = &Entry{} +//var _ bootstrap.SeriesRef = &Entry{} // NewEntryOptions supplies options for a new entry. type NewEntryOptions struct { From 87979c8b81800f555610e540db09a4c75270f7f1 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Fri, 5 Mar 2021 13:54:50 +0200 Subject: [PATCH 10/31] trying to fix warm flush. --- .../bootstrapper/commitlog/source.go | 50 +++++++++++-------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index 8a1e829086..8fb9c4fd09 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -851,9 +851,25 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( return nil } - flushPreparer, err := s.persistManager.StartFlushPersist() - if err != nil { - return err + var flushPreparer persist.FlushPreparer + var prepareData persist.PreparedDataPersist + if writeType == series.WarmWrite { + flushPreparer, err = s.persistManager.StartFlushPersist() + if err != nil { + return err + } + + prepareData, err = flushPreparer.PrepareData(persist.DataPrepareOptions{ + NamespaceMetadata: ns, + BlockStart: blockStart, + Shard: shard, + VolumeIndex: 0, + FileSetType: persist.FileSetSnapshotType, + DeleteIfExists: false, + }) + if err != nil { + return err + } } for _, resolverBlocks := range seriesBlocks { @@ -870,31 +886,21 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( } if writeType == series.WarmWrite { - prepareData, err := flushPreparer.PrepareData(persist.DataPrepareOptions{ - NamespaceMetadata: ns, - BlockStart: blockStart, - Shard: shard, - VolumeIndex: mostRecentCompleteSnapshot.ID.VolumeIndex, - FileSetType: persist.FileSetSnapshotType, - DeleteIfExists: false, - }) - if err != nil { - return err - } - if _, err = seriesRef.WarmFlush(ctx, blockStart, prepareData.Persist, nsCtx); err != nil { return err } - - // todo close if flush fails as well. - if err = prepareData.Close(); err != nil { - return err - } } } - if err = flushPreparer.DoneFlush(); err != nil { - return err + if writeType == series.WarmWrite { + // todo close if flush fails as well. + if err = prepareData.Close(); err != nil { + return err + } + + if err = flushPreparer.DoneFlush(); err != nil { + return err + } } return nil } From b4042195cbd99d2018e7875db30353fc62240d83 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Fri, 5 Mar 2021 15:18:41 +0200 Subject: [PATCH 11/31] revert flush changes. --- .../storage/bootstrap/bootstrap_mock.go | 16 --- .../bootstrapper/commitlog/source.go | 102 ++++-------------- src/dbnode/storage/bootstrap/types.go | 8 -- src/dbnode/storage/bootstrap/util.go | 2 - src/dbnode/storage/series/lookup/entry.go | 3 +- 5 files changed, 25 insertions(+), 106 deletions(-) diff --git a/src/dbnode/storage/bootstrap/bootstrap_mock.go b/src/dbnode/storage/bootstrap/bootstrap_mock.go index 0f9cd66f9c..26aa406362 100644 --- a/src/dbnode/storage/bootstrap/bootstrap_mock.go +++ b/src/dbnode/storage/bootstrap/bootstrap_mock.go @@ -29,7 +29,6 @@ import ( "time" "github.com/m3db/m3/src/dbnode/namespace" - "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" @@ -904,21 +903,6 @@ func (mr *MockSeriesRefMockRecorder) LoadBlock(block, writeType interface{}) *go return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadBlock", reflect.TypeOf((*MockSeriesRef)(nil).LoadBlock), block, writeType) } -// WarmFlush mocks base method -func (m *MockSeriesRef) WarmFlush(ctx context.Context, blockStart time.Time, persistFn persist.DataFn, nsCtx namespace.Context) (series.FlushOutcome, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "WarmFlush", ctx, blockStart, persistFn, nsCtx) - ret0, _ := ret[0].(series.FlushOutcome) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// WarmFlush indicates an expected call of WarmFlush -func (mr *MockSeriesRefMockRecorder) WarmFlush(ctx, blockStart, persistFn, nsCtx interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WarmFlush", reflect.TypeOf((*MockSeriesRef)(nil).WarmFlush), ctx, blockStart, persistFn, nsCtx) -} - // MockSeriesRefResolver is a mock of SeriesRefResolver interface type MockSeriesRefResolver struct { ctrl *gomock.Controller diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index 8fb9c4fd09..f4c7c9cb99 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -78,7 +78,6 @@ type commitLogSource struct { commitLogResult commitLogResult instrumentation *instrumentation - persistManager persist.Manager } type bootstrapNamespace struct { @@ -119,11 +118,6 @@ type accumulateWorker struct { numErrors int } -type seriesBlocks struct { - resolver bootstrap.SeriesRefResolver - blocks []block.DatabaseBlock -} - func newCommitLogSource( opts Options, inspection fs.Inspection, @@ -140,9 +134,6 @@ func newCommitLogSource( Logger(). With(zap.String("bootstrapper", "commitlog")) - // todo handle error - persistManager, _ := fs.NewPersistManager(opts.CommitLogOptions().FilesystemOptions()) - return &commitLogSource{ opts: opts, log: log, @@ -153,7 +144,7 @@ func newCommitLogSource( newIteratorFn: commitlog.NewIterator, snapshotFilesFn: fs.SnapshotFiles, newReaderFn: fs.NewReader, - persistManager: persistManager, + metrics: newCommitLogSourceMetrics(scope), instrumentation: newInstrumentation(opts, scope, log), } @@ -240,7 +231,6 @@ func (s *commitLogSource) Read( if err != nil { return bootstrap.NamespaceResults{}, err } - } } instrCtx.bootstrapSnapshotsCompleted() @@ -788,6 +778,11 @@ func (s *commitLogSource) bootstrapShardSnapshots( return nil } +type seriesBlock struct { + resolver bootstrap.SeriesRefResolver + block block.DatabaseBlock +} + func (s *commitLogSource) bootstrapShardBlockSnapshot( ns namespace.Metadata, accumulator bootstrap.NamespaceDataAccumulator, @@ -804,7 +799,6 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( bytesPool = blOpts.BytesPool() fsOpts = s.opts.CommitLogOptions().FilesystemOptions() nsCtx = namespace.NewContextFrom(ns) - ctx = context.NewBackground() ) // Bootstrap the snapshot file. @@ -847,58 +841,14 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( return err } - if len(seriesBlocks) < 1 { - return nil - } - - var flushPreparer persist.FlushPreparer - var prepareData persist.PreparedDataPersist - if writeType == series.WarmWrite { - flushPreparer, err = s.persistManager.StartFlushPersist() - if err != nil { - return err - } - - prepareData, err = flushPreparer.PrepareData(persist.DataPrepareOptions{ - NamespaceMetadata: ns, - BlockStart: blockStart, - Shard: shard, - VolumeIndex: 0, - FileSetType: persist.FileSetSnapshotType, - DeleteIfExists: false, - }) - if err != nil { - return err - } - } - - for _, resolverBlocks := range seriesBlocks { + for _, seriesBlock := range seriesBlocks { // Load into series. - seriesRef, err := resolverBlocks.resolver.SeriesRef() + seriesRef, err := seriesBlock.resolver.SeriesRef() if err != nil { return fmt.Errorf("(commitlog) unable to resolve series ref: %w", err) } - for _, databaseBlock := range resolverBlocks.blocks { - if err := seriesRef.LoadBlock(databaseBlock, writeType); err != nil { - return err - } - } - - if writeType == series.WarmWrite { - if _, err = seriesRef.WarmFlush(ctx, blockStart, prepareData.Persist, nsCtx); err != nil { - return err - } - } - } - - if writeType == series.WarmWrite { - // todo close if flush fails as well. - if err = prepareData.Close(); err != nil { - return err - } - - if err = flushPreparer.DoneFlush(); err != nil { + if err := seriesRef.LoadBlock(seriesBlock.block, writeType); err != nil { return err } } @@ -945,8 +895,8 @@ func (s *commitLogSource) readSeriesBlocks( blockStart time.Time, blockSize time.Duration, nsCtx namespace.Context, -) (map[string]*seriesBlocks, error) { - resultBlocks := map[string]*seriesBlocks{} +) ([]seriesBlock, error) { + seriesBlocks := make([]seriesBlock, 0, reader.Entries()) for { id, tags, data, expectedChecksum, err := reader.Read() if err != nil && err != io.EOF { @@ -971,30 +921,24 @@ func (s *commitLogSource) readSeriesBlocks( id, checksum, expectedChecksum) } - idString := id.String() - resolverBlocks, ok := resultBlocks[idString] - if !ok { - res, owned, err := accumulator.CheckoutSeriesWithoutLock(shard, id, tags) - if err != nil { - if !owned { - // Skip bootstrapping this series if we don't own it. - continue - } - return nil, err - } - - resultBlocks[idString] = &seriesBlocks{ - resolver: res.Resolver, - blocks: []block.DatabaseBlock{dbBlock}, + res, owned, err := accumulator.CheckoutSeriesWithoutLock(shard, id, tags) + if err != nil { + if !owned { + // Skip bootstrapping this series if we don't own it. + continue } - } else { - resolverBlocks.blocks = append(resolverBlocks.blocks, dbBlock) + return nil, err } + seriesBlocks = append(seriesBlocks, seriesBlock{ + resolver: res.Resolver, + block: dbBlock, + }) + id.Finalize() tags.Close() } - return resultBlocks, nil + return seriesBlocks, nil } // TODO(rartoul): Refactor this to take the SnapshotMetadata files into account to reduce diff --git a/src/dbnode/storage/bootstrap/types.go b/src/dbnode/storage/bootstrap/types.go index e2fac3ab60..d6c48a79a8 100644 --- a/src/dbnode/storage/bootstrap/types.go +++ b/src/dbnode/storage/bootstrap/types.go @@ -493,14 +493,6 @@ type SeriesRef interface { block block.DatabaseBlock, writeType series.WriteType, ) error - - // WarmFlush flushes data to disk. - WarmFlush( - ctx context.Context, - blockStart time.Time, - persistFn persist.DataFn, - nsCtx namespace.Context, - ) (series.FlushOutcome, error) } // SeriesRefResolver is a series resolver for just in time resolving of diff --git a/src/dbnode/storage/bootstrap/util.go b/src/dbnode/storage/bootstrap/util.go index 4f99f00669..3724ff24f2 100644 --- a/src/dbnode/storage/bootstrap/util.go +++ b/src/dbnode/storage/bootstrap/util.go @@ -283,8 +283,6 @@ func (a *TestDataAccumulator) checkoutSeriesWithLock( return true, series.WarmWrite, nil }).AnyTimes() - mockSeries.EXPECT().WarmFlush(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - result := CheckoutSeriesResult{ Shard: shardID, Resolver: &seriesStaticResolver{series: mockSeries}, diff --git a/src/dbnode/storage/series/lookup/entry.go b/src/dbnode/storage/series/lookup/entry.go index 04bff598aa..a1319af9c7 100644 --- a/src/dbnode/storage/series/lookup/entry.go +++ b/src/dbnode/storage/series/lookup/entry.go @@ -26,6 +26,7 @@ import ( "time" "github.com/m3db/m3/src/dbnode/storage/block" + "github.com/m3db/m3/src/dbnode/storage/bootstrap" "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/ts/writes" @@ -79,7 +80,7 @@ var _ OnReleaseReadWriteRef = &Entry{} var _ index.OnIndexSeries = &Entry{} // ensure Entry satisfies the `bootstrap.SeriesRef` interface. -//var _ bootstrap.SeriesRef = &Entry{} +var _ bootstrap.SeriesRef = &Entry{} // NewEntryOptions supplies options for a new entry. type NewEntryOptions struct { From acc2f0a5b2a9fb02b0bdb3750ae3dc5540bd9d36 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Fri, 5 Mar 2021 15:26:59 +0200 Subject: [PATCH 12/31] use multiErrors. --- .../namespace_bootstrap_data_accumulator.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/dbnode/storage/namespace_bootstrap_data_accumulator.go b/src/dbnode/storage/namespace_bootstrap_data_accumulator.go index 6eb9fb8f75..2767b8e6bf 100644 --- a/src/dbnode/storage/namespace_bootstrap_data_accumulator.go +++ b/src/dbnode/storage/namespace_bootstrap_data_accumulator.go @@ -22,10 +22,10 @@ package storage import ( "errors" - "fmt" "sync" "github.com/m3db/m3/src/dbnode/storage/bootstrap" + xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/ident" ) @@ -86,24 +86,25 @@ func (a *namespaceDataAccumulator) Close() error { return errAlreadyClosed } - a.closed = true - // Release all refs. - var errs []error + multiError := xerrors.NewMultiError() for _, elem := range a.needsRelease { if err := elem.ReleaseRef(); err != nil { - errs = append(errs, err) + multiError = multiError.Add(err) } } + if !multiError.Empty() { + return multiError.FinalError() + } + + a.closed = true + // Memset optimization for reset. for i := range a.needsRelease { a.needsRelease[i] = nil } a.needsRelease = a.needsRelease[:0] - if len(errs) > 0 { - return fmt.Errorf("got %d release ref errors", len(errs)) - } return nil } From c0877e8ce8ce3775b954c404ddec28a6067b7185 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Mon, 8 Mar 2021 12:52:56 +0200 Subject: [PATCH 13/31] fixed index write when using series ref. added UniqueIndex to SeriesRef. --- src/dbnode/storage/bootstrap/types.go | 3 +++ ...mespace_bootstrap_data_accumulator_test.go | 24 ++++++++++++------- src/dbnode/storage/series/lookup/entry.go | 5 ++++ src/dbnode/storage/shard.go | 8 ++----- 4 files changed, 26 insertions(+), 14 deletions(-) diff --git a/src/dbnode/storage/bootstrap/types.go b/src/dbnode/storage/bootstrap/types.go index d6c48a79a8..c520f2441d 100644 --- a/src/dbnode/storage/bootstrap/types.go +++ b/src/dbnode/storage/bootstrap/types.go @@ -493,6 +493,9 @@ type SeriesRef interface { block block.DatabaseBlock, writeType series.WriteType, ) error + + // UniqueIndex is the unique index for the series. + UniqueIndex() uint64 } // SeriesRefResolver is a series resolver for just in time resolving of diff --git a/src/dbnode/storage/namespace_bootstrap_data_accumulator_test.go b/src/dbnode/storage/namespace_bootstrap_data_accumulator_test.go index a56c15115e..f7c6a86c87 100644 --- a/src/dbnode/storage/namespace_bootstrap_data_accumulator_test.go +++ b/src/dbnode/storage/namespace_bootstrap_data_accumulator_test.go @@ -39,8 +39,9 @@ var ( ) type seriesTestResolver struct { - series bootstrap.SeriesRef - calls int + series bootstrap.SeriesRef + calls int + uniqueIndex uint64 } func (r *seriesTestResolver) SeriesRef() (bootstrap.SeriesRef, error) { @@ -52,6 +53,10 @@ func (r *seriesTestResolver) ReleaseRef() error { return nil } +func (r *seriesTestResolver) UniqueIndex() uint64 { + return r.uniqueIndex +} + type checkoutFn func(bootstrap.NamespaceDataAccumulator, uint32, ident.ID, ident.TagIterator) (bootstrap.CheckoutSeriesResult, error) @@ -87,17 +92,19 @@ func testCheckoutSeries(t *testing.T, checkoutFn checkoutFn) { ctrl := xtest.NewController(t) defer ctrl.Finish() var ( - ns = NewMockdatabaseNamespace(ctrl) - series = series.NewMockDatabaseSeries(ctrl) - acc = NewDatabaseNamespaceDataAccumulator(ns) - shardID = uint32(7) - resolver = &seriesTestResolver{series: series} - ref = SeriesReadWriteRef{ + ns = NewMockdatabaseNamespace(ctrl) + series = series.NewMockDatabaseSeries(ctrl) + acc = NewDatabaseNamespaceDataAccumulator(ns) + uniqueIdx = uint64(10) + shardID = uint32(7) + resolver = &seriesTestResolver{series: series, uniqueIndex: uniqueIdx} + ref = SeriesReadWriteRef{ Resolver: resolver, Shard: shardID, } ) + series.EXPECT().UniqueIndex().Return(uniqueIdx).AnyTimes() ns.EXPECT().SeriesReadWriteRef(shardID, id, tagIter).Return(ref, true, nil) ns.EXPECT().SeriesReadWriteRef(shardID, idErr, tagIter). Return(SeriesReadWriteRef{}, false, errors.New("err")) @@ -110,6 +117,7 @@ func testCheckoutSeries(t *testing.T, checkoutFn checkoutFn) { seriesRef, err := seriesResult.Resolver.SeriesRef() require.NoError(t, err) require.Equal(t, series, seriesRef) + require.Equal(t, uniqueIdx, seriesRef.UniqueIndex()) require.Equal(t, shardID, seriesResult.Shard) cast, ok := acc.(*namespaceDataAccumulator) diff --git a/src/dbnode/storage/series/lookup/entry.go b/src/dbnode/storage/series/lookup/entry.go index a1319af9c7..a354861ac8 100644 --- a/src/dbnode/storage/series/lookup/entry.go +++ b/src/dbnode/storage/series/lookup/entry.go @@ -237,6 +237,11 @@ func (entry *Entry) LoadBlock( return entry.Series.LoadBlock(block, writeType) } +// UniqueIndex is the unique index for the series. +func (entry *Entry) UniqueIndex() uint64 { + return entry.Series.UniqueIndex() +} + func (entry *Entry) maybeIndex(timestamp time.Time) error { idx := entry.indexWriter if idx == nil { diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 09f3a2c876..3cc714b6f2 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -1070,7 +1070,6 @@ func (s *dbShard) SeriesReadWriteRef( }, nil } - s.RLock() result, err := s.insertSeriesAsyncBatched(id, tags, dbShardInsertAsyncOptions{ // skipRateLimit for true since this method is used by bootstrapping // and should not be rate limited. @@ -1078,11 +1077,8 @@ func (s *dbShard) SeriesReadWriteRef( entryRefCountIncremented: false, // should be false because we weren't able to find entry. }) if err != nil { - s.RUnlock() return SeriesReadWriteRef{}, err } - result.entry.IncrementReaderWriterCount() - s.RUnlock() // Series will wait for the result to be batched together and inserted. return SeriesReadWriteRef{ Resolver: &seriesResolver{ @@ -1098,7 +1094,7 @@ type seriesStaticResolver struct { } func (r *seriesStaticResolver) SeriesRef() (bootstrap.SeriesRef, error) { - return r.entry.Series, nil + return r.entry, nil } func (r *seriesStaticResolver) ReleaseRef() error { @@ -1157,7 +1153,7 @@ func (r *seriesResolver) SeriesRef() (bootstrap.SeriesRef, error) { if err := r.resolve(); err != nil { return nil, err } - return r.entry.Series, nil + return r.entry, nil } func (r *seriesResolver) ReleaseRef() error { From 80f4e4201464f4c6c1bff38de95b2625a80707f9 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Mon, 8 Mar 2021 13:39:54 +0200 Subject: [PATCH 14/31] insert new entry and inc ref count. --- src/dbnode/storage/shard.go | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 3cc714b6f2..92e1753c93 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -1070,20 +1070,32 @@ func (s *dbShard) SeriesReadWriteRef( }, nil } - result, err := s.insertSeriesAsyncBatched(id, tags, dbShardInsertAsyncOptions{ - // skipRateLimit for true since this method is used by bootstrapping - // and should not be rate limited. - skipRateLimit: true, - entryRefCountIncremented: false, // should be false because we weren't able to find entry. - }) + entry, err = s.newShardEntry(id, newTagsIterArg(tags)) if err != nil { return SeriesReadWriteRef{}, err } + entry.IncrementReaderWriterCount() + + wg, err := s.insertQueue.Insert(dbShardInsert{ + entry: entry, + opts: dbShardInsertAsyncOptions{ + // skipRateLimit for true since this method is used by bootstrapping + // and should not be rate limited. + skipRateLimit: true, + entryRefCountIncremented: false, // should be false because otherwise entry won't be inserted. + }, + }) + // Series will wait for the result to be batched together and inserted. return SeriesReadWriteRef{ Resolver: &seriesResolver{ - insertAsyncResult: result, - shard: s, + insertAsyncResult: insertAsyncResult{ + wg: wg, + // Make sure to return the copied ID from the new series. + copiedID: entry.Series.ID(), + entry: entry, + }, + shard: s, }, Shard: s.shard, }, nil From b481a6cb3cbc68e1e10b6b000330e3b5357e4334 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Mon, 8 Mar 2021 15:15:25 +0200 Subject: [PATCH 15/31] fixed linter issues. --- src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go | 4 ++-- src/dbnode/storage/shard.go | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index f4c7c9cb99..4b08be7d12 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -899,10 +899,10 @@ func (s *commitLogSource) readSeriesBlocks( seriesBlocks := make([]seriesBlock, 0, reader.Entries()) for { id, tags, data, expectedChecksum, err := reader.Read() - if err != nil && err != io.EOF { + if err != nil && !errors.Is(err, io.EOF) { return nil, err } - if err == io.EOF { + if errors.Is(err, io.EOF) { break } diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 92e1753c93..b4a5b99f93 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -1085,6 +1085,9 @@ func (s *dbShard) SeriesReadWriteRef( entryRefCountIncremented: false, // should be false because otherwise entry won't be inserted. }, }) + if err != nil { + return SeriesReadWriteRef{}, err + } // Series will wait for the result to be batched together and inserted. return SeriesReadWriteRef{ From 6078828dca948e32682d826d1718b52239611c26 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Mon, 8 Mar 2021 16:08:28 +0200 Subject: [PATCH 16/31] small cleanup. removed unnecessary resolvers. --- .../storage/bootstrap/bootstrap_mock.go | 14 +++++ src/dbnode/storage/namespace.go | 10 ++-- .../namespace_bootstrap_data_accumulator.go | 8 +-- ...mespace_bootstrap_data_accumulator_test.go | 31 ++++------ src/dbnode/storage/series/lookup/entry.go | 16 ++++++ src/dbnode/storage/shard.go | 56 ++++++------------- src/dbnode/storage/storage_mock.go | 28 +++++----- src/dbnode/storage/types.go | 14 ++--- 8 files changed, 87 insertions(+), 90 deletions(-) diff --git a/src/dbnode/storage/bootstrap/bootstrap_mock.go b/src/dbnode/storage/bootstrap/bootstrap_mock.go index 26aa406362..6fa7f4ea0b 100644 --- a/src/dbnode/storage/bootstrap/bootstrap_mock.go +++ b/src/dbnode/storage/bootstrap/bootstrap_mock.go @@ -903,6 +903,20 @@ func (mr *MockSeriesRefMockRecorder) LoadBlock(block, writeType interface{}) *go return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadBlock", reflect.TypeOf((*MockSeriesRef)(nil).LoadBlock), block, writeType) } +// UniqueIndex mocks base method +func (m *MockSeriesRef) UniqueIndex() uint64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UniqueIndex") + ret0, _ := ret[0].(uint64) + return ret0 +} + +// UniqueIndex indicates an expected call of UniqueIndex +func (mr *MockSeriesRefMockRecorder) UniqueIndex() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UniqueIndex", reflect.TypeOf((*MockSeriesRef)(nil).UniqueIndex)) +} + // MockSeriesRefResolver is a mock of SeriesRefResolver interface type MockSeriesRefResolver struct { ctrl *gomock.Controller diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index d63aacb52f..6173a40985 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -741,19 +741,19 @@ func (n *dbNamespace) WritePendingIndexInserts( return n.reverseIndex.WritePending(pending) } -func (n *dbNamespace) SeriesReadWriteRef( +func (n *dbNamespace) SeriesRefResolver( shardID uint32, id ident.ID, tags ident.TagIterator, -) (SeriesReadWriteRef, bool, error) { +) (bootstrap.SeriesRefResolver, bool, error) { n.RLock() shard, owned, err := n.shardAtWithRLock(shardID) n.RUnlock() if err != nil { - return SeriesReadWriteRef{}, owned, err + return nil, owned, err } - res, err := shard.SeriesReadWriteRef(id, tags) - return res, true, err + resolver, err := shard.SeriesRefResolver(id, tags) + return resolver, true, err } func (n *dbNamespace) QueryIDs( diff --git a/src/dbnode/storage/namespace_bootstrap_data_accumulator.go b/src/dbnode/storage/namespace_bootstrap_data_accumulator.go index 2767b8e6bf..ad2e5b546b 100644 --- a/src/dbnode/storage/namespace_bootstrap_data_accumulator.go +++ b/src/dbnode/storage/namespace_bootstrap_data_accumulator.go @@ -55,15 +55,15 @@ func (a *namespaceDataAccumulator) CheckoutSeriesWithoutLock( id ident.ID, tags ident.TagIterator, ) (bootstrap.CheckoutSeriesResult, bool, error) { - ref, owned, err := a.namespace.SeriesReadWriteRef(shardID, id, tags) + resolver, owned, err := a.namespace.SeriesRefResolver(shardID, id, tags) if err != nil { return bootstrap.CheckoutSeriesResult{}, owned, err } - a.needsRelease = append(a.needsRelease, ref.Resolver) + a.needsRelease = append(a.needsRelease, resolver) return bootstrap.CheckoutSeriesResult{ - Resolver: ref.Resolver, - Shard: ref.Shard, + Resolver: resolver, + Shard: shardID, }, true, nil } diff --git a/src/dbnode/storage/namespace_bootstrap_data_accumulator_test.go b/src/dbnode/storage/namespace_bootstrap_data_accumulator_test.go index f7c6a86c87..7cd34793ac 100644 --- a/src/dbnode/storage/namespace_bootstrap_data_accumulator_test.go +++ b/src/dbnode/storage/namespace_bootstrap_data_accumulator_test.go @@ -92,22 +92,18 @@ func testCheckoutSeries(t *testing.T, checkoutFn checkoutFn) { ctrl := xtest.NewController(t) defer ctrl.Finish() var ( - ns = NewMockdatabaseNamespace(ctrl) - series = series.NewMockDatabaseSeries(ctrl) - acc = NewDatabaseNamespaceDataAccumulator(ns) - uniqueIdx = uint64(10) - shardID = uint32(7) - resolver = &seriesTestResolver{series: series, uniqueIndex: uniqueIdx} - ref = SeriesReadWriteRef{ - Resolver: resolver, - Shard: shardID, - } + ns = NewMockdatabaseNamespace(ctrl) + mockSeries = series.NewMockDatabaseSeries(ctrl) + acc = NewDatabaseNamespaceDataAccumulator(ns) + uniqueIdx = uint64(10) + shardID = uint32(7) + resolver = &seriesTestResolver{series: mockSeries, uniqueIndex: uniqueIdx} ) - series.EXPECT().UniqueIndex().Return(uniqueIdx).AnyTimes() - ns.EXPECT().SeriesReadWriteRef(shardID, id, tagIter).Return(ref, true, nil) - ns.EXPECT().SeriesReadWriteRef(shardID, idErr, tagIter). - Return(SeriesReadWriteRef{}, false, errors.New("err")) + mockSeries.EXPECT().UniqueIndex().Return(uniqueIdx).AnyTimes() + ns.EXPECT().SeriesRefResolver(shardID, id, tagIter).Return(resolver, true, nil) + ns.EXPECT().SeriesRefResolver(shardID, idErr, tagIter). + Return(nil, false, errors.New("err")) _, err := checkoutFn(acc, shardID, idErr, tagIter) require.Error(t, err) @@ -116,7 +112,7 @@ func testCheckoutSeries(t *testing.T, checkoutFn checkoutFn) { require.NoError(t, err) seriesRef, err := seriesResult.Resolver.SeriesRef() require.NoError(t, err) - require.Equal(t, series, seriesRef) + require.Equal(t, mockSeries, seriesRef) require.Equal(t, uniqueIdx, seriesRef.UniqueIndex()) require.Equal(t, shardID, seriesResult.Shard) @@ -146,12 +142,9 @@ func testAccumulatorRelease(t *testing.T, checkoutFn checkoutFn) { acc = NewDatabaseNamespaceDataAccumulator(ns) shardID = uint32(1337) resolver = &seriesTestResolver{series: series.NewMockDatabaseSeries(ctrl)} - ref = SeriesReadWriteRef{ - Resolver: resolver, - } ) - ns.EXPECT().SeriesReadWriteRef(shardID, id, tagIter).Return(ref, true, nil) + ns.EXPECT().SeriesRefResolver(shardID, id, tagIter).Return(resolver, true, nil) _, err = checkoutFn(acc, shardID, id, tagIter) require.NoError(t, err) diff --git a/src/dbnode/storage/series/lookup/entry.go b/src/dbnode/storage/series/lookup/entry.go index a354861ac8..d8ad688a62 100644 --- a/src/dbnode/storage/series/lookup/entry.go +++ b/src/dbnode/storage/series/lookup/entry.go @@ -82,6 +82,9 @@ var _ index.OnIndexSeries = &Entry{} // ensure Entry satisfies the `bootstrap.SeriesRef` interface. var _ bootstrap.SeriesRef = &Entry{} +// // ensure Entry satisfies the `bootstrap.SeriesRefResolver` interface. +var _ bootstrap.SeriesRefResolver = &Entry{} + // NewEntryOptions supplies options for a new entry. type NewEntryOptions struct { Series series.DatabaseSeries @@ -262,6 +265,19 @@ func (entry *Entry) maybeIndex(timestamp time.Time) error { return idx.WritePending(entry.pendingIndexBatchSizeOne) } +// SeriesRef returns the series read write ref. +func (entry *Entry) SeriesRef() (bootstrap.SeriesRef, error) { + return entry, nil +} + +// ReleaseRef must be called after using the series ref +// to release the reference count to the series so it can +// be expired by the owning shard eventually. +func (entry *Entry) ReleaseRef() error { + entry.OnReleaseReadWriteRef() + return nil +} + // entryIndexState is used to capture the state of indexing for a single shard // entry. It's used to prevent redundant indexing operations. // NB(prateek): We need this amount of state because in the worst case, as we can have 3 active blocks being diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index b4a5b99f93..cd67f83359 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -1050,32 +1050,26 @@ func (s *dbShard) writeAndIndex( }, nil } -func (s *dbShard) SeriesReadWriteRef( +func (s *dbShard) SeriesRefResolver( id ident.ID, tags ident.TagIterator, -) (SeriesReadWriteRef, error) { +) (bootstrap.SeriesRefResolver, error) { // Try retrieve existing series. entry, _, err := s.tryRetrieveWritableSeries(id) if err != nil { - return SeriesReadWriteRef{}, err + return nil, err } if entry != nil { // The read/write ref is already incremented. - return SeriesReadWriteRef{ - Resolver: &seriesStaticResolver{ - entry: entry, - }, - Shard: s.shard, - }, nil + return entry, nil } entry, err = s.newShardEntry(id, newTagsIterArg(tags)) if err != nil { - return SeriesReadWriteRef{}, err + return nil, err } entry.IncrementReaderWriterCount() - wg, err := s.insertQueue.Insert(dbShardInsert{ entry: entry, opts: dbShardInsertAsyncOptions{ @@ -1086,42 +1080,24 @@ func (s *dbShard) SeriesReadWriteRef( }, }) if err != nil { - return SeriesReadWriteRef{}, err + return nil, err } // Series will wait for the result to be batched together and inserted. - return SeriesReadWriteRef{ - Resolver: &seriesResolver{ - insertAsyncResult: insertAsyncResult{ - wg: wg, - // Make sure to return the copied ID from the new series. - copiedID: entry.Series.ID(), - entry: entry, - }, - shard: s, - }, - Shard: s.shard, + return &seriesResolver{ + wg: wg, + // Make sure to return the copied ID from the new series. + copiedID: entry.Series.ID(), + shard: s, }, nil } -type seriesStaticResolver struct { - entry *lookup.Entry -} - -func (r *seriesStaticResolver) SeriesRef() (bootstrap.SeriesRef, error) { - return r.entry, nil -} - -func (r *seriesStaticResolver) ReleaseRef() error { - r.entry.OnReleaseReadWriteRef() - return nil -} - type seriesResolver struct { sync.RWMutex - insertAsyncResult insertAsyncResult - shard *dbShard + wg *sync.WaitGroup + copiedID ident.ID + shard *dbShard resolved bool resolvedResult error @@ -1144,8 +1120,8 @@ func (r *seriesResolver) resolve() error { return r.resolvedResult } - r.insertAsyncResult.wg.Wait() - id := r.insertAsyncResult.copiedID + r.wg.Wait() + id := r.copiedID entry, _, err := r.shard.tryRetrieveWritableSeries(id) // Retrieve the inserted entry if err != nil { diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index 854ea4b572..4052e4f7cf 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -1809,20 +1809,20 @@ func (mr *MockdatabaseNamespaceMockRecorder) FlushState(shardID, blockStart inte return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FlushState", reflect.TypeOf((*MockdatabaseNamespace)(nil).FlushState), shardID, blockStart) } -// SeriesReadWriteRef mocks base method -func (m *MockdatabaseNamespace) SeriesReadWriteRef(shardID uint32, id ident.ID, tags ident.TagIterator) (SeriesReadWriteRef, bool, error) { +// SeriesRefResolver mocks base method +func (m *MockdatabaseNamespace) SeriesRefResolver(shardID uint32, id ident.ID, tags ident.TagIterator) (bootstrap.SeriesRefResolver, bool, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SeriesReadWriteRef", shardID, id, tags) - ret0, _ := ret[0].(SeriesReadWriteRef) + ret := m.ctrl.Call(m, "SeriesRefResolver", shardID, id, tags) + ret0, _ := ret[0].(bootstrap.SeriesRefResolver) ret1, _ := ret[1].(bool) ret2, _ := ret[2].(error) return ret0, ret1, ret2 } -// SeriesReadWriteRef indicates an expected call of SeriesReadWriteRef -func (mr *MockdatabaseNamespaceMockRecorder) SeriesReadWriteRef(shardID, id, tags interface{}) *gomock.Call { +// SeriesRefResolver indicates an expected call of SeriesRefResolver +func (mr *MockdatabaseNamespaceMockRecorder) SeriesRefResolver(shardID, id, tags interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SeriesReadWriteRef", reflect.TypeOf((*MockdatabaseNamespace)(nil).SeriesReadWriteRef), shardID, id, tags) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SeriesRefResolver", reflect.TypeOf((*MockdatabaseNamespace)(nil).SeriesRefResolver), shardID, id, tags) } // WritePendingIndexInserts mocks base method @@ -2345,19 +2345,19 @@ func (mr *MockdatabaseShardMockRecorder) Repair(ctx, nsCtx, nsMeta, tr, repairer return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Repair", reflect.TypeOf((*MockdatabaseShard)(nil).Repair), ctx, nsCtx, nsMeta, tr, repairer) } -// SeriesReadWriteRef mocks base method -func (m *MockdatabaseShard) SeriesReadWriteRef(id ident.ID, tags ident.TagIterator) (SeriesReadWriteRef, error) { +// SeriesRefResolver mocks base method +func (m *MockdatabaseShard) SeriesRefResolver(id ident.ID, tags ident.TagIterator) (bootstrap.SeriesRefResolver, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SeriesReadWriteRef", id, tags) - ret0, _ := ret[0].(SeriesReadWriteRef) + ret := m.ctrl.Call(m, "SeriesRefResolver", id, tags) + ret0, _ := ret[0].(bootstrap.SeriesRefResolver) ret1, _ := ret[1].(error) return ret0, ret1 } -// SeriesReadWriteRef indicates an expected call of SeriesReadWriteRef -func (mr *MockdatabaseShardMockRecorder) SeriesReadWriteRef(id, tags interface{}) *gomock.Call { +// SeriesRefResolver indicates an expected call of SeriesRefResolver +func (mr *MockdatabaseShardMockRecorder) SeriesRefResolver(id, tags interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SeriesReadWriteRef", reflect.TypeOf((*MockdatabaseShard)(nil).SeriesReadWriteRef), id, tags) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SeriesRefResolver", reflect.TypeOf((*MockdatabaseShard)(nil).SeriesRefResolver), id, tags) } // DocRef mocks base method diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 99c3f8c56e..e7143e156e 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -459,14 +459,14 @@ type databaseNamespace interface { // FlushState returns the flush state for the specified shard and block start. FlushState(shardID uint32, blockStart time.Time) (fileOpState, error) - // SeriesReadWriteRef returns a read/write ref to a series, callers + // SeriesRefResolver returns a series ref resolver, callers // must make sure to call the release callback once finished // with the reference. - SeriesReadWriteRef( + SeriesRefResolver( shardID uint32, id ident.ID, tags ident.TagIterator, - ) (result SeriesReadWriteRef, owned bool, err error) + ) (result bootstrap.SeriesRefResolver, owned bool, err error) // WritePendingIndexInserts will write any pending index inserts. WritePendingIndexInserts(pending []writes.PendingIndexInsert) error @@ -485,8 +485,6 @@ type databaseNamespace interface { type SeriesReadWriteRef struct { // Resolver resolves the reference for read/writing. Resolver bootstrap.SeriesRefResolver - // Shard is the shard of the series. - Shard uint32 } // Shard is a time series database shard. @@ -651,13 +649,13 @@ type databaseShard interface { repairer databaseShardRepairer, ) (repair.MetadataComparisonResult, error) - // SeriesReadWriteRef returns a read/write ref to a series, callers + // SeriesRefResolver returns a series ref resolver, callers // must make sure to call the release callback once finished // with the reference. - SeriesReadWriteRef( + SeriesRefResolver( id ident.ID, tags ident.TagIterator, - ) (SeriesReadWriteRef, error) + ) (bootstrap.SeriesRefResolver, error) // DocRef returns the doc if already present in a shard series. DocRef(id ident.ID) (doc.Metadata, bool, error) From 2f5f2c829e883b5ea14cccdc25bba10be5e0d38a Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Tue, 9 Mar 2021 11:13:49 +0200 Subject: [PATCH 17/31] optimized series ref resolver usage. --- .../bootstrapper/commitlog/source.go | 285 ++++++++++-------- .../bootstrap/bootstrapper/peers/source.go | 58 ++-- 2 files changed, 190 insertions(+), 153 deletions(-) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index 4b08be7d12..17cd8d4341 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -51,35 +51,14 @@ import ( ) const ( - workerChannelSize = 256 + workerChannelSize = 256 + readSeriesBlocksWorkerChannelSize = 512 ) type newIteratorFn func(opts commitlog.IteratorOpts) ( iter commitlog.Iterator, corruptFiles []commitlog.ErrorWithPath, err error) type snapshotFilesFn func(filePathPrefix string, namespace ident.ID, shard uint32) (fs.FileSetFilesSlice, error) -type commitLogSource struct { - opts Options - log *zap.Logger - nowFn func() time.Time - - // Filesystem inspection capture before node was started. - inspection fs.Inspection - - newIteratorFn newIteratorFn - snapshotFilesFn snapshotFilesFn - newReaderFn fs.NewReaderFn - - metrics commitLogSourceMetrics - // Cache the results of reading the commit log between passes. The commit log is not sharded by time range, so the - // entire log needs to be read irrespective of the configured time ranges for the pass. The commit log only needs - // to be read once (during the first pass) and the results can be subsequently cached and returned on future passes. - // Since the bootstrapper is single threaded this does not need to be guarded with a mutex. - commitLogResult commitLogResult - - instrumentation *instrumentation -} - type bootstrapNamespace struct { namespaceID []byte bootstrapping bool @@ -118,6 +97,135 @@ type accumulateWorker struct { numErrors int } +type seriesBlock struct { + resolver bootstrap.SeriesRefResolver + block block.DatabaseBlock +} + +type readSeriesBlocksWorker struct { + dataCh chan seriesBlock + reader fs.DataFileSetReader + shard uint32 + accumulator bootstrap.NamespaceDataAccumulator + blocksPool block.DatabaseBlockPool + blockStart time.Time + blockSize time.Duration + nsCtx namespace.Context +} + +func (w *readSeriesBlocksWorker) readSeriesBlocks() error { + defer close(w.dataCh) + for { + id, tags, data, expectedChecksum, err := w.reader.Read() + if err != nil && !errors.Is(err, io.EOF) { + return err + } + if errors.Is(err, io.EOF) { + break + } + + dbBlock := w.blocksPool.Get() + dbBlock.Reset(w.blockStart, w.blockSize, + ts.NewSegment(data, nil, 0, ts.FinalizeHead), w.nsCtx) + + // Resetting the block will trigger a checksum calculation, so use + // that instead of calculating it twice. + checksum, err := dbBlock.Checksum() + if err != nil { + return err + } + if checksum != expectedChecksum { + return fmt.Errorf("checksum for series: %s was %d but expected %d", + id, checksum, expectedChecksum) + } + + res, owned, err := w.accumulator.CheckoutSeriesWithoutLock(w.shard, id, tags) + if err != nil { + if !owned { + // Skip bootstrapping this series if we don't own it. + continue + } + return err + } + + w.dataCh <- seriesBlock{ + resolver: res.Resolver, + block: dbBlock, + } + + id.Finalize() + tags.Close() + } + return nil +} + +type readNamespaceResult struct { + namespace bootstrap.Namespace + dataAndIndexShardRanges result.ShardTimeRanges +} + +type commitLogResult struct { + shouldReturnUnfulfilled bool + // ensures we only read the commit log once + read bool +} + +type commitLogSourceMetrics struct { + corruptCommitlogFile tally.Counter + bootstrapping tally.Gauge + commitLogEntriesRead tally.Counter +} + +func newCommitLogSourceMetrics(scope tally.Scope) commitLogSourceMetrics { + return commitLogSourceMetrics{ + corruptCommitlogFile: scope.SubScope("commitlog").Counter("corrupt"), + bootstrapping: scope.SubScope("status").Gauge("bootstrapping"), + commitLogEntriesRead: scope.SubScope("commitlog").Counter("entries-read"), + } +} + +type gaugeLoopCloserFn func() + +func (m commitLogSourceMetrics) emitBootstrapping() gaugeLoopCloserFn { + doneCh := make(chan struct{}) + go func() { + for { + select { + case <-doneCh: + m.bootstrapping.Update(0) + return + default: + m.bootstrapping.Update(1) + time.Sleep(time.Second) + } + } + }() + + return func() { close(doneCh) } +} + +type commitLogSource struct { + opts Options + log *zap.Logger + nowFn func() time.Time + + // Filesystem inspection capture before node was started. + inspection fs.Inspection + + newIteratorFn newIteratorFn + snapshotFilesFn snapshotFilesFn + newReaderFn fs.NewReaderFn + + metrics commitLogSourceMetrics + // Cache the results of reading the commit log between passes. The commit log is not sharded by time range, so the + // entire log needs to be read irrespective of the configured time ranges for the pass. The commit log only needs + // to be read once (during the first pass) and the results can be subsequently cached and returned on future passes. + // Since the bootstrapper is single threaded this does not need to be guarded with a mutex. + commitLogResult commitLogResult + + instrumentation *instrumentation +} + func newCommitLogSource( opts Options, inspection fs.Inspection, @@ -168,11 +276,6 @@ func (s *commitLogSource) AvailableIndex( return s.availability(ns, shardsTimeRanges, runOpts) } -type readNamespaceResult struct { - namespace bootstrap.Namespace - dataAndIndexShardRanges result.ShardTimeRanges -} - // Read will read all commitlog files on disk, as well as as the latest snapshot for // each shard/block combination (if it exists) and merge them. // TODO(rartoul): Make this take the SnapshotMetadata files into account to reduce the @@ -276,12 +379,6 @@ func (s *commitLogSource) Read( return bootstrapResult, nil } -type commitLogResult struct { - shouldReturnUnfulfilled bool - // ensures we only read the commit log once - read bool -} - func (s *commitLogSource) readCommitLog(namespaces bootstrap.Namespaces, span opentracing.Span) (commitLogResult, error) { // Setup the series accumulator pipeline. var ( @@ -503,7 +600,7 @@ func (s *commitLogSource) readCommitLog(namespaces bootstrap.Namespaces, span op // Check out the series for writing, no need for concurrency // as commit log bootstrapper does not perform parallel // checking out of series. - series, owned, err := accumulator.CheckoutSeriesWithLock( + series, owned, err := accumulator.CheckoutSeriesWithoutLock( entry.Series.Shard, entry.Series.ID, tagIter) @@ -778,11 +875,6 @@ func (s *commitLogSource) bootstrapShardSnapshots( return nil } -type seriesBlock struct { - resolver bootstrap.SeriesRefResolver - block block.DatabaseBlock -} - func (s *commitLogSource) bootstrapShardBlockSnapshot( ns namespace.Metadata, accumulator bootstrap.NamespaceDataAccumulator, @@ -835,13 +927,25 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( zap.Time("blockStart", blockStart), zap.Int("volume", mostRecentCompleteSnapshot.ID.VolumeIndex)) - seriesBlocks, err := s.readSeriesBlocks(reader, shard, accumulator, - blocksPool, blockStart, blockSize, nsCtx) - if err != nil { - return err + var workerErr error + worker := &readSeriesBlocksWorker{ + dataCh: make(chan seriesBlock, readSeriesBlocksWorkerChannelSize), + reader: reader, + shard: shard, + accumulator: accumulator, + blocksPool: blocksPool, + blockStart: blockStart, + blockSize: blockSize, + nsCtx: nsCtx, } - for _, seriesBlock := range seriesBlocks { + go func() { + if workerErr = worker.readSeriesBlocks(); workerErr != nil { + s.log.Error("series read blocks error", zap.Error(workerErr)) + } + }() + + for seriesBlock := range worker.dataCh { // Load into series. seriesRef, err := seriesBlock.resolver.SeriesRef() if err != nil { @@ -852,7 +956,8 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( return err } } - return nil + + return workerErr } func (s *commitLogSource) mostRecentSnapshotByBlockShard( @@ -887,60 +992,6 @@ func (s *commitLogSource) mostRecentSnapshotByBlockShard( return mostRecentCompleteSnapshotByBlockShard, nil } -func (s *commitLogSource) readSeriesBlocks( - reader fs.DataFileSetReader, - shard uint32, - accumulator bootstrap.NamespaceDataAccumulator, - blocksPool block.DatabaseBlockPool, - blockStart time.Time, - blockSize time.Duration, - nsCtx namespace.Context, -) ([]seriesBlock, error) { - seriesBlocks := make([]seriesBlock, 0, reader.Entries()) - for { - id, tags, data, expectedChecksum, err := reader.Read() - if err != nil && !errors.Is(err, io.EOF) { - return nil, err - } - if errors.Is(err, io.EOF) { - break - } - - dbBlock := blocksPool.Get() - dbBlock.Reset(blockStart, blockSize, - ts.NewSegment(data, nil, 0, ts.FinalizeHead), nsCtx) - - // Resetting the block will trigger a checksum calculation, so use - // that instead of calculating it twice. - checksum, err := dbBlock.Checksum() - if err != nil { - return nil, err - } - if checksum != expectedChecksum { - return nil, fmt.Errorf("checksum for series: %s was %d but expected %d", - id, checksum, expectedChecksum) - } - - res, owned, err := accumulator.CheckoutSeriesWithoutLock(shard, id, tags) - if err != nil { - if !owned { - // Skip bootstrapping this series if we don't own it. - continue - } - return nil, err - } - - seriesBlocks = append(seriesBlocks, seriesBlock{ - resolver: res.Resolver, - block: dbBlock, - }) - - id.Finalize() - tags.Close() - } - return seriesBlocks, nil -} - // TODO(rartoul): Refactor this to take the SnapshotMetadata files into account to reduce // the number of commitlog files that need to be read. func (s *commitLogSource) readCommitLogFilePredicate(f commitlog.FileFilterInfo) bool { @@ -1155,37 +1206,3 @@ func (s *commitLogSource) shardsReplicated( majorityReplicas := initialTopologyState.MajorityReplicas return majorityReplicas > 1 } - -type commitLogSourceMetrics struct { - corruptCommitlogFile tally.Counter - bootstrapping tally.Gauge - commitLogEntriesRead tally.Counter -} - -func newCommitLogSourceMetrics(scope tally.Scope) commitLogSourceMetrics { - return commitLogSourceMetrics{ - corruptCommitlogFile: scope.SubScope("commitlog").Counter("corrupt"), - bootstrapping: scope.SubScope("status").Gauge("bootstrapping"), - commitLogEntriesRead: scope.SubScope("commitlog").Counter("entries-read"), - } -} - -type gaugeLoopCloserFn func() - -func (m commitLogSourceMetrics) emitBootstrapping() gaugeLoopCloserFn { - doneCh := make(chan struct{}) - go func() { - for { - select { - case <-doneCh: - m.bootstrapping.Update(0) - return - default: - m.bootstrapping.Update(1) - time.Sleep(time.Second) - } - } - }() - - return func() { close(doneCh) } -} diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index b8ed276d8c..b44b2305c0 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -36,6 +36,7 @@ import ( "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs" + "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/dbnode/storage/bootstrap" "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" @@ -57,6 +58,8 @@ import ( var errNamespaceNotFound = errors.New("namespace not found") +const readSeriesBlocksWorkerChannelSize = 512 + type peersSource struct { opts Options newPersistManager func() (persist.Manager, error) @@ -381,6 +384,11 @@ func (s *peersSource) runPersistenceQueueWorkerLoop( } } +type seriesBlocks struct { + resolver bootstrap.SeriesRefResolver + block block.DatabaseSeriesBlocks +} + // fetchBootstrapBlocksFromPeers loops through all the provided ranges for a given shard and // fetches all the bootstrap blocks from the appropriate peers. // Persistence enabled case: Immediately add the results to the bootstrap result @@ -433,37 +441,49 @@ func (s *peersSource) fetchBootstrapBlocksFromPeers( continue } - // If not waiting to flush, add straight away to bootstrap result. - for _, elem := range shardResult.AllSeries().Iter() { - entry := elem.Value() - tagsIter.Reset(entry.Tags) - ref, owned, err := accumulator.CheckoutSeriesWithLock(shard, entry.ID, tagsIter) - if err != nil { - if !owned { - // Only if we own this shard do we care consider this an - // error in bootstrapping. + dataCh := make(chan seriesBlocks, readSeriesBlocksWorkerChannelSize) + go func() { + defer close(dataCh) + for _, elem := range shardResult.AllSeries().Iter() { + entry := elem.Value() + tagsIter.Reset(entry.Tags) + ref, owned, err := accumulator.CheckoutSeriesWithLock(shard, entry.ID, tagsIter) + if err != nil { + if !owned { + // Only if we own this shard do we care consider this an + // error in bootstrapping. + continue + } + unfulfill(currRange) + s.log.Error("could not checkout series", zap.Error(err)) continue } - unfulfill(currRange) - s.log.Error("could not checkout series", zap.Error(err)) - continue + + dataCh <- seriesBlocks{ + resolver: ref.Resolver, + block: entry.Blocks, + } + + // Safe to finalize these IDs and Tags, shard result no longer used. + entry.ID.Finalize() + entry.Tags.Finalize() } + }() - seriesRef, err := ref.Resolver.SeriesRef() + for seriesBlocks := range dataCh { + seriesRef, err := seriesBlocks.resolver.SeriesRef() if err != nil { s.log.Error("could not resolve seriesRef", zap.Error(err)) + unfulfill(currRange) continue } - for _, block := range entry.Blocks.AllBlocks() { - if err := seriesRef.LoadBlock(block, series.WarmWrite); err != nil { + + for _, bl := range seriesBlocks.block.AllBlocks() { + if err := seriesRef.LoadBlock(bl, series.WarmWrite); err != nil { unfulfill(currRange) s.log.Error("could not load series block", zap.Error(err)) } } - - // Safe to finalize these IDs and Tags, shard result no longer used. - entry.ID.Finalize() - entry.Tags.Finalize() } } } From 05bec35f765b4340ec71d1acaaddded04705e3c2 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Tue, 9 Mar 2021 12:02:47 +0200 Subject: [PATCH 18/31] added unit test. --- src/dbnode/storage/shard_test.go | 54 ++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/src/dbnode/storage/shard_test.go b/src/dbnode/storage/shard_test.go index 6faef2fc86..a0dab77d34 100644 --- a/src/dbnode/storage/shard_test.go +++ b/src/dbnode/storage/shard_test.go @@ -1925,6 +1925,60 @@ func TestOpenStreamingReader(t *testing.T) { require.NoError(t, err) } +func TestSeriesRefResolver(t *testing.T) { + ctrl := xtest.NewController(t) + shard := testDatabaseShard(t, DefaultTestOptions()) + ctx := context.NewBackground() + defer func() { + ctrl.Finish() + _ = shard.Close() + ctx.Close() + }() + + seriesID := ident.StringID("foo+bar=baz") + seriesTags := ident.NewTags(ident.Tag{ + Name: ident.StringID("bar"), + Value: ident.StringID("baz"), + }) + + iter := ident.NewMockTagIterator(ctrl) + // Ensure duplicate called but no close, etc + iter.EXPECT(). + Duplicate(). + Times(1). + Return(ident.NewTagsIterator(seriesTags)) + + now := time.Now() + + resolver, err := shard.SeriesRefResolver(seriesID, iter) + require.NoError(t, err) + seriesRef, err := resolver.SeriesRef() + require.NoError(t, err) + write, writeType, err := seriesRef.Write(ctx, now, 1.0, xtime.Second, + []byte("annotation1"), series.WriteOptions{}) + require.NoError(t, err) + require.Equal(t, series.WarmWrite, writeType) + require.True(t, write) + + // should return already inserted entry as series. + resolverEntry, err := shard.SeriesRefResolver(seriesID, iter) + require.NoError(t, err) + require.IsType(t, &lookup.Entry{}, resolverEntry) + refEntry, err := resolverEntry.SeriesRef() + require.NoError(t, err) + require.Equal(t, seriesRef, refEntry) + + databaseBlock := block.NewMockDatabaseBlock(ctrl) + databaseBlock.EXPECT().StartTime().Return(now).AnyTimes() + err = seriesRef.LoadBlock(databaseBlock, series.ColdWrite) + require.NoError(t, err) + + err = resolver.ReleaseRef() + require.NoError(t, err) + err = resolverEntry.ReleaseRef() + require.NoError(t, err) +} + func getMockReader( ctrl *gomock.Controller, t *testing.T, From 629efb90b35b4c95643d682e2b05e2978c859640 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Tue, 9 Mar 2021 15:37:05 +0200 Subject: [PATCH 19/31] fixing possible race. --- .../bootstrapper/commitlog/source.go | 21 +++++++++++++------ src/dbnode/storage/bootstrap/types.go | 2 +- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index 17cd8d4341..c878a5c02f 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -114,7 +114,6 @@ type readSeriesBlocksWorker struct { } func (w *readSeriesBlocksWorker) readSeriesBlocks() error { - defer close(w.dataCh) for { id, tags, data, expectedChecksum, err := w.reader.Read() if err != nil && !errors.Is(err, io.EOF) { @@ -940,12 +939,23 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( } go func() { - if workerErr = worker.readSeriesBlocks(); workerErr != nil { - s.log.Error("series read blocks error", zap.Error(workerErr)) + if err := worker.readSeriesBlocks(); err != nil { + s.log.Error("series read blocks error", zap.Error(err)) + workerErr = err } + close(worker.dataCh) }() - for seriesBlock := range worker.dataCh { + if err := s.loadBlocks(worker.dataCh, writeType); err != nil { + close(worker.dataCh) + return err + } + + return workerErr +} + +func (s *commitLogSource) loadBlocks(dataCh <-chan seriesBlock, writeType series.WriteType) error { + for seriesBlock := range dataCh { // Load into series. seriesRef, err := seriesBlock.resolver.SeriesRef() if err != nil { @@ -956,8 +966,7 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( return err } } - - return workerErr + return nil } func (s *commitLogSource) mostRecentSnapshotByBlockShard( diff --git a/src/dbnode/storage/bootstrap/types.go b/src/dbnode/storage/bootstrap/types.go index c520f2441d..30dd871ae4 100644 --- a/src/dbnode/storage/bootstrap/types.go +++ b/src/dbnode/storage/bootstrap/types.go @@ -279,7 +279,7 @@ type NamespaceDataAccumulator interface { // CheckoutSeriesResult is the result of a checkout series operation. type CheckoutSeriesResult struct { - // Resolver is the series for the checkout operation. + // Resolver is the series read write ref resolver. Resolver SeriesRefResolver // Shard is the shard for the series. Shard uint32 From 28caa333e261f4e0b9c6b363e803518f0f9dcf5b Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Tue, 9 Mar 2021 15:52:20 +0200 Subject: [PATCH 20/31] simplified error handling in commit log bs. --- .../bootstrap/bootstrapper/commitlog/source.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index c878a5c02f..4277fa1a3d 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -28,6 +28,8 @@ import ( "sync" "time" + "golang.org/x/sync/errgroup" + "github.com/m3db/m3/src/cluster/shard" "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/persist" @@ -114,6 +116,7 @@ type readSeriesBlocksWorker struct { } func (w *readSeriesBlocksWorker) readSeriesBlocks() error { + defer close(w.dataCh) for { id, tags, data, expectedChecksum, err := w.reader.Read() if err != nil && !errors.Is(err, io.EOF) { @@ -926,7 +929,6 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( zap.Time("blockStart", blockStart), zap.Int("volume", mostRecentCompleteSnapshot.ID.VolumeIndex)) - var workerErr error worker := &readSeriesBlocksWorker{ dataCh: make(chan seriesBlock, readSeriesBlocksWorkerChannelSize), reader: reader, @@ -938,20 +940,15 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( nsCtx: nsCtx, } - go func() { - if err := worker.readSeriesBlocks(); err != nil { - s.log.Error("series read blocks error", zap.Error(err)) - workerErr = err - } - close(worker.dataCh) - }() - + ctx := context.NewBackground() + errs, _ := errgroup.WithContext(ctx.GoContext()) + errs.Go(worker.readSeriesBlocks) if err := s.loadBlocks(worker.dataCh, writeType); err != nil { close(worker.dataCh) return err } - return workerErr + return errs.Wait() } func (s *commitLogSource) loadBlocks(dataCh <-chan seriesBlock, writeType series.WriteType) error { From 79343ed9701b7f41df668276343df1e790a0ba21 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Wed, 10 Mar 2021 10:07:05 +0200 Subject: [PATCH 21/31] use cancellable context to cancel read series goroutine if load blocks returns an error. --- .../bootstrapper/commitlog/source.go | 29 ++++++++++++------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index 4277fa1a3d..45188d45d0 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -22,14 +22,13 @@ package commitlog import ( "bytes" + "context" "errors" "fmt" "io" "sync" "time" - "golang.org/x/sync/errgroup" - "github.com/m3db/m3/src/cluster/shard" "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/persist" @@ -42,7 +41,7 @@ import ( "github.com/m3db/m3/src/dbnode/topology" "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/x/checked" - "github.com/m3db/m3/src/x/context" + xcontext "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" xtime "github.com/m3db/m3/src/x/time" @@ -50,6 +49,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/uber-go/tally" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) const ( @@ -115,7 +115,7 @@ type readSeriesBlocksWorker struct { nsCtx namespace.Context } -func (w *readSeriesBlocksWorker) readSeriesBlocks() error { +func (w *readSeriesBlocksWorker) readSeriesBlocks(ctx context.Context) error { defer close(w.dataCh) for { id, tags, data, expectedChecksum, err := w.reader.Read() @@ -157,6 +157,13 @@ func (w *readSeriesBlocksWorker) readSeriesBlocks() error { id.Finalize() tags.Close() + + select { + case <-ctx.Done(): + return nil + default: + // do not block. + } } return nil } @@ -283,7 +290,7 @@ func (s *commitLogSource) AvailableIndex( // TODO(rartoul): Make this take the SnapshotMetadata files into account to reduce the // number of commitlogs / snapshots that we need to read. func (s *commitLogSource) Read( - ctx context.Context, + ctx xcontext.Context, namespaces bootstrap.Namespaces, cache bootstrap.Cache, ) (bootstrap.NamespaceResults, error) { @@ -940,11 +947,13 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( nsCtx: nsCtx, } - ctx := context.NewBackground() - errs, _ := errgroup.WithContext(ctx.GoContext()) - errs.Go(worker.readSeriesBlocks) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + errs, _ := errgroup.WithContext(ctx) + errs.Go(func() error { + return worker.readSeriesBlocks(ctx) + }) if err := s.loadBlocks(worker.dataCh, writeType); err != nil { - close(worker.dataCh) return err } @@ -1024,7 +1033,7 @@ func (s *commitLogSource) readCommitLogFilePredicate(f commitlog.FileFilterInfo) } func (s *commitLogSource) startAccumulateWorker(worker *accumulateWorker) { - ctx := context.NewBackground() + ctx := xcontext.NewBackground() defer ctx.Close() for input := range worker.inputCh { From b26825fa803a3e4ea5cee5eb9a7d5a8445289f4d Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Wed, 10 Mar 2021 10:52:33 +0200 Subject: [PATCH 22/31] load blocks concurrently. simplify code. --- .../bootstrap/bootstrapper/commitlog/source.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index 45188d45d0..bc9fd129aa 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -900,6 +900,7 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( bytesPool = blOpts.BytesPool() fsOpts = s.opts.CommitLogOptions().FilesystemOptions() nsCtx = namespace.NewContextFrom(ns) + numWorkers = s.opts.AccumulateConcurrency() ) // Bootstrap the snapshot file. @@ -947,14 +948,15 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( nsCtx: nsCtx, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - errs, _ := errgroup.WithContext(ctx) + errs, ctx := errgroup.WithContext(context.Background()) errs.Go(func() error { return worker.readSeriesBlocks(ctx) }) - if err := s.loadBlocks(worker.dataCh, writeType); err != nil { - return err + + for i := 0; i < numWorkers; i++ { + errs.Go(func() error { + return s.loadBlocks(worker.dataCh, writeType) + }) } return errs.Wait() From 51ed40f323b62b4e2bca6d29efdf2c55cd786569 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Wed, 10 Mar 2021 13:04:02 +0200 Subject: [PATCH 23/31] changes after review. --- .../bootstrap/bootstrapper/peers/source.go | 6 ++-- src/dbnode/storage/bootstrap/util.go | 2 +- .../namespace_bootstrap_data_accumulator.go | 4 +-- ...mespace_bootstrap_data_accumulator_test.go | 14 +++++----- src/dbnode/storage/series/lookup/entry.go | 20 +------------ src/dbnode/storage/shard.go | 28 +++++++++---------- src/dbnode/storage/shard_test.go | 1 - src/dbnode/storage/types.go | 8 ------ 8 files changed, 26 insertions(+), 57 deletions(-) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index b44b2305c0..c57fdf2d11 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -386,7 +386,7 @@ func (s *peersSource) runPersistenceQueueWorkerLoop( type seriesBlocks struct { resolver bootstrap.SeriesRefResolver - block block.DatabaseSeriesBlocks + blocks block.DatabaseSeriesBlocks } // fetchBootstrapBlocksFromPeers loops through all the provided ranges for a given shard and @@ -461,7 +461,7 @@ func (s *peersSource) fetchBootstrapBlocksFromPeers( dataCh <- seriesBlocks{ resolver: ref.Resolver, - block: entry.Blocks, + blocks: entry.Blocks, } // Safe to finalize these IDs and Tags, shard result no longer used. @@ -478,7 +478,7 @@ func (s *peersSource) fetchBootstrapBlocksFromPeers( continue } - for _, bl := range seriesBlocks.block.AllBlocks() { + for _, bl := range seriesBlocks.blocks.AllBlocks() { if err := seriesRef.LoadBlock(bl, series.WarmWrite); err != nil { unfulfill(currRange) s.log.Error("could not load series block", zap.Error(err)) diff --git a/src/dbnode/storage/bootstrap/util.go b/src/dbnode/storage/bootstrap/util.go index 3724ff24f2..c1081f1b6c 100644 --- a/src/dbnode/storage/bootstrap/util.go +++ b/src/dbnode/storage/bootstrap/util.go @@ -295,7 +295,7 @@ func (a *TestDataAccumulator) checkoutSeriesWithLock( var _ SeriesRefResolver = (*seriesStaticResolver)(nil) type seriesStaticResolver struct { - series *series.MockDatabaseSeries + series SeriesRef } func (r *seriesStaticResolver) SeriesRef() (SeriesRef, error) { diff --git a/src/dbnode/storage/namespace_bootstrap_data_accumulator.go b/src/dbnode/storage/namespace_bootstrap_data_accumulator.go index ad2e5b546b..1aad327ef2 100644 --- a/src/dbnode/storage/namespace_bootstrap_data_accumulator.go +++ b/src/dbnode/storage/namespace_bootstrap_data_accumulator.go @@ -89,9 +89,7 @@ func (a *namespaceDataAccumulator) Close() error { // Release all refs. multiError := xerrors.NewMultiError() for _, elem := range a.needsRelease { - if err := elem.ReleaseRef(); err != nil { - multiError = multiError.Add(err) - } + multiError = multiError.Add(elem.ReleaseRef()) } if !multiError.Empty() { diff --git a/src/dbnode/storage/namespace_bootstrap_data_accumulator_test.go b/src/dbnode/storage/namespace_bootstrap_data_accumulator_test.go index 7cd34793ac..16fa354e52 100644 --- a/src/dbnode/storage/namespace_bootstrap_data_accumulator_test.go +++ b/src/dbnode/storage/namespace_bootstrap_data_accumulator_test.go @@ -39,9 +39,9 @@ var ( ) type seriesTestResolver struct { - series bootstrap.SeriesRef - calls int - uniqueIndex uint64 + series bootstrap.SeriesRef + releaseCalls int + uniqueIndex uint64 } func (r *seriesTestResolver) SeriesRef() (bootstrap.SeriesRef, error) { @@ -49,7 +49,7 @@ func (r *seriesTestResolver) SeriesRef() (bootstrap.SeriesRef, error) { } func (r *seriesTestResolver) ReleaseRef() error { - r.calls++ + r.releaseCalls++ return nil } @@ -121,7 +121,7 @@ func testCheckoutSeries(t *testing.T, checkoutFn checkoutFn) { require.Equal(t, 1, len(cast.needsRelease)) require.Equal(t, resolver, cast.needsRelease[0]) // Ensure it hasn't been released. - require.Equal(t, 0, resolver.calls) + require.Zero(t, resolver.releaseCalls) } func TestAccumulatorRelease(t *testing.T) { @@ -154,9 +154,9 @@ func testAccumulatorRelease(t *testing.T, checkoutFn checkoutFn) { require.Equal(t, resolver, cast.needsRelease[0]) require.NoError(t, acc.Close()) - require.Equal(t, 0, len(cast.needsRelease)) + require.Zero(t, len(cast.needsRelease)) // ensure release has been called. - require.Equal(t, 1, resolver.calls) + require.Equal(t, 1, resolver.releaseCalls) // ensure double-close errors. require.Error(t, acc.Close()) } diff --git a/src/dbnode/storage/series/lookup/entry.go b/src/dbnode/storage/series/lookup/entry.go index d8ad688a62..6e256c8c97 100644 --- a/src/dbnode/storage/series/lookup/entry.go +++ b/src/dbnode/storage/series/lookup/entry.go @@ -67,15 +67,6 @@ type Entry struct { pendingIndexBatchSizeOne []writes.PendingIndexInsert } -// OnReleaseReadWriteRef is a callback that can release -// a strongly held series read/write ref. -type OnReleaseReadWriteRef interface { - OnReleaseReadWriteRef() -} - -// ensure Entry satifies the `OnReleaseReadWriteRef` interface. -var _ OnReleaseReadWriteRef = &Entry{} - // ensure Entry satisfies the `index.OnIndexSeries` interface. var _ index.OnIndexSeries = &Entry{} @@ -125,15 +116,6 @@ func (entry *Entry) DecrementReaderWriterCount() { atomic.AddInt32(&entry.curReadWriters, -1) } -// OnReleaseReadWriteRef decrements a read/write ref, it's named -// differently to decouple the concrete task needed when a ref -// is released and the intent to release the ref (simpler for -// caller readability/reasoning). -func (entry *Entry) OnReleaseReadWriteRef() { - // All we do when we release a read/write ref is decrement. - entry.DecrementReaderWriterCount() -} - // IndexedForBlockStart returns a bool to indicate if the Entry has been successfully // indexed for the given index blockstart. func (entry *Entry) IndexedForBlockStart(indexBlockStart xtime.UnixNano) bool { @@ -274,7 +256,7 @@ func (entry *Entry) SeriesRef() (bootstrap.SeriesRef, error) { // to release the reference count to the series so it can // be expired by the owning shard eventually. func (entry *Entry) ReleaseRef() error { - entry.OnReleaseReadWriteRef() + entry.DecrementReaderWriterCount() return nil } diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 9b58e7f337..25208e04f3 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -1086,7 +1086,7 @@ func (s *dbShard) SeriesRefResolver( // Series will wait for the result to be batched together and inserted. return &seriesResolver{ wg: wg, - // Make sure to return the copied ID from the new series. + // ID was already copied in newShardEntry so we can set it here safely. copiedID: entry.Series.ID(), shard: s, }, nil @@ -1099,15 +1099,15 @@ type seriesResolver struct { copiedID ident.ID shard *dbShard - resolved bool - resolvedResult error - entry *lookup.Entry + resolved bool + resolvedErr error + entry *lookup.Entry } func (r *seriesResolver) resolve() error { r.RLock() if r.resolved { - resolvedResult := r.resolvedResult + resolvedResult := r.resolvedErr r.RUnlock() return resolvedResult } @@ -1116,27 +1116,26 @@ func (r *seriesResolver) resolve() error { r.Lock() defer r.Unlock() + // fast path: if we already resolved the result, just return it. if r.resolved { - return r.resolvedResult + return r.resolvedErr } r.wg.Wait() id := r.copiedID entry, _, err := r.shard.tryRetrieveWritableSeries(id) + r.resolved = true // Retrieve the inserted entry if err != nil { - r.resolvedResult = err - r.resolved = true - return r.resolvedResult + r.resolvedErr = err + return r.resolvedErr } if entry == nil { - r.resolvedResult = fmt.Errorf("could not resolve: %s", id) - r.resolved = true - return r.resolvedResult + r.resolvedErr = fmt.Errorf("could not resolve: %s", id) + return r.resolvedErr } r.entry = entry - r.resolved = true return nil } @@ -1151,8 +1150,7 @@ func (r *seriesResolver) ReleaseRef() error { if err := r.resolve(); err != nil { return err } - r.entry.OnReleaseReadWriteRef() - return nil + return r.entry.ReleaseRef() } func (s *dbShard) ReadEncoded( diff --git a/src/dbnode/storage/shard_test.go b/src/dbnode/storage/shard_test.go index a0dab77d34..c8d89d4ed2 100644 --- a/src/dbnode/storage/shard_test.go +++ b/src/dbnode/storage/shard_test.go @@ -1945,7 +1945,6 @@ func TestSeriesRefResolver(t *testing.T) { // Ensure duplicate called but no close, etc iter.EXPECT(). Duplicate(). - Times(1). Return(ident.NewTagsIterator(seriesTags)) now := time.Now() diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index e7143e156e..eca40f3f59 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -479,14 +479,6 @@ type databaseNamespace interface { ) (int64, error) } -// SeriesReadWriteRef is a read/write reference for a series, -// must make sure to release the read/write reference by calling -// release on the resolver. -type SeriesReadWriteRef struct { - // Resolver resolves the reference for read/writing. - Resolver bootstrap.SeriesRefResolver -} - // Shard is a time series database shard. type Shard interface { // ID returns the ID of the shard. From be6604b23b06a40f28734e87aeb69c1567f09f7f Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Wed, 10 Mar 2021 13:06:41 +0200 Subject: [PATCH 24/31] removed previously added logging line for testing. --- src/dbnode/storage/shard.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 25208e04f3..bd5e19c601 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -876,8 +876,6 @@ func (s *dbShard) purgeExpiredSeries(expiredEntries []*lookup.Entry) { series.Close() s.list.Remove(elem) s.lookup.Delete(id) - s.logger.Info("entry expired", - zap.String("seriesId", id.String())) } s.Unlock() } From baeca8d5df35885d634d83a20804c1ca0fb2d880 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Wed, 10 Mar 2021 13:45:49 +0200 Subject: [PATCH 25/31] updated mocks. --- src/dbnode/generated/mocks/generate.go | 2 +- .../storage/series/lookup/lookup_mock.go | 39 +------------------ 2 files changed, 3 insertions(+), 38 deletions(-) diff --git a/src/dbnode/generated/mocks/generate.go b/src/dbnode/generated/mocks/generate.go index de23a9efbf..0c8ad0f3f5 100644 --- a/src/dbnode/generated/mocks/generate.go +++ b/src/dbnode/generated/mocks/generate.go @@ -24,7 +24,7 @@ //go:generate sh -c "mockgen -package=xio $PACKAGE/src/dbnode/x/xio SegmentReader,SegmentReaderPool | genclean -pkg $PACKAGE/src/dbnode/x/xio -out $GOPATH/src/$PACKAGE/src/dbnode/x/xio/io_mock.go" //go:generate sh -c "mockgen -package=digest -destination=$GOPATH/src/$PACKAGE/src/dbnode/digest/digest_mock.go $PACKAGE/src/dbnode/digest ReaderWithDigest" //go:generate sh -c "mockgen -package=series $PACKAGE/src/dbnode/storage/series DatabaseSeries,QueryableBlockRetriever | genclean -pkg $PACKAGE/src/dbnode/storage/series -out $GOPATH/src/$PACKAGE/src/dbnode/storage/series/series_mock.go" -//go:generate sh -c "mockgen -package=lookup $PACKAGE/src/dbnode/storage/series/lookup OnReleaseReadWriteRef,IndexWriter | genclean -pkg $PACKAGE/src/dbnode/storage/series/lookup -out $GOPATH/src/$PACKAGE/src/dbnode/storage/series/lookup/lookup_mock.go" +//go:generate sh -c "mockgen -package=lookup $PACKAGE/src/dbnode/storage/series/lookup IndexWriter | genclean -pkg $PACKAGE/src/dbnode/storage/series/lookup -out $GOPATH/src/$PACKAGE/src/dbnode/storage/series/lookup/lookup_mock.go" // mockgen rules for generating mocks for unexported interfaces (file mode) //go:generate sh -c "mockgen -package=encoding -destination=$GOPATH/src/$PACKAGE/src/dbnode/encoding/encoding_mock.go -source=$GOPATH/src/$PACKAGE/src/dbnode/encoding/types.go" diff --git a/src/dbnode/storage/series/lookup/lookup_mock.go b/src/dbnode/storage/series/lookup/lookup_mock.go index be219af435..90d8711441 100644 --- a/src/dbnode/storage/series/lookup/lookup_mock.go +++ b/src/dbnode/storage/series/lookup/lookup_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/m3db/m3/src/dbnode/storage/series/lookup (interfaces: OnReleaseReadWriteRef,IndexWriter) +// Source: github.com/m3db/m3/src/dbnode/storage/series/lookup (interfaces: IndexWriter) -// Copyright (c) 2020 Uber Technologies, Inc. +// Copyright (c) 2021 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -34,41 +34,6 @@ import ( "github.com/golang/mock/gomock" ) -// MockOnReleaseReadWriteRef is a mock of OnReleaseReadWriteRef interface -type MockOnReleaseReadWriteRef struct { - ctrl *gomock.Controller - recorder *MockOnReleaseReadWriteRefMockRecorder -} - -// MockOnReleaseReadWriteRefMockRecorder is the mock recorder for MockOnReleaseReadWriteRef -type MockOnReleaseReadWriteRefMockRecorder struct { - mock *MockOnReleaseReadWriteRef -} - -// NewMockOnReleaseReadWriteRef creates a new mock instance -func NewMockOnReleaseReadWriteRef(ctrl *gomock.Controller) *MockOnReleaseReadWriteRef { - mock := &MockOnReleaseReadWriteRef{ctrl: ctrl} - mock.recorder = &MockOnReleaseReadWriteRefMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use -func (m *MockOnReleaseReadWriteRef) EXPECT() *MockOnReleaseReadWriteRefMockRecorder { - return m.recorder -} - -// OnReleaseReadWriteRef mocks base method -func (m *MockOnReleaseReadWriteRef) OnReleaseReadWriteRef() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "OnReleaseReadWriteRef") -} - -// OnReleaseReadWriteRef indicates an expected call of OnReleaseReadWriteRef -func (mr *MockOnReleaseReadWriteRefMockRecorder) OnReleaseReadWriteRef() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnReleaseReadWriteRef", reflect.TypeOf((*MockOnReleaseReadWriteRef)(nil).OnReleaseReadWriteRef)) -} - // MockIndexWriter is a mock of IndexWriter interface type MockIndexWriter struct { ctrl *gomock.Controller From d3c69cf30c8468680992309df0445263af02e183 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Wed, 10 Mar 2021 13:58:49 +0200 Subject: [PATCH 26/31] updated some code comments. --- src/dbnode/storage/shard.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index bd5e19c601..c631cf9d67 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -1067,14 +1067,17 @@ func (s *dbShard) SeriesRefResolver( if err != nil { return nil, err } + // increment ref count to avoid expiration of the new entry just after adding it to the queue. entry.IncrementReaderWriterCount() wg, err := s.insertQueue.Insert(dbShardInsert{ entry: entry, opts: dbShardInsertAsyncOptions{ // skipRateLimit for true since this method is used by bootstrapping // and should not be rate limited. - skipRateLimit: true, - entryRefCountIncremented: false, // should be false because otherwise entry won't be inserted. + skipRateLimit: true, + // should be false despite entry.IncrementReaderWriterCount() + // because otherwise entry will be skipped during background insert loop. + entryRefCountIncremented: false, }, }) if err != nil { From bfc972a1f7d60b3439cf73a465676edb70c39c7c Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Wed, 10 Mar 2021 17:05:10 +0200 Subject: [PATCH 27/31] renamed entryRefCountIncremented option to releaseEntryRef. --- src/dbnode/storage/shard.go | 18 +++++++++--------- src/dbnode/storage/shard_insert_queue.go | 8 ++++---- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index c631cf9d67..0502d1270a 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -1075,9 +1075,9 @@ func (s *dbShard) SeriesRefResolver( // skipRateLimit for true since this method is used by bootstrapping // and should not be rate limited. skipRateLimit: true, - // should be false despite entry.IncrementReaderWriterCount() - // because otherwise entry will be skipped during background insert loop. - entryRefCountIncremented: false, + // do not release entry ref during async write, because entry ref will be released when + // ReleaseRef() is called on bootstrap.SeriesRefResolver. + releaseEntryRef: false, }, }) if err != nil { @@ -1376,7 +1376,7 @@ func (s *dbShard) insertSeriesForIndexingAsyncBatched( }, // indicate we already have inc'd the entry's ref count, so we can correctly // handle the ref counting semantics in `insertSeriesBatch`. - entryRefCountIncremented: true, + releaseEntryRef: true, }, }) @@ -1541,7 +1541,7 @@ func (s *dbShard) insertSeriesBatch(inserts []dbShardInsert) error { // we don't need to inc the entry ref count if we already have a ref on the entry. check if // that's the case. - if inserts[i].opts.entryRefCountIncremented { + if inserts[i].opts.releaseEntryRef { // don't need to inc a ref on the entry, we were given as writable entry as input. continue } @@ -1560,7 +1560,7 @@ func (s *dbShard) insertSeriesBatch(inserts []dbShardInsert) error { // visible before we release the lookup write lock. inserts[i].entry.IncrementReaderWriterCount() // also indicate that we have a ref count on this entry for this operation. - inserts[i].opts.entryRefCountIncremented = true + inserts[i].opts.releaseEntryRef = true } if err == nil { @@ -1599,7 +1599,7 @@ func (s *dbShard) insertSeriesBatch(inserts []dbShardInsert) error { for i := range inserts { var ( entry = inserts[i].entry - releaseEntryRef = inserts[i].opts.entryRefCountIncremented + releaseEntryRef = inserts[i].opts.releaseEntryRef err error ) @@ -1636,7 +1636,7 @@ func (s *dbShard) insertSeriesBatch(inserts []dbShardInsert) error { if inserts[i].opts.hasPendingIndexing { pendingIndex := inserts[i].opts.pendingIndex // increment the ref on the entry, as the original one was transferred to the - // this method (insertSeriesBatch) via `entryRefCountIncremented` mechanism. + // this method (insertSeriesBatch) via `releaseEntryRef` mechanism. entry.OnIndexPrepare() writeBatchEntry := index.WriteBatchEntry{ @@ -1655,7 +1655,7 @@ func (s *dbShard) insertSeriesBatch(inserts []dbShardInsert) error { // Entries in the shard insert queue are either of: // - new entries - // - existing entries that we've taken a ref on (marked as entryRefCountIncremented) + // - existing entries that we've taken a ref on (marked as releaseEntryRef) if releaseEntryRef { entry.DecrementReaderWriterCount() } diff --git a/src/dbnode/storage/shard_insert_queue.go b/src/dbnode/storage/shard_insert_queue.go index 794b9848a3..b7a4b17971 100644 --- a/src/dbnode/storage/shard_insert_queue.go +++ b/src/dbnode/storage/shard_insert_queue.go @@ -350,12 +350,12 @@ type dbShardInsertAsyncOptions struct { hasPendingRetrievedBlock bool hasPendingIndexing bool - // NB(prateek): `entryRefCountIncremented` indicates if the + // NB(prateek): `releaseEntryRef` indicates if the // entry provided along with the dbShardInsertAsyncOptions - // already has it's ref count incremented. It's used to - // correctly manage the lifecycle of the entry across the + // already has it's ref count incremented and it will be decremented after insert. + // It's used to correctly manage the lifecycle of the entry across the // shard -> shard Queue -> shard boundaries. - entryRefCountIncremented bool + releaseEntryRef bool } type dbShardPendingWrite struct { From 15b82e14ced36d732a9fb6fee3f0feb403852f48 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Mon, 15 Mar 2021 11:53:00 +0200 Subject: [PATCH 28/31] extracted series resolver into separate file. --- src/dbnode/storage/series/lookup/entry.go | 2 +- src/dbnode/storage/series_resolver.go | 107 +++++++++++++++++ src/dbnode/storage/series_resolver_test.go | 130 +++++++++++++++++++++ src/dbnode/storage/shard.go | 73 ++---------- 4 files changed, 245 insertions(+), 67 deletions(-) create mode 100644 src/dbnode/storage/series_resolver.go create mode 100644 src/dbnode/storage/series_resolver_test.go diff --git a/src/dbnode/storage/series/lookup/entry.go b/src/dbnode/storage/series/lookup/entry.go index 6e256c8c97..e668a82af3 100644 --- a/src/dbnode/storage/series/lookup/entry.go +++ b/src/dbnode/storage/series/lookup/entry.go @@ -73,7 +73,7 @@ var _ index.OnIndexSeries = &Entry{} // ensure Entry satisfies the `bootstrap.SeriesRef` interface. var _ bootstrap.SeriesRef = &Entry{} -// // ensure Entry satisfies the `bootstrap.SeriesRefResolver` interface. +// ensure Entry satisfies the `bootstrap.SeriesRefResolver` interface. var _ bootstrap.SeriesRefResolver = &Entry{} // NewEntryOptions supplies options for a new entry. diff --git a/src/dbnode/storage/series_resolver.go b/src/dbnode/storage/series_resolver.go new file mode 100644 index 0000000000..e04bc76af4 --- /dev/null +++ b/src/dbnode/storage/series_resolver.go @@ -0,0 +1,107 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package storage + +import ( + "fmt" + "sync" + + "github.com/m3db/m3/src/dbnode/storage/bootstrap" + "github.com/m3db/m3/src/dbnode/storage/series/lookup" + "github.com/m3db/m3/src/x/ident" +) + +// RetrieveWritableSeriesFn represents the function to retrieve series entry. +type RetrieveWritableSeriesFn func(id ident.ID) (*lookup.Entry, error) + +type seriesResolver struct { + sync.RWMutex + + wg *sync.WaitGroup + copiedID ident.ID + retrieveWritableSeriesFn RetrieveWritableSeriesFn + + resolved bool + resolvedErr error + entry *lookup.Entry +} + +// NewSeriesResolver creates new series ref resolver. +func NewSeriesResolver( + wg *sync.WaitGroup, + copiedID ident.ID, + retrieveWritableSeriesFn RetrieveWritableSeriesFn, +) bootstrap.SeriesRefResolver { + return &seriesResolver{ + wg: wg, + copiedID: copiedID, + retrieveWritableSeriesFn: retrieveWritableSeriesFn, + } +} + +func (r *seriesResolver) resolve() error { + r.RLock() + if r.resolved { + resolvedResult := r.resolvedErr + r.RUnlock() + return resolvedResult + } + r.RUnlock() + + r.Lock() + defer r.Unlock() + + // fast path: if we already resolved the result, just return it. + if r.resolved { + return r.resolvedErr + } + + r.wg.Wait() + id := r.copiedID + entry, err := r.retrieveWritableSeriesFn(id) + r.resolved = true + // Retrieve the inserted entry + if err != nil { + r.resolvedErr = err + return r.resolvedErr + } + + if entry == nil { + r.resolvedErr = fmt.Errorf("could not resolve: %s", id) + return r.resolvedErr + } + r.entry = entry + return nil +} + +func (r *seriesResolver) SeriesRef() (bootstrap.SeriesRef, error) { + if err := r.resolve(); err != nil { + return nil, err + } + return r.entry, nil +} + +func (r *seriesResolver) ReleaseRef() error { + if err := r.resolve(); err != nil { + return err + } + return r.entry.ReleaseRef() +} diff --git a/src/dbnode/storage/series_resolver_test.go b/src/dbnode/storage/series_resolver_test.go new file mode 100644 index 0000000000..b55e6581f6 --- /dev/null +++ b/src/dbnode/storage/series_resolver_test.go @@ -0,0 +1,130 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package storage + +import ( + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/m3db/m3/src/dbnode/storage/series/lookup" + "github.com/m3db/m3/src/x/ident" +) + +func TestResolveError(t *testing.T) { + wg := sync.WaitGroup{} + id := ident.StringID("foo") + sut := NewSeriesResolver(&wg, id, func(id ident.ID) (*lookup.Entry, error) { + return nil, fmt.Errorf("unable to resolve series") + }) + _, err := sut.SeriesRef() + require.Error(t, err) +} + +func TestResolveNilEntry(t *testing.T) { + wg := sync.WaitGroup{} + id := ident.StringID("foo") + sut := NewSeriesResolver(&wg, id, func(id ident.ID) (*lookup.Entry, error) { + return nil, nil + }) + _, err := sut.SeriesRef() + require.Error(t, err) +} + +func TestResolve(t *testing.T) { + wg := sync.WaitGroup{} + id := ident.StringID("foo") + sut := NewSeriesResolver(&wg, id, func(id ident.ID) (*lookup.Entry, error) { + return lookup.NewEntry(lookup.NewEntryOptions{ + Index: 11, + }), nil + }) + seriesRef, err := sut.SeriesRef() + require.NoError(t, err) + require.IsType(t, &lookup.Entry{}, seriesRef) + entry := seriesRef.(*lookup.Entry) + require.Equal(t, uint64(11), entry.Index) +} + +func TestSecondResolveWontWait(t *testing.T) { + wg := sync.WaitGroup{} + id := ident.StringID("foo") + sut := NewSeriesResolver(&wg, id, func(id ident.ID) (*lookup.Entry, error) { + return lookup.NewEntry(lookup.NewEntryOptions{ + Index: 11, + }), nil + }) + seriesRef, err := sut.SeriesRef() + require.NoError(t, err) + require.IsType(t, &lookup.Entry{}, seriesRef) + entry := seriesRef.(*lookup.Entry) + require.Equal(t, uint64(11), entry.Index) + + wg.Add(1) + seriesRef2, err := sut.SeriesRef() + require.NoError(t, err) + require.IsType(t, &lookup.Entry{}, seriesRef2) + entry2 := seriesRef2.(*lookup.Entry) + require.Equal(t, entry, entry2) +} + +func TestReleaseRef(t *testing.T) { + wg := sync.WaitGroup{} + id := ident.StringID("foo") + sut := NewSeriesResolver(&wg, id, func(id ident.ID) (*lookup.Entry, error) { + entry := lookup.NewEntry(lookup.NewEntryOptions{}) + entry.IncrementReaderWriterCount() + return entry, nil + }) + seriesRef, err := sut.SeriesRef() + require.NoError(t, err) + require.IsType(t, &lookup.Entry{}, seriesRef) + + entry := seriesRef.(*lookup.Entry) + require.Equal(t, int32(1), entry.ReaderWriterCount()) + err = sut.ReleaseRef() + require.NoError(t, err) + require.Zero(t, entry.ReaderWriterCount()) +} + +func TestReleaseRefError(t *testing.T) { + wg := sync.WaitGroup{} + id := ident.StringID("foo") + sut := NewSeriesResolver(&wg, id, func(id ident.ID) (*lookup.Entry, error) { + return nil, fmt.Errorf("unable to resolve series") + }) + err := sut.ReleaseRef() + require.Error(t, err) +} + +func TestReleaseRefWithoutSeriesRef(t *testing.T) { + wg := sync.WaitGroup{} + id := ident.StringID("foo") + sut := NewSeriesResolver(&wg, id, func(id ident.ID) (*lookup.Entry, error) { + entry := lookup.NewEntry(lookup.NewEntryOptions{}) + entry.IncrementReaderWriterCount() + return entry, nil + }) + err := sut.ReleaseRef() + require.NoError(t, err) +} diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 0502d1270a..e33beaeb4e 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -1085,73 +1085,14 @@ func (s *dbShard) SeriesRefResolver( } // Series will wait for the result to be batched together and inserted. - return &seriesResolver{ - wg: wg, + return NewSeriesResolver( + wg, // ID was already copied in newShardEntry so we can set it here safely. - copiedID: entry.Series.ID(), - shard: s, - }, nil -} - -type seriesResolver struct { - sync.RWMutex - - wg *sync.WaitGroup - copiedID ident.ID - shard *dbShard - - resolved bool - resolvedErr error - entry *lookup.Entry -} - -func (r *seriesResolver) resolve() error { - r.RLock() - if r.resolved { - resolvedResult := r.resolvedErr - r.RUnlock() - return resolvedResult - } - r.RUnlock() - - r.Lock() - defer r.Unlock() - - // fast path: if we already resolved the result, just return it. - if r.resolved { - return r.resolvedErr - } - - r.wg.Wait() - id := r.copiedID - entry, _, err := r.shard.tryRetrieveWritableSeries(id) - r.resolved = true - // Retrieve the inserted entry - if err != nil { - r.resolvedErr = err - return r.resolvedErr - } - - if entry == nil { - r.resolvedErr = fmt.Errorf("could not resolve: %s", id) - return r.resolvedErr - } - r.entry = entry - return nil -} - -func (r *seriesResolver) SeriesRef() (bootstrap.SeriesRef, error) { - if err := r.resolve(); err != nil { - return nil, err - } - return r.entry, nil -} - -func (r *seriesResolver) ReleaseRef() error { - if err := r.resolve(); err != nil { - return err - } - return r.entry.ReleaseRef() + entry.Series.ID(), + func(id ident.ID) (*lookup.Entry, error) { + entry, _, err := s.tryRetrieveWritableSeries(id) + return entry, err + }), nil } func (s *dbShard) ReadEncoded( From 49561add75b272badd07f4955c7c4de62308ff46 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Mon, 15 Mar 2021 12:03:39 +0200 Subject: [PATCH 29/31] check if context was not cancelled once in 1024 iterations instead of every time. --- .../bootstrap/bootstrapper/commitlog/source.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index bc9fd129aa..ee4e10f3c5 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -117,6 +117,7 @@ type readSeriesBlocksWorker struct { func (w *readSeriesBlocksWorker) readSeriesBlocks(ctx context.Context) error { defer close(w.dataCh) + numSeriesRead := 0 for { id, tags, data, expectedChecksum, err := w.reader.Read() if err != nil && !errors.Is(err, io.EOF) { @@ -125,6 +126,7 @@ func (w *readSeriesBlocksWorker) readSeriesBlocks(ctx context.Context) error { if errors.Is(err, io.EOF) { break } + numSeriesRead++ dbBlock := w.blocksPool.Get() dbBlock.Reset(w.blockStart, w.blockSize, @@ -158,11 +160,14 @@ func (w *readSeriesBlocksWorker) readSeriesBlocks(ctx context.Context) error { id.Finalize() tags.Close() - select { - case <-ctx.Done(): - return nil - default: - // do not block. + // check if context was not cancelled on a regular basis. + if numSeriesRead%1024 == 0 { + select { + case <-ctx.Done(): + return nil + default: + // do not block. + } } } return nil From 2d2fc761c067e613c5a173d8072e8e3c508a7271 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Mon, 15 Mar 2021 13:52:07 +0200 Subject: [PATCH 30/31] fixed comment. --- src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index ee4e10f3c5..7c239344d3 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -160,7 +160,7 @@ func (w *readSeriesBlocksWorker) readSeriesBlocks(ctx context.Context) error { id.Finalize() tags.Close() - // check if context was not cancelled on a regular basis. + // check if context was not canceled on a regular basis. if numSeriesRead%1024 == 0 { select { case <-ctx.Done(): From 49f97536ecb03ab532cf158607eb8829488bc21a Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Mon, 15 Mar 2021 15:05:21 +0200 Subject: [PATCH 31/31] inline retrieveWritableSeries function into series resolver. --- src/dbnode/storage/series_resolver.go | 8 ++++---- src/dbnode/storage/shard.go | 14 ++++++++------ 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/dbnode/storage/series_resolver.go b/src/dbnode/storage/series_resolver.go index e04bc76af4..8e3b2798c5 100644 --- a/src/dbnode/storage/series_resolver.go +++ b/src/dbnode/storage/series_resolver.go @@ -29,15 +29,15 @@ import ( "github.com/m3db/m3/src/x/ident" ) -// RetrieveWritableSeriesFn represents the function to retrieve series entry. -type RetrieveWritableSeriesFn func(id ident.ID) (*lookup.Entry, error) +// retrieveWritableSeriesFn represents the function to retrieve series entry. +type retrieveWritableSeriesFn func(id ident.ID) (*lookup.Entry, error) type seriesResolver struct { sync.RWMutex wg *sync.WaitGroup copiedID ident.ID - retrieveWritableSeriesFn RetrieveWritableSeriesFn + retrieveWritableSeriesFn retrieveWritableSeriesFn resolved bool resolvedErr error @@ -48,7 +48,7 @@ type seriesResolver struct { func NewSeriesResolver( wg *sync.WaitGroup, copiedID ident.ID, - retrieveWritableSeriesFn RetrieveWritableSeriesFn, + retrieveWritableSeriesFn retrieveWritableSeriesFn, ) bootstrap.SeriesRefResolver { return &seriesResolver{ wg: wg, diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index e33beaeb4e..30e863f6dc 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -1053,7 +1053,7 @@ func (s *dbShard) SeriesRefResolver( tags ident.TagIterator, ) (bootstrap.SeriesRefResolver, error) { // Try retrieve existing series. - entry, _, err := s.tryRetrieveWritableSeries(id) + entry, err := s.retrieveWritableSeries(id) if err != nil { return nil, err } @@ -1089,10 +1089,7 @@ func (s *dbShard) SeriesRefResolver( wg, // ID was already copied in newShardEntry so we can set it here safely. entry.Series.ID(), - func(id ident.ID) (*lookup.Entry, error) { - entry, _, err := s.tryRetrieveWritableSeries(id) - return entry, err - }), nil + s.retrieveWritableSeries), nil } func (s *dbShard) ReadEncoded( @@ -1162,7 +1159,7 @@ func (s *dbShard) lookupEntryWithLock(id ident.ID) (*lookup.Entry, *list.Element func (s *dbShard) writableSeries(id ident.ID, tags ident.TagIterator) (*lookup.Entry, error) { for { - entry, _, err := s.tryRetrieveWritableSeries(id) + entry, err := s.retrieveWritableSeries(id) if entry != nil { return entry, nil } @@ -1206,6 +1203,11 @@ func (s *dbShard) tryRetrieveWritableSeries(id ident.ID) ( return nil, opts, nil } +func (s *dbShard) retrieveWritableSeries(id ident.ID) (*lookup.Entry, error) { + entry, _, err := s.tryRetrieveWritableSeries(id) + return entry, err +} + func (s *dbShard) newShardEntry( id ident.ID, tagsArgOpts tagsArgOptions,