Skip to content

Commit

Permalink
[dbnode] Skip out of retention index segments during bootstrap. (#2992)
Browse files Browse the repository at this point in the history
  • Loading branch information
notbdu authored Dec 9, 2020
1 parent 0a24298 commit 066e956
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 33 deletions.
6 changes: 3 additions & 3 deletions src/dbnode/persist/fs/index_claims_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ var (
// errMustUseSingleClaimsManager returned when a second claims manager
// created, since this is a violation of expected behavior.
errMustUseSingleClaimsManager = errors.New("not using single global claims manager")
// errOutOfRetentionClaim returned when reserving a claim that is
// ErrOutOfRetentionClaim returned when reserving a claim that is
// out of retention.
errOutOfRetentionClaim = errors.New("out of retention index volume claim")
ErrOutOfRetentionClaim = errors.New("out of retention index volume claim")

globalIndexClaimsManagers uint64
)
Expand Down Expand Up @@ -110,7 +110,7 @@ func (i *indexClaimsManager) ClaimNextIndexFileSetVolumeIndex(

// Reject out of retention claims.
if blockStart.Before(earliestBlockStart) {
return 0, errOutOfRetentionClaim
return 0, ErrOutOfRetentionClaim
}

volumeIndexClaimsByBlockStart, ok := i.volumeIndexClaims[md.ID().String()]
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/persist/fs/index_claims_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func TestIndexClaimsManagerOutOfRetention(t *testing.T) {
md,
blockStart,
)
require.Equal(t, errOutOfRetentionClaim, err)
require.Equal(t, ErrOutOfRetentionClaim, err)

// Verify that the out of retention entry has been deleted as well.
_, ok = mgr.volumeIndexClaims[md.ID().String()][xtime.ToUnixNano(blockStart)]
Expand Down
40 changes: 26 additions & 14 deletions src/dbnode/storage/bootstrap/bootstrapper/fs/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package fs

import (
"errors"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -83,8 +84,9 @@ type fileSystemSource struct {
}

type fileSystemSourceMetrics struct {
persistedIndexBlocksRead tally.Counter
persistedIndexBlocksWrite tally.Counter
persistedIndexBlocksRead tally.Counter
persistedIndexBlocksWrite tally.Counter
persistedIndexBlocksOutOfRetention tally.Counter
}

func newFileSystemSource(opts Options) (bootstrap.Source, error) {
Expand All @@ -105,8 +107,9 @@ func newFileSystemSource(opts Options) (bootstrap.Source, error) {
idPool: opts.IdentifierPool(),
newReaderFn: fs.NewReader,
metrics: fileSystemSourceMetrics{
persistedIndexBlocksRead: scope.Counter("persist-index-blocks-read"),
persistedIndexBlocksWrite: scope.Counter("persist-index-blocks-write"),
persistedIndexBlocksRead: scope.Counter("persist-index-blocks-read"),
persistedIndexBlocksWrite: scope.Counter("persist-index-blocks-write"),
persistedIndexBlocksOutOfRetention: scope.Counter("persist-index-blocks-out-of-retention"),
},
}
s.newReaderPoolOpts.Alloc = s.newReader
Expand Down Expand Up @@ -408,6 +411,17 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult(
requestedRanges := timeWindowReaders.Ranges
remainingRanges := requestedRanges.Copy()
shardReaders := timeWindowReaders.Readers
defer func() {
// Return readers to pool.
for _, shardReaders := range shardReaders {
for _, r := range shardReaders.Readers {
if err := r.Close(); err == nil {
readerPool.Put(r)
}
}
}
}()

for shard, shardReaders := range shardReaders {
shard := uint32(shard)
readers := shardReaders.Readers
Expand Down Expand Up @@ -590,7 +604,14 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult(
blockStart,
blockEnd,
)
if err != nil {
if errors.Is(err, fs.ErrOutOfRetentionClaim) {
// Bail early if the index segment is already out of retention.
// This can happen when the edge of requested ranges at time of data bootstrap
// is now out of retention.
s.log.Debug("skipping out of retention index segment", buildIndexLogFields...)
s.metrics.persistedIndexBlocksOutOfRetention.Inc(1)
return
} else if err != nil {
instrument.EmitAndLogInvariantViolation(iopts, func(l *zap.Logger) {
l.Error("persist fs index bootstrap failed",
zap.Error(err),
Expand Down Expand Up @@ -637,15 +658,6 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult(
runResult.Unlock()
}

// Return readers to pool.
for _, shardReaders := range shardReaders {
for _, r := range shardReaders.Readers {
if err := r.Close(); err == nil {
readerPool.Put(r)
}
}
}

s.markRunResultErrorsAndUnfulfilled(runResult, requestedRanges,
remainingRanges, timesWithErrors)
}
Expand Down
50 changes: 35 additions & 15 deletions src/dbnode/storage/bootstrap/bootstrapper/peers/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,17 @@
package peers

import (
"errors"
"fmt"
"io"
"sync"
"time"

"github.com/opentracing/opentracing-go"
"github.com/uber-go/tally"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/m3db/m3/src/cluster/shard"
"github.com/m3db/m3/src/dbnode/client"
"github.com/m3db/m3/src/dbnode/namespace"
Expand All @@ -50,17 +56,18 @@ import (
xresource "github.com/m3db/m3/src/x/resource"
xsync "github.com/m3db/m3/src/x/sync"
xtime "github.com/m3db/m3/src/x/time"

"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

type peersSource struct {
opts Options
log *zap.Logger
newPersistManager func() (persist.Manager, error)
nowFn clock.NowFn
metrics peersSourceMetrics
}

type peersSourceMetrics struct {
persistedIndexBlocksOutOfRetention tally.Counter
}

type persistenceFlush struct {
Expand All @@ -76,13 +83,18 @@ func newPeersSource(opts Options) (bootstrap.Source, error) {
}

iopts := opts.ResultOptions().InstrumentOptions()
scope := iopts.MetricsScope().SubScope("peers-bootstrapper")
iopts = iopts.SetMetricsScope(scope)
return &peersSource{
opts: opts,
log: iopts.Logger().With(zap.String("bootstrapper", "peers")),
newPersistManager: func() (persist.Manager, error) {
return fs.NewPersistManager(opts.FilesystemOptions())
},
nowFn: opts.ResultOptions().ClockOptions().NowFn(),
metrics: peersSourceMetrics{
persistedIndexBlocksOutOfRetention: scope.Counter("persist-index-blocks-out-of-retention"),
},
}, nil
}

Expand Down Expand Up @@ -824,7 +836,17 @@ func (s *peersSource) processReaders(
timesWithErrors []time.Time
totalEntries int
)
defer docsPool.Put(batch)
defer func() {
docsPool.Put(batch)
// Return readers to pool.
for _, shardReaders := range timeWindowReaders.Readers {
for _, r := range shardReaders.Readers {
if err := r.Close(); err == nil {
readerPool.Put(r)
}
}
}
}()

requestedRanges := timeWindowReaders.Ranges
remainingRanges := requestedRanges.Copy()
Expand Down Expand Up @@ -934,7 +956,14 @@ func (s *peersSource) processReaders(
blockStart,
blockEnd,
)
if err != nil {
if errors.Is(err, fs.ErrOutOfRetentionClaim) {
// Bail early if the index segment is already out of retention.
// This can happen when the edge of requested ranges at time of data bootstrap
// is now out of retention.
s.log.Debug("skipping out of retention index segment", buildIndexLogFields...)
s.metrics.persistedIndexBlocksOutOfRetention.Inc(1)
return remainingRanges, timesWithErrors
} else if err != nil {
instrument.EmitAndLogInvariantViolation(iopts, func(l *zap.Logger) {
l.Error("persist fs index bootstrap failed",
zap.Stringer("namespace", ns.ID()),
Expand Down Expand Up @@ -978,15 +1007,6 @@ func (s *peersSource) processReaders(
r.IndexResults()[xtime.ToUnixNano(blockStart)].SetBlock(idxpersist.DefaultIndexVolumeType, result.NewIndexBlock(segments, newFulfilled))
resultLock.Unlock()

// Return readers to pool.
for _, shardReaders := range timeWindowReaders.Readers {
for _, r := range shardReaders.Readers {
if err := r.Close(); err == nil {
readerPool.Put(r)
}
}
}

return remainingRanges, timesWithErrors
}

Expand Down

0 comments on commit 066e956

Please sign in to comment.