Skip to content

Commit

Permalink
lightning: support custom file listing policy (#38599)
Browse files Browse the repository at this point in the history
close #38598
  • Loading branch information
dsdashun authored Oct 30, 2022
1 parent 6d6e9c4 commit bc0b419
Showing 1 changed file with 77 additions and 38 deletions.
115 changes: 77 additions & 38 deletions br/pkg/lightning/mydump/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,16 @@ type MDLoaderSetupConfig struct {
// ReturnPartialResultOnError specifies whether the currently scanned files are analyzed,
// and return the partial result.
ReturnPartialResultOnError bool
// FileIter controls the file iteration policy when constructing a MDLoader.
FileIter FileIterator
}

// DefaultMDLoaderSetupConfig generates a default MDLoaderSetupConfig.
func DefaultMDLoaderSetupConfig() *MDLoaderSetupConfig {
return &MDLoaderSetupConfig{
MaxScanFiles: 0, // By default, the loader will scan all the files.
ReturnPartialResultOnError: false,
FileIter: nil,
}
}

Expand All @@ -156,6 +159,13 @@ func ReturnPartialResultOnError(supportPartialResult bool) MDLoaderSetupOption {
}
}

// WithFileIterator generates an option that specifies the file iteration policy.
func WithFileIterator(fileIter FileIterator) MDLoaderSetupOption {
return func(cfg *MDLoaderSetupConfig) {
cfg.FileIter = fileIter
}
}

// MDLoader is for 'Mydumper File Loader', which loads the files in the data source and generates a set of metadata.
type MDLoader struct {
store storage.ExternalStorage
Expand Down Expand Up @@ -202,6 +212,12 @@ func NewMyDumpLoaderWithStore(ctx context.Context, cfg *config.Config, store sto
for _, o := range opts {
o(mdLoaderSetupCfg)
}
if mdLoaderSetupCfg.FileIter == nil {
mdLoaderSetupCfg.FileIter = &allFileIterator{
store: store,
maxScanFiles: mdLoaderSetupCfg.MaxScanFiles,
}
}

if len(cfg.Routes) > 0 && len(cfg.Mydumper.FileRouters) > 0 {
return nil, common.ErrInvalidConfig.GenWithStack("table route is deprecated, can't config both [routes] and [mydumper.files]")
Expand Down Expand Up @@ -254,7 +270,7 @@ func NewMyDumpLoaderWithStore(ctx context.Context, cfg *config.Config, store sto
setupCfg: mdLoaderSetupCfg,
}

if err := setup.setup(ctx, mdl.store); err != nil {
if err := setup.setup(ctx); err != nil {
if mdLoaderSetupCfg.ReturnPartialResultOnError {
return mdl, errors.Trace(err)
}
Expand Down Expand Up @@ -312,15 +328,19 @@ type ExtendColumnData struct {
// Will sort tables by table size, this means that the big table is imported
// at the latest, which to avoid large table take a long time to import and block
// small table to release index worker.
func (s *mdLoaderSetup) setup(ctx context.Context, store storage.ExternalStorage) error {
func (s *mdLoaderSetup) setup(ctx context.Context) error {
/*
Mydumper file names format
db —— {db}-schema-create.sql
table —— {db}.{table}-schema.sql
sql —— {db}.{table}.{part}.sql / {db}.{table}.sql
*/
var gerr error
if err := s.listFiles(ctx, store); err != nil {
fileIter := s.setupCfg.FileIter
if fileIter == nil {
return errors.New("file iterator is not defined")
}
if err := fileIter.IterateFiles(ctx, s.constructFileInfo); err != nil {
if s.setupCfg.ReturnPartialResultOnError {
gerr = err
} else {
Expand Down Expand Up @@ -389,55 +409,74 @@ func (s *mdLoaderSetup) setup(ctx context.Context, store storage.ExternalStorage
return gerr
}

func (s *mdLoaderSetup) listFiles(ctx context.Context, store storage.ExternalStorage) error {
// FileHandler is the interface to handle the file give the path and size.
// It is mainly used in the `FileIterator` as parameters.
type FileHandler func(ctx context.Context, path string, size int64) error

// FileIterator is the interface to iterate files in a data source.
// Use this interface to customize the file iteration policy.
type FileIterator interface {
IterateFiles(ctx context.Context, hdl FileHandler) error
}

type allFileIterator struct {
store storage.ExternalStorage
maxScanFiles int
}

func (iter *allFileIterator) IterateFiles(ctx context.Context, hdl FileHandler) error {
// `filepath.Walk` yields the paths in a deterministic (lexicographical) order,
// meaning the file and chunk orders will be the same everytime it is called
// (as long as the source is immutable).
totalScannedFileCount := 0
err := store.WalkDir(ctx, &storage.WalkOption{}, func(path string, size int64) error {
logger := log.FromContext(ctx).With(zap.String("path", path))
err := iter.store.WalkDir(ctx, &storage.WalkOption{}, func(path string, size int64) error {
totalScannedFileCount++
if s.setupCfg.MaxScanFiles > 0 && totalScannedFileCount > s.setupCfg.MaxScanFiles {
if iter.maxScanFiles > 0 && totalScannedFileCount > iter.maxScanFiles {
return common.ErrTooManySourceFiles
}
res, err := s.loader.fileRouter.Route(filepath.ToSlash(path))
if err != nil {
return errors.Annotatef(err, "apply file routing on file '%s' failed", path)
}
if res == nil {
logger.Info("[loader] file is filtered by file router")
return nil
}

info := FileInfo{
TableName: filter.Table{Schema: res.Schema, Name: res.Name},
FileMeta: SourceFileMeta{Path: path, Type: res.Type, Compression: res.Compression, SortKey: res.Key, FileSize: size},
}
return hdl(ctx, path, size)
})

if s.loader.shouldSkip(&info.TableName) {
logger.Debug("[filter] ignoring table file")
return errors.Trace(err)
}

return nil
}
func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size int64) error {
logger := log.FromContext(ctx).With(zap.String("path", path))
res, err := s.loader.fileRouter.Route(filepath.ToSlash(path))
if err != nil {
return errors.Annotatef(err, "apply file routing on file '%s' failed", path)
}
if res == nil {
logger.Info("[loader] file is filtered by file router")
return nil
}

switch res.Type {
case SourceTypeSchemaSchema:
s.dbSchemas = append(s.dbSchemas, info)
case SourceTypeTableSchema:
s.tableSchemas = append(s.tableSchemas, info)
case SourceTypeViewSchema:
s.viewSchemas = append(s.viewSchemas, info)
case SourceTypeSQL, SourceTypeCSV, SourceTypeParquet:
s.tableDatas = append(s.tableDatas, info)
}
info := FileInfo{
TableName: filter.Table{Schema: res.Schema, Name: res.Name},
FileMeta: SourceFileMeta{Path: path, Type: res.Type, Compression: res.Compression, SortKey: res.Key, FileSize: size},
}

logger.Debug("file route result", zap.String("schema", res.Schema),
zap.String("table", res.Name), zap.Stringer("type", res.Type))
if s.loader.shouldSkip(&info.TableName) {
logger.Debug("[filter] ignoring table file")

return nil
})
}

return errors.Trace(err)
switch res.Type {
case SourceTypeSchemaSchema:
s.dbSchemas = append(s.dbSchemas, info)
case SourceTypeTableSchema:
s.tableSchemas = append(s.tableSchemas, info)
case SourceTypeViewSchema:
s.viewSchemas = append(s.viewSchemas, info)
case SourceTypeSQL, SourceTypeCSV, SourceTypeParquet:
s.tableDatas = append(s.tableDatas, info)
}

logger.Debug("file route result", zap.String("schema", res.Schema),
zap.String("table", res.Name), zap.Stringer("type", res.Type))

return nil
}

func (l *MDLoader) shouldSkip(table *filter.Table) bool {
Expand Down

0 comments on commit bc0b419

Please sign in to comment.