Skip to content

Commit

Permalink
Addresss feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Richard Artoul committed Oct 23, 2018
1 parent e6e8f19 commit d5d1543
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 39 deletions.
124 changes: 86 additions & 38 deletions src/dbnode/persist/fs/commitlog/commit_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package commitlog

import (
"errors"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -56,9 +57,8 @@ type writeCommitLogFn func(
unit xtime.Unit,
annotation ts.Annotation,
) error
type commitLogFailFn func(err error)

type valueTypeFn func(f File, err error)
type commitLogFailFn func(err error)

type commitLog struct {
// The commitlog has two different locks that it maintains:
Expand All @@ -82,7 +82,7 @@ type commitLog struct {
closeErr chan error

writes chan commitLogWrite
pendingFlushFns []valueTypeFn
pendingFlushFns []callbackFn

opts Options
nowFn clock.NowFn
Expand Down Expand Up @@ -137,23 +137,49 @@ type commitLogMetrics struct {
flushDone tally.Counter
}

type valueType int
type eventType int

// nolint: varcheck, unused
const (
writeValueType valueType = iota
flushValueType
activeLogsValueType
writeEventType eventType = iota
flushEventType
activeLogsEventType
)

type callbackFn func(callbackResult)

type callbackResult struct {
eventType eventType
err error
activeLogs activeLogsCallbackResult
}

type activeLogsCallbackResult struct {
file *File
}

func (r callbackResult) activeLogsCallbackResult() (activeLogsCallbackResult, error) {
if r.eventType != activeLogsEventType {
return activeLogsCallbackResult{}, fmt.Errorf(
"wrong event type: expected %d but got %d",
activeLogsEventType, r.eventType)
}

if r.err != nil {
return activeLogsCallbackResult{}, nil
}

return r.activeLogs, nil
}

type commitLogWrite struct {
valueType valueType
eventType eventType

series Series
datapoint ts.Datapoint
unit xtime.Unit
annotation ts.Annotation
valueTypeFn valueTypeFn
series Series
datapoint ts.Datapoint
unit xtime.Unit
annotation ts.Annotation
callbackFn callbackFn
}

// NewCommitLog creates a new commit log
Expand Down Expand Up @@ -229,31 +255,44 @@ func (l *commitLog) Open() error {
}

func (l *commitLog) ActiveLogs() ([]File, error) {
l.closedState.Lock()
defer l.closedState.Unlock()
l.closedState.RLock()
defer l.closedState.RUnlock()

if l.closedState.closed {
return nil, errCommitLogClosed
}

var (
err error
file File
wg = sync.WaitGroup{}
err error
files []File
wg = sync.WaitGroup{}
)
wg.Add(1)

l.writes <- commitLogWrite{
valueType: activeLogsValueType,
valueTypeFn: func(f File, e error) {
err = e
file = f
wg.Done()
eventType: activeLogsEventType,
callbackFn: func(r callbackResult) {
defer wg.Done()

result, e := r.activeLogsCallbackResult()
if e != nil {
err = e
return
}

if result.file != nil {
files = append(files, *result.file)
}
},
}

wg.Wait()
return []File{file}, err

if err != nil {
return nil, err
}

return files, nil
}

func (l *commitLog) flushEvery(interval time.Duration) {
Expand Down Expand Up @@ -288,19 +327,19 @@ func (l *commitLog) flushEvery(interval time.Duration) {
return
}

l.writes <- commitLogWrite{valueType: flushValueType}
l.writes <- commitLogWrite{eventType: flushEventType}
l.closedState.RUnlock()
}
}

func (l *commitLog) write() {
for write := range l.writes {
// For writes requiring acks add to pending acks
if write.valueType == writeValueType && write.valueTypeFn != nil {
l.pendingFlushFns = append(l.pendingFlushFns, write.valueTypeFn)
if write.eventType == writeEventType && write.callbackFn != nil {
l.pendingFlushFns = append(l.pendingFlushFns, write.callbackFn)
}

if write.valueType == flushValueType {
if write.eventType == flushEventType {
// TODO(rartoul): This should probably be replaced with a call to Sync() as the expectation
// is that the commitlog will actually FSync the data at regular intervals, whereas Flush
// just ensures that the writers buffer flushes to the chunkWriter (creating a new chunk), but
Expand All @@ -310,8 +349,14 @@ func (l *commitLog) write() {
continue
}

if write.valueType == activeLogsValueType {
write.valueTypeFn(*l.writerState.activeFile, nil)
if write.eventType == activeLogsEventType {
write.callbackFn(callbackResult{
eventType: write.eventType,
err: nil,
activeLogs: activeLogsCallbackResult{
file: l.writerState.activeFile,
},
})
continue
}

Expand Down Expand Up @@ -375,7 +420,10 @@ func (l *commitLog) onFlush(err error) {
}

for i := range l.pendingFlushFns {
l.pendingFlushFns[i](File{}, err)
l.pendingFlushFns[i](callbackResult{
eventType: flushEventType,
err: err,
})
l.pendingFlushFns[i] = nil
}
l.pendingFlushFns = l.pendingFlushFns[:0]
Expand Down Expand Up @@ -442,17 +490,17 @@ func (l *commitLog) writeWait(

wg.Add(1)

completion := func(_ File, err error) {
result = err
completion := func(r callbackResult) {
result = r.err
wg.Done()
}

write := commitLogWrite{
series: series,
datapoint: datapoint,
unit: unit,
annotation: annotation,
valueTypeFn: completion,
series: series,
datapoint: datapoint,
unit: unit,
annotation: annotation,
callbackFn: completion,
}

enqueued := false
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/persist/fs/commitlog/commit_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func flushUntilDone(l *commitLog, wg *sync.WaitGroup) {
blockWg.Add(1)
go func() {
for atomic.LoadUint64(&done) == 0 {
l.writes <- commitLogWrite{valueType: flushValueType}
l.writes <- commitLogWrite{eventType: flushEventType}
time.Sleep(time.Millisecond)
}
blockWg.Done()
Expand Down

0 comments on commit d5d1543

Please sign in to comment.