Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: make default file router support compression #38515

Merged
merged 8 commits into from
Oct 19, 2022
9 changes: 6 additions & 3 deletions br/pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func MakeTableRegions(
go func() {
defer wg.Done()
for info := range fileChan {
regions, sizes, err := makeSourceFileRegion(execCtx, meta, info, columns, cfg, ioWorkers, store)
regions, sizes, err := MakeSourceFileRegion(execCtx, meta, info, columns, cfg, ioWorkers, store)
select {
case resultChan <- fileRegionRes{info: info, regions: regions, sizes: sizes, err: err}:
case <-ctx.Done():
Expand Down Expand Up @@ -255,7 +255,8 @@ func MakeTableRegions(
return filesRegions, nil
}

func makeSourceFileRegion(
// MakeSourceFileRegion create a new source file region.
func MakeSourceFileRegion(
ctx context.Context,
meta *MDTableMeta,
fi FileInfo,
Expand Down Expand Up @@ -283,7 +284,9 @@ func makeSourceFileRegion(
// We increase the check threshold by 1/10 of the `max-region-size` because the source file size dumped by tools
// like dumpling might be slight exceed the threshold when it is equal `max-region-size`, so we can
// avoid split a lot of small chunks.
if isCsvFile && cfg.Mydumper.StrictFormat && dataFileSize > int64(cfg.Mydumper.MaxRegionSize+cfg.Mydumper.MaxRegionSize/largeCSVLowerThresholdRation) {
// If a csv file is compressed, we can't split it now because we can't get the exact size of a row.
if isCsvFile && cfg.Mydumper.StrictFormat && fi.FileMeta.Compression == CompressionNone &&
dataFileSize > int64(cfg.Mydumper.MaxRegionSize+cfg.Mydumper.MaxRegionSize/largeCSVLowerThresholdRation) {
_, regions, subFileSizes, err := SplitLargeFile(ctx, meta, cfg, fi, divisor, 0, ioWorkers, store)
return regions, subFileSizes, err
}
Expand Down
57 changes: 57 additions & 0 deletions br/pkg/lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,63 @@ func TestAllocateEngineIDs(t *testing.T) {
})
}

func TestMakeSourceFileRegion(t *testing.T) {
meta := &MDTableMeta{
DB: "csv",
Name: "large_csv_file",
}
cfg := &config.Config{
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
MaxRegionSize: 1,
CSV: config.CSVConfig{
Separator: ",",
Delimiter: "",
Header: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
},
StrictFormat: true,
Filter: []string{"*.*"},
},
}
filePath := "./csv/split_large_file.csv"
dataFileInfo, err := os.Stat(filePath)
require.NoError(t, err)
fileSize := dataFileInfo.Size()
fileInfo := FileInfo{FileMeta: SourceFileMeta{Path: filePath, Type: SourceTypeCSV, FileSize: fileSize}}
colCnt := 3
columns := []string{"a", "b", "c"}

ctx := context.Background()
ioWorkers := worker.NewPool(ctx, 4, "io")
store, err := storage.NewLocalStorage(".")
assert.NoError(t, err)

// test - no compression
fileInfo.FileMeta.Compression = CompressionNone
regions, _, err := MakeSourceFileRegion(ctx, meta, fileInfo, colCnt, cfg, ioWorkers, store)
assert.NoError(t, err)
offsets := [][]int64{{6, 12}, {12, 18}, {18, 24}, {24, 30}}
assert.Len(t, regions, len(offsets))
for i := range offsets {
assert.Equal(t, offsets[i][0], regions[i].Chunk.Offset)
assert.Equal(t, offsets[i][1], regions[i].Chunk.EndOffset)
assert.Equal(t, columns, regions[i].Chunk.Columns)
}

// test - gzip compression
fileInfo.FileMeta.Compression = CompressionGZ
regions, _, err = MakeSourceFileRegion(ctx, meta, fileInfo, colCnt, cfg, ioWorkers, store)
assert.NoError(t, err)
assert.Len(t, regions, 1)
assert.Equal(t, int64(0), regions[0].Chunk.Offset)
assert.Equal(t, fileInfo.FileMeta.FileSize, regions[0].Chunk.EndOffset)
assert.Len(t, regions[0].Chunk.Columns, 0)
}

func TestSplitLargeFile(t *testing.T) {
meta := &MDTableMeta{
DB: "csv",
Expand Down
31 changes: 18 additions & 13 deletions br/pkg/lightning/mydump/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ const (
CompressionZStd
// CompressionXZ is the compression type that uses XZ algorithm.
CompressionXZ
// CompressionLZO is the compression type that uses LZO algorithm.
CompressionLZO
// CompressionSnappy is the compression type that uses Snappy algorithm.
CompressionSnappy
)

func parseSourceType(t string) (SourceType, error) {
Expand Down Expand Up @@ -109,14 +113,18 @@ func (s SourceType) String() string {

func parseCompressionType(t string) (Compression, error) {
switch strings.ToLower(strings.TrimSpace(t)) {
case "gz":
case "gz", "gzip":
return CompressionGZ, nil
case "lz4":
return CompressionLZ4, nil
case "zstd":
return CompressionZStd, nil
case "xz":
return CompressionXZ, nil
case "lzo":
return CompressionLZO, nil
case "snappy":
return CompressionSnappy, nil
case "":
return CompressionNone, nil
default:
Expand All @@ -128,15 +136,15 @@ var expandVariablePattern = regexp.MustCompile(`\$(?:\$|[\pL\p{Nd}_]+|\{[\pL\p{N

var defaultFileRouteRules = []*config.FileRouteRule{
// ignore *-schema-trigger.sql, *-schema-post.sql files
{Pattern: `(?i).*(-schema-trigger|-schema-post)\.sql$`, Type: "ignore"},
// db schema create file pattern, matches files like '{schema}-schema-create.sql'
{Pattern: `(?i)^(?:[^/]*/)*([^/.]+)-schema-create\.sql$`, Schema: "$1", Table: "", Type: SchemaSchema, Unescape: true},
// table schema create file pattern, matches files like '{schema}.{table}-schema.sql'
{Pattern: `(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)-schema\.sql$`, Schema: "$1", Table: "$2", Type: TableSchema, Unescape: true},
// view schema create file pattern, matches files like '{schema}.{table}-schema-view.sql'
{Pattern: `(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)-schema-view\.sql$`, Schema: "$1", Table: "$2", Type: ViewSchema, Unescape: true},
// source file pattern, matches files like '{schema}.{table}.0001.{sql|csv}'
{Pattern: `(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)(?:\.([0-9]+))?\.(sql|csv|parquet)$`, Schema: "$1", Table: "$2", Type: "$4", Key: "$3", Unescape: true},
{Pattern: `(?i).*(-schema-trigger|-schema-post)\.sql(?:\.(\w*?))?$`, Type: "ignore"},
// db schema create file pattern, matches files like '{schema}-schema-create.sql[.{compress}]'
{Pattern: `(?i)^(?:[^/]*/)*([^/.]+)-schema-create\.sql(?:\.(\w*?))?$`, Schema: "$1", Table: "", Type: SchemaSchema, Compression: "$2", Unescape: true},
// table schema create file pattern, matches files like '{schema}.{table}-schema.sql[.{compress}]'
{Pattern: `(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)-schema\.sql(?:\.(\w*?))?$`, Schema: "$1", Table: "$2", Type: TableSchema, Compression: "$3", Unescape: true},
// view schema create file pattern, matches files like '{schema}.{table}-schema-view.sql[.{compress}]'
{Pattern: `(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)-schema-view\.sql(?:\.(\w*?))?$`, Schema: "$1", Table: "$2", Type: ViewSchema, Compression: "$3", Unescape: true},
// source file pattern, matches files like '{schema}.{table}.0001.{sql|csv}[.{compress}]'
{Pattern: `(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)(?:\.([0-9]+))?\.(sql|csv|parquet)(?:\.(\w+))?$`, Schema: "$1", Table: "$2", Type: "$4", Key: "$3", Compression: "$5", Unescape: true},
}

// FileRouter provides some operations to apply a rule to route file path to target schema/table
Expand Down Expand Up @@ -292,9 +300,6 @@ func (p regexRouterParser) Parse(r *config.FileRouteRule, logger log.Logger) (*R
if err != nil {
return err
}
if compression != CompressionNone {
return errors.New("Currently we don't support restore compressed source file yet")
}
result.Compression = compression
return nil
})
Expand Down
28 changes: 28 additions & 0 deletions br/pkg/lightning/mydump/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,34 @@ func TestRouteParser(t *testing.T) {
}
}

func TestDefaultRouter(t *testing.T) {
r, err := NewFileRouter(defaultFileRouteRules, log.L())
assert.NoError(t, err)

inputOutputMap := map[string][]string{
"a/test-schema-create.sql": {"test", "", "", "", SchemaSchema},
"test-schema-create.sql.gz": {"test", "", "", "gz", SchemaSchema},
"c/d/test.t-schema.sql": {"test", "t", "", "", TableSchema},
"test.t-schema.sql.lzo": {"test", "t", "", "lzo", TableSchema},
"/bc/dc/test.v1-schema-view.sql": {"test", "v1", "", "", ViewSchema},
"test.v1-schema-view.sql.snappy": {"test", "v1", "", "snappy", ViewSchema},
"my_schema.my_table.sql": {"my_schema", "my_table", "", "", "sql"},
"/test/123/my_schema.my_table.sql.gz": {"my_schema", "my_table", "", "gz", "sql"},
"my_dir/my_schema.my_table.csv.lzo": {"my_schema", "my_table", "", "lzo", "csv"},
"my_schema.my_table.0001.sql.snappy": {"my_schema", "my_table", "0001", "snappy", "sql"},
}
for path, fields := range inputOutputMap {
res, err := r.Route(path)
assert.NoError(t, err)
compress, e := parseCompressionType(fields[3])
assert.NoError(t, e)
ty, e := parseSourceType(fields[4])
assert.NoError(t, e)
exp := &RouteResult{filter.Table{Schema: fields[0], Name: fields[1]}, fields[2], compress, ty}
assert.Equal(t, exp, res)
}
}

func TestInvalidRouteRule(t *testing.T) {
rule := &config.FileRouteRule{}
rules := []*config.FileRouteRule{rule}
Expand Down