Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ActiveLogs() API to commitlog and use it in the CleanupManager #1090

Merged
merged 39 commits into from
Oct 23, 2018

Conversation

richardartoul
Copy link
Contributor

@richardartoul richardartoul commented Oct 15, 2018

This P.R allows us to distinguish between corrupt commitlogs and active commitlogs (which sometimes look corrupt because the header info hasn't been flushed yet), allowing us to delete corrupt commitlog files safely.

@codecov
Copy link

codecov bot commented Oct 16, 2018

Codecov Report

Merging #1090 into master will decrease coverage by 1.7%.
The diff coverage is 82.5%.

Impacted file tree graph

@@           Coverage Diff            @@
##           master   #1090     +/-   ##
========================================
- Coverage    71.4%   69.6%   -1.8%     
========================================
  Files         726     715     -11     
  Lines       60571   60412    -159     
========================================
- Hits        43261   42098   -1163     
- Misses      14556   15641   +1085     
+ Partials     2754    2673     -81
Flag Coverage Δ
#aggregator 81.6% <ø> (ø) ⬆️
#cluster 84.8% <ø> (-1.3%) ⬇️
#collector 78.1% <ø> (ø) ⬆️
#dbnode 77.2% <82.5%> (-4%) ⬇️
#m3em 73.2% <ø> (ø) ⬆️
#m3ninx 71.2% <ø> (-4.2%) ⬇️
#m3nsch 51.1% <ø> (ø) ⬆️
#metrics 18.3% <ø> (ø) ⬆️
#msg 75.1% <ø> (-0.2%) ⬇️
#query 65.1% <ø> (+1.5%) ⬆️
#x 69.4% <ø> (-5.8%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 6c086cf...3af9c80. Read the comment docs.

@richardartoul richardartoul changed the title [WIP] - Add ActiveLogs() API to commitlog Add ActiveLogs() API to commitlog Oct 16, 2018
@richardartoul richardartoul changed the title Add ActiveLogs() API to commitlog [WIP] - Add ActiveLogs() API to commitlog Oct 16, 2018
@richardartoul richardartoul changed the title [WIP] - Add ActiveLogs() API to commitlog Add ActiveLogs() API to commitlog and expose it to the CleanupManager Oct 16, 2018
@richardartoul richardartoul force-pushed the ra/active-log branch 3 times, most recently from 67379f7 to 920e57d Compare October 16, 2018 22:09
@richardartoul richardartoul changed the title Add ActiveLogs() API to commitlog and expose it to the CleanupManager Add ActiveLogs() API to commitlog and use it in the CleanupManager Oct 16, 2018
@@ -184,6 +189,21 @@ func (l *commitLog) Open() error {
return nil
}

func (l *commitLog) ActiveLogs() ([]File, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have this function return []File? It seems like you return at most one File.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@prateek Was talking about having support for multiple commit log files at some point

@@ -181,7 +181,7 @@ The ticking process runs continously in the background and is responsible for a

#### Merging all encoders

M3TSZ is designed for compressing time series data in which each datapoint has a timestamp that is larger than the last encoded datapoint. For monitoring workloads this works very well because every subsequent datapoint is almost always larger than the previous one. However, real world systems are messy and occassionally out of order writes will be received. When this happens, M3DB will allocate a new encoder for the out of order datapoints. The multiple encoders need to be merged before flushing the data to disk, but to prevent huge memory spikes during the flushing process we continuously merge out of order encoders in the background.
M3TSZ is designed for compressing time series data in which each datapoint has a timestamp that is larger than the last encoded datapoint. For monitoring workloads this works very well because every subsequent datapoint is almost always larger than the previous one. However, real world systems are messy and occasionally out of order writes will be received. When this happens, M3DB will allocate a new encoder for the out of order datapoints. The multiple encoders need to be merged before flushing the data to disk, but to prevent huge memory spikes during the flushing process we continuously merge out of order encoders in the background.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do you want to say almost always chronologically after the previous one instead of almost always larger than the previous one

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ps can't tell what changed in this line

@@ -89,6 +88,15 @@ func TestDiskCleanup(t *testing.T) {
// and commit logs at now will be deleted
newNow := now.Add(retentionPeriod).Add(2 * blockSize)
testSetup.setNowFn(newNow)
// This isn't great, but right now the commitlog will only ever rotate when writes
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice to see our tests actually catch this kinda thing


// We list the commit log files on disk before we determine what the currently active commitlog
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for explanation and example

for write := range l.writes {
// For writes requiring acks add to pending acks
if write.completionFn != nil {
l.pendingFlushFns = append(l.pendingFlushFns, write.completionFn)
l.flushState.pendingFlushFns = append(l.flushState.pendingFlushFns, write.completionFn)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this racy? i.e. you're modifying a field on flushState without a write Lock on it. but you use a Read lock when accessing the same field on line 326.

metrics commitLogMetrics
type closedState struct {
sync.RWMutex
closed bool
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you replace the closedState with an atomic. It'll make a lot of the code simpler

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We caught up, keeping lock will keep the code simpler as discussed.

@@ -61,30 +61,45 @@ type commitLogFailFn func(err error)
type completionFn func(err error)

type commitLog struct {
sync.RWMutex
flushState flushState
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add unit test for some of the races

closeErr chan error

// TODO(r): replace buffered channel with concurrent striped
// circular buffer to avoid central write lock contention.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm you can probably remove this comment, I think we just need to encode in parallel rather than need a better buffer.

@@ -184,6 +228,34 @@ func (l *commitLog) Open() error {
return nil
}

func (l *commitLog) ActiveLogs() ([]File, error) {
l.closedState.Lock()
defer l.closedState.Unlock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This just needs to be RLock() and RUnlock() yeah? Don't see it changing the closed state as far as I can tell?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

wg.Wait()
return []File{file}, err
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to bother doing the if err != nil { return nil, err}; return []File{file}, nil so you only return something if and only if err == nil? Just slightly more idiomatic Go to do so.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah thats fair, Its kind of messed up to return a non-empty slice in the error case

@@ -58,33 +58,72 @@ type writeCommitLogFn func(
) error
type commitLogFailFn func(err error)

type completionFn func(err error)
type valueTypeFn func(f File, err error)
Copy link
Collaborator

@robskillington robskillington Oct 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For better reuse, which we'll need in the future, can we just rename this to callbackFn?

And maybe the first arg it would be better to be some composite event struct, so we can add cleanly to it in the future:

func callbackFn func(result callbackResult)

type callbackResultType uint

const (
  activeLogsCallback callbackResultType = iota
  // ... more in the future
)

type callbackResult struct {
  resultType callbackResultType
  err error
  activeLogs activeLogsCallbackResult // to be used if and only if eventType == activeLogsResultCallbackEventType
}

type activeLogsCallbackResult struct {
  file *File
}

func (r callbackResult) activeLogsCallbackResult() (activeLogsCallbackResult, error) {
  if expectedType := r.resultType; expectedType != activeLogsCallback {
    return activeLogsCallbackResult{}, fmt.Errorf("wrong result type: expected=%d, actual=%d", expectedType, r.resultType)
  }
  if r.err != nil {
    return activeLogsCallbackResult{}, err
  }
  return r.activeLogs, nil
}

// Now from calling code:
func foo() {
  var (
    result activeLogsCallbackResult
    err error
  )
  writes <- commitLogWrite{
    valueType: activeLogsValueType,
    callbackFn: func(r callbackResult) {
      result, err = r.activeLogsCallbackResult()
      wg.Done()
    },
  }
  wg.Wait()
  if err != nil {
    return nil, err
  }
  return result.file, nil
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I can make this change, the reason I didn't is that the Rotate() API will need pretty much the same result as the ActiveLogs one but this is fine too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resultType seems a little overkill, I think I can just repurpose the valueType for now to infer the result type

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resultType seems a little overkill, I think I can just repurpose the valueType for now to infer the result type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// Flush will flush the contents to the disk, useful when first testing if first commit log is writable
// Sync will ensure that all writes that have been issued to the writer have been
// FSync'd to disk.
Sync() error
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps FlushAndSync() considering that's what it's doing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to stop using the word Flush entirely in the public interface (because its a little non-sensical, in this case "flush" means "flushed to chunk writer" whereas all an external caller will care about is "FSync'd to disk")

Getting rid of the flush method is outside the scope of this P.R but I'd rather not put it in this method name

Copy link
Collaborator

@robskillington robskillington left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM once build passing

@richardartoul richardartoul merged commit a6b8c2a into master Oct 23, 2018
@justinjc justinjc deleted the ra/active-log branch October 30, 2018 20:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants