diff --git a/src/dbnode/persist/fs/commitlog/commit_log.go b/src/dbnode/persist/fs/commitlog/commit_log.go index a70326d0c5..2e95b70c42 100644 --- a/src/dbnode/persist/fs/commitlog/commit_log.go +++ b/src/dbnode/persist/fs/commitlog/commit_log.go @@ -22,6 +22,7 @@ package commitlog import ( "errors" + "fmt" "sync" "time" @@ -56,9 +57,8 @@ type writeCommitLogFn func( unit xtime.Unit, annotation ts.Annotation, ) error -type commitLogFailFn func(err error) -type valueTypeFn func(f File, err error) +type commitLogFailFn func(err error) type commitLog struct { // The commitlog has two different locks that it maintains: @@ -82,7 +82,7 @@ type commitLog struct { closeErr chan error writes chan commitLogWrite - pendingFlushFns []valueTypeFn + pendingFlushFns []callbackFn opts Options nowFn clock.NowFn @@ -137,23 +137,49 @@ type commitLogMetrics struct { flushDone tally.Counter } -type valueType int +type eventType int // nolint: varcheck, unused const ( - writeValueType valueType = iota - flushValueType - activeLogsValueType + writeEventType eventType = iota + flushEventType + activeLogsEventType ) +type callbackFn func(callbackResult) + +type callbackResult struct { + eventType eventType + err error + activeLogs activeLogsCallbackResult +} + +type activeLogsCallbackResult struct { + file *File +} + +func (r callbackResult) activeLogsCallbackResult() (activeLogsCallbackResult, error) { + if r.eventType != activeLogsEventType { + return activeLogsCallbackResult{}, fmt.Errorf( + "wrong event type: expected %d but got %d", + activeLogsEventType, r.eventType) + } + + if r.err != nil { + return activeLogsCallbackResult{}, nil + } + + return r.activeLogs, nil +} + type commitLogWrite struct { - valueType valueType + eventType eventType - series Series - datapoint ts.Datapoint - unit xtime.Unit - annotation ts.Annotation - valueTypeFn valueTypeFn + series Series + datapoint ts.Datapoint + unit xtime.Unit + annotation ts.Annotation + callbackFn callbackFn } // NewCommitLog creates a new commit log @@ -229,31 +255,44 @@ func (l *commitLog) Open() error { } func (l *commitLog) ActiveLogs() ([]File, error) { - l.closedState.Lock() - defer l.closedState.Unlock() + l.closedState.RLock() + defer l.closedState.RUnlock() if l.closedState.closed { return nil, errCommitLogClosed } var ( - err error - file File - wg = sync.WaitGroup{} + err error + files []File + wg = sync.WaitGroup{} ) wg.Add(1) l.writes <- commitLogWrite{ - valueType: activeLogsValueType, - valueTypeFn: func(f File, e error) { - err = e - file = f - wg.Done() + eventType: activeLogsEventType, + callbackFn: func(r callbackResult) { + defer wg.Done() + + result, e := r.activeLogsCallbackResult() + if e != nil { + err = e + return + } + + if result.file != nil { + files = append(files, *result.file) + } }, } wg.Wait() - return []File{file}, err + + if err != nil { + return nil, err + } + + return files, nil } func (l *commitLog) flushEvery(interval time.Duration) { @@ -288,7 +327,7 @@ func (l *commitLog) flushEvery(interval time.Duration) { return } - l.writes <- commitLogWrite{valueType: flushValueType} + l.writes <- commitLogWrite{eventType: flushEventType} l.closedState.RUnlock() } } @@ -296,11 +335,11 @@ func (l *commitLog) flushEvery(interval time.Duration) { func (l *commitLog) write() { for write := range l.writes { // For writes requiring acks add to pending acks - if write.valueType == writeValueType && write.valueTypeFn != nil { - l.pendingFlushFns = append(l.pendingFlushFns, write.valueTypeFn) + if write.eventType == writeEventType && write.callbackFn != nil { + l.pendingFlushFns = append(l.pendingFlushFns, write.callbackFn) } - if write.valueType == flushValueType { + if write.eventType == flushEventType { // TODO(rartoul): This should probably be replaced with a call to Sync() as the expectation // is that the commitlog will actually FSync the data at regular intervals, whereas Flush // just ensures that the writers buffer flushes to the chunkWriter (creating a new chunk), but @@ -310,8 +349,14 @@ func (l *commitLog) write() { continue } - if write.valueType == activeLogsValueType { - write.valueTypeFn(*l.writerState.activeFile, nil) + if write.eventType == activeLogsEventType { + write.callbackFn(callbackResult{ + eventType: write.eventType, + err: nil, + activeLogs: activeLogsCallbackResult{ + file: l.writerState.activeFile, + }, + }) continue } @@ -375,7 +420,10 @@ func (l *commitLog) onFlush(err error) { } for i := range l.pendingFlushFns { - l.pendingFlushFns[i](File{}, err) + l.pendingFlushFns[i](callbackResult{ + eventType: flushEventType, + err: err, + }) l.pendingFlushFns[i] = nil } l.pendingFlushFns = l.pendingFlushFns[:0] @@ -442,17 +490,17 @@ func (l *commitLog) writeWait( wg.Add(1) - completion := func(_ File, err error) { - result = err + completion := func(r callbackResult) { + result = r.err wg.Done() } write := commitLogWrite{ - series: series, - datapoint: datapoint, - unit: unit, - annotation: annotation, - valueTypeFn: completion, + series: series, + datapoint: datapoint, + unit: unit, + annotation: annotation, + callbackFn: completion, } enqueued := false diff --git a/src/dbnode/persist/fs/commitlog/commit_log_test.go b/src/dbnode/persist/fs/commitlog/commit_log_test.go index fa8052b861..3a2607ca42 100644 --- a/src/dbnode/persist/fs/commitlog/commit_log_test.go +++ b/src/dbnode/persist/fs/commitlog/commit_log_test.go @@ -285,7 +285,7 @@ func flushUntilDone(l *commitLog, wg *sync.WaitGroup) { blockWg.Add(1) go func() { for atomic.LoadUint64(&done) == 0 { - l.writes <- commitLogWrite{valueType: flushValueType} + l.writes <- commitLogWrite{eventType: flushEventType} time.Sleep(time.Millisecond) } blockWg.Done()