Skip to content

Commit

Permalink
lightning: introduce param to skip CSV header parsing (pingcap#41128)
Browse files Browse the repository at this point in the history
  • Loading branch information
dsdashun authored and blacktear23 committed Feb 15, 2023
1 parent 10c0c20 commit e9ed0c8
Show file tree
Hide file tree
Showing 12 changed files with 255 additions and 71 deletions.
32 changes: 17 additions & 15 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,14 +479,15 @@ type PostRestore struct {

type CSVConfig struct {
// Separator, Delimiter and Terminator should all be in utf8mb4 encoding.
Separator string `toml:"separator" json:"separator"`
Delimiter string `toml:"delimiter" json:"delimiter"`
Terminator string `toml:"terminator" json:"terminator"`
Null string `toml:"null" json:"null"`
Header bool `toml:"header" json:"header"`
TrimLastSep bool `toml:"trim-last-separator" json:"trim-last-separator"`
NotNull bool `toml:"not-null" json:"not-null"`
BackslashEscape bool `toml:"backslash-escape" json:"backslash-escape"`
Separator string `toml:"separator" json:"separator"`
Delimiter string `toml:"delimiter" json:"delimiter"`
Terminator string `toml:"terminator" json:"terminator"`
Null string `toml:"null" json:"null"`
Header bool `toml:"header" json:"header"`
HeaderSchemaMatch bool `toml:"header-schema-match" json:"header-schema-match"`
TrimLastSep bool `toml:"trim-last-separator" json:"trim-last-separator"`
NotNull bool `toml:"not-null" json:"not-null"`
BackslashEscape bool `toml:"backslash-escape" json:"backslash-escape"`
// hide these options for lightning configuration file, they can only be used by LOAD DATA
// https://dev.mysql.com/doc/refman/8.0/en/load-data.html#load-data-field-line-handling
StartingBy string `toml:"-" json:"-"`
Expand Down Expand Up @@ -743,13 +744,14 @@ func NewConfig() *Config {
Mydumper: MydumperRuntime{
ReadBlockSize: ReadBlockSize,
CSV: CSVConfig{
Separator: ",",
Delimiter: `"`,
Header: true,
NotNull: false,
Null: `\N`,
BackslashEscape: true,
TrimLastSep: false,
Separator: ",",
Delimiter: `"`,
Header: true,
HeaderSchemaMatch: true,
NotNull: false,
Null: `\N`,
BackslashEscape: true,
TrimLastSep: false,
},
StrictFormat: false,
MaxRegionSize: MaxRegionSize,
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/lightning/mydump/csv_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,9 @@ func (parser *CSVParser) ReadColumns() error {
if err != nil {
return errors.Trace(err)
}
if !parser.cfg.HeaderSchemaMatch {
return nil
}
parser.columns = make([]string, 0, len(columns))
for _, colName := range columns {
colName, _, err = parser.unescapeString(colName)
Expand Down
84 changes: 78 additions & 6 deletions br/pkg/lightning/mydump/csv_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,12 +481,13 @@ func TestSyntaxErrorCSV(t *testing.T) {

func TestTSV(t *testing.T) {
cfg := config.CSVConfig{
Separator: "\t",
Delimiter: "",
BackslashEscape: false,
NotNull: false,
Null: "",
Header: true,
Separator: "\t",
Delimiter: "",
BackslashEscape: false,
NotNull: false,
Null: "",
Header: true,
HeaderSchemaMatch: true,
}

parser, err := mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(`a b c d e f
Expand Down Expand Up @@ -577,6 +578,7 @@ func TestCsvWithWhiteSpaceLine(t *testing.T) {
require.Nil(t, parser.Close())

cfg.Header = true
cfg.HeaderSchemaMatch = true
data = " \r\na,b,c\r\n0,,abc\r\n"
parser, err = mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(data), int64(config.ReadBlockSize), ioWorkers, true, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -609,6 +611,7 @@ func TestEmpty(t *testing.T) {
// Try again with headers.

cfg.Header = true
cfg.HeaderSchemaMatch = true

parser, err = mydump.NewCSVParser(context.Background(), &cfg, mydump.NewStringReader(""), int64(config.ReadBlockSize), ioWorkers, true, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -1292,3 +1295,72 @@ func BenchmarkReadRowUsingEncodingCSV(b *testing.B) {
}
require.Equal(b, b.N, rowsCount)
}

func TestHeaderSchemaMatch(t *testing.T) {
cfg := config.MydumperRuntime{
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
},
}

inputData := `id,val1,val2,val3
1,111,aaa,1.0
2,222,bbb,2.0
3,333,ccc,3.0
4,444,ddd,4.0`

parsedDataPart := [][]types.Datum{
{types.NewStringDatum("1"), types.NewStringDatum("111"), types.NewStringDatum("aaa"), types.NewStringDatum("1.0")},
{types.NewStringDatum("2"), types.NewStringDatum("222"), types.NewStringDatum("bbb"), types.NewStringDatum("2.0")},
{types.NewStringDatum("3"), types.NewStringDatum("333"), types.NewStringDatum("ccc"), types.NewStringDatum("3.0")},
{types.NewStringDatum("4"), types.NewStringDatum("444"), types.NewStringDatum("ddd"), types.NewStringDatum("4.0")},
}

type testCase struct {
Header bool
HeaderSchemaMatch bool
ExpectedData [][]types.Datum
ExpectedColumns []string
}

for _, tc := range []testCase{
{
Header: true,
HeaderSchemaMatch: true,
ExpectedData: parsedDataPart,
ExpectedColumns: []string{"id", "val1", "val2", "val3"},
},
{
Header: true,
HeaderSchemaMatch: false,
ExpectedData: parsedDataPart,
ExpectedColumns: nil,
},
{
Header: false,
HeaderSchemaMatch: true,
ExpectedData: append([][]types.Datum{
{types.NewStringDatum("id"), types.NewStringDatum("val1"), types.NewStringDatum("val2"), types.NewStringDatum("val3")},
}, parsedDataPart...),
ExpectedColumns: nil,
},
} {
comment := fmt.Sprintf("header = %v, header-schema-match = %v", tc.Header, tc.HeaderSchemaMatch)
cfg.CSV.Header = tc.Header
cfg.CSV.HeaderSchemaMatch = tc.HeaderSchemaMatch
charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
assert.NoError(t, err)
parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(inputData), int64(config.ReadBlockSize), ioWorkers, tc.Header, charsetConvertor)
assert.NoError(t, err)
for i, row := range tc.ExpectedData {
comment := fmt.Sprintf("row = %d, header = %v, header-schema-match = %v", i+1, tc.Header, tc.HeaderSchemaMatch)
e := parser.ReadRow()
assert.NoErrorf(t, e, "row = %d, error = %s", i+1, errors.ErrorStack(e))
assert.Equal(t, int64(i)+1, parser.LastRow().RowID, comment)
assert.Equal(t, row, parser.LastRow().Row, comment)
}
assert.ErrorIsf(t, errors.Cause(parser.ReadRow()), io.EOF, comment)
assert.Equal(t, tc.ExpectedColumns, parser.Columns(), comment)
}
}
4 changes: 3 additions & 1 deletion br/pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,9 @@ func SplitLargeFile(
if err = parser.ReadColumns(); err != nil {
return 0, nil, nil, err
}
columns = parser.Columns()
if cfg.Mydumper.CSV.HeaderSchemaMatch {
columns = parser.Columns()
}
startOffset, _ = parser.Pos()
endOffset = startOffset + maxRegionSize
if endOffset > dataFile.FileMeta.FileSize {
Expand Down
75 changes: 40 additions & 35 deletions br/pkg/lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,14 @@ func TestMakeSourceFileRegion(t *testing.T) {
ReadBlockSize: config.ReadBlockSize,
MaxRegionSize: 1,
CSV: config.CSVConfig{
Separator: ",",
Delimiter: "",
Header: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
Separator: ",",
Delimiter: "",
Header: true,
HeaderSchemaMatch: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
},
StrictFormat: true,
Filter: []string{"*.*"},
Expand Down Expand Up @@ -230,13 +231,14 @@ func TestCompressedMakeSourceFileRegion(t *testing.T) {
ReadBlockSize: config.ReadBlockSize,
MaxRegionSize: 1,
CSV: config.CSVConfig{
Separator: ",",
Delimiter: "",
Header: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
Separator: ",",
Delimiter: "",
Header: true,
HeaderSchemaMatch: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
},
StrictFormat: true,
Filter: []string{"*.*"},
Expand Down Expand Up @@ -284,13 +286,14 @@ func TestSplitLargeFile(t *testing.T) {
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
CSV: config.CSVConfig{
Separator: ",",
Delimiter: "",
Header: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
Separator: ",",
Delimiter: "",
Header: true,
HeaderSchemaMatch: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
},
StrictFormat: true,
Filter: []string{"*.*"},
Expand Down Expand Up @@ -342,13 +345,14 @@ func TestSplitLargeFileNoNewLineAtEOF(t *testing.T) {
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
CSV: config.CSVConfig{
Separator: ",",
Delimiter: "",
Header: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
Separator: ",",
Delimiter: "",
Header: true,
HeaderSchemaMatch: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
},
StrictFormat: true,
Filter: []string{"*.*"},
Expand Down Expand Up @@ -447,13 +451,14 @@ func TestSplitLargeFileOnlyOneChunk(t *testing.T) {
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
CSV: config.CSVConfig{
Separator: ",",
Delimiter: "",
Header: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
Separator: ",",
Delimiter: "",
Header: true,
HeaderSchemaMatch: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
},
StrictFormat: true,
Filter: []string{"*.*"},
Expand Down
32 changes: 18 additions & 14 deletions br/pkg/lightning/restore/table_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ func (s *tableRestoreSuite) TestPopulateChunks() {

// set csv header to true, this will cause check columns fail
s.cfg.Mydumper.CSV.Header = true
s.cfg.Mydumper.CSV.HeaderSchemaMatch = true
s.cfg.Mydumper.StrictFormat = true
regionSize := s.cfg.Mydumper.MaxRegionSize
s.cfg.Mydumper.MaxRegionSize = 5
Expand Down Expand Up @@ -455,6 +456,7 @@ func (s *tableRestoreSuite) TestPopulateChunksCSVHeader() {
cfg.Mydumper.MaxRegionSize = 40

cfg.Mydumper.CSV.Header = true
cfg.Mydumper.CSV.HeaderSchemaMatch = true
cfg.Mydumper.StrictFormat = true
rc := &Controller{cfg: cfg, ioWorkers: worker.NewPool(context.Background(), 1, "io"), store: store}

Expand Down Expand Up @@ -2135,13 +2137,14 @@ func (s *tableRestoreSuite) TestSchemaIsValid() {
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
Header: ca.hasHeader,
NotNull: false,
Null: `\N`,
BackslashEscape: true,
TrimLastSep: false,
Separator: ",",
Delimiter: `"`,
Header: ca.hasHeader,
HeaderSchemaMatch: true,
NotNull: false,
Null: `\N`,
BackslashEscape: true,
TrimLastSep: false,
},
IgnoreColumns: ca.ignoreColumns,
},
Expand Down Expand Up @@ -2170,13 +2173,14 @@ func (s *tableRestoreSuite) TestGBKEncodedSchemaIsValid() {
DataCharacterSet: "gb18030",
DataInvalidCharReplace: string(utf8.RuneError),
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
Header: true,
NotNull: false,
Null: `\N`,
BackslashEscape: true,
TrimLastSep: false,
Separator: ",",
Delimiter: `"`,
Header: true,
HeaderSchemaMatch: true,
NotNull: false,
Null: `\N`,
BackslashEscape: true,
TrimLastSep: false,
},
IgnoreColumns: nil,
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE testtbl (
id INTEGER PRIMARY KEY,
val1 VARCHAR(40) NOT NULL,
INDEX `idx_val1` (`val1`)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
aaa,bbb
1,"aaa01"
2,"aaa01"
3,"aaa02"
4,"aaa02"
5,"aaa05"
9 changes: 9 additions & 0 deletions br/tests/lightning_config_skip_csv_header/err_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[lightning]
check-requirements=true

[mydumper.csv]
header = true
header-schema-match = true

[tikv-importer]
duplicate-resolution = 'remove'
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[lightning]
check-requirements=true

[mydumper.csv]
header = true

[tikv-importer]
duplicate-resolution = 'remove'
9 changes: 9 additions & 0 deletions br/tests/lightning_config_skip_csv_header/normal_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[lightning]
check-requirements=true

[mydumper.csv]
header = true
header-schema-match = false

[tikv-importer]
duplicate-resolution = 'remove'
Loading

0 comments on commit e9ed0c8

Please sign in to comment.