Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle commit log files with corrupt info headers during cleanup and bootstrap #1066

Merged
merged 47 commits into from
Oct 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
8cf4917
Handle commit log files with corrupt info headers during cleanup
Oct 10, 2018
949025c
Rename openError to fsError
Oct 10, 2018
9674341
Clarify comment
Oct 10, 2018
f0ad534
Fix typo
Oct 10, 2018
16827fe
Skip corrupt commit log files in the commitlog bootstrapper
Oct 10, 2018
a53a56d
modify commit log source prop test to test that it handle commit log …
Oct 10, 2018
cecfa02
wip
Oct 11, 2018
4eea9bd
wip
Oct 11, 2018
60ca533
wip
Oct 11, 2018
a274193
wi[
Oct 11, 2018
d870758
Refactor to return unfulfilled on error
Oct 11, 2018
3883221
Update test
Oct 11, 2018
f138227
update prop test
Oct 11, 2018
ad5d08b
remove prints
Oct 11, 2018
c14f51c
cleanup handling of errors
Oct 12, 2018
6515237
Refactor cleanup code and tests
Oct 12, 2018
e60e7e2
Fix flaky test
Oct 12, 2018
acf3b34
share logging code
Oct 12, 2018
cbf359e
more refactoring
Oct 12, 2018
422c423
Improve logic for determining if peer bootstrapper could provide any …
Oct 12, 2018
e829864
Fix tests
Oct 12, 2018
be684ff
More refactoring
Oct 12, 2018
0fd6c9d
Fix broken tests
Oct 12, 2018
f57e71b
fix test
Oct 12, 2018
9883df1
Fix broken test
Oct 12, 2018
d4aacfe
Add metrics for cleanup commitlogs
Oct 12, 2018
c3de970
Fix compulation error
Oct 12, 2018
c976906
Add metrics for commitlog bootstrapper
Oct 12, 2018
ce9beca
Fix imports
Oct 12, 2018
02ce8e4
More tweaks
Oct 12, 2018
07284d5
Fix broken test
Oct 12, 2018
979ecf7
Fix broken test
Oct 12, 2018
d5a159b
Disable deletion of corrupt commit log files temporarily
Oct 12, 2018
18e405e
Move stuff to bottom of file
Oct 15, 2018
7a94eaf
Refactor files interface
Oct 15, 2018
d5e10e6
Refactor code to use new interface
Oct 15, 2018
48b671a
Fix tests
Oct 15, 2018
c300a20
Add function comment
Oct 15, 2018
a380187
emit gauges in loop
Oct 15, 2018
d490cde
simplify couldObtainDataFromPeers logic
Oct 15, 2018
6e167e2
improve logic and comment
Oct 15, 2018
9a6849c
mark prop test as large
Oct 15, 2018
fa4b0c3
Address feedback
Oct 16, 2018
926308c
Fix subscopes
Oct 16, 2018
656df6e
Fix import order
Oct 16, 2018
da67bf7
fix flaky test
Oct 16, 2018
b8020fe
Fix broken metric
Oct 16, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/cmd/services/m3dbnode/config/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ type BootstrapConfiguration struct {
// Peers bootstrapper configuration.
Peers *BootstrapPeersConfiguration `yaml:"peers"`

// Commitlog bootstrapper configuration.
Commitlog *BootstrapCommitlogConfiguration `yaml:"commitlog"`

// CacheSeriesMetadata determines whether individual bootstrappers cache
// series metadata across all calls (namespaces / shards / blocks).
CacheSeriesMetadata *bool `yaml:"cacheSeriesMetadata"`
Expand Down Expand Up @@ -91,6 +94,17 @@ type BootstrapPeersConfiguration struct {
FetchBlocksMetadataEndpointVersion client.FetchBlocksMetadataEndpointVersion `yaml:"fetchBlocksMetadataEndpointVersion"`
}

// BootstrapCommitlogConfiguration specifies config for the commitlog bootstrapper.
type BootstrapCommitlogConfiguration struct {
// ReturnUnfulfilledForCorruptCommitlogFiles controls whether the commitlog bootstrapper
// will return unfulfilled for all shard time ranges when it encounters a corrupt commit
// file. Note that regardless of this value, the commitlog bootstrapper will still try and
// read all the uncorrupted commitlog files and return as much data as it can, but setting
// this to true allows the node to attempt a repair if the peers bootstrapper is configured
// after the commitlog bootstrapper.
ReturnUnfulfilledForCorruptCommitlogFiles bool `yaml:"returnUnfulfilledForCorruptCommitlogFiles"`
}

// New creates a bootstrap process based on the bootstrap configuration.
func (bsc BootstrapConfiguration) New(
opts storage.Options,
Expand Down Expand Up @@ -140,7 +154,8 @@ func (bsc BootstrapConfiguration) New(
case commitlog.CommitLogBootstrapperName:
cOpts := commitlog.NewOptions().
SetResultOptions(rsOpts).
SetCommitLogOptions(opts.CommitLogOptions())
SetCommitLogOptions(opts.CommitLogOptions()).
SetRuntimeOptionsManager(opts.RuntimeOptionsManager())

inspection, err := fs.InspectFilesystem(fsOpts)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ db:
fs:
numProcessorsPerCPU: 0.125
peers: null
commitlog: null
cacheSeriesMetadata: null
blockRetrieve: null
cache:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/m3db/m3/src/dbnode/integration/generate"
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/dbnode/runtime"
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
bcl "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/commitlog"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
Expand Down Expand Up @@ -108,7 +109,8 @@ func TestBootstrapAfterBufferRotation(t *testing.T) {
bootstrapOpts := newDefaulTestResultOptions(setup.storageOpts)
bootstrapCommitlogOpts := bcl.NewOptions().
SetResultOptions(bootstrapOpts).
SetCommitLogOptions(commitLogOpts)
SetCommitLogOptions(commitLogOpts).
SetRuntimeOptionsManager(runtime.NewOptionsManager())
fsOpts := setup.storageOpts.CommitLogOptions().FilesystemOptions()
commitlogBootstrapperProvider, err := bcl.NewCommitLogBootstrapperProvider(
bootstrapCommitlogOpts, mustInspectFilesystem(fsOpts), nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/m3db/m3/src/dbnode/integration/generate"
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/dbnode/runtime"
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
bcl "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/commitlog"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
Expand Down Expand Up @@ -122,7 +123,8 @@ func TestBootstrapBeforeBufferRotationNoTick(t *testing.T) {
bootstrapOpts := newDefaulTestResultOptions(setup.storageOpts)
bootstrapCommitlogOpts := bcl.NewOptions().
SetResultOptions(bootstrapOpts).
SetCommitLogOptions(commitLogOpts)
SetCommitLogOptions(commitLogOpts).
SetRuntimeOptionsManager(runtime.NewOptionsManager())
fsOpts := setup.storageOpts.CommitLogOptions().FilesystemOptions()
commitlogBootstrapperProvider, err := bcl.NewCommitLogBootstrapperProvider(
bootstrapCommitlogOpts, mustInspectFilesystem(fsOpts), nil)
Expand Down
4 changes: 3 additions & 1 deletion src/dbnode/integration/bootstrap_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/persist/fs/commitlog"
"github.com/m3db/m3/src/dbnode/runtime"
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper"
bcl "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/commitlog"
Expand Down Expand Up @@ -173,7 +174,8 @@ func setupCommitLogBootstrapperWithFSInspection(
bsOpts := newDefaulTestResultOptions(setup.storageOpts)
bclOpts := bcl.NewOptions().
SetResultOptions(bsOpts).
SetCommitLogOptions(commitLogOpts)
SetCommitLogOptions(commitLogOpts).
SetRuntimeOptionsManager(runtime.NewOptionsManager())
fsOpts := setup.storageOpts.CommitLogOptions().FilesystemOptions()
bs, err := bcl.NewCommitLogBootstrapperProvider(
bclOpts, mustInspectFilesystem(fsOpts), noOpAll)
Expand Down
4 changes: 3 additions & 1 deletion src/dbnode/integration/commitlog_bootstrap_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/m3db/m3/src/dbnode/integration/generate"
persistfs "github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/dbnode/runtime"
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper"
bcl "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/commitlog"
Expand Down Expand Up @@ -124,7 +125,8 @@ func TestCommitLogAndFSMergeBootstrap(t *testing.T) {
bsOpts := newDefaulTestResultOptions(setup.storageOpts)
bclOpts := bcl.NewOptions().
SetResultOptions(bsOpts).
SetCommitLogOptions(commitLogOpts)
SetCommitLogOptions(commitLogOpts).
SetRuntimeOptionsManager(runtime.NewOptionsManager())
fsOpts := setup.storageOpts.CommitLogOptions().FilesystemOptions()

commitLogBootstrapper, err := bcl.NewCommitLogBootstrapperProvider(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/m3db/m3/src/dbnode/integration/generate"
persistfs "github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/dbnode/runtime"
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper"
bcl "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/commitlog"
Expand Down Expand Up @@ -225,7 +226,8 @@ func setCommitLogAndFilesystemBootstrapper(t *testing.T, opts testOptions, setup
bsOpts := newDefaulTestResultOptions(setup.storageOpts)
bclOpts := bcl.NewOptions().
SetResultOptions(bsOpts).
SetCommitLogOptions(commitLogOpts)
SetCommitLogOptions(commitLogOpts).
SetRuntimeOptionsManager(runtime.NewOptionsManager())

commitLogBootstrapper, err := bcl.NewCommitLogBootstrapperProvider(
bclOpts, mustInspectFilesystem(fsOpts), noOpAll)
Expand Down
11 changes: 8 additions & 3 deletions src/dbnode/persist/fs/commitlog/commit_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,9 @@ func assertCommitLogWritesByIterating(t *testing.T, l *commitLog, writes []testW
FileFilterPredicate: ReadAllPredicate(),
SeriesFilterPredicate: ReadAllSeriesPredicate(),
}
iter, err := NewIterator(iterOpts)
iter, corruptFiles, err := NewIterator(iterOpts)
require.NoError(t, err)
require.Equal(t, 0, len(corruptFiles))
defer iter.Close()

// Convert the writes to be in-order, but keyed by series ID because the
Expand Down Expand Up @@ -425,8 +426,10 @@ func TestReadCommitLogMissingMetadata(t *testing.T) {
FileFilterPredicate: ReadAllPredicate(),
SeriesFilterPredicate: ReadAllSeriesPredicate(),
}
iter, err := NewIterator(iterOpts)
iter, corruptFiles, err := NewIterator(iterOpts)
require.NoError(t, err)
require.Equal(t, 0, len(corruptFiles))

for iter.Next() {
require.NoError(t, iter.Err())
}
Expand Down Expand Up @@ -526,8 +529,10 @@ func TestCommitLogIteratorUsesPredicateFilter(t *testing.T) {
FileFilterPredicate: commitLogPredicate,
SeriesFilterPredicate: ReadAllSeriesPredicate(),
}
iter, err := NewIterator(iterOpts)
iter, corruptFiles, err := NewIterator(iterOpts)
require.NoError(t, err)
require.Equal(t, 0, len(corruptFiles))

iterStruct := iter.(*iterator)
require.True(t, len(iterStruct.files) == 2)
}
Expand Down
63 changes: 57 additions & 6 deletions src/dbnode/persist/fs/commitlog/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func ReadLogInfo(filePath string, opts Options) (time.Time, time.Duration, int64

fd, err = os.Open(filePath)
if err != nil {
return time.Time{}, 0, 0, err
return time.Time{}, 0, 0, fsError{err}
}

chunkReader := newChunkReader(opts.FlushSize())
Expand All @@ -72,27 +72,44 @@ func ReadLogInfo(filePath string, opts Options) (time.Time, time.Duration, int64
err = fd.Close()
fd = nil
if err != nil {
return time.Time{}, 0, 0, err
return time.Time{}, 0, 0, fsError{err}
}

return time.Unix(0, logInfo.Start), time.Duration(logInfo.Duration), logInfo.Index, decoderErr
}

// Files returns a slice of all available commit log files on disk along with
// their associated metadata.
func Files(opts Options) ([]File, error) {
func Files(opts Options) ([]File, []ErrorWithPath, error) {
commitLogsDir := fs.CommitLogsDirPath(
opts.FilesystemOptions().FilePathPrefix())
filePaths, err := fs.SortedCommitLogFiles(commitLogsDir)
if err != nil {
return nil, err
return nil, nil, err
}

commitLogFiles := make([]File, 0, len(filePaths))
errorsWithPath := make([]ErrorWithPath, 0)
for _, filePath := range filePaths {
file := File{
FilePath: filePath,
}

start, duration, index, err := ReadLogInfo(filePath, opts)
if _, ok := err.(fsError); ok {
return nil, nil, err
}

if err != nil {
return nil, err
errorsWithPath = append(errorsWithPath, NewErrorWithPath(
err, filePath))
continue
}

if err == nil {
file.Start = start
file.Duration = duration
file.Index = index
}

commitLogFiles = append(commitLogFiles, File{
Expand All @@ -104,8 +121,42 @@ func Files(opts Options) ([]File, error) {
}

sort.Slice(commitLogFiles, func(i, j int) bool {
// Sorting is best effort here since we may not know the start.
Copy link
Collaborator

@prateek prateek Oct 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why sort at all? and if you're going to sort - you can sort errors first (or last) and then guarantee order too.

or better yet change the return type for this function: ([]File, []ErrorWithPath, error) and then won't need the FileOrError type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorting is important because you want to try and read the commit log files in order in the bootstrapper

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your interface change suggestion is good though, just made it

return commitLogFiles[i].Start.Before(commitLogFiles[j].Start)
})

return commitLogFiles, nil
return commitLogFiles, errorsWithPath, nil
}

// ErrorWithPath is an error that includes the path of the file that
// had the error.
type ErrorWithPath struct {
err error
path string
}

// Error returns the error.
func (e ErrorWithPath) Error() string {
return e.err.Error()
}

// Path returns the path of hte file that the error is associated with.
func (e ErrorWithPath) Path() string {
return e.path
}

// NewErrorWithPath creates a new ErrorWithPath.
func NewErrorWithPath(err error, path string) ErrorWithPath {
return ErrorWithPath{
err: err,
path: path,
}
}

type fsError struct {
err error
}

func (e fsError) Error() string {
return e.err.Error()
}
32 changes: 16 additions & 16 deletions src/dbnode/persist/fs/commitlog/files_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,24 @@ import (
)

func TestFiles(t *testing.T) {
// TODO(r): Find some time/people to help investigate this flakey test.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol is this no longer flaky?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I fixed it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

t.Skip()

dir, err := ioutil.TempDir("", "commitlogs")
require.NoError(t, err)
defer os.RemoveAll(dir)

createTestCommitLogFiles(t, dir, 10*time.Minute, 5)

opts := NewOptions()
var (
minNumBlocks = 5
opts = NewOptions()
)
opts = opts.SetFilesystemOptions(
opts.FilesystemOptions().
SetFilePathPrefix(dir),
)
files, err := Files(opts)
files, corruptFiles, err := Files(opts)
require.NoError(t, err)
require.Equal(t, 5, len(files))
require.True(t, len(corruptFiles) == 0)
require.True(t, len(files) >= minNumBlocks)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah is this the fix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep


// Make sure its sorted
var lastFileStart time.Time
Expand All @@ -71,12 +72,12 @@ func TestFiles(t *testing.T) {
}
}

// createTestCommitLogFiles creates the specified number of commit log files
// on disk with the appropriate block size. Commit log files will be valid
// and contain readable metadata.
// createTestCommitLogFiles creates at least the specified number of commit log files
// on disk with the appropriate block size. Commit log files will be valid and contain
// readable metadata.
func createTestCommitLogFiles(
t *testing.T, filePathPrefix string, blockSize time.Duration, numBlocks int) {
require.True(t, numBlocks >= 2)
t *testing.T, filePathPrefix string, blockSize time.Duration, minNumBlocks int) {
require.True(t, minNumBlocks >= 2)

var (
nowLock = sync.RWMutex{}
Expand Down Expand Up @@ -109,13 +110,12 @@ func createTestCommitLogFiles(
}
// Commit log writer is asynchronous and performs batching so getting the exact number
// of files that we want is tricky. The implementation below loops infinitely, writing
// a single datapoint and increasing the time after each iteration until numBlocks -1
// files are on disk. After that, it terminates, and the final batch flush from calling
// commitlog.Close() will generate the last file.
// a single datapoint and increasing the time after each iteration until minNumBlocks
// files are on disk.
for {
files, err := fs.SortedCommitLogFiles(commitLogsDir)
require.NoError(t, err)
if len(files) == numBlocks-1 {
if len(files) >= minNumBlocks {
break
}
err = commitLog.Write(context.NewContext(), series, ts.Datapoint{}, xtime.Second, nil)
Expand All @@ -126,5 +126,5 @@ func createTestCommitLogFiles(
require.NoError(t, commitLog.Close())
files, err := fs.SortedCommitLogFiles(commitLogsDir)
require.NoError(t, err)
require.Equal(t, numBlocks, len(files))
require.True(t, len(files) >= minNumBlocks)
}
14 changes: 7 additions & 7 deletions src/dbnode/persist/fs/commitlog/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ func ReadAllPredicate() FileFilterPredicate {
}

// NewIterator creates a new commit log iterator
func NewIterator(iterOpts IteratorOpts) (Iterator, error) {
func NewIterator(iterOpts IteratorOpts) (iter Iterator, corruptFiles []ErrorWithPath, err error) {
opts := iterOpts.CommitLogOptions
iops := opts.InstrumentOptions()
iops = iops.SetMetricsScope(iops.MetricsScope().SubScope("iterator"))

files, err := Files(opts)
files, corruptFiles, err := Files(opts)
if err != nil {
return nil, err
return nil, nil, err
}
filteredFiles := filterFiles(opts, files, iterOpts.FileFilterPredicate)

Expand All @@ -90,7 +90,7 @@ func NewIterator(iterOpts IteratorOpts) (Iterator, error) {
log: iops.Logger(),
files: filteredFiles,
seriesPred: iterOpts.SeriesFilterPredicate,
}, nil
}, corruptFiles, nil
}

func (i *iterator) Next() bool {
Expand Down Expand Up @@ -191,13 +191,13 @@ func (i *iterator) nextReader() bool {
}

func filterFiles(opts Options, files []File, predicate FileFilterPredicate) []File {
filteredFiles := make([]File, 0, len(files))
filtered := make([]File, 0, len(files))
for _, f := range files {
if predicate(f) {
filteredFiles = append(filteredFiles, f)
filtered = append(filtered, f)
}
}
return filteredFiles
return filtered
}

func (i *iterator) closeAndResetReader() error {
Expand Down
Loading