Skip to content

Commit

Permalink
lightning: support compress for lightning, add compress unit and inte…
Browse files Browse the repository at this point in the history
…gration tests (#39153)

ref #38514
  • Loading branch information
lichunzhu authored Nov 28, 2022
1 parent 8973bcc commit 5b9d96b
Show file tree
Hide file tree
Showing 41 changed files with 440 additions and 53 deletions.
7 changes: 7 additions & 0 deletions br/pkg/lightning/mydump/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ func decodeCharacterSet(data []byte, characterSet string) ([]byte, error) {

// ExportStatement exports the SQL statement in the schema file.
func ExportStatement(ctx context.Context, store storage.ExternalStorage, sqlFile FileInfo, characterSet string) ([]byte, error) {
if sqlFile.FileMeta.Compression != CompressionNone {
compressType, err := ToStorageCompressType(sqlFile.FileMeta.Compression)
if err != nil {
return nil, errors.Trace(err)
}
store = storage.WithCompression(store, compressType)
}
fd, err := store.Open(ctx, sqlFile.FileMeta.Path)
if err != nil {
return nil, errors.Trace(err)
Expand Down
26 changes: 26 additions & 0 deletions br/pkg/lightning/mydump/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package mydump_test

import (
"compress/gzip"
"context"
"errors"
"os"
Expand Down Expand Up @@ -173,3 +174,28 @@ func TestExportStatementHandleNonEOFError(t *testing.T) {
_, err := ExportStatement(ctx, mockStorage, f, "auto")
require.Contains(t, err.Error(), "read error")
}

func TestExportStatementCompressed(t *testing.T) {
dir := t.TempDir()
file, err := os.Create(filepath.Join(dir, "tidb_lightning_test_reader"))
require.NoError(t, err)
defer os.Remove(file.Name())

store, err := storage.NewLocalStorage(dir)
require.NoError(t, err)

gzipFile := gzip.NewWriter(file)
_, err = gzipFile.Write([]byte("CREATE DATABASE whatever;"))
require.NoError(t, err)
err = gzipFile.Close()
require.NoError(t, err)
stat, err := file.Stat()
require.NoError(t, err)
err = file.Close()
require.NoError(t, err)

f := FileInfo{FileMeta: SourceFileMeta{Path: stat.Name(), FileSize: stat.Size(), Compression: CompressionGZ}}
data, err := ExportStatement(context.TODO(), store, f, "auto")
require.NoError(t, err)
require.Equal(t, []byte("CREATE DATABASE whatever;"), data)
}
27 changes: 23 additions & 4 deletions br/pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,13 @@ import (
)

const (
tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024
tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024
compressedTableRegionSizeWarningThreshold int64 = 410 * 1024 * 1024 // 0.4 * tableRegionSizeWarningThreshold
// the increment ratio of large CSV file size threshold by `region-split-size`
largeCSVLowerThresholdRation = 10
// TableFileSizeINF for compressed size, for lightning 10TB is a relatively big value and will strongly affect efficiency
// It's used to make sure compressed files can be read until EOF. Because we can't get the exact decompressed size of the compressed files.
TableFileSizeINF = 10 * 1024 * tableRegionSizeWarningThreshold
)

// TableRegion contains information for a table region during import.
Expand Down Expand Up @@ -292,19 +296,34 @@ func MakeSourceFileRegion(
return regions, subFileSizes, err
}

fileSize := fi.FileMeta.FileSize
rowIDMax := fileSize / divisor
// for compressed files, suggest the compress ratio is 1% to calculate the rowIDMax.
// set fileSize to INF to make sure compressed files can be read until EOF. Because we can't get the exact size of the compressed files.
// TODO: update progress bar calculation for compressed files.
if fi.FileMeta.Compression != CompressionNone {
rowIDMax = fileSize * 100 / divisor // FIXME: this is not accurate. Need more tests and fix solution.
fileSize = TableFileSizeINF
}
tableRegion := &TableRegion{
DB: meta.DB,
Table: meta.Name,
FileMeta: fi.FileMeta,
Chunk: Chunk{
Offset: 0,
EndOffset: fi.FileMeta.FileSize,
EndOffset: fileSize,
PrevRowIDMax: 0,
RowIDMax: fi.FileMeta.FileSize / divisor,
RowIDMax: rowIDMax,
},
}

if tableRegion.Size() > tableRegionSizeWarningThreshold {
regionTooBig := false
if fi.FileMeta.Compression == CompressionNone {
regionTooBig = tableRegion.Size() > tableRegionSizeWarningThreshold
} else {
regionTooBig = fi.FileMeta.FileSize > compressedTableRegionSizeWarningThreshold
}
if regionTooBig {
log.FromContext(ctx).Warn(
"file is too big to be processed efficiently; we suggest splitting it at 256 MB each",
zap.String("file", fi.FileMeta.Path),
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func TestMakeSourceFileRegion(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, regions, 1)
assert.Equal(t, int64(0), regions[0].Chunk.Offset)
assert.Equal(t, fileInfo.FileMeta.FileSize, regions[0].Chunk.EndOffset)
assert.Equal(t, TableFileSizeINF, regions[0].Chunk.EndOffset)
assert.Len(t, regions[0].Chunk.Columns, 0)
}

Expand Down
5 changes: 4 additions & 1 deletion br/pkg/lightning/mydump/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func parseCompressionType(t string) (Compression, error) {
return CompressionGZ, nil
case "lz4":
return CompressionLZ4, nil
case "zstd":
case "zstd", "zst":
return CompressionZStd, nil
case "xz":
return CompressionXZ, nil
Expand Down Expand Up @@ -324,6 +324,9 @@ func (p regexRouterParser) Parse(r *config.FileRouteRule, logger log.Logger) (*R
if err != nil {
return err
}
if result.Type == SourceTypeParquet && compression != CompressionNone {
return errors.Errorf("can't support whole compressed parquet file, should compress parquet files by choosing correct parquet compress writer, path: %s", r.Path)
}
result.Compression = compression
return nil
})
Expand Down
18 changes: 18 additions & 0 deletions br/pkg/lightning/mydump/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,21 @@ func TestRouteWithPath(t *testing.T) {
require.NoError(t, err)
require.Nil(t, res)
}

func TestRouteWithCompressedParquet(t *testing.T) {
fileName := "myschema.my_table.000.parquet.gz"
rule := &config.FileRouteRule{
Pattern: `(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)(?:\.([0-9]+))?\.(sql|csv|parquet)(?:\.(\w+))?$`,
Schema: "$1",
Table: "$2",
Type: "$4",
Key: "$3",
Compression: "$5",
Unescape: true,
}
r := *rule
router, err := NewFileRouter([]*config.FileRouteRule{&r}, log.L())
require.NoError(t, err)
_, err = router.Route(fileName)
require.Error(t, err)
}
18 changes: 2 additions & 16 deletions br/pkg/lightning/restore/get_pre_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,15 +444,7 @@ func (p *PreRestoreInfoGetterImpl) ReadFirstNRowsByTableName(ctx context.Context
// ReadFirstNRowsByFileMeta reads the first N rows of an data file.
// It implements the PreRestoreInfoGetter interface.
func (p *PreRestoreInfoGetterImpl) ReadFirstNRowsByFileMeta(ctx context.Context, dataFileMeta mydump.SourceFileMeta, n int) ([]string, [][]types.Datum, error) {
var (
reader storage.ReadSeekCloser
err error
)
if dataFileMeta.Type == mydump.SourceTypeParquet {
reader, err = mydump.OpenParquetReader(ctx, p.srcStorage, dataFileMeta.Path, dataFileMeta.FileSize)
} else {
reader, err = p.srcStorage.Open(ctx, dataFileMeta.Path)
}
reader, err := openReader(ctx, dataFileMeta, p.srcStorage)
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand Down Expand Up @@ -590,13 +582,7 @@ func (p *PreRestoreInfoGetterImpl) sampleDataFromTable(
return resultIndexRatio, isRowOrdered, nil
}
sampleFile := tableMeta.DataFiles[0].FileMeta
var reader storage.ReadSeekCloser
var err error
if sampleFile.Type == mydump.SourceTypeParquet {
reader, err = mydump.OpenParquetReader(ctx, p.srcStorage, sampleFile.Path, sampleFile.FileSize)
} else {
reader, err = p.srcStorage.Open(ctx, sampleFile.Path)
}
reader, err := openReader(ctx, sampleFile, p.srcStorage)
if err != nil {
return 0.0, false, errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 5b9d96b

Please sign in to comment.