-
Notifications
You must be signed in to change notification settings - Fork 453
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ActiveLogs() API to commitlog and use it in the CleanupManager #1090
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1090 +/- ##
========================================
- Coverage 71.4% 69.6% -1.8%
========================================
Files 726 715 -11
Lines 60571 60412 -159
========================================
- Hits 43261 42098 -1163
- Misses 14556 15641 +1085
+ Partials 2754 2673 -81
Continue to review full report at Codecov.
|
67379f7
to
920e57d
Compare
920e57d
to
227de8b
Compare
@@ -184,6 +189,21 @@ func (l *commitLog) Open() error { | |||
return nil | |||
} | |||
|
|||
func (l *commitLog) ActiveLogs() ([]File, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why have this function return []File
? It seems like you return at most one File
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@prateek Was talking about having support for multiple commit log files at some point
docs/m3db/architecture/engine.md
Outdated
@@ -181,7 +181,7 @@ The ticking process runs continously in the background and is responsible for a | |||
|
|||
#### Merging all encoders | |||
|
|||
M3TSZ is designed for compressing time series data in which each datapoint has a timestamp that is larger than the last encoded datapoint. For monitoring workloads this works very well because every subsequent datapoint is almost always larger than the previous one. However, real world systems are messy and occassionally out of order writes will be received. When this happens, M3DB will allocate a new encoder for the out of order datapoints. The multiple encoders need to be merged before flushing the data to disk, but to prevent huge memory spikes during the flushing process we continuously merge out of order encoders in the background. | |||
M3TSZ is designed for compressing time series data in which each datapoint has a timestamp that is larger than the last encoded datapoint. For monitoring workloads this works very well because every subsequent datapoint is almost always larger than the previous one. However, real world systems are messy and occasionally out of order writes will be received. When this happens, M3DB will allocate a new encoder for the out of order datapoints. The multiple encoders need to be merged before flushing the data to disk, but to prevent huge memory spikes during the flushing process we continuously merge out of order encoders in the background. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: do you want to say almost always chronologically after the previous one
instead of almost always larger than the previous one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ps can't tell what changed in this line
@@ -89,6 +88,15 @@ func TestDiskCleanup(t *testing.T) { | |||
// and commit logs at now will be deleted | |||
newNow := now.Add(retentionPeriod).Add(2 * blockSize) | |||
testSetup.setNowFn(newNow) | |||
// This isn't great, but right now the commitlog will only ever rotate when writes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice to see our tests actually catch this kinda thing
|
||
// We list the commit log files on disk before we determine what the currently active commitlog |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for explanation and example
for write := range l.writes { | ||
// For writes requiring acks add to pending acks | ||
if write.completionFn != nil { | ||
l.pendingFlushFns = append(l.pendingFlushFns, write.completionFn) | ||
l.flushState.pendingFlushFns = append(l.flushState.pendingFlushFns, write.completionFn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't this racy? i.e. you're modifying a field on flushState
without a write Lock on it. but you use a Read lock when accessing the same field on line 326.
metrics commitLogMetrics | ||
type closedState struct { | ||
sync.RWMutex | ||
closed bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you replace the closedState
with an atomic. It'll make a lot of the code simpler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We caught up, keeping lock will keep the code simpler as discussed.
@@ -61,30 +61,45 @@ type commitLogFailFn func(err error) | |||
type completionFn func(err error) | |||
|
|||
type commitLog struct { | |||
sync.RWMutex | |||
flushState flushState |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add unit test for some of the races
closeErr chan error | ||
|
||
// TODO(r): replace buffered channel with concurrent striped | ||
// circular buffer to avoid central write lock contention. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm you can probably remove this comment, I think we just need to encode in parallel rather than need a better buffer.
25a530f
to
4d409bf
Compare
@@ -184,6 +228,34 @@ func (l *commitLog) Open() error { | |||
return nil | |||
} | |||
|
|||
func (l *commitLog) ActiveLogs() ([]File, error) { | |||
l.closedState.Lock() | |||
defer l.closedState.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This just needs to be RLock() and RUnlock() yeah? Don't see it changing the closed state as far as I can tell?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
wg.Wait() | ||
return []File{file}, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to bother doing the if err != nil { return nil, err}; return []File{file}, nil
so you only return something if and only if err == nil
? Just slightly more idiomatic Go to do so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah thats fair, Its kind of messed up to return a non-empty slice in the error case
@@ -58,33 +58,72 @@ type writeCommitLogFn func( | |||
) error | |||
type commitLogFailFn func(err error) | |||
|
|||
type completionFn func(err error) | |||
type valueTypeFn func(f File, err error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For better reuse, which we'll need in the future, can we just rename this to callbackFn
?
And maybe the first arg it would be better to be some composite event struct, so we can add cleanly to it in the future:
func callbackFn func(result callbackResult)
type callbackResultType uint
const (
activeLogsCallback callbackResultType = iota
// ... more in the future
)
type callbackResult struct {
resultType callbackResultType
err error
activeLogs activeLogsCallbackResult // to be used if and only if eventType == activeLogsResultCallbackEventType
}
type activeLogsCallbackResult struct {
file *File
}
func (r callbackResult) activeLogsCallbackResult() (activeLogsCallbackResult, error) {
if expectedType := r.resultType; expectedType != activeLogsCallback {
return activeLogsCallbackResult{}, fmt.Errorf("wrong result type: expected=%d, actual=%d", expectedType, r.resultType)
}
if r.err != nil {
return activeLogsCallbackResult{}, err
}
return r.activeLogs, nil
}
// Now from calling code:
func foo() {
var (
result activeLogsCallbackResult
err error
)
writes <- commitLogWrite{
valueType: activeLogsValueType,
callbackFn: func(r callbackResult) {
result, err = r.activeLogsCallbackResult()
wg.Done()
},
}
wg.Wait()
if err != nil {
return nil, err
}
return result.file, nil
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I can make this change, the reason I didn't is that the Rotate() API will need pretty much the same result as the ActiveLogs one but this is fine too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resultType
seems a little overkill, I think I can just repurpose the valueType
for now to infer the result type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resultType
seems a little overkill, I think I can just repurpose the valueType
for now to infer the result type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// Flush will flush the contents to the disk, useful when first testing if first commit log is writable | ||
// Sync will ensure that all writes that have been issued to the writer have been | ||
// FSync'd to disk. | ||
Sync() error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps FlushAndSync()
considering that's what it's doing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to stop using the word Flush entirely in the public interface (because its a little non-sensical, in this case "flush" means "flushed to chunk writer" whereas all an external caller will care about is "FSync'd to disk")
Getting rid of the flush method is outside the scope of this P.R but I'd rather not put it in this method name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM once build passing
07f2180
to
3d65fce
Compare
This P.R allows us to distinguish between corrupt commitlogs and active commitlogs (which sometimes look corrupt because the header info hasn't been flushed yet), allowing us to delete corrupt commitlog files safely.