Skip to content

Commit

Permalink
Remove sync API
Browse files Browse the repository at this point in the history
  • Loading branch information
Richard Artoul committed Oct 23, 2018
1 parent e65f16e commit 3d65fce
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 39 deletions.
9 changes: 2 additions & 7 deletions src/dbnode/persist/fs/commitlog/commit_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (l *commitLog) Open() error {

// Sync the info header to ensure we can write to disk and make sure that we can at least
// read the info about the commitlog file later.
if err := l.writerState.writer.Sync(); err != nil {
if err := l.writerState.writer.Flush(true); err != nil {
return err
}

Expand Down Expand Up @@ -335,12 +335,7 @@ func (l *commitLog) flushEvery(interval time.Duration) {
func (l *commitLog) write() {
for write := range l.writes {
if write.eventType == flushEventType {
// TODO(rartoul): This should probably be replaced with a call to Sync() as the expectation
// is that the commitlog will actually FSync the data at regular intervals, whereas Flush
// just ensures that the writers buffer flushes to the chunkWriter (creating a new chunk), but
// does not guarantee that the O.S isn't still buffering the data. Leaving as is for now as making
// this change will require extensive benchmarking in production clusters.
l.writerState.writer.Flush()
l.writerState.writer.Flush(false)
continue
}

Expand Down
24 changes: 8 additions & 16 deletions src/dbnode/persist/fs/commitlog/commit_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,7 @@ func snapshotCounterValue(
type mockCommitLogWriter struct {
openFn func(start time.Time, duration time.Duration) (File, error)
writeFn func(Series, ts.Datapoint, xtime.Unit, ts.Annotation) error
flushFn func() error
syncFn func() error
flushFn func(sync bool) error
closeFn func() error
}

Expand All @@ -168,10 +167,7 @@ func newMockCommitLogWriter() *mockCommitLogWriter {
writeFn: func(Series, ts.Datapoint, xtime.Unit, ts.Annotation) error {
return nil
},
flushFn: func() error {
return nil
},
syncFn: func() error {
flushFn: func(sync bool) error {
return nil
},
closeFn: func() error {
Expand All @@ -193,12 +189,8 @@ func (w *mockCommitLogWriter) Write(
return w.writeFn(series, datapoint, unit, annotation)
}

func (w *mockCommitLogWriter) Flush() error {
return w.flushFn()
}

func (w *mockCommitLogWriter) Sync() error {
return w.syncFn()
func (w *mockCommitLogWriter) Flush(sync bool) error {
return w.flushFn(sync)
}

func (w *mockCommitLogWriter) Close() error {
Expand Down Expand Up @@ -696,7 +688,7 @@ func TestCommitLogFailOnWriteError(t *testing.T) {
return File{}, nil
}

writer.flushFn = func() error {
writer.flushFn = func(bool) error {
commitLog.onFlush(nil)
return nil
}
Expand Down Expand Up @@ -745,7 +737,7 @@ func TestCommitLogFailOnOpenError(t *testing.T) {
return File{}, nil
}

writer.flushFn = func() error {
writer.flushFn = func(bool) error {
commitLog.onFlush(nil)
return nil
}
Expand Down Expand Up @@ -796,7 +788,7 @@ func TestCommitLogFailOnFlushError(t *testing.T) {
writer := newMockCommitLogWriter()

var flushes int64
writer.flushFn = func() error {
writer.flushFn = func(bool) error {
if atomic.AddInt64(&flushes, 1) >= 2 {
commitLog.onFlush(fmt.Errorf("an error"))
} else {
Expand Down Expand Up @@ -843,7 +835,7 @@ func TestCommitLogActiveLogs(t *testing.T) {
commitLog := newTestCommitLog(t, opts)

writer := newMockCommitLogWriter()
writer.flushFn = func() error {
writer.flushFn = func(bool) error {
return nil
}
commitLog.newCommitLogWriterFn = func(
Expand Down
30 changes: 14 additions & 16 deletions src/dbnode/persist/fs/commitlog/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,9 @@ type commitLogWriter interface {
annotation ts.Annotation,
) error

// Sync will ensure that all writes that have been issued to the writer have been
// FSync'd to disk.
Sync() error

// Flush will flush any data in the writers buffer to the chunkWriter, essentially forcing
// a new chunk to be created. Only guarantees that the data is FSync'd to disk if the
// StrategyWriteWait is enabled.
// TODO(rartoul): Consider deleting this once we have time to do some performance benchmarking
// with using Sync instead.
Flush() error
// a new chunk to be created. Optionally forces the data to be FSync'd to disk.
Flush(sync bool) error

// Close the reader
Close() error
Expand Down Expand Up @@ -257,28 +250,33 @@ func (w *writer) Write(
return nil
}

func (w *writer) Sync() error {
if err := w.Flush(); err != nil {
func (w *writer) Flush(sync bool) error {
err := w.buffer.Flush()
if err != nil {
return err
}

if !sync {
return nil
}

return w.sync()
}

func (w *writer) sync() error {
if err := w.chunkWriter.sync(); err != nil {
return err
}

return nil
}

func (w *writer) Flush() error {
return w.buffer.Flush()
}

func (w *writer) Close() error {
if !w.isOpen() {
return nil
}

if err := w.Flush(); err != nil {
if err := w.Flush(true); err != nil {
return err
}
if err := w.chunkWriter.close(); err != nil {
Expand Down

0 comments on commit 3d65fce

Please sign in to comment.