diff --git a/docs/m3db/architecture/engine.md b/docs/m3db/architecture/engine.md index b34b59596b..9f5cca0725 100644 --- a/docs/m3db/architecture/engine.md +++ b/docs/m3db/architecture/engine.md @@ -181,7 +181,7 @@ The ticking process runs continously in the background and is responsible for a #### Merging all encoders -M3TSZ is designed for compressing time series data in which each datapoint has a timestamp that is larger than the last encoded datapoint. For monitoring workloads this works very well because every subsequent datapoint is almost always larger than the previous one. However, real world systems are messy and occassionally out of order writes will be received. When this happens, M3DB will allocate a new encoder for the out of order datapoints. The multiple encoders need to be merged before flushing the data to disk, but to prevent huge memory spikes during the flushing process we continuously merge out of order encoders in the background. +M3TSZ is designed for compressing time series data in which each datapoint has a timestamp that is larger than the last encoded datapoint. For monitoring workloads this works very well because every subsequent datapoint is almost always chronologically after the previous one. However, real world systems are messy and occasionally out of order writes will be received. When this happens, M3DB will allocate a new encoder for the out of order datapoints. The multiple encoders need to be merged before flushing the data to disk, but to prevent huge memory spikes during the flushing process we continuously merge out of order encoders in the background. #### Removing expired / flushed series and blocks from memory diff --git a/src/dbnode/integration/disk_cleanup_multi_ns_test.go b/src/dbnode/integration/disk_cleanup_multi_ns_test.go index 2a026c3436..e549622695 100644 --- a/src/dbnode/integration/disk_cleanup_multi_ns_test.go +++ b/src/dbnode/integration/disk_cleanup_multi_ns_test.go @@ -154,6 +154,16 @@ func TestDiskCleanupMultipleNamespace(t *testing.T) { // Move now forward by 12 hours, and see if the expected files have been deleted testSetup.setNowFn(end) + // This isn't great, but right now the commitlog will only ever rotate when writes + // are received, so we need to issue a write after changing the time to force the + // commitlog rotation. This won't be required once we tie commitlog rotation into + // the snapshotting process. + testSetup.writeBatch(testNamespaces[0], generate.Block(generate.BlockConfig{ + IDs: []string{"foo"}, + NumPoints: 1, + Start: end, + })) + // Check if expected files have been deleted log.Infof("waiting until data is cleaned up") waitTimeout := 60 * time.Second diff --git a/src/dbnode/integration/disk_cleanup_test.go b/src/dbnode/integration/disk_cleanup_test.go index b3b2675576..cad5d96040 100644 --- a/src/dbnode/integration/disk_cleanup_test.go +++ b/src/dbnode/integration/disk_cleanup_test.go @@ -76,7 +76,6 @@ func TestDiskCleanup(t *testing.T) { } writeDataFileSetFiles(t, testSetup.storageOpts, md, shard, fileTimes) for _, clTime := range fileTimes { - // Need to generate valid commit log files otherwise cleanup will fail. data := map[xtime.UnixNano]generate.SeriesBlock{ xtime.ToUnixNano(clTime): nil, } @@ -89,6 +88,15 @@ func TestDiskCleanup(t *testing.T) { // and commit logs at now will be deleted newNow := now.Add(retentionPeriod).Add(2 * blockSize) testSetup.setNowFn(newNow) + // This isn't great, but right now the commitlog will only ever rotate when writes + // are received, so we need to issue a write after changing the time to force the + // commitlog rotation. This won't be required once we tie commitlog rotation into + // the snapshotting process. + testSetup.writeBatch(testNamespaces[0], generate.Block(generate.BlockConfig{ + IDs: []string{"foo"}, + NumPoints: 1, + Start: newNow, + })) // Check if files have been deleted waitTimeout := 30 * time.Second diff --git a/src/dbnode/persist/fs/commitlog/commit_log.go b/src/dbnode/persist/fs/commitlog/commit_log.go index d9a1b821ea..9323ecde6d 100644 --- a/src/dbnode/persist/fs/commitlog/commit_log.go +++ b/src/dbnode/persist/fs/commitlog/commit_log.go @@ -22,6 +22,7 @@ package commitlog import ( "errors" + "fmt" "sync" "time" @@ -56,35 +57,73 @@ type writeCommitLogFn func( unit xtime.Unit, annotation ts.Annotation, ) error -type commitLogFailFn func(err error) -type completionFn func(err error) +type commitLogFailFn func(err error) type commitLog struct { - sync.RWMutex + // The commitlog has two different locks that it maintains: + // + // 1) The closedState lock is acquired and held for any actions taking place that + // the commitlog must remain open for the duration of (or for changing the state + // of the commitlog to closed). + // + // + // 2) The flushState is only used for reading and writing the lastFlushAt variable. The scope + // of the flushState lock is very limited and is hidden behind helper methods for getting and + // setting the value of lastFlushAt. + closedState closedState + flushState flushState + + writerState writerState + + // Associated with the closedState, but stored separately since + // it does not require the closedState lock to be acquired before + // being accessed. + closeErr chan error + + writes chan commitLogWrite + pendingFlushFns []callbackFn + opts Options nowFn clock.NowFn - - log xlog.Logger + log xlog.Logger newCommitLogWriterFn newCommitLogWriterFn writeFn writeCommitLogFn commitLogFailFn commitLogFailFn - writer commitLogWriter - // TODO(r): replace buffered channel with concurrent striped - // circular buffer to avoid central write lock contention - writes chan commitLogWrite + metrics commitLogMetrics +} - flushMutex sync.RWMutex - lastFlushAt time.Time - pendingFlushFns []completionFn +// Use the helper methods when interacting with this struct, the mutex +// should never need to be manually interacted with. +type flushState struct { + sync.RWMutex + lastFlushAt time.Time +} + +func (f *flushState) setLastFlushAt(t time.Time) { + f.Lock() + f.lastFlushAt = t + f.Unlock() +} +func (f *flushState) getLastFlushAt() time.Time { + f.RLock() + lastFlush := f.lastFlushAt + f.RUnlock() + return lastFlush +} + +type writerState struct { + writer commitLogWriter writerExpireAt time.Time - closed bool - closeErr chan error + activeFile *File +} - metrics commitLogMetrics +type closedState struct { + sync.RWMutex + closed bool } type commitLogMetrics struct { @@ -98,22 +137,49 @@ type commitLogMetrics struct { flushDone tally.Counter } -type valueType int +type eventType int // nolint: varcheck, unused const ( - writeValueType valueType = iota - flushValueType + writeEventType eventType = iota + flushEventType + activeLogsEventType ) +type callbackFn func(callbackResult) + +type callbackResult struct { + eventType eventType + err error + activeLogs activeLogsCallbackResult +} + +type activeLogsCallbackResult struct { + file *File +} + +func (r callbackResult) activeLogsCallbackResult() (activeLogsCallbackResult, error) { + if r.eventType != activeLogsEventType { + return activeLogsCallbackResult{}, fmt.Errorf( + "wrong event type: expected %d but got %d", + activeLogsEventType, r.eventType) + } + + if r.err != nil { + return activeLogsCallbackResult{}, nil + } + + return r.activeLogs, nil +} + type commitLogWrite struct { - valueType valueType + eventType eventType - series Series - datapoint ts.Datapoint - unit xtime.Unit - annotation ts.Annotation - completionFn completionFn + series Series + datapoint ts.Datapoint + unit xtime.Unit + annotation ts.Annotation + callbackFn callbackFn } // NewCommitLog creates a new commit log @@ -155,13 +221,17 @@ func NewCommitLog(opts Options) (CommitLog, error) { } func (l *commitLog) Open() error { + l.closedState.Lock() + defer l.closedState.Unlock() + // Open the buffered commit log writer if err := l.openWriter(l.nowFn()); err != nil { return err } - // Flush the info header to ensure we can write to disk - if err := l.writer.Flush(); err != nil { + // Sync the info header to ensure we can write to disk and make sure that we can at least + // read the info about the commitlog file later. + if err := l.writerState.writer.Flush(true); err != nil { return err } @@ -184,6 +254,47 @@ func (l *commitLog) Open() error { return nil } +func (l *commitLog) ActiveLogs() ([]File, error) { + l.closedState.RLock() + defer l.closedState.RUnlock() + + if l.closedState.closed { + return nil, errCommitLogClosed + } + + var ( + err error + files []File + wg = sync.WaitGroup{} + ) + wg.Add(1) + + l.writes <- commitLogWrite{ + eventType: activeLogsEventType, + callbackFn: func(r callbackResult) { + defer wg.Done() + + result, e := r.activeLogsCallbackResult() + if e != nil { + err = e + return + } + + if result.file != nil { + files = append(files, *result.file) + } + }, + } + + wg.Wait() + + if err != nil { + return nil, err + } + + return files, nil +} + func (l *commitLog) flushEvery(interval time.Duration) { // Periodically flush the underlying commit log writer to cover // the case when writes stall for a considerable time @@ -202,10 +313,7 @@ func (l *commitLog) flushEvery(interval time.Duration) { time.Sleep(sleepFor) - l.flushMutex.RLock() - lastFlushAt := l.lastFlushAt - l.flushMutex.RUnlock() - + lastFlushAt := l.flushState.getLastFlushAt() if sinceFlush := l.nowFn().Sub(lastFlushAt); sinceFlush < interval { // Flushed already recently, sleep until we would next consider flushing sleepForOverride = interval - sinceFlush @@ -213,32 +321,43 @@ func (l *commitLog) flushEvery(interval time.Duration) { } // Request a flush - l.RLock() - if l.closed { - l.RUnlock() + l.closedState.RLock() + if l.closedState.closed { + l.closedState.RUnlock() return } - l.writes <- commitLogWrite{valueType: flushValueType} - l.RUnlock() + l.writes <- commitLogWrite{eventType: flushEventType} + l.closedState.RUnlock() } } func (l *commitLog) write() { for write := range l.writes { - // For writes requiring acks add to pending acks - if write.completionFn != nil { - l.pendingFlushFns = append(l.pendingFlushFns, write.completionFn) + if write.eventType == flushEventType { + l.writerState.writer.Flush(false) + continue } - if write.valueType == flushValueType { - l.writer.Flush() + if write.eventType == activeLogsEventType { + write.callbackFn(callbackResult{ + eventType: write.eventType, + err: nil, + activeLogs: activeLogsCallbackResult{ + file: l.writerState.activeFile, + }, + }) continue } - if now := l.nowFn(); !now.Before(l.writerExpireAt) { - if err := l.openWriter(now); err != nil { + // For writes requiring acks add to pending acks + if write.eventType == writeEventType && write.callbackFn != nil { + l.pendingFlushFns = append(l.pendingFlushFns, write.callbackFn) + } + if now := l.nowFn(); !now.Before(l.writerState.writerExpireAt) { + err := l.openWriter(now) + if err != nil { l.metrics.errors.Inc(1) l.metrics.openErrors.Inc(1) l.log.Errorf("failed to open commit log: %v", err) @@ -251,7 +370,7 @@ func (l *commitLog) write() { } } - err := l.writer.Write(write.series, + err := l.writerState.writer.Write(write.series, write.datapoint, write.unit, write.annotation) if err != nil { @@ -267,18 +386,14 @@ func (l *commitLog) write() { l.metrics.success.Inc(1) } - l.Lock() - defer l.Unlock() + writer := l.writerState.writer + l.writerState.writer = nil - writer := l.writer - l.writer = nil l.closeErr <- writer.Close() } func (l *commitLog) onFlush(err error) { - l.flushMutex.Lock() - l.lastFlushAt = l.nowFn() - l.flushMutex.Unlock() + l.flushState.setLastFlushAt(l.nowFn()) if err != nil { l.metrics.errors.Inc(1) @@ -300,36 +415,42 @@ func (l *commitLog) onFlush(err error) { } for i := range l.pendingFlushFns { - l.pendingFlushFns[i](err) + l.pendingFlushFns[i](callbackResult{ + eventType: flushEventType, + err: err, + }) l.pendingFlushFns[i] = nil } l.pendingFlushFns = l.pendingFlushFns[:0] l.metrics.flushDone.Inc(1) } +// writerState lock must be held for the duration of this function call. func (l *commitLog) openWriter(now time.Time) error { - if l.writer != nil { - if err := l.writer.Close(); err != nil { + if l.writerState.writer != nil { + if err := l.writerState.writer.Close(); err != nil { l.metrics.closeErrors.Inc(1) l.log.Errorf("failed to close commit log: %v", err) // If we failed to close then create a new commit log writer - l.writer = nil + l.writerState.writer = nil } } - if l.writer == nil { - l.writer = l.newCommitLogWriterFn(l.onFlush, l.opts) + if l.writerState.writer == nil { + l.writerState.writer = l.newCommitLogWriterFn(l.onFlush, l.opts) } blockSize := l.opts.BlockSize() start := now.Truncate(blockSize) - if err := l.writer.Open(start, blockSize); err != nil { + file, err := l.writerState.writer.Open(start, blockSize) + if err != nil { return err } - l.writerExpireAt = start.Add(blockSize) + l.writerState.activeFile = &file + l.writerState.writerExpireAt = start.Add(blockSize) return nil } @@ -351,9 +472,9 @@ func (l *commitLog) writeWait( unit xtime.Unit, annotation ts.Annotation, ) error { - l.RLock() - if l.closed { - l.RUnlock() + l.closedState.RLock() + if l.closedState.closed { + l.closedState.RUnlock() return errCommitLogClosed } @@ -364,17 +485,17 @@ func (l *commitLog) writeWait( wg.Add(1) - completion := func(err error) { - result = err + completion := func(r callbackResult) { + result = r.err wg.Done() } write := commitLogWrite{ - series: series, - datapoint: datapoint, - unit: unit, - annotation: annotation, - completionFn: completion, + series: series, + datapoint: datapoint, + unit: unit, + annotation: annotation, + callbackFn: completion, } enqueued := false @@ -385,7 +506,7 @@ func (l *commitLog) writeWait( default: } - l.RUnlock() + l.closedState.RUnlock() if !enqueued { return ErrCommitLogQueueFull @@ -403,9 +524,9 @@ func (l *commitLog) writeBehind( unit xtime.Unit, annotation ts.Annotation, ) error { - l.RLock() - if l.closed { - l.RUnlock() + l.closedState.RLock() + if l.closedState.closed { + l.closedState.RUnlock() return errCommitLogClosed } @@ -424,7 +545,7 @@ func (l *commitLog) writeBehind( default: } - l.RUnlock() + l.closedState.RUnlock() if !enqueued { return ErrCommitLogQueueFull @@ -434,15 +555,15 @@ func (l *commitLog) writeBehind( } func (l *commitLog) Close() error { - l.Lock() - if l.closed { - l.Unlock() + l.closedState.Lock() + if l.closedState.closed { + l.closedState.Unlock() return nil } - l.closed = true + l.closedState.closed = true close(l.writes) - l.Unlock() + l.closedState.Unlock() // Receive the result of closing the writer from asynchronous writer return <-l.closeErr diff --git a/src/dbnode/persist/fs/commitlog/commit_log_conc_test.go b/src/dbnode/persist/fs/commitlog/commit_log_conc_test.go new file mode 100644 index 0000000000..437aef3694 --- /dev/null +++ b/src/dbnode/persist/fs/commitlog/commit_log_conc_test.go @@ -0,0 +1,99 @@ +// +build big +// +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package commitlog + +import ( + "testing" + "time" + + "github.com/m3db/m3/src/dbnode/ts" + "github.com/m3db/m3x/context" + xtime "github.com/m3db/m3x/time" + + "github.com/stretchr/testify/require" +) + +func TestCommitLogActiveLogsConcurrency(t *testing.T) { + opts, _ := newTestOptions(t, overrides{ + strategy: StrategyWriteBehind, + }) + opts = opts.SetBlockSize(1 * time.Millisecond) + defer cleanup(t, opts) + + var ( + doneCh = make(chan struct{}) + commitLog = newTestCommitLog(t, opts) + ) + + // One goroutine continuously writing + go func() { + for { + select { + case <-doneCh: + return + default: + time.Sleep(time.Millisecond) + err := commitLog.Write( + context.NewContext(), + testSeries(0, "foo.bar", testTags1, 127), + ts.Datapoint{}, + xtime.Second, + nil) + if err == errCommitLogClosed { + return + } + if err == ErrCommitLogQueueFull { + continue + } + if err != nil { + panic(err) + } + } + } + }() + + // One goroutine continuously checking active logs + go func() { + var ( + lastSeenFile string + numFilesSeen int + ) + for numFilesSeen < 10 { + time.Sleep(100 * time.Millisecond) + logs, err := commitLog.ActiveLogs() + if err != nil { + panic(err) + } + require.Equal(t, 1, len(logs)) + if logs[0].FilePath != lastSeenFile { + lastSeenFile = logs[0].FilePath + numFilesSeen++ + } + } + close(doneCh) + }() + + <-doneCh + + require.NoError(t, commitLog.Close()) +} diff --git a/src/dbnode/persist/fs/commitlog/commit_log_mock.go b/src/dbnode/persist/fs/commitlog/commit_log_mock.go index c032d0c2cf..c4f4551720 100644 --- a/src/dbnode/persist/fs/commitlog/commit_log_mock.go +++ b/src/dbnode/persist/fs/commitlog/commit_log_mock.go @@ -99,6 +99,19 @@ func (mr *MockCommitLogMockRecorder) Close() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockCommitLog)(nil).Close)) } +// ActiveLogs mocks base method +func (m *MockCommitLog) ActiveLogs() ([]File, error) { + ret := m.ctrl.Call(m, "ActiveLogs") + ret0, _ := ret[0].([]File) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ActiveLogs indicates an expected call of ActiveLogs +func (mr *MockCommitLogMockRecorder) ActiveLogs() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ActiveLogs", reflect.TypeOf((*MockCommitLog)(nil).ActiveLogs)) +} + // MockIterator is a mock of Iterator interface type MockIterator struct { ctrl *gomock.Controller diff --git a/src/dbnode/persist/fs/commitlog/commit_log_test.go b/src/dbnode/persist/fs/commitlog/commit_log_test.go index b3a35467eb..ff990d4a1e 100644 --- a/src/dbnode/persist/fs/commitlog/commit_log_test.go +++ b/src/dbnode/persist/fs/commitlog/commit_log_test.go @@ -153,21 +153,21 @@ func snapshotCounterValue( } type mockCommitLogWriter struct { - openFn func(start time.Time, duration time.Duration) error + openFn func(start time.Time, duration time.Duration) (File, error) writeFn func(Series, ts.Datapoint, xtime.Unit, ts.Annotation) error - flushFn func() error + flushFn func(sync bool) error closeFn func() error } func newMockCommitLogWriter() *mockCommitLogWriter { return &mockCommitLogWriter{ - openFn: func(start time.Time, duration time.Duration) error { - return nil + openFn: func(start time.Time, duration time.Duration) (File, error) { + return File{}, nil }, writeFn: func(Series, ts.Datapoint, xtime.Unit, ts.Annotation) error { return nil }, - flushFn: func() error { + flushFn: func(sync bool) error { return nil }, closeFn: func() error { @@ -176,7 +176,7 @@ func newMockCommitLogWriter() *mockCommitLogWriter { } } -func (w *mockCommitLogWriter) Open(start time.Time, duration time.Duration) error { +func (w *mockCommitLogWriter) Open(start time.Time, duration time.Duration) (File, error) { return w.openFn(start, duration) } @@ -189,8 +189,8 @@ func (w *mockCommitLogWriter) Write( return w.writeFn(series, datapoint, unit, annotation) } -func (w *mockCommitLogWriter) Flush() error { - return w.flushFn() +func (w *mockCommitLogWriter) Flush(sync bool) error { + return w.flushFn(sync) } func (w *mockCommitLogWriter) Close() error { @@ -277,7 +277,7 @@ func flushUntilDone(l *commitLog, wg *sync.WaitGroup) { blockWg.Add(1) go func() { for atomic.LoadUint64(&done) == 0 { - l.writes <- commitLogWrite{valueType: flushValueType} + l.writes <- commitLogWrite{eventType: flushEventType} time.Sleep(time.Millisecond) } blockWg.Done() @@ -384,7 +384,7 @@ func TestReadCommitLogMissingMetadata(t *testing.T) { // Replace bitset in writer with one that configurably returns true or false // depending on the series commitLog := newTestCommitLog(t, opts) - writer := commitLog.writer.(*writer) + writer := commitLog.writerState.writer.(*writer) bitSet := bitset.NewBitSet(0) @@ -681,14 +681,14 @@ func TestCommitLogFailOnWriteError(t *testing.T) { } var opens int64 - writer.openFn = func(start time.Time, duration time.Duration) error { + writer.openFn = func(start time.Time, duration time.Duration) (File, error) { if atomic.AddInt64(&opens, 1) >= 2 { - return fmt.Errorf("an error") + return File{}, fmt.Errorf("an error") } - return nil + return File{}, nil } - writer.flushFn = func() error { + writer.flushFn = func(bool) error { commitLog.onFlush(nil) return nil } @@ -730,14 +730,14 @@ func TestCommitLogFailOnOpenError(t *testing.T) { writer := newMockCommitLogWriter() var opens int64 - writer.openFn = func(start time.Time, duration time.Duration) error { + writer.openFn = func(start time.Time, duration time.Duration) (File, error) { if atomic.AddInt64(&opens, 1) >= 2 { - return fmt.Errorf("an error") + return File{}, fmt.Errorf("an error") } - return nil + return File{}, nil } - writer.flushFn = func() error { + writer.flushFn = func(bool) error { commitLog.onFlush(nil) return nil } @@ -754,10 +754,8 @@ func TestCommitLogFailOnOpenError(t *testing.T) { wg := setupCloseOnFail(t, commitLog) func() { - commitLog.RLock() - defer commitLog.RUnlock() // Expire the writer so it requires a new open - commitLog.writerExpireAt = timeZero + commitLog.writerState.writerExpireAt = timeZero }() writes := []testWrite{ @@ -790,7 +788,7 @@ func TestCommitLogFailOnFlushError(t *testing.T) { writer := newMockCommitLogWriter() var flushes int64 - writer.flushFn = func() error { + writer.flushFn = func(bool) error { if atomic.AddInt64(&flushes, 1) >= 2 { commitLog.onFlush(fmt.Errorf("an error")) } else { @@ -828,6 +826,35 @@ func TestCommitLogFailOnFlushError(t *testing.T) { require.Equal(t, int64(1), flushErrors.Value()) } +func TestCommitLogActiveLogs(t *testing.T) { + opts, _ := newTestOptions(t, overrides{ + strategy: StrategyWriteBehind, + }) + defer cleanup(t, opts) + + commitLog := newTestCommitLog(t, opts) + + writer := newMockCommitLogWriter() + writer.flushFn = func(bool) error { + return nil + } + commitLog.newCommitLogWriterFn = func( + _ flushFn, + _ Options, + ) commitLogWriter { + return writer + } + + logs, err := commitLog.ActiveLogs() + require.NoError(t, err) + require.Equal(t, 1, len(logs)) + + // Close the commit log and consequently flush + require.NoError(t, commitLog.Close()) + _, err = commitLog.ActiveLogs() + require.Error(t, err) +} + var ( testTag1 = ident.StringTag("name1", "val1") testTag2 = ident.StringTag("name2", "val2") diff --git a/src/dbnode/persist/fs/commitlog/read_write_prop_test.go b/src/dbnode/persist/fs/commitlog/read_write_prop_test.go index 92ae6039c8..b3ea94fd1f 100644 --- a/src/dbnode/persist/fs/commitlog/read_write_prop_test.go +++ b/src/dbnode/persist/fs/commitlog/read_write_prop_test.go @@ -23,6 +23,7 @@ package commitlog import ( + "errors" "fmt" "io/ioutil" "os" @@ -187,7 +188,7 @@ func clCommandFunctor(t *testing.T, basePath string, seed int64) *commands.Proto return ok }, GenCommandFunc: func(state commands.State) gopter.Gen { - return gen.OneGenOf(genOpenCommand, genCloseCommand, genWriteBehindCommand) + return gen.OneGenOf(genOpenCommand, genCloseCommand, genWriteBehindCommand, genActiveLogsCommand) }, } } @@ -220,7 +221,12 @@ var genOpenCommand = gen.Const(&commands.ProtoCommand{ return w } } - return s.cLog.Open() + err = s.cLog.Open() + if err != nil { + return err + } + s.open = true + return nil }, NextStateFunc: func(state commands.State) commands.State { s := state.(*clState) @@ -245,12 +251,16 @@ var genCloseCommand = gen.Const(&commands.ProtoCommand{ }, RunFunc: func(q commands.SystemUnderTest) commands.Result { s := q.(*clState) - return s.cLog.Close() + err := s.cLog.Close() + if err != nil { + return err + } + s.open = false + return nil }, NextStateFunc: func(state commands.State) commands.State { s := state.(*clState) s.open = false - s.cLog = nil return s }, PostConditionFunc: func(state commands.State, result commands.Result) *gopter.PropResult { @@ -310,6 +320,51 @@ var genWriteBehindCommand = gen.SliceOfN(10, genWrite()). } }) +var genActiveLogsCommand = gen.Const(&commands.ProtoCommand{ + Name: "ActiveLogs", + PreConditionFunc: func(state commands.State) bool { + return true + }, + RunFunc: func(q commands.SystemUnderTest) commands.Result { + s := q.(*clState) + + if s.cLog == nil { + return nil + } + + logs, err := s.cLog.ActiveLogs() + if !s.open { + if err != errCommitLogClosed { + return errors.New("did not receive commit log closed error") + } + return nil + } + + if err != nil { + return err + } + + if len(logs) != 1 { + return fmt.Errorf("ActiveLogs did not return exactly one log file: %v", logs) + } + + return nil + }, + NextStateFunc: func(state commands.State) commands.State { + s := state.(*clState) + return s + }, + PostConditionFunc: func(state commands.State, result commands.Result) *gopter.PropResult { + if result == nil { + return &gopter.PropResult{Status: gopter.PropTrue} + } + return &gopter.PropResult{ + Status: gopter.PropFalse, + Error: result.(error), + } + }, +}) + // clState holds the expected state (i.e. its the commands.State), and we use it as the SystemUnderTest type clState struct { basePath string @@ -555,3 +610,7 @@ func (c *corruptingChunkWriter) close() error { func (c *corruptingChunkWriter) isOpen() bool { return c.chunkWriter.isOpen() } + +func (c *corruptingChunkWriter) sync() error { + return c.chunkWriter.sync() +} diff --git a/src/dbnode/persist/fs/commitlog/types.go b/src/dbnode/persist/fs/commitlog/types.go index f5fddc7c18..bcb24e6e16 100644 --- a/src/dbnode/persist/fs/commitlog/types.go +++ b/src/dbnode/persist/fs/commitlog/types.go @@ -64,6 +64,9 @@ type CommitLog interface { // Close the commit log Close() error + + // ActiveLogs returns a slice of the active commitlogs. + ActiveLogs() ([]File, error) } // Iterator provides an iterator for commit logs diff --git a/src/dbnode/persist/fs/commitlog/writer.go b/src/dbnode/persist/fs/commitlog/writer.go index 133f54267b..bc18881e19 100644 --- a/src/dbnode/persist/fs/commitlog/writer.go +++ b/src/dbnode/persist/fs/commitlog/writer.go @@ -65,7 +65,7 @@ var ( type commitLogWriter interface { // Open opens the commit log for writing data - Open(start time.Time, duration time.Duration) error + Open(start time.Time, duration time.Duration) (File, error) // Write will write an entry in the commit log for a given series Write( @@ -75,8 +75,9 @@ type commitLogWriter interface { annotation ts.Annotation, ) error - // Flush will flush the contents to the disk, useful when first testing if first commit log is writable - Flush() error + // Flush will flush any data in the writers buffer to the chunkWriter, essentially forcing + // a new chunk to be created. Optionally forces the data to be FSync'd to disk. + Flush(sync bool) error // Close the reader Close() error @@ -88,6 +89,7 @@ type chunkWriter interface { reset(f xos.File) close() error isOpen() bool + sync() error } type flushFn func(err error) @@ -133,19 +135,19 @@ func newCommitLogWriter( } } -func (w *writer) Open(start time.Time, duration time.Duration) error { +func (w *writer) Open(start time.Time, duration time.Duration) (File, error) { if w.isOpen() { - return errCommitLogWriterAlreadyOpen + return File{}, errCommitLogWriterAlreadyOpen } commitLogsDir := fs.CommitLogsDirPath(w.filePathPrefix) if err := os.MkdirAll(commitLogsDir, w.newDirectoryMode); err != nil { - return err + return File{}, err } filePath, index, err := fs.NextCommitLogsFile(w.filePathPrefix, start) if err != nil { - return err + return File{}, err } logInfo := schema.LogInfo{ Start: start.UnixNano(), @@ -154,23 +156,28 @@ func (w *writer) Open(start time.Time, duration time.Duration) error { } w.logEncoder.Reset() if err := w.logEncoder.EncodeLogInfo(logInfo); err != nil { - return err + return File{}, err } fd, err := fs.OpenWritable(filePath, w.newFileMode) if err != nil { - return err + return File{}, err } w.chunkWriter.reset(fd) w.buffer.Reset(w.chunkWriter) if err := w.write(w.logEncoder.Bytes()); err != nil { w.Close() - return err + return File{}, err } w.start = start w.duration = duration - return nil + return File{ + FilePath: filePath, + Start: start, + Duration: duration, + Index: int64(index), + }, nil } func (w *writer) isOpen() bool { @@ -243,8 +250,25 @@ func (w *writer) Write( return nil } -func (w *writer) Flush() error { - return w.buffer.Flush() +func (w *writer) Flush(sync bool) error { + err := w.buffer.Flush() + if err != nil { + return err + } + + if !sync { + return nil + } + + return w.sync() +} + +func (w *writer) sync() error { + if err := w.chunkWriter.sync(); err != nil { + return err + } + + return nil } func (w *writer) Close() error { @@ -252,7 +276,7 @@ func (w *writer) Close() error { return nil } - if err := w.Flush(); err != nil { + if err := w.Flush(true); err != nil { return err } if err := w.chunkWriter.close(); err != nil { @@ -314,6 +338,10 @@ func (w *fsChunkWriter) isOpen() bool { return w.fd != nil } +func (w *fsChunkWriter) sync() error { + return w.fd.Sync() +} + func (w *fsChunkWriter) Write(p []byte) (int, error) { size := len(p) @@ -351,7 +379,7 @@ func (w *fsChunkWriter) Write(p []byte) (int, error) { // Fsync if required to if w.fsync { - err = w.fd.Sync() + err = w.sync() } // Fire flush callback diff --git a/src/dbnode/storage/cleanup.go b/src/dbnode/storage/cleanup.go index 6f5917ffd0..550e190369 100644 --- a/src/dbnode/storage/cleanup.go +++ b/src/dbnode/storage/cleanup.go @@ -41,10 +41,18 @@ type deleteFilesFn func(files []string) error type deleteInactiveDirectoriesFn func(parentDirPath string, activeDirNames []string) error +// Narrow interface so as not to expose all the functionality of the commitlog +// to the cleanup manager. +type activeCommitlogs interface { + ActiveLogs() ([]commitlog.File, error) +} + type cleanupManager struct { sync.RWMutex - database database + database database + activeCommitlogs activeCommitlogs + opts Options nowFn clock.NowFn filePathPrefix string @@ -71,13 +79,16 @@ func newCleanupManagerMetrics(scope tally.Scope) cleanupManagerMetrics { } } -func newCleanupManager(database database, scope tally.Scope) databaseCleanupManager { +func newCleanupManager( + database database, activeLogs activeCommitlogs, scope tally.Scope) databaseCleanupManager { opts := database.Options() filePathPrefix := opts.CommitLogOptions().FilesystemOptions().FilePathPrefix() commitLogsDir := fs.CommitLogsDirPath(filePathPrefix) return &cleanupManager{ - database: database, + database: database, + activeCommitlogs: activeLogs, + opts: opts, nowFn: opts.ClockOptions().NowFn(), filePathPrefix: filePathPrefix, @@ -286,26 +297,42 @@ func (m *cleanupManager) cleanupNamespaceSnapshotFiles(earliestToRetain time.Tim // commitLogTimes returns the earliest time before which the commit logs are expired, // as well as a list of times we need to clean up commit log files for. func (m *cleanupManager) commitLogTimes(t time.Time) ([]commitLogFileWithErrorAndPath, error) { - // NB(prateek): this logic of polling the namespaces across the commit log's entire - // retention history could get expensive if commit logs are retained for long periods. - // e.g. if we retain them for 40 days, with a block 2 hours; then every time - // we try to flush we are going to be polling each namespace, for each shard, for 480 - // distinct blockstarts. Say we have 2 namespaces, each with 8192 shards, that's ~10M map lookups. - // If we cared about 100% correctness, we would optimize this by retaining a smarter data - // structure (e.g. interval tree), but for our use-case, it's safe to assume that commit logs - // are only retained for a period of 1-2 days (at most), after we which we'd live we with the - // data loss. + namespaces, err := m.database.GetOwnedNamespaces() + if err != nil { + return nil, err + } + // We list the commit log files on disk before we determine what the currently active commitlog + // is to ensure that the logic remains correct even if the commitlog is rotated while this + // function is executing. For example, imagine the following commitlogs are on disk: + // + // [time1, time2, time3] + // + // If we call ActiveLogs first then it will return time3. Next, the commit log file rotates, and + // after that we call commitLogFilesFn which returns: [time1, time2, time3, time4]. In this scenario + // we would be allowed to delete commitlog files 1,2, and 4 which is not the desired behavior. Instead, + // we list the commitlogs on disk first (which returns time1, time2, and time3) and *then* check what + // the active file is. If the commitlog has not rotated, then ActiveLogs() will return time3 which + // we will correctly avoid deleting, and if the commitlog has rotated, then ActiveLogs() will return + // time4 which we wouldn't consider deleting anyways because it wasn't returned from the first call + // to commitLogFilesFn. files, corruptFiles, err := m.commitLogFilesFn(m.opts.CommitLogOptions()) if err != nil { return nil, err } - namespaces, err := m.database.GetOwnedNamespaces() + + activeCommitlogs, err := m.activeCommitlogs.ActiveLogs() if err != nil { return nil, err } shouldCleanupFile := func(f commitlog.File) (bool, error) { + if commitlogsContainPath(activeCommitlogs, f.FilePath) { + // An active commitlog should never satisfy all of the constraints + // for deleting a commitlog, but skip them for posterity. + return false, nil + } + for _, ns := range namespaces { var ( start = f.Start @@ -366,6 +393,12 @@ func (m *cleanupManager) commitLogTimes(t time.Time) ([]commitLogFileWithErrorAn } for _, errorWithPath := range corruptFiles { + if commitlogsContainPath(activeCommitlogs, errorWithPath.Path()) { + // Skip active commit log files as they may appear corrupt due to the + // header info not being written out yet. + continue + } + m.metrics.corruptCommitlogFile.Inc(1) // If we were unable to read the commit log files info header, then we're forced to assume // that the file is corrupt and remove it. This can happen in situations where M3DB experiences @@ -373,11 +406,8 @@ func (m *cleanupManager) commitLogTimes(t time.Time) ([]commitLogFileWithErrorAn m.opts.InstrumentOptions().Logger().Errorf( "encountered err: %v reading commit log file: %v info during cleanup, marking file for deletion", errorWithPath.Error(), errorWithPath.Path()) - // TODO(rartoul): Leave this out until we have a way of distinguishing between a corrupt commit - // log file and the commit log file that is actively being written to (which may still be missing - // the header): https://github.com/m3db/m3/issues/1078 - // filesToCleanup = append(filesToCleanup, newCommitLogFileWithErrorAndPath( - // commitlog.File{}, errorWithPath.Path(), err)) + filesToCleanup = append(filesToCleanup, newCommitLogFileWithErrorAndPath( + commitlog.File{}, errorWithPath.Path(), err)) } return filesToCleanup, nil @@ -435,3 +465,13 @@ func newCommitLogFileWithErrorAndPath( err: err, } } + +func commitlogsContainPath(commitlogs []commitlog.File, path string) bool { + for _, f := range commitlogs { + if path == f.FilePath { + return true + } + } + + return false +} diff --git a/src/dbnode/storage/cleanup_prop_test.go b/src/dbnode/storage/cleanup_prop_test.go index f30a91214b..1d5100af61 100644 --- a/src/dbnode/storage/cleanup_prop_test.go +++ b/src/dbnode/storage/cleanup_prop_test.go @@ -60,7 +60,7 @@ func newPropTestCleanupMgr( db.EXPECT().Options().Return(opts).AnyTimes() db.EXPECT().GetOwnedNamespaces().Return(ns, nil).AnyTimes() scope := tally.NoopScope - cmIface := newCleanupManager(db, scope) + cmIface := newCleanupManager(db, newNoopFakeActiveLogs(), scope) cm := cmIface.(*cleanupManager) var ( diff --git a/src/dbnode/storage/cleanup_test.go b/src/dbnode/storage/cleanup_test.go index 166f45f7b1..7357c7cb48 100644 --- a/src/dbnode/storage/cleanup_test.go +++ b/src/dbnode/storage/cleanup_test.go @@ -68,7 +68,7 @@ func TestCleanupManagerCleanup(t *testing.T) { } db := newMockdatabase(ctrl, namespaces...) db.EXPECT().GetOwnedNamespaces().Return(namespaces, nil).AnyTimes() - mgr := newCleanupManager(db, tally.NoopScope).(*cleanupManager) + mgr := newCleanupManager(db, newNoopFakeActiveLogs(), tally.NoopScope).(*cleanupManager) mgr.opts = mgr.opts.SetCommitLogOptions( mgr.opts.CommitLogOptions(). SetBlockSize(rOpts.BlockSize())) @@ -116,7 +116,7 @@ func TestCleanupManagerNamespaceCleanup(t *testing.T) { db := newMockdatabase(ctrl, ns) db.EXPECT().GetOwnedNamespaces().Return(nses, nil).AnyTimes() - mgr := newCleanupManager(db, tally.NoopScope).(*cleanupManager) + mgr := newCleanupManager(db, newNoopFakeActiveLogs(), tally.NoopScope).(*cleanupManager) idx.EXPECT().CleanupExpiredFileSets(ts).Return(nil) require.NoError(t, mgr.Cleanup(ts)) } @@ -140,7 +140,7 @@ func TestCleanupManagerDoesntNeedCleanup(t *testing.T) { } db := newMockdatabase(ctrl, namespaces...) db.EXPECT().GetOwnedNamespaces().Return(namespaces, nil).AnyTimes() - mgr := newCleanupManager(db, tally.NoopScope).(*cleanupManager) + mgr := newCleanupManager(db, newNoopFakeActiveLogs(), tally.NoopScope).(*cleanupManager) mgr.opts = mgr.opts.SetCommitLogOptions( mgr.opts.CommitLogOptions(). SetBlockSize(rOpts.BlockSize())) @@ -175,7 +175,7 @@ func TestCleanupDataAndSnapshotFileSetFiles(t *testing.T) { db := newMockdatabase(ctrl, namespaces...) db.EXPECT().GetOwnedNamespaces().Return(namespaces, nil).AnyTimes() - mgr := newCleanupManager(db, tally.NoopScope).(*cleanupManager) + mgr := newCleanupManager(db, newNoopFakeActiveLogs(), tally.NoopScope).(*cleanupManager) require.NoError(t, mgr.Cleanup(ts)) } @@ -204,7 +204,7 @@ func TestDeleteInactiveDataAndSnapshotFileSetFiles(t *testing.T) { db := newMockdatabase(ctrl, namespaces...) db.EXPECT().GetOwnedNamespaces().Return(namespaces, nil).AnyTimes() - mgr := newCleanupManager(db, tally.NoopScope).(*cleanupManager) + mgr := newCleanupManager(db, newNoopFakeActiveLogs(), tally.NoopScope).(*cleanupManager) deleteInactiveDirectoriesCalls := []deleteInactiveDirectoriesCall{} deleteInactiveDirectoriesFn := func(parentDirPath string, activeDirNames []string) error { @@ -257,7 +257,7 @@ func TestCleanupManagerPropagatesGetOwnedNamespacesError(t *testing.T) { db.EXPECT().Terminate().Return(nil) db.EXPECT().GetOwnedNamespaces().Return(nil, errDatabaseIsClosed).AnyTimes() - mgr := newCleanupManager(db, tally.NoopScope).(*cleanupManager) + mgr := newCleanupManager(db, newNoopFakeActiveLogs(), tally.NoopScope).(*cleanupManager) require.NoError(t, db.Open()) require.NoError(t, db.Terminate()) @@ -436,7 +436,7 @@ func newCleanupManagerCommitLogTimesTest(t *testing.T, ctrl *gomock.Controller) ns.EXPECT().Options().Return(no).AnyTimes() db := newMockdatabase(ctrl, ns) - mgr := newCleanupManager(db, tally.NoopScope).(*cleanupManager) + mgr := newCleanupManager(db, newNoopFakeActiveLogs(), tally.NoopScope).(*cleanupManager) mgr.opts = mgr.opts.SetCommitLogOptions( mgr.opts.CommitLogOptions(). @@ -465,7 +465,7 @@ func newCleanupManagerCommitLogTimesTestMultiNS( ns2.EXPECT().Options().Return(no).AnyTimes() db := newMockdatabase(ctrl, ns1, ns2) - mgr := newCleanupManager(db, tally.NoopScope).(*cleanupManager) + mgr := newCleanupManager(db, newNoopFakeActiveLogs(), tally.NoopScope).(*cleanupManager) mgr.opts = mgr.opts.SetCommitLogOptions( mgr.opts.CommitLogOptions(). @@ -722,10 +722,6 @@ func TestCleanupManagerCommitLogTimesMultiNS(t *testing.T) { } func TestCleanupManagerDeletesCorruptCommitLogFiles(t *testing.T) { - // TODO(rartoul): Re-enable this once https://github.com/m3db/m3/issues/1078 - // is resolved. - t.Skip() - ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -744,3 +740,44 @@ func TestCleanupManagerDeletesCorruptCommitLogFiles(t *testing.T) { require.NoError(t, err) require.True(t, containsCorrupt(filesToCleanup, path)) } + +func TestCleanupManagerIgnoresActiveCommitLogFiles(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var ( + _, mgr = newCleanupManagerCommitLogTimesTest(t, ctrl) + err = errors.New("some_error") + path = "path" + ) + mgr.commitLogFilesFn = func(_ commitlog.Options) ([]commitlog.File, []commitlog.ErrorWithPath, error) { + return []commitlog.File{}, []commitlog.ErrorWithPath{ + commitlog.NewErrorWithPath(err, path), + }, nil + } + mgr.activeCommitlogs = newFakeActiveLogs([]commitlog.File{ + {FilePath: path}, + }) + + filesToCleanup, err := mgr.commitLogTimes(currentTime) + require.NoError(t, err) + require.Empty(t, filesToCleanup, path) +} + +type fakeActiveLogs struct { + activeLogs []commitlog.File +} + +func (f fakeActiveLogs) ActiveLogs() ([]commitlog.File, error) { + return f.activeLogs, nil +} + +func newNoopFakeActiveLogs() fakeActiveLogs { + return newFakeActiveLogs(nil) +} + +func newFakeActiveLogs(activeLogs []commitlog.File) fakeActiveLogs { + return fakeActiveLogs{ + activeLogs: activeLogs, + } +} diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index 0928c45cfa..4e4d1647c8 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -188,7 +188,8 @@ func NewDatabase( return nil, err } - mediator, err := newMediator(d, opts.SetInstrumentOptions(databaseIOpts)) + mediator, err := newMediator( + d, commitLog, opts.SetInstrumentOptions(databaseIOpts)) if err != nil { return nil, err } diff --git a/src/dbnode/storage/fs.go b/src/dbnode/storage/fs.go index a34eff50b0..5ddcb13b39 100644 --- a/src/dbnode/storage/fs.go +++ b/src/dbnode/storage/fs.go @@ -24,6 +24,7 @@ import ( "sync" "time" + "github.com/m3db/m3/src/dbnode/persist/fs/commitlog" xlog "github.com/m3db/m3x/log" ) @@ -69,12 +70,13 @@ type fileSystemManager struct { func newFileSystemManager( database database, + commitLog commitlog.CommitLog, opts Options, ) databaseFileSystemManager { instrumentOpts := opts.InstrumentOptions() scope := instrumentOpts.MetricsScope().SubScope("fs") fm := newFlushManager(database, scope) - cm := newCleanupManager(database, scope) + cm := newCleanupManager(database, commitLog, scope) return &fileSystemManager{ databaseFlushManager: fm, diff --git a/src/dbnode/storage/fs_test.go b/src/dbnode/storage/fs_test.go index f95cc64df5..4c5b05ba9f 100644 --- a/src/dbnode/storage/fs_test.go +++ b/src/dbnode/storage/fs_test.go @@ -34,7 +34,7 @@ func TestFileSystemManagerShouldRunDuringBootstrap(t *testing.T) { defer ctrl.Finish() database := newMockdatabase(ctrl) - fsm := newFileSystemManager(database, testDatabaseOptions()) + fsm := newFileSystemManager(database, nil, testDatabaseOptions()) mgr := fsm.(*fileSystemManager) database.EXPECT().IsBootstrapped().Return(false) @@ -48,7 +48,7 @@ func TestFileSystemManagerShouldRunWhileRunning(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() database := newMockdatabase(ctrl) - fsm := newFileSystemManager(database, testDatabaseOptions()) + fsm := newFileSystemManager(database, nil, testDatabaseOptions()) mgr := fsm.(*fileSystemManager) database.EXPECT().IsBootstrapped().Return(true) require.True(t, mgr.shouldRunWithLock()) @@ -60,7 +60,7 @@ func TestFileSystemManagerShouldRunEnableDisable(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() database := newMockdatabase(ctrl) - fsm := newFileSystemManager(database, testDatabaseOptions()) + fsm := newFileSystemManager(database, nil, testDatabaseOptions()) mgr := fsm.(*fileSystemManager) database.EXPECT().IsBootstrapped().Return(true).AnyTimes() require.True(t, mgr.shouldRunWithLock()) @@ -78,7 +78,7 @@ func TestFileSystemManagerRun(t *testing.T) { fm := NewMockdatabaseFlushManager(ctrl) cm := NewMockdatabaseCleanupManager(ctrl) - fsm := newFileSystemManager(database, testDatabaseOptions()) + fsm := newFileSystemManager(database, nil, testDatabaseOptions()) mgr := fsm.(*fileSystemManager) mgr.databaseFlushManager = fm mgr.databaseCleanupManager = cm diff --git a/src/dbnode/storage/mediator.go b/src/dbnode/storage/mediator.go index d5b9f2bea1..0b58be1a36 100644 --- a/src/dbnode/storage/mediator.go +++ b/src/dbnode/storage/mediator.go @@ -26,6 +26,7 @@ import ( "time" "github.com/m3db/m3/src/dbnode/clock" + "github.com/m3db/m3/src/dbnode/persist/fs/commitlog" "github.com/uber-go/tally" ) @@ -79,7 +80,7 @@ type mediator struct { closedCh chan struct{} } -func newMediator(database database, opts Options) (databaseMediator, error) { +func newMediator(database database, commitlog commitlog.CommitLog, opts Options) (databaseMediator, error) { scope := opts.InstrumentOptions().MetricsScope() d := &mediator{ database: database, @@ -91,7 +92,7 @@ func newMediator(database database, opts Options) (databaseMediator, error) { closedCh: make(chan struct{}), } - fsm := newFileSystemManager(database, opts) + fsm := newFileSystemManager(database, commitlog, opts) d.databaseFileSystemManager = fsm d.databaseRepairer = newNoopDatabaseRepairer() diff --git a/src/dbnode/storage/mediator_test.go b/src/dbnode/storage/mediator_test.go index 83d484dbaa..4a61222e44 100644 --- a/src/dbnode/storage/mediator_test.go +++ b/src/dbnode/storage/mediator_test.go @@ -44,7 +44,7 @@ func TestDatabaseMediatorOpenClose(t *testing.T) { db.EXPECT().Options().Return(opts).AnyTimes() db.EXPECT().GetOwnedNamespaces().Return(nil, nil).AnyTimes() db.EXPECT().BootstrapState().Return(DatabaseBootstrapState{}).AnyTimes() - m, err := newMediator(db, opts) + m, err := newMediator(db, nil, opts) require.NoError(t, err) require.Equal(t, errMediatorNotOpen, m.Close()) @@ -70,7 +70,7 @@ func TestDatabaseMediatorDisableFileOps(t *testing.T) { db := NewMockdatabase(ctrl) db.EXPECT().Options().Return(opts).AnyTimes() - med, err := newMediator(db, opts) + med, err := newMediator(db, nil, opts) require.NoError(t, err) m := med.(*mediator)