From 5b9d96b9486a070a79279252d94ce0bc45f2e0bc Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 28 Nov 2022 18:58:01 +0800 Subject: [PATCH] lightning: support compress for lightning, add compress unit and integration tests (#39153) ref pingcap/tidb#38514 --- br/pkg/lightning/mydump/reader.go | 7 + br/pkg/lightning/mydump/reader_test.go | 26 +++ br/pkg/lightning/mydump/region.go | 27 ++- br/pkg/lightning/mydump/region_test.go | 2 +- br/pkg/lightning/mydump/router.go | 5 +- br/pkg/lightning/mydump/router_test.go | 18 ++ br/pkg/lightning/restore/get_pre_info.go | 18 +- br/pkg/lightning/restore/get_pre_info_test.go | 209 ++++++++++++++++++ br/pkg/lightning/restore/mock/mock.go | 20 +- br/pkg/lightning/restore/restore.go | 35 +-- br/tests/lightning_compress/config.toml | 18 ++ .../data.gzip/compress-schema-create.sql.gz | Bin 0 -> 113 bytes .../compress.empty_strings-schema.sql.gz | Bin 0 -> 175 bytes .../compress.empty_strings.000000000.csv.gz | Bin 0 -> 58 bytes .../data.gzip/compress.escapes-schema.sql.gz | Bin 0 -> 221 bytes .../compress.escapes.000000000.csv.gz | Bin 0 -> 103 bytes .../compress.multi_rows-schema.sql.gz | Bin 0 -> 141 bytes .../compress.multi_rows.000000000.csv.gz | Bin 0 -> 2763 bytes .../data.gzip/compress.threads-schema.sql.gz | Bin 0 -> 444 bytes .../compress.threads.000000000.csv.gz | Bin 0 -> 1010 bytes .../compress-schema-create.sql.snappy | Bin 0 -> 117 bytes .../compress.empty_strings-schema.sql.snappy | Bin 0 -> 231 bytes ...ompress.empty_strings.000000000.sql.snappy | Bin 0 -> 128 bytes .../compress.escapes-schema.sql.snappy | Bin 0 -> 251 bytes .../compress.escapes.000000000.sql.snappy | Bin 0 -> 184 bytes .../compress.multi_rows-schema.sql.snappy | Bin 0 -> 146 bytes .../compress.multi_rows.000000000.sql.snappy | Bin 0 -> 75746 bytes .../compress.threads-schema.sql.snappy | Bin 0 -> 614 bytes .../compress.threads.000000000.sql.snappy | Bin 0 -> 1395 bytes .../data.zstd/compress-schema-create.sql.zst | Bin 0 -> 108 bytes .../compress.empty_strings-schema.sql.zst | Bin 0 -> 167 bytes .../compress.empty_strings.000000000.csv.zst | Bin 0 -> 50 bytes .../data.zstd/compress.escapes-schema.sql.zst | Bin 0 -> 210 bytes .../compress.escapes.000000000.csv.zst | Bin 0 -> 88 bytes .../compress.multi_rows-schema.sql.zst | Bin 0 -> 132 bytes .../compress.multi_rows.000000000.csv.zst | Bin 0 -> 178 bytes .../data.zstd/compress.threads-schema.sql.zst | Bin 0 -> 456 bytes .../compress.threads.000000000.csv.zst | Bin 0 -> 1030 bytes br/tests/lightning_compress/run.sh | 61 +++++ dumpling/tests/e2e/run.sh | 20 ++ dumpling/tests/e2e_csv/run.sh | 27 ++- 41 files changed, 440 insertions(+), 53 deletions(-) create mode 100644 br/tests/lightning_compress/config.toml create mode 100644 br/tests/lightning_compress/data.gzip/compress-schema-create.sql.gz create mode 100644 br/tests/lightning_compress/data.gzip/compress.empty_strings-schema.sql.gz create mode 100644 br/tests/lightning_compress/data.gzip/compress.empty_strings.000000000.csv.gz create mode 100644 br/tests/lightning_compress/data.gzip/compress.escapes-schema.sql.gz create mode 100644 br/tests/lightning_compress/data.gzip/compress.escapes.000000000.csv.gz create mode 100644 br/tests/lightning_compress/data.gzip/compress.multi_rows-schema.sql.gz create mode 100644 br/tests/lightning_compress/data.gzip/compress.multi_rows.000000000.csv.gz create mode 100644 br/tests/lightning_compress/data.gzip/compress.threads-schema.sql.gz create mode 100644 br/tests/lightning_compress/data.gzip/compress.threads.000000000.csv.gz create mode 100644 br/tests/lightning_compress/data.snappy/compress-schema-create.sql.snappy create mode 100644 br/tests/lightning_compress/data.snappy/compress.empty_strings-schema.sql.snappy create mode 100644 br/tests/lightning_compress/data.snappy/compress.empty_strings.000000000.sql.snappy create mode 100644 br/tests/lightning_compress/data.snappy/compress.escapes-schema.sql.snappy create mode 100644 br/tests/lightning_compress/data.snappy/compress.escapes.000000000.sql.snappy create mode 100644 br/tests/lightning_compress/data.snappy/compress.multi_rows-schema.sql.snappy create mode 100644 br/tests/lightning_compress/data.snappy/compress.multi_rows.000000000.sql.snappy create mode 100644 br/tests/lightning_compress/data.snappy/compress.threads-schema.sql.snappy create mode 100644 br/tests/lightning_compress/data.snappy/compress.threads.000000000.sql.snappy create mode 100644 br/tests/lightning_compress/data.zstd/compress-schema-create.sql.zst create mode 100644 br/tests/lightning_compress/data.zstd/compress.empty_strings-schema.sql.zst create mode 100644 br/tests/lightning_compress/data.zstd/compress.empty_strings.000000000.csv.zst create mode 100644 br/tests/lightning_compress/data.zstd/compress.escapes-schema.sql.zst create mode 100644 br/tests/lightning_compress/data.zstd/compress.escapes.000000000.csv.zst create mode 100644 br/tests/lightning_compress/data.zstd/compress.multi_rows-schema.sql.zst create mode 100644 br/tests/lightning_compress/data.zstd/compress.multi_rows.000000000.csv.zst create mode 100644 br/tests/lightning_compress/data.zstd/compress.threads-schema.sql.zst create mode 100644 br/tests/lightning_compress/data.zstd/compress.threads.000000000.csv.zst create mode 100755 br/tests/lightning_compress/run.sh diff --git a/br/pkg/lightning/mydump/reader.go b/br/pkg/lightning/mydump/reader.go index 2988c3675dfa9..4837b35aceab2 100644 --- a/br/pkg/lightning/mydump/reader.go +++ b/br/pkg/lightning/mydump/reader.go @@ -70,6 +70,13 @@ func decodeCharacterSet(data []byte, characterSet string) ([]byte, error) { // ExportStatement exports the SQL statement in the schema file. func ExportStatement(ctx context.Context, store storage.ExternalStorage, sqlFile FileInfo, characterSet string) ([]byte, error) { + if sqlFile.FileMeta.Compression != CompressionNone { + compressType, err := ToStorageCompressType(sqlFile.FileMeta.Compression) + if err != nil { + return nil, errors.Trace(err) + } + store = storage.WithCompression(store, compressType) + } fd, err := store.Open(ctx, sqlFile.FileMeta.Path) if err != nil { return nil, errors.Trace(err) diff --git a/br/pkg/lightning/mydump/reader_test.go b/br/pkg/lightning/mydump/reader_test.go index e7506ea869782..1f67f2c31c43a 100644 --- a/br/pkg/lightning/mydump/reader_test.go +++ b/br/pkg/lightning/mydump/reader_test.go @@ -15,6 +15,7 @@ package mydump_test import ( + "compress/gzip" "context" "errors" "os" @@ -173,3 +174,28 @@ func TestExportStatementHandleNonEOFError(t *testing.T) { _, err := ExportStatement(ctx, mockStorage, f, "auto") require.Contains(t, err.Error(), "read error") } + +func TestExportStatementCompressed(t *testing.T) { + dir := t.TempDir() + file, err := os.Create(filepath.Join(dir, "tidb_lightning_test_reader")) + require.NoError(t, err) + defer os.Remove(file.Name()) + + store, err := storage.NewLocalStorage(dir) + require.NoError(t, err) + + gzipFile := gzip.NewWriter(file) + _, err = gzipFile.Write([]byte("CREATE DATABASE whatever;")) + require.NoError(t, err) + err = gzipFile.Close() + require.NoError(t, err) + stat, err := file.Stat() + require.NoError(t, err) + err = file.Close() + require.NoError(t, err) + + f := FileInfo{FileMeta: SourceFileMeta{Path: stat.Name(), FileSize: stat.Size(), Compression: CompressionGZ}} + data, err := ExportStatement(context.TODO(), store, f, "auto") + require.NoError(t, err) + require.Equal(t, []byte("CREATE DATABASE whatever;"), data) +} diff --git a/br/pkg/lightning/mydump/region.go b/br/pkg/lightning/mydump/region.go index 8562acc2867b3..ffd9173483896 100644 --- a/br/pkg/lightning/mydump/region.go +++ b/br/pkg/lightning/mydump/region.go @@ -31,9 +31,13 @@ import ( ) const ( - tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024 + tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024 + compressedTableRegionSizeWarningThreshold int64 = 410 * 1024 * 1024 // 0.4 * tableRegionSizeWarningThreshold // the increment ratio of large CSV file size threshold by `region-split-size` largeCSVLowerThresholdRation = 10 + // TableFileSizeINF for compressed size, for lightning 10TB is a relatively big value and will strongly affect efficiency + // It's used to make sure compressed files can be read until EOF. Because we can't get the exact decompressed size of the compressed files. + TableFileSizeINF = 10 * 1024 * tableRegionSizeWarningThreshold ) // TableRegion contains information for a table region during import. @@ -292,19 +296,34 @@ func MakeSourceFileRegion( return regions, subFileSizes, err } + fileSize := fi.FileMeta.FileSize + rowIDMax := fileSize / divisor + // for compressed files, suggest the compress ratio is 1% to calculate the rowIDMax. + // set fileSize to INF to make sure compressed files can be read until EOF. Because we can't get the exact size of the compressed files. + // TODO: update progress bar calculation for compressed files. + if fi.FileMeta.Compression != CompressionNone { + rowIDMax = fileSize * 100 / divisor // FIXME: this is not accurate. Need more tests and fix solution. + fileSize = TableFileSizeINF + } tableRegion := &TableRegion{ DB: meta.DB, Table: meta.Name, FileMeta: fi.FileMeta, Chunk: Chunk{ Offset: 0, - EndOffset: fi.FileMeta.FileSize, + EndOffset: fileSize, PrevRowIDMax: 0, - RowIDMax: fi.FileMeta.FileSize / divisor, + RowIDMax: rowIDMax, }, } - if tableRegion.Size() > tableRegionSizeWarningThreshold { + regionTooBig := false + if fi.FileMeta.Compression == CompressionNone { + regionTooBig = tableRegion.Size() > tableRegionSizeWarningThreshold + } else { + regionTooBig = fi.FileMeta.FileSize > compressedTableRegionSizeWarningThreshold + } + if regionTooBig { log.FromContext(ctx).Warn( "file is too big to be processed efficiently; we suggest splitting it at 256 MB each", zap.String("file", fi.FileMeta.Path), diff --git a/br/pkg/lightning/mydump/region_test.go b/br/pkg/lightning/mydump/region_test.go index 0830d378f47ff..0c990278e65cd 100644 --- a/br/pkg/lightning/mydump/region_test.go +++ b/br/pkg/lightning/mydump/region_test.go @@ -217,7 +217,7 @@ func TestMakeSourceFileRegion(t *testing.T) { assert.NoError(t, err) assert.Len(t, regions, 1) assert.Equal(t, int64(0), regions[0].Chunk.Offset) - assert.Equal(t, fileInfo.FileMeta.FileSize, regions[0].Chunk.EndOffset) + assert.Equal(t, TableFileSizeINF, regions[0].Chunk.EndOffset) assert.Len(t, regions[0].Chunk.Columns, 0) } diff --git a/br/pkg/lightning/mydump/router.go b/br/pkg/lightning/mydump/router.go index bdc2a922f12f7..bf0ccba834fe0 100644 --- a/br/pkg/lightning/mydump/router.go +++ b/br/pkg/lightning/mydump/router.go @@ -134,7 +134,7 @@ func parseCompressionType(t string) (Compression, error) { return CompressionGZ, nil case "lz4": return CompressionLZ4, nil - case "zstd": + case "zstd", "zst": return CompressionZStd, nil case "xz": return CompressionXZ, nil @@ -324,6 +324,9 @@ func (p regexRouterParser) Parse(r *config.FileRouteRule, logger log.Logger) (*R if err != nil { return err } + if result.Type == SourceTypeParquet && compression != CompressionNone { + return errors.Errorf("can't support whole compressed parquet file, should compress parquet files by choosing correct parquet compress writer, path: %s", r.Path) + } result.Compression = compression return nil }) diff --git a/br/pkg/lightning/mydump/router_test.go b/br/pkg/lightning/mydump/router_test.go index 4e3d8a4215a0d..ab97769e30ce8 100644 --- a/br/pkg/lightning/mydump/router_test.go +++ b/br/pkg/lightning/mydump/router_test.go @@ -292,3 +292,21 @@ func TestRouteWithPath(t *testing.T) { require.NoError(t, err) require.Nil(t, res) } + +func TestRouteWithCompressedParquet(t *testing.T) { + fileName := "myschema.my_table.000.parquet.gz" + rule := &config.FileRouteRule{ + Pattern: `(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)(?:\.([0-9]+))?\.(sql|csv|parquet)(?:\.(\w+))?$`, + Schema: "$1", + Table: "$2", + Type: "$4", + Key: "$3", + Compression: "$5", + Unescape: true, + } + r := *rule + router, err := NewFileRouter([]*config.FileRouteRule{&r}, log.L()) + require.NoError(t, err) + _, err = router.Route(fileName) + require.Error(t, err) +} diff --git a/br/pkg/lightning/restore/get_pre_info.go b/br/pkg/lightning/restore/get_pre_info.go index 287d59c6145a4..93927c6956809 100644 --- a/br/pkg/lightning/restore/get_pre_info.go +++ b/br/pkg/lightning/restore/get_pre_info.go @@ -444,15 +444,7 @@ func (p *PreRestoreInfoGetterImpl) ReadFirstNRowsByTableName(ctx context.Context // ReadFirstNRowsByFileMeta reads the first N rows of an data file. // It implements the PreRestoreInfoGetter interface. func (p *PreRestoreInfoGetterImpl) ReadFirstNRowsByFileMeta(ctx context.Context, dataFileMeta mydump.SourceFileMeta, n int) ([]string, [][]types.Datum, error) { - var ( - reader storage.ReadSeekCloser - err error - ) - if dataFileMeta.Type == mydump.SourceTypeParquet { - reader, err = mydump.OpenParquetReader(ctx, p.srcStorage, dataFileMeta.Path, dataFileMeta.FileSize) - } else { - reader, err = p.srcStorage.Open(ctx, dataFileMeta.Path) - } + reader, err := openReader(ctx, dataFileMeta, p.srcStorage) if err != nil { return nil, nil, errors.Trace(err) } @@ -590,13 +582,7 @@ func (p *PreRestoreInfoGetterImpl) sampleDataFromTable( return resultIndexRatio, isRowOrdered, nil } sampleFile := tableMeta.DataFiles[0].FileMeta - var reader storage.ReadSeekCloser - var err error - if sampleFile.Type == mydump.SourceTypeParquet { - reader, err = mydump.OpenParquetReader(ctx, p.srcStorage, sampleFile.Path, sampleFile.FileSize) - } else { - reader, err = p.srcStorage.Open(ctx, sampleFile.Path) - } + reader, err := openReader(ctx, sampleFile, p.srcStorage) if err != nil { return 0.0, false, errors.Trace(err) } diff --git a/br/pkg/lightning/restore/get_pre_info_test.go b/br/pkg/lightning/restore/get_pre_info_test.go index 8ea57d023c679..f66a76901116f 100644 --- a/br/pkg/lightning/restore/get_pre_info_test.go +++ b/br/pkg/lightning/restore/get_pre_info_test.go @@ -14,6 +14,8 @@ package restore import ( + "bytes" + "compress/gzip" "context" "database/sql" "fmt" @@ -24,6 +26,7 @@ import ( mysql_sql_driver "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/lightning/config" + "github.com/pingcap/tidb/br/pkg/lightning/mydump" "github.com/pingcap/tidb/br/pkg/lightning/restore/mock" ropts "github.com/pingcap/tidb/br/pkg/lightning/restore/opts" "github.com/pingcap/tidb/errno" @@ -412,6 +415,118 @@ INSERT INTO db01.tbl01 (ival, sval) VALUES (444, 'ddd');` require.Equal(t, theDataInfo.ExpectFirstRowDatums, rowDatums) } +func compressGz(t *testing.T, data []byte) []byte { + t.Helper() + var buf bytes.Buffer + w := gzip.NewWriter(&buf) + _, err := w.Write(data) + require.NoError(t, err) + require.NoError(t, w.Close()) + return buf.Bytes() +} + +func TestGetPreInfoReadCompressedFirstRow(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + var ( + testCSVData01 = []byte(`ival,sval +111,"aaa" +222,"bbb" +`) + testSQLData01 = []byte(`INSERT INTO db01.tbl01 (ival, sval) VALUES (333, 'ccc'); +INSERT INTO db01.tbl01 (ival, sval) VALUES (444, 'ddd');`) + ) + + test1CSVCompressed := compressGz(t, testCSVData01) + test1SQLCompressed := compressGz(t, testSQLData01) + + testDataInfos := []struct { + FileName string + Data []byte + FirstN int + CSVConfig *config.CSVConfig + ExpectFirstRowDatums [][]types.Datum + ExpectColumns []string + }{ + { + FileName: "/db01/tbl01/data.001.csv.gz", + Data: test1CSVCompressed, + FirstN: 1, + ExpectFirstRowDatums: [][]types.Datum{ + { + types.NewStringDatum("111"), + types.NewStringDatum("aaa"), + }, + }, + ExpectColumns: []string{"ival", "sval"}, + }, + { + FileName: "/db01/tbl01/data.001.sql.gz", + Data: test1SQLCompressed, + FirstN: 1, + ExpectFirstRowDatums: [][]types.Datum{ + { + types.NewUintDatum(333), + types.NewStringDatum("ccc"), + }, + }, + ExpectColumns: []string{"ival", "sval"}, + }, + } + + tbl01SchemaBytes := []byte("CREATE TABLE db01.tbl01(id INTEGER PRIMARY KEY AUTO_INCREMENT, ival INTEGER, sval VARCHAR(64));") + tbl01SchemaBytesCompressed := compressGz(t, tbl01SchemaBytes) + + tblMockSourceData := &mock.MockTableSourceData{ + DBName: "db01", + TableName: "tbl01", + SchemaFile: &mock.MockSourceFile{ + FileName: "/db01/tbl01/tbl01.schema.sql.gz", + Data: tbl01SchemaBytesCompressed, + }, + DataFiles: []*mock.MockSourceFile{}, + } + for _, testInfo := range testDataInfos { + tblMockSourceData.DataFiles = append(tblMockSourceData.DataFiles, &mock.MockSourceFile{ + FileName: testInfo.FileName, + Data: testInfo.Data, + }) + } + mockDataMap := map[string]*mock.MockDBSourceData{ + "db01": { + Name: "db01", + Tables: map[string]*mock.MockTableSourceData{ + "tbl01": tblMockSourceData, + }, + }, + } + mockSrc, err := mock.NewMockImportSource(mockDataMap) + require.Nil(t, err) + mockTarget := mock.NewMockTargetInfo() + cfg := config.NewConfig() + cfg.TikvImporter.Backend = config.BackendLocal + ig, err := NewPreRestoreInfoGetter(cfg, mockSrc.GetAllDBFileMetas(), mockSrc.GetStorage(), mockTarget, nil, nil) + require.NoError(t, err) + + cfg.Mydumper.CSV.Header = true + tblMeta := mockSrc.GetDBMetaMap()["db01"].Tables[0] + for i, dataFile := range tblMeta.DataFiles { + theDataInfo := testDataInfos[i] + dataFile.FileMeta.Compression = mydump.CompressionGZ + cols, rowDatums, err := ig.ReadFirstNRowsByFileMeta(ctx, dataFile.FileMeta, theDataInfo.FirstN) + require.Nil(t, err) + t.Logf("%v, %v", cols, rowDatums) + require.Equal(t, theDataInfo.ExpectColumns, cols) + require.Equal(t, theDataInfo.ExpectFirstRowDatums, rowDatums) + } + + theDataInfo := testDataInfos[0] + cols, rowDatums, err := ig.ReadFirstNRowsByTableName(ctx, "db01", "tbl01", theDataInfo.FirstN) + require.NoError(t, err) + require.Equal(t, theDataInfo.ExpectColumns, cols) + require.Equal(t, theDataInfo.ExpectFirstRowDatums, rowDatums) +} + func TestGetPreInfoSampleSource(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -497,6 +612,100 @@ func TestGetPreInfoSampleSource(t *testing.T) { } } +func TestGetPreInfoSampleSourceCompressed(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + dataFileName := "/db01/tbl01/tbl01.data.001.csv.gz" + schemaFileData := []byte("CREATE TABLE db01.tbl01 (id INTEGER PRIMARY KEY AUTO_INCREMENT, ival INTEGER, sval VARCHAR(64));") + schemaFileDataCompressed := compressGz(t, schemaFileData) + mockDataMap := map[string]*mock.MockDBSourceData{ + "db01": { + Name: "db01", + Tables: map[string]*mock.MockTableSourceData{ + "tbl01": { + DBName: "db01", + TableName: "tbl01", + SchemaFile: &mock.MockSourceFile{ + FileName: "/db01/tbl01/tbl01.schema.sql.gz", + Data: schemaFileDataCompressed, + }, + DataFiles: []*mock.MockSourceFile{ + { + FileName: dataFileName, + Data: []byte(nil), + }, + }, + }, + }, + }, + } + mockSrc, err := mock.NewMockImportSource(mockDataMap) + require.Nil(t, err) + mockTarget := mock.NewMockTargetInfo() + cfg := config.NewConfig() + cfg.TikvImporter.Backend = config.BackendLocal + ig, err := NewPreRestoreInfoGetter(cfg, mockSrc.GetAllDBFileMetas(), mockSrc.GetStorage(), mockTarget, nil, nil, ropts.WithIgnoreDBNotExist(true)) + require.NoError(t, err) + + mdDBMeta := mockSrc.GetAllDBFileMetas()[0] + mdTblMeta := mdDBMeta.Tables[0] + dbInfos, err := ig.GetAllTableStructures(ctx) + require.NoError(t, err) + + data := [][]byte{ + []byte(`id,ival,sval +1,111,"aaa" +2,222,"bbb" +`), + []byte(`sval,ival,id +"aaa",111,1 +"bbb",222,2 +`), + []byte(`id,ival,sval +2,222,"bbb" +1,111,"aaa" +`), + []byte(`sval,ival,id +"aaa",111,2 +"bbb",222,1 +`), + } + compressedData := make([][]byte, 0, 4) + for _, d := range data { + compressedData = append(compressedData, compressGz(t, d)) + } + + subTests := []struct { + Data []byte + ExpectIsOrdered bool + }{ + { + Data: compressedData[0], + ExpectIsOrdered: true, + }, + { + Data: compressedData[1], + ExpectIsOrdered: true, + }, + { + Data: compressedData[2], + ExpectIsOrdered: false, + }, + { + Data: compressedData[3], + ExpectIsOrdered: false, + }, + } + for _, subTest := range subTests { + require.NoError(t, mockSrc.GetStorage().WriteFile(ctx, dataFileName, subTest.Data)) + sampledIndexRatio, isRowOrderedFromSample, err := ig.sampleDataFromTable(ctx, "db01", mdTblMeta, dbInfos["db01"].Tables["tbl01"].Core, nil, defaultImportantVariables) + require.NoError(t, err) + t.Logf("%v, %v", sampledIndexRatio, isRowOrderedFromSample) + require.Greater(t, sampledIndexRatio, 1.0) + require.Equal(t, subTest.ExpectIsOrdered, isRowOrderedFromSample) + } +} + func TestGetPreInfoEstimateSourceSize(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/br/pkg/lightning/restore/mock/mock.go b/br/pkg/lightning/restore/mock/mock.go index f43e6c022673e..5556e1caf3363 100644 --- a/br/pkg/lightning/restore/mock/mock.go +++ b/br/pkg/lightning/restore/mock/mock.go @@ -77,14 +77,19 @@ func NewMockImportSource(dbSrcDataMap map[string]*MockDBSourceData) (*MockImport tblMeta := mydump.NewMDTableMeta("binary") tblMeta.DB = dbName tblMeta.Name = tblName + compression := mydump.CompressionNone + if strings.HasSuffix(tblData.SchemaFile.FileName, ".gz") { + compression = mydump.CompressionGZ + } tblMeta.SchemaFile = mydump.FileInfo{ TableName: filter.Table{ Schema: dbName, Name: tblName, }, FileMeta: mydump.SourceFileMeta{ - Path: tblData.SchemaFile.FileName, - Type: mydump.SourceTypeTableSchema, + Path: tblData.SchemaFile.FileName, + Type: mydump.SourceTypeTableSchema, + Compression: compression, }, } tblMeta.DataFiles = []mydump.FileInfo{} @@ -108,12 +113,17 @@ func NewMockImportSource(dbSrcDataMap map[string]*MockDBSourceData) (*MockImport FileSize: int64(fileSize), }, } + fileName := tblDataFile.FileName + if strings.HasSuffix(fileName, ".gz") { + fileName = strings.TrimSuffix(tblDataFile.FileName, ".gz") + fileInfo.FileMeta.Compression = mydump.CompressionGZ + } switch { - case strings.HasSuffix(tblDataFile.FileName, ".csv"): + case strings.HasSuffix(fileName, ".csv"): fileInfo.FileMeta.Type = mydump.SourceTypeCSV - case strings.HasSuffix(tblDataFile.FileName, ".sql"): + case strings.HasSuffix(fileName, ".sql"): fileInfo.FileMeta.Type = mydump.SourceTypeSQL - case strings.HasSuffix(tblDataFile.FileName, ".parquet"): + case strings.HasSuffix(fileName, ".parquet"): fileInfo.FileMeta.Type = mydump.SourceTypeParquet default: return nil, errors.Errorf("unsupported file type: %s", tblDataFile.FileName) diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 99b56d05414ce..210435640473f 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -2199,23 +2199,7 @@ func newChunkRestore( ) (*chunkRestore, error) { blockBufSize := int64(cfg.Mydumper.ReadBlockSize) - var ( - reader storage.ReadSeekCloser - compressType storage.CompressType - err error - ) - switch { - case chunk.FileMeta.Type == mydump.SourceTypeParquet: - reader, err = mydump.OpenParquetReader(ctx, store, chunk.FileMeta.Path, chunk.FileMeta.FileSize) - case chunk.FileMeta.Compression != mydump.CompressionNone: - compressType, err = mydump.ToStorageCompressType(chunk.FileMeta.Compression) - if err != nil { - break - } - reader, err = storage.WithCompression(store, compressType).Open(ctx, chunk.FileMeta.Path) - default: - reader, err = store.Open(ctx, chunk.FileMeta.Path) - } + reader, err := openReader(ctx, chunk.FileMeta, store) if err != nil { return nil, errors.Trace(err) } @@ -2790,3 +2774,20 @@ func (cr *chunkRestore) restore( } return errors.Trace(firstErr(encodeErr, deliverErr)) } + +func openReader(ctx context.Context, fileMeta mydump.SourceFileMeta, store storage.ExternalStorage) ( + reader storage.ReadSeekCloser, err error) { + switch { + case fileMeta.Type == mydump.SourceTypeParquet: + reader, err = mydump.OpenParquetReader(ctx, store, fileMeta.Path, fileMeta.FileSize) + case fileMeta.Compression != mydump.CompressionNone: + compressType, err2 := mydump.ToStorageCompressType(fileMeta.Compression) + if err2 != nil { + return nil, err2 + } + reader, err = storage.WithCompression(store, compressType).Open(ctx, fileMeta.Path) + default: + reader, err = store.Open(ctx, fileMeta.Path) + } + return +} diff --git a/br/tests/lightning_compress/config.toml b/br/tests/lightning_compress/config.toml new file mode 100644 index 0000000000000..000018c5c41d4 --- /dev/null +++ b/br/tests/lightning_compress/config.toml @@ -0,0 +1,18 @@ +[mydumper.csv] +separator = ',' +delimiter = '"' +header = true +not-null = false +null = '\N' +backslash-escape = true +trim-last-separator = false + +[checkpoint] +enable = true +schema = "tidb_lightning_checkpoint_test" +driver = "mysql" +keep-after-success = true + +[tikv-importer] +send-kv-pairs=10 +region-split-size = 1024 diff --git a/br/tests/lightning_compress/data.gzip/compress-schema-create.sql.gz b/br/tests/lightning_compress/data.gzip/compress-schema-create.sql.gz new file mode 100644 index 0000000000000000000000000000000000000000..6571d2a15b507c2b8d3d5c3206b7b4f1410dcbd4 GIT binary patch literal 113 zcmb2|=3oGW|Cg=@nHUt=5gwbx6Z07m$pAE;wviUDGV(rC47avOtV9xCZ$cBowA6*h*f}r P;s5`~*U~+bfi?jEN8l<- literal 0 HcmV?d00001 diff --git a/br/tests/lightning_compress/data.gzip/compress.empty_strings-schema.sql.gz b/br/tests/lightning_compress/data.gzip/compress.empty_strings-schema.sql.gz new file mode 100644 index 0000000000000000000000000000000000000000..542898561bab1b01cf4e274116413dabfb713c11 GIT binary patch literal 175 zcmV;g08sxQiwFP!00000|8&Vci-Ry20N_2p;=N7fKyx@8h(px(u0ooE#@QtO%wSq# zTJYbO4(-&A4^NtWH`Oca73+GG2VM literal 0 HcmV?d00001 diff --git a/br/tests/lightning_compress/data.gzip/compress.escapes-schema.sql.gz b/br/tests/lightning_compress/data.gzip/compress.escapes-schema.sql.gz new file mode 100644 index 0000000000000000000000000000000000000000..bed4b7859ac92be875a58e014f39fb8d74173721 GIT binary patch literal 221 zcmV<303!b%iwFP!00000|76TTYr-%X#_@YU#pA6lgIe$~5D&3^x0S^>()1u>_!`Y1 z;?iLvxNkpp*cfj^6uwPjO4BfEj@@q{(~dq7{_o6mSIn$y@40qt%Tm-nKsDTbAhTf zlh^&}5|cZ-!`6p`yS|~3LvKh^i?YfJkb3MCExHiKcKH`=o`mknEH3H&{=?4Dtfdr& Xe)rb;FwedN00960il|vd4FLcEMN@9> literal 0 HcmV?d00001 diff --git a/br/tests/lightning_compress/data.gzip/compress.escapes.000000000.csv.gz b/br/tests/lightning_compress/data.gzip/compress.escapes.000000000.csv.gz new file mode 100644 index 0000000000000000000000000000000000000000..37028e36d9de82f124269e054e0bf9aa9f57426d GIT binary patch literal 103 zcmb2|=3oGW|2mxuxehysu>5WQ&Y${a*0y!azYvMezG literal 0 HcmV?d00001 diff --git a/br/tests/lightning_compress/data.gzip/compress.multi_rows-schema.sql.gz b/br/tests/lightning_compress/data.gzip/compress.multi_rows-schema.sql.gz new file mode 100644 index 0000000000000000000000000000000000000000..328fed9cb3df88807fc198eea7177de65e421208 GIT binary patch literal 141 zcmb2|=3oGW|28K#@--OK;COZI#TEzUIYg9ffai+n1>u`yDx}6y32aY(LQM|Nl2hN{BZASpY9tK~4Yw literal 0 HcmV?d00001 diff --git a/br/tests/lightning_compress/data.gzip/compress.multi_rows.000000000.csv.gz b/br/tests/lightning_compress/data.gzip/compress.multi_rows.000000000.csv.gz new file mode 100644 index 0000000000000000000000000000000000000000..c732af263d57611423998ac6624b8cc95e160013 GIT binary patch literal 2763 zcmb2|=3oGW|8I^f3Nje*uo&#yJ+mVH{e726Q4uwr%zxg~XO4o=5Eu=C(GVC7fzc2c h4S~@R7!85Z5Euj@aPy}c1H=FSw@S)49*q%Y000A9X!`&F literal 0 HcmV?d00001 diff --git a/br/tests/lightning_compress/data.gzip/compress.threads-schema.sql.gz b/br/tests/lightning_compress/data.gzip/compress.threads-schema.sql.gz new file mode 100644 index 0000000000000000000000000000000000000000..1782675bfc7fea8c917e35d7f18d78b2b8464379 GIT binary patch literal 444 zcmV;t0Ym;DiwFP!00000|CEzaZ`v>r$KUxB*C!l=L|~|@RQ16)*J3TPk$riHrYV7v z(MYW;g)nvBe#%%&2$ZN#k$u0r^Z#=H(co(8kNq)NBv3p;3#-B{%RhtBZBH2Fgg|^2 zB4pK5nJ?Gd3|J2!qq#vIq%z20Rcs1Zv5D`&&TWg0%h#YZfr`U$ShIEA41O=m$ERh< z#*-WGI8Mo;8TVV^h0y}74W*?|`vrK<z;sJ-sUAXyKuq4^?hi4%U|JyPB444z-zrpn){Gb}f#DDC*$Cuv59U9XVJl8GZ! zgh2+m+r6-Ug0?^GE8TBxz+76=Ch5Y2Rtg14t|A34F%{Si>$Jj0SY|=NFdCTrh#Jr7VlNQHOOxz}L!F=R34 m#vsK!XBZJg9UfA%@wIRT^(aWI)>^BFKfI9Ue31nKjm`1Ke^ z(;!I}^CXqG4+ox-AU+v-h?4YVXm)opFpZYWNq9P0I$xdwlXQ|QiwN%{F*J#TFwG7Z z9!CrD@H|Y?_-Pr0X&^=)=1CgG&%gxJC=7#XI*-Dv7uWxHl=Kq>N%Ry?gY1tudU^z3 zzt|?=TMF}X$lj`9og)lPa5tI$DIO34R$>1E^^Uz&-GFO5E;ZkA{%WKucO?ap~$V%gt_T3qibVsJIwgnm{EuEuZrWv=T?L5=P&%iWIM1F(xh71@vVCI*O$TAf%MgtR@&~%ss#FH56|VMBOZz zc8+2%qS2s`p?|eGnJ?4MQ7omCm9tCJ?xKcm?90q^6h9Hd%C>H_$&zX3C~i`sf@h59 zoavO+XPjavYtAy#a8zG`AOO(+bD&3O1->}Fj+^uv!znAFu8}`*&?YK)vI!kUiRTT#5`lg z7M5+*8OMDT2me#NgDH~=!ls)J{GB6?@Wvq6fOW%n##;+p2!3cOwyZ0jcZ6qXgYzoe z=D4hW-L`CLBHqmrqabxdA!FHwqcY?9O9fA4-X6%D<*+6g^q~ThmNgKU2ycEOPu3UD zKSC1>vw~m@3y#F3M^y1vM+xD;1QjXrYD<{-o8@)Wt+3y4-?Q8g4;Lh)c!gY;=&fa0 z{WlZ#Cl=wJT|@)vn$SAO1?+;O&VdR3Y9ZYL(=tIKx5OxOE^aVb!GasV8EZ;vL!z01 zFee6S`5INw7z}yc4DH_w_C7G~=fPr@7-sSss1MMT zP#+&1kYr+lLRn%_azM6@d3IK#*YU&-kJ*l*4;uy literal 0 HcmV?d00001 diff --git a/br/tests/lightning_compress/data.snappy/compress.empty_strings.000000000.sql.snappy b/br/tests/lightning_compress/data.snappy/compress.empty_strings.000000000.sql.snappy new file mode 100644 index 0000000000000000000000000000000000000000..9c81e8f78f234b325aa6e86d6591107abf1beef0 GIT binary patch literal 128 zcmey*#=ubQml#kG$ymg|u;vYKv%Z$1iGiVkp+c~0h=QM^uWPVEQf6LaQKgo?HJ7Jf zuxn6=f~Q}Izd}N4Zb3<9d~r!pW?p)6fTqcoA(+NG>M=?% KaTBPxH5UK`3m_8! literal 0 HcmV?d00001 diff --git a/br/tests/lightning_compress/data.snappy/compress.escapes-schema.sql.snappy b/br/tests/lightning_compress/data.snappy/compress.escapes-schema.sql.snappy new file mode 100644 index 0000000000000000000000000000000000000000..9e27befa522a077cf841ceeef3223aabf93bc252 GIT binary patch literal 251 zcmXw!!D@p*6h)_9bm6|BUA^p73P#8x5V9D@msUD4LC1wHC8Kc)hI9;Z0_jH-`X&9d zLjIt2)8$>9!@0jVg!E~(K9uB#kniWa=JWc(1AjCbhvN`(6p+#-=1{k~I)3=U%wic* zfgota5v1z3)j{nvH)tb`{l!5wTK1il!(Z1<<$sH18&t!~u$6&Npaff>f&2bKqzJ|rp5 i6Ro>Ax-GugBh7~N`N_OZ-|Gpm73V|q|K1E|X4W6cQ%TGK literal 0 HcmV?d00001 diff --git a/br/tests/lightning_compress/data.snappy/compress.escapes.000000000.sql.snappy b/br/tests/lightning_compress/data.snappy/compress.escapes.000000000.sql.snappy new file mode 100644 index 0000000000000000000000000000000000000000..1380b47d9881e7064fb08e1077c928caa8ba127a GIT binary patch literal 184 zcmey*#=ubQml#kG$+(JvVV>3bB7H4I69Yp7Lxo`15CuO+U)NxTq|Cg;qDn1&Yc5Z} zVAr4!1y8>ae}#n9;^f4F)Zzq%Fh`$I*I+IULml;)7>KG;a7$qx(=-66yoeK3d zC{Wkb;nFYys!;-}iBXD)$%7&wze3$0#UK%c!K#db3i5!eqGObFbaY~rVnHfQlZ}!= JSY6Ya3jo3{G4KEY literal 0 HcmV?d00001 diff --git a/br/tests/lightning_compress/data.snappy/compress.multi_rows-schema.sql.snappy b/br/tests/lightning_compress/data.snappy/compress.multi_rows-schema.sql.snappy new file mode 100644 index 0000000000000000000000000000000000000000..5cc0365d1c65d62156db13d3b38ee7c2dd7ee5c4 GIT binary patch literal 146 zcmey*#=ubQml#kG$=JfcV4;_>OQbmrmT% z!@UFF!s+DTqJwWj1aT0@>J@Ryl2Oi2c&6i)=ksuoFTaEPbt#0SZn}1`6}r{%_03W= z98P?{H$Qvx*23b#V$y5(lkW2Ub}xCb^Ef?xIy>KtI^AA-y`OZt{ncc!cd~oDvwgUK zdNfEjmsd90y{J)7)7ph(7d!2x+Y{ zeI5jjdTlDCrQf{`5+Fc;009C72oNCfM<7!Mh9U$A5FkK+009C72n1!|btwV_2oNAZ zfB*pk1Y%`ictn5z0RjXF5FkK+Kp~KMo5Wib9rT>-8(yISp*1-3fw6k zjdw3B0t7|{mWoGXtz{AT_XS?AwJNm`9=(74D*gLAFNkoZ8g4v%a#~cyoM{8dM z2oNAZfB*pk1jZD|Rl%52ZX-Z|009C72oNApR)Jgk7p@C}3x9yP^jElb<3t)s^wy>Qms@G#{v<(gE`t8hJN(CtZrothIN6F&|5Y}-sB(({QlTOat0w!v}4fh zS*8~g2E%Y7YZV1e!|`ws27N?mEa%wvTC$tydg#QD_7D6>u4)bCe_XiT^LVV2&~ta> zE~5tK+_t#b8$s5@JGx`9ijCW_>~PGy+>6CqnsTDTr8ecFpr-4Uxe95swYm>QKv{oXtv{_8$+p$l|+|v?N3`z(#ghfeDxEfE9 zK)U=nP5C<~)nJ%YhZ(rH%c#@!98LyOrOT|2&RldKyGXur^ga$&L26kPAMGDb$CDrm z`n_S;4`xr0VLM!4hD}sflYFIy_A#}bd_vy73ZUd});~cmTKZNf`QN4X!eU`%L%opK z4TCyFNH;cCAlQL5?HWMaFF(MraDDRwEMyDnHjyCFxT|CogvqW&aMKFIW3%N9&y2Rt b;+8b0lV^>iy&5v8ZN~}zn|pC@wlaSK&XT7p literal 0 HcmV?d00001 diff --git a/br/tests/lightning_compress/data.snappy/compress.threads.000000000.sql.snappy b/br/tests/lightning_compress/data.snappy/compress.threads.000000000.sql.snappy new file mode 100644 index 0000000000000000000000000000000000000000..dc7c1ee8adc0b0f5912ba123c64c483f7cfcaab4 GIT binary patch literal 1395 zcmXX`O>7fa5T56F!SPRG@<)sa-5|ooU}CTB#C9lAW3r*4#!2j?At++m*e}k4z3c3* z6YL^{lh`4+krFDka#avjOGR5MwLQ?9142>7rD_jV%b~{}dgzH;n+h#`RNQ91`F7^b z%(wGm7l0ZS##O4yVZrXJ0pk^<6^>tC*L|18=Z^}F|O!j zB+@~(Fqu3d*!J3etT))((cy5}nQ1Q0*mhBE?Ez13DB$x^wic($-0gMkx!g|ZdDX2^ zr68|~8LaSc^hdO{(!QZ+X6eUSc&bmdJ7iV*GeEr7n zSHW@fToFPP^qww)DJ`e;Yf!ib;HN(6M9_Eo8dzWUqZgr2gb^PqLV(y$ivWGS1_-{g z1Fa;-D?8X)G_s!4%V1wpL=Ee^424|=2su`&rTn_S0ykCwf>jH!)lK3(Sb_EXg4M|S z^mX)f9T|h((0aIun#c+%%+IBB>O7y2WhF_bt;veM&#<_U)U9;_8$L*V{91Qn9w#nk zWlWSor{D74YltZ%^W4XBM0V3gXgDvNwtPF!l@>rHY_)7#NvJAzPk zoP?(+R5^qdMOOHXEQxx=xEwK(ScJqbQ0QG!?)@@UPZ2jkH7uCz3RcB}zOcKnu$$0c z;!{+Y#VDvx#;J#OR#Pz4_nSn}7}aU9CUcpre#cO_W2nA-+!%EzDJC>t)dWp_Sc2_( zbZa0yPSJCk%F9_Ij3;e|?dxzWREszLX=}OcUXqnd`kF1aW;2F7`DXCdPUuTeUV=OY zzUoJ|CA*MF;B4_dlY}+&?`_mDprW>4>%A{*c4jP{OEt_iHkca8$J5K;i&g5Z*_@Jk z)My+iL*aUZF~lC$&EN~)s4LokE`cN0Z-$t)(Yp%aN*!A5T-%;BZJUwvn}(8w&?5i&!EF(nCUeqNB0^_v%mTO5Cr z`iobf765-OfUb-s3<;PBfjM)ln~CStES89p zQ#W^`okyg(uTaG$OPDBJl5j#35$`R-n=CmQiY#m7QV^L5>3La2R;wyxvedT_ciuIH z{2u2&9bb1jd%6sMCQhV?2|3{D^LT7*@dxP5`^sBD_Ok`uS**;-N=A?pm`l8#5lGVU z99Hs-%j06=PN$Vl3L0|wRh%X-ppVdVikzX_PP9H7P-*g4$uyDpoS4S+4CB@^SvT90 zP#0$ydU%A348`dQE_P-lIyk`%(cx*ragh)gJsBB|?`fiE4iYai66J=EfqiC1nqiLJ Uzu1BN-aZc->ihrpIggwF1F-R_F8}}l literal 0 HcmV?d00001 diff --git a/br/tests/lightning_compress/data.zstd/compress-schema-create.sql.zst b/br/tests/lightning_compress/data.zstd/compress-schema-create.sql.zst new file mode 100644 index 0000000000000000000000000000000000000000..12bdbd710973e7229b4877b70edcde04708b142a GIT binary patch literal 108 zcmV-y0F(bHwJ-ey0Q~|0GR77iaBCAVMi~p0clZa z<~)kx0rBdy(`|PbH1ELhfHKZp$r4ml<Y4`k-@!^(@|2yKGN!PI4~^$z%=2L&c7KTtl}lC zUZSHT&K>#M&j36Bc6?$4*W!#>Sw6S~<{LgwzUAu)YK@X)m|j~?HRHw0rtTKEFoi@3 V1OR791K?tZvM(KpIs?7f?pB&~NLl~@ literal 0 HcmV?d00001 diff --git a/br/tests/lightning_compress/data.zstd/compress.empty_strings.000000000.csv.zst b/br/tests/lightning_compress/data.zstd/compress.empty_strings.000000000.csv.zst new file mode 100644 index 0000000000000000000000000000000000000000..aa89918bb2cee01b2aded5caf591f0e61a0ef00e GIT binary patch literal 50 zcmV-20L}j>wJ-ey04)IkLPH1+um=H_jVzp{jN85W&`d+_gdz8^V8|UX_FtA_B(+A z8UO%`s6Z@X>wK_u?=_+$W&!}hwJ0Q0(G5fk$+C7*Fq{M+-(gR?#E~Ucz32xQDzWt; zp%r>B3ngN}$oAan+RYtIa7T2EDHcqNGT+l$*W^8Ul?!0$_d@Uc_RkFORq(O_??6y< zZGG4NS@XWA0wi+LKEH*NXX!XbEb+933dGiViWv@GGIi(x9S}@=0tEnPND~+?49|J< M^!Bka&4tllDH9D}a{vGU literal 0 HcmV?d00001 diff --git a/br/tests/lightning_compress/data.zstd/compress.escapes.000000000.csv.zst b/br/tests/lightning_compress/data.zstd/compress.escapes.000000000.csv.zst new file mode 100644 index 0000000000000000000000000000000000000000..40994e745bdf3d6e23bed1ed837e66677df3e11f GIT binary patch literal 88 zcmV-e0H^;bwJ-ey09^tA;>Hmfpa%djGcz+2*{00`$k4xsT82`SSaF1zDnst~PQ$l3 uan^1yWPMy8JH5WX-kt6QLzYsCb72qw!3(Z%z6%U#a3Y)u=fMC~U6KtWXi_^kh2r)`b2ymjgQVPZXt6=PkSKwc3K>(&j7 zX_UHV)ig(G3D5gY3$eT7EJ~w>VKjhzguOSx_nQ`CccitW&IF>)#^sbhJ1!RhRy9yp m#ZbuebGFaDEIo`-l7KoXb$H~$!Z*jar7eO$q67d9h+{;889a{w literal 0 HcmV?d00001 diff --git a/br/tests/lightning_compress/data.zstd/compress.multi_rows.000000000.csv.zst b/br/tests/lightning_compress/data.zstd/compress.multi_rows.000000000.csv.zst new file mode 100644 index 0000000000000000000000000000000000000000..4db1bea4c69f959a9050322b82d9b7a70418ef16 GIT binary patch literal 178 zcmdPcs{fZIqlbaPLMc&+mrE&;iQ(;kLt}?xw-^Qn2F4Hu0h7P~|Nm!%@&tjr-~XAR zJRu-sk`9IY50(0SrQ4|8p`tVPv>p0^&O`dL1q+- zhhNWrTBG9x3953dsX|%Q1#xLmKmhn>!46}o4_KNlf>R=?avDtSW|g!jm6J>OuHB`R zrhEz`jr2y*(3TqJaJdxQ*Y6$G`P5;Qb9(RDE(z+g+=m9XvcTcDryaq6FrK$4lFspf zJm~RMMw~{#Y+7S&1^u4FNNGDcC6GjVN1(xA)`UU9nnbYesJUfk6fsukm(CUHS}KTp zVFzlk>sFSE!uG-GwVsi;%nV}-QJGK{!)3W6q&@eyo{kwrQ0B4eRlqj|axhRT+)DDW!;L`$v6Yt?naJjE`(Y6^lT;S{7)F23ADpmBLL0q;m;_kJcH)Z&Pe+q7xh7 yX=D1?4bIj8$SMlB(WgFp2-i>m literal 0 HcmV?d00001 diff --git a/br/tests/lightning_compress/data.zstd/compress.threads.000000000.csv.zst b/br/tests/lightning_compress/data.zstd/compress.threads.000000000.csv.zst new file mode 100644 index 0000000000000000000000000000000000000000..13eef0ba83011073d588bcfc35896b64f5f056e5 GIT binary patch literal 1030 zcmV+h1o`_YwJ-f-^b*A%0OIkOCh)4x0FtXLx8e&KUtxocuqIu_Gl9g6fhC9`0WuH$ z2s|TFp2q_K0TdD``8X7W$Pmb+7lT1OM8g;gIvGSnFbf3CVr~g~HL|%e+JZ5!0tRv6 z-N4g;UvChM#aGfp&FG#l!QN!5eB%uy=l^5pUcTew6$(uOv)Fb*C3ER!`e$+url z;X!M$_R`F-0vvjl((&4}z1Rj@coadkitkQ-NiUk3Ss?+sB`c1pWAn zt1c2VJJ>#exu6kMrU(QEK$2D>2*3h>fud~#5V(OC;czLOh^(iyvktc!;mQJs0?{?T z5$ujv+lTdbf8b7vv7?J5F0Iw`-5eccF}J-5*)hQCy)MFaJtLCRYz}BVox57lw#$Ukq*Lu)}F=-GjWsY8V2(@ zXOuwkvQz_SrP?Dpf2+9_reN-J#s#?mYDGpnxZ)v*0C|MR?h?6YwrW012}^dZvJnR* zM~U#DOsl9ADTU(I7aezoCwv!Nb3zN1kEoJcvL}U~~rFK2nST zbQd&SM3zePDubwHBrw6Wm^a9M3etutx}knIct?j8?F@{n4K3=Rn>loIJV-1|NV}?? zEfmoP8h*C4wszERXSAsro!m=;GQtP;c`vl|NHvUx4q8cIQ2J8fan$zu_jTtv0zZ&~ z@KK(k;u-DTpF}&!cjz59c*Y1O6IRJ%Zcd2Nhma|qlXSvF2d6t}@yufAdEKW)KF6*4 A /dev/null + set -e + + # restart lightning from checkpoint, the second line should be written successfully + export GO_FAILPOINTS= + set +e + run_lightning --backend $BACKEND -d "tests/$TEST_NAME/data.$compress" --enable-checkpoint=1 2> /dev/null + set -e + + run_sql 'SELECT count(*), sum(PROCESSLIST_TIME), sum(THREAD_OS_ID), count(PROCESSLIST_STATE) FROM compress.threads' + check_contains 'count(*): 43' + check_contains 'sum(PROCESSLIST_TIME): 322253' + check_contains 'sum(THREAD_OS_ID): 303775702' + check_contains 'count(PROCESSLIST_STATE): 3' + + run_sql 'SELECT count(*) FROM compress.threads WHERE PROCESSLIST_TIME IS NOT NULL' + check_contains 'count(*): 12' + + run_sql 'SELECT count(*) FROM compress.multi_rows WHERE a="aaaaaaaaaa"' + check_contains 'count(*): 100000' + + run_sql 'SELECT hex(t), j, hex(b) FROM compress.escapes WHERE i = 1' + check_contains 'hex(t): 5C' + check_contains 'j: {"?": []}' + check_contains 'hex(b): FFFFFFFF' + + run_sql 'SELECT hex(t), j, hex(b) FROM compress.escapes WHERE i = 2' + check_contains 'hex(t): 22' + check_contains 'j: "\n\n\n"' + check_contains 'hex(b): 0D0A0D0A' + + run_sql 'SELECT hex(t), j, hex(b) FROM compress.escapes WHERE i = 3' + check_contains 'hex(t): 0A' + check_contains 'j: [",,,"]' + check_contains 'hex(b): 5C2C5C2C' + + run_sql 'SELECT id FROM compress.empty_strings WHERE a = """"' + check_contains 'id: 3' + run_sql 'SELECT id FROM compress.empty_strings WHERE b <> ""' + check_not_contains 'id:' + done +done diff --git a/dumpling/tests/e2e/run.sh b/dumpling/tests/e2e/run.sh index f5da32acc33e0..73b580ca594d9 100644 --- a/dumpling/tests/e2e/run.sh +++ b/dumpling/tests/e2e/run.sh @@ -37,4 +37,24 @@ run_lightning $cur/conf/lightning.toml # check mysql and tidb data check_sync_diff $cur/conf/diff_config.toml +# test e2e with compress option again + +# drop database on tidb +export DUMPLING_TEST_PORT=4000 +run_sql "drop database if exists $DB_NAME;" + +export DUMPLING_TEST_PORT=3306 + +# dumping +export DUMPLING_TEST_DATABASE=$DB_NAME +rm -rf $DUMPLING_OUTPUT_DIR +run_dumpling --compress "snappy" + +cat "$cur/conf/lightning.toml" +# use lightning import data to tidb +run_lightning $cur/conf/lightning.toml + +# check mysql and tidb data +check_sync_diff $cur/conf/diff_config.toml + diff --git a/dumpling/tests/e2e_csv/run.sh b/dumpling/tests/e2e_csv/run.sh index d80e321d9294a..9c5afaca469d7 100644 --- a/dumpling/tests/e2e_csv/run.sh +++ b/dumpling/tests/e2e_csv/run.sh @@ -24,6 +24,7 @@ run_sql_file "$DUMPLING_TEST_DIR/data/e2e_csv.t.sql" run() { echo "*** running subtest case ***" + echo "compress is $compress" echo "escape_backslash is $escape_backslash" echo "csv_delimiter is $csv_delimiter" echo "csv_separator is $csv_separator" @@ -36,7 +37,11 @@ run() { # dumping export DUMPLING_TEST_PORT=3306 export DUMPLING_TEST_DATABASE=$DB_NAME - run_dumpling --filetype="csv" --escape-backslash=$escape_backslash --csv-delimiter="$csv_delimiter" --csv-separator="$csv_separator" + rm -rf "$DUMPLING_OUTPUT_DIR" + if [ $compress = "space" ]; then + compress="" + fi + run_dumpling --filetype="csv" --escape-backslash=$escape_backslash --csv-delimiter="$csv_delimiter" --csv-separator="$csv_separator" --compress="$compress" # construct lightning configuration mkdir -p $DUMPLING_TEST_DIR/conf @@ -67,18 +72,22 @@ run() { escape_backslash_arr="true false" csv_delimiter_arr="\" '" csv_separator_arr=', a aa |*|' +compress_arr='space gzip snappy zstd' -for escape_backslash in $escape_backslash_arr +for compress in $compress_arr do - for csv_separator in $csv_separator_arr + for escape_backslash in $escape_backslash_arr do - for csv_delimiter in $csv_delimiter_arr + for csv_separator in $csv_separator_arr do - run + for csv_delimiter in $csv_delimiter_arr + do + run + done + if [ "$escape_backslash" = "true" ]; then + csv_delimiter="" + run + fi done - if [ "$escape_backslash" = "true" ]; then - csv_delimiter="" - run - fi done done