Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: support compress for lightning, add compress unit and integration tests #39153

Merged
merged 21 commits into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
e0ed623
support compress for lightning, add compress reader tests
lichunzhu Nov 15, 2022
f4c9873
Merge branch 'master' into lightningCompress
lichunzhu Nov 15, 2022
1fa5b1a
fix tests
lichunzhu Nov 15, 2022
7a07c2d
Merge branch 'lightningCompress' of https://github.com/lichunzhu/tidb…
lichunzhu Nov 15, 2022
39e30db
Merge branch 'master' into lightningCompress
lichunzhu Nov 15, 2022
2238172
fix lightning/dumpling integration tests
lichunzhu Nov 16, 2022
dab5dc5
Merge branch 'lightningCompress' of https://github.com/lichunzhu/tidb…
lichunzhu Nov 16, 2022
90ab390
Merge branch 'master' into lightningCompress
lichunzhu Nov 16, 2022
a23bf67
Merge branch 'master' into lightningCompress
lichunzhu Nov 17, 2022
e55dd2a
fix ut
lichunzhu Nov 17, 2022
536eac5
Merge branch 'master' into lightningCompress
lichunzhu Nov 18, 2022
4c7eaf9
address comment
lichunzhu Nov 18, 2022
46ddf53
address comments
lichunzhu Nov 23, 2022
a3ce953
address comment
lichunzhu Nov 28, 2022
9a221d3
Merge branch 'master' into lightningCompress
lichunzhu Nov 28, 2022
f64281f
address comment
lichunzhu Nov 28, 2022
369ab8a
Merge branch 'lightningCompress' of https://github.com/lichunzhu/tidb…
lichunzhu Nov 28, 2022
a91ad73
Merge branch 'master' into lightningCompress
lichunzhu Nov 28, 2022
c8a4817
Merge branch 'master' into lightningCompress
ti-chi-bot Nov 28, 2022
aa1b841
Merge branch 'master' into lightningCompress
ti-chi-bot Nov 28, 2022
9931544
Merge branch 'master' into lightningCompress
ti-chi-bot Nov 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions br/pkg/lightning/mydump/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ func decodeCharacterSet(data []byte, characterSet string) ([]byte, error) {

// ExportStatement exports the SQL statement in the schema file.
func ExportStatement(ctx context.Context, store storage.ExternalStorage, sqlFile FileInfo, characterSet string) ([]byte, error) {
if sqlFile.FileMeta.Compression != CompressionNone {
compressType, err := ToStorageCompressType(sqlFile.FileMeta.Compression)
if err != nil {
return nil, errors.Trace(err)
}
store = storage.WithCompression(store, compressType)
}
fd, err := store.Open(ctx, sqlFile.FileMeta.Path)
if err != nil {
return nil, errors.Trace(err)
Expand Down
26 changes: 26 additions & 0 deletions br/pkg/lightning/mydump/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package mydump_test

import (
"compress/gzip"
"context"
"errors"
"os"
Expand Down Expand Up @@ -173,3 +174,28 @@ func TestExportStatementHandleNonEOFError(t *testing.T) {
_, err := ExportStatement(ctx, mockStorage, f, "auto")
require.Contains(t, err.Error(), "read error")
}

func TestExportStatementCompressed(t *testing.T) {
dir := t.TempDir()
file, err := os.Create(filepath.Join(dir, "tidb_lightning_test_reader"))
require.NoError(t, err)
defer os.Remove(file.Name())

store, err := storage.NewLocalStorage(dir)
require.NoError(t, err)

gzipFile := gzip.NewWriter(file)
_, err = gzipFile.Write([]byte("CREATE DATABASE whatever;"))
require.NoError(t, err)
err = gzipFile.Close()
require.NoError(t, err)
stat, err := file.Stat()
require.NoError(t, err)
err = file.Close()
require.NoError(t, err)

f := FileInfo{FileMeta: SourceFileMeta{Path: stat.Name(), FileSize: stat.Size(), Compression: CompressionGZ}}
data, err := ExportStatement(context.TODO(), store, f, "auto")
require.NoError(t, err)
require.Equal(t, []byte("CREATE DATABASE whatever;"), data)
}
18 changes: 15 additions & 3 deletions br/pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ const (
tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024
// the increment ratio of large CSV file size threshold by `region-split-size`
largeCSVLowerThresholdRation = 10
// TableFileSizeINF for compressed size, for lightning 10TB is a relatively big value and will strongly affect efficiency
// It's used to make sure compressed files can be read until EOF. Because we can't get the exact decompressed size of the compressed files.
TableFileSizeINF = 10 * 1024 * tableRegionSizeWarningThreshold
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does INF mean? infinite?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, INF means infinite. It's used to make sure compressed files can be read until the EOF error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set fileSize to INF to make sure compressed files can be read until EOF. Because we can't get the exact size of the compressed files.

I would expect to put its usage in the comment (as well as why this works, maybe)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed in 46ddf53

)

// TableRegion contains information for a table region during import.
Expand Down Expand Up @@ -292,19 +295,28 @@ func MakeSourceFileRegion(
return regions, subFileSizes, err
}

fileSize := fi.FileMeta.FileSize
rowIDMax := fileSize / divisor
// for compressed files, suggest the compress ratio is 1% to calculate the rowIDMax.
// set fileSize to INF to make sure compressed files can be read until EOF. Because we can't get the exact size of the compressed files.
// TODO: update progress bar calculation for compressed files.
if fi.FileMeta.Compression != CompressionNone {
rowIDMax = fileSize * 100 / divisor // FIXME: this is not accurate. Need more tests and fix solution.
fileSize = TableFileSizeINF
}
tableRegion := &TableRegion{
DB: meta.DB,
Table: meta.Name,
FileMeta: fi.FileMeta,
Chunk: Chunk{
Offset: 0,
EndOffset: fi.FileMeta.FileSize,
EndOffset: fileSize,
PrevRowIDMax: 0,
RowIDMax: fi.FileMeta.FileSize / divisor,
RowIDMax: rowIDMax,
},
}

if tableRegion.Size() > tableRegionSizeWarningThreshold {
if fi.FileMeta.Compression == CompressionNone && tableRegion.Size() > tableRegionSizeWarningThreshold {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a compressed file's size exceeds the threshold, the actual size is even bigger. So the threshold check is suitable for both compressed and uncompressed file ? Otherwise, the compressed file won't receive this warning message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed in a3ce953

log.FromContext(ctx).Warn(
"file is too big to be processed efficiently; we suggest splitting it at 256 MB each",
zap.String("file", fi.FileMeta.Path),
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func TestMakeSourceFileRegion(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, regions, 1)
assert.Equal(t, int64(0), regions[0].Chunk.Offset)
assert.Equal(t, fileInfo.FileMeta.FileSize, regions[0].Chunk.EndOffset)
assert.Equal(t, TableFileSizeINF, regions[0].Chunk.EndOffset)
assert.Len(t, regions[0].Chunk.Columns, 0)
}

Expand Down
5 changes: 4 additions & 1 deletion br/pkg/lightning/mydump/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func parseCompressionType(t string) (Compression, error) {
return CompressionGZ, nil
case "lz4":
return CompressionLZ4, nil
case "zstd":
case "zstd", "zst":
return CompressionZStd, nil
case "xz":
return CompressionXZ, nil
Expand Down Expand Up @@ -324,6 +324,9 @@ func (p regexRouterParser) Parse(r *config.FileRouteRule, logger log.Logger) (*R
if err != nil {
return err
}
if result.Type == SourceTypeParquet && compression != CompressionNone {
return errors.Errorf("can't support whole compressed parquet file, should compress parquet files by choosing correct parquet compress writer, path: %s", r.Path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this workaround mean here? I remember we support compressed parquet files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We support compressed binary data in parquet files, but we don't support compressed parquet files like parquet.gz.

}
result.Compression = compression
return nil
})
Expand Down
18 changes: 18 additions & 0 deletions br/pkg/lightning/mydump/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,21 @@ func TestRouteWithPath(t *testing.T) {
require.NoError(t, err)
require.Nil(t, res)
}

func TestRouteWithCompressedParquet(t *testing.T) {
fileName := "myschema.my_table.000.parquet.gz"
rule := &config.FileRouteRule{
Pattern: `(?i)^(?:[^/]*/)*([^/.]+)\.(.*?)(?:\.([0-9]+))?\.(sql|csv|parquet)(?:\.(\w+))?$`,
Schema: "$1",
Table: "$2",
Type: "$4",
Key: "$3",
Compression: "$5",
Unescape: true,
}
r := *rule
router, err := NewFileRouter([]*config.FileRouteRule{&r}, log.L())
require.NoError(t, err)
_, err = router.Route(fileName)
require.Error(t, err)
}
18 changes: 2 additions & 16 deletions br/pkg/lightning/restore/get_pre_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,15 +444,7 @@ func (p *PreRestoreInfoGetterImpl) ReadFirstNRowsByTableName(ctx context.Context
// ReadFirstNRowsByFileMeta reads the first N rows of an data file.
// It implements the PreRestoreInfoGetter interface.
func (p *PreRestoreInfoGetterImpl) ReadFirstNRowsByFileMeta(ctx context.Context, dataFileMeta mydump.SourceFileMeta, n int) ([]string, [][]types.Datum, error) {
var (
reader storage.ReadSeekCloser
err error
)
if dataFileMeta.Type == mydump.SourceTypeParquet {
reader, err = mydump.OpenParquetReader(ctx, p.srcStorage, dataFileMeta.Path, dataFileMeta.FileSize)
} else {
reader, err = p.srcStorage.Open(ctx, dataFileMeta.Path)
}
reader, err := openReader(ctx, dataFileMeta, p.srcStorage)
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand Down Expand Up @@ -590,13 +582,7 @@ func (p *PreRestoreInfoGetterImpl) sampleDataFromTable(
return resultIndexRatio, isRowOrdered, nil
}
sampleFile := tableMeta.DataFiles[0].FileMeta
var reader storage.ReadSeekCloser
var err error
if sampleFile.Type == mydump.SourceTypeParquet {
reader, err = mydump.OpenParquetReader(ctx, p.srcStorage, sampleFile.Path, sampleFile.FileSize)
} else {
reader, err = p.srcStorage.Open(ctx, sampleFile.Path)
}
reader, err := openReader(ctx, sampleFile, p.srcStorage)
if err != nil {
return 0.0, false, errors.Trace(err)
}
Expand Down
Loading