Skip to content

Commit

Permalink
*: support sample for compressed files for adjustment (#39680)
Browse files Browse the repository at this point in the history
ref #38514
  • Loading branch information
lichunzhu authored Dec 19, 2022
1 parent 3edde02 commit 0c18082
Show file tree
Hide file tree
Showing 13 changed files with 256 additions and 32 deletions.
Binary file not shown.
99 changes: 96 additions & 3 deletions br/pkg/lightning/mydump/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package mydump

import (
"context"
"io"
"path/filepath"
"sort"
"strings"
Expand All @@ -30,6 +31,9 @@ import (
"go.uber.org/zap"
)

// sampleCompressedFileSize represents how many bytes need to be sampled for compressed files
const sampleCompressedFileSize = 4 * 1024

// MDDatabaseMeta contains some parsed metadata for a database in the source by MyDumper Loader.
type MDDatabaseMeta struct {
Name string
Expand Down Expand Up @@ -82,7 +86,9 @@ type SourceFileMeta struct {
Compression Compression
SortKey string
FileSize int64
ExtendData ExtendColumnData
// WARNING: variables below are not persistent
ExtendData ExtendColumnData
RealSize int64
}

// NewMDTableMeta creates an Mydumper table meta with specified character set.
Expand Down Expand Up @@ -386,7 +392,7 @@ func (s *mdLoaderSetup) setup(ctx context.Context) error {
// set a dummy `FileInfo` here without file meta because we needn't restore the table schema
tableMeta, _, _ := s.insertTable(FileInfo{TableName: fileInfo.TableName})
tableMeta.DataFiles = append(tableMeta.DataFiles, fileInfo)
tableMeta.TotalSize += fileInfo.FileMeta.FileSize
tableMeta.TotalSize += fileInfo.FileMeta.RealSize
}

for _, dbMeta := range s.loader.dbs {
Expand Down Expand Up @@ -453,7 +459,7 @@ func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size

info := FileInfo{
TableName: filter.Table{Schema: res.Schema, Name: res.Name},
FileMeta: SourceFileMeta{Path: path, Type: res.Type, Compression: res.Compression, SortKey: res.Key, FileSize: size},
FileMeta: SourceFileMeta{Path: path, Type: res.Type, Compression: res.Compression, SortKey: res.Key, FileSize: size, RealSize: size},
}

if s.loader.shouldSkip(&info.TableName) {
Expand All @@ -470,6 +476,15 @@ func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size
case SourceTypeViewSchema:
s.viewSchemas = append(s.viewSchemas, info)
case SourceTypeSQL, SourceTypeCSV, SourceTypeParquet:
if info.FileMeta.Compression != CompressionNone {
compressRatio, err2 := SampleFileCompressRatio(ctx, info.FileMeta, s.loader.GetStore())
if err2 != nil {
logger.Error("[loader] fail to calculate data file compress ratio",
zap.String("schema", res.Schema), zap.String("table", res.Name), zap.Stringer("type", res.Type))
} else {
info.FileMeta.RealSize = int64(compressRatio * float64(info.FileMeta.FileSize))
}
}
s.tableDatas = append(s.tableDatas, info)
}

Expand Down Expand Up @@ -648,3 +663,81 @@ func (l *MDLoader) GetDatabases() []*MDDatabaseMeta {
func (l *MDLoader) GetStore() storage.ExternalStorage {
return l.store
}

func calculateFileBytes(ctx context.Context,
dataFile string,
compressType storage.CompressType,
store storage.ExternalStorage,
offset int64) (tot int, pos int64, err error) {
bytes := make([]byte, sampleCompressedFileSize)
reader, err := store.Open(ctx, dataFile)
if err != nil {
return 0, 0, errors.Trace(err)
}
defer reader.Close()

compressReader, err := storage.NewLimitedInterceptReader(reader, compressType, offset)
if err != nil {
return 0, 0, errors.Trace(err)
}

readBytes := func() error {
n, err2 := compressReader.Read(bytes)
if err2 != nil && errors.Cause(err2) != io.EOF && errors.Cause(err) != io.ErrUnexpectedEOF {
return err2
}
tot += n
return err2
}

if offset == 0 {
err = readBytes()
if err != nil && errors.Cause(err) != io.EOF && errors.Cause(err) != io.ErrUnexpectedEOF {
return 0, 0, err
}
pos, err = compressReader.Seek(0, io.SeekCurrent)
if err != nil {
return 0, 0, errors.Trace(err)
}
return tot, pos, nil
}

for {
err = readBytes()
if err != nil {
break
}
}
if err != nil && errors.Cause(err) != io.EOF && errors.Cause(err) != io.ErrUnexpectedEOF {
return 0, 0, errors.Trace(err)
}
return tot, offset, nil
}

// SampleFileCompressRatio samples the compress ratio of the compressed file.
func SampleFileCompressRatio(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (float64, error) {
if fileMeta.Compression == CompressionNone {
return 1, nil
}
compressType, err := ToStorageCompressType(fileMeta.Compression)
if err != nil {
return 0, err
}
// We use the following method to sample the compress ratio of the first few bytes of the file.
// 1. read first time aiming to find a valid compressed file offset. If we continue read now, the compress reader will
// request more data from file reader buffer them in its memory. We can't compute an accurate compress ratio.
// 2. we use a second reading and limit the file reader only read n bytes(n is the valid position we find in the first reading).
// Then we read all the data out from the compress reader. The data length m we read out is the uncompressed data length.
// Use m/n to compute the compress ratio.
// read first time, aims to find a valid end pos in compressed file
_, pos, err := calculateFileBytes(ctx, fileMeta.Path, compressType, store, 0)
if err != nil {
return 0, err
}
// read second time, original reader ends at first time's valid pos, compute sample data compress ratio
tot, pos, err := calculateFileBytes(ctx, fileMeta.Path, compressType, store, pos)
if err != nil {
return 0, err
}
return float64(tot) / float64(pos), nil
}
33 changes: 33 additions & 0 deletions br/pkg/lightning/mydump/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package mydump_test

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"os"
Expand Down Expand Up @@ -1053,3 +1055,34 @@ func TestExternalDataRoutes(t *testing.T) {
require.Equal(t, expectedExtendVals[i], fileInfo.FileMeta.ExtendData.Values)
}
}

func TestSampleFileCompressRatio(t *testing.T) {
s := newTestMydumpLoaderSuite(t)
store, err := storage.NewLocalStorage(s.sourceDir)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

byteArray := make([]byte, 0, 4096)
bf := bytes.NewBuffer(byteArray)
compressWriter := gzip.NewWriter(bf)
csvData := []byte("aaaa\n")
for i := 0; i < 1000; i++ {
_, err = compressWriter.Write(csvData)
require.NoError(t, err)
}
err = compressWriter.Flush()
require.NoError(t, err)

fileName := "test_1.t1.csv.gz"
err = store.WriteFile(ctx, fileName, bf.Bytes())
require.NoError(t, err)

ratio, err := md.SampleFileCompressRatio(ctx, md.SourceFileMeta{
Path: fileName,
Compression: md.CompressionGZ,
}, store)
require.NoError(t, err)
require.InDelta(t, ratio, 5000.0/float64(bf.Len()), 1e-5)
}
31 changes: 14 additions & 17 deletions br/pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,14 @@ import (
)

const (
tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024
compressedTableRegionSizeWarningThreshold int64 = 410 * 1024 * 1024 // 0.4 * tableRegionSizeWarningThreshold
tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024
// the increment ratio of large CSV file size threshold by `region-split-size`
largeCSVLowerThresholdRation = 10
// TableFileSizeINF for compressed size, for lightning 10TB is a relatively big value and will strongly affect efficiency
// It's used to make sure compressed files can be read until EOF. Because we can't get the exact decompressed size of the compressed files.
TableFileSizeINF = 10 * 1024 * tableRegionSizeWarningThreshold
// compressDataRatio is a relatively maximum compress ratio for normal compressed data
// It's used to estimate rowIDMax, we use a large value to try to avoid overlapping
compressDataRatio = 500
// CompressSizeFactor is used to adjust compressed data size
CompressSizeFactor = 5
)

// TableRegion contains information for a table region during import.
Expand Down Expand Up @@ -303,11 +301,11 @@ func MakeSourceFileRegion(
rowIDMax := fileSize / divisor
// for compressed files, suggest the compress ratio is 1% to calculate the rowIDMax.
// set fileSize to INF to make sure compressed files can be read until EOF. Because we can't get the exact size of the compressed files.
// TODO: update progress bar calculation for compressed files.
if fi.FileMeta.Compression != CompressionNone {
// FIXME: this is not accurate. Need sample ratio in the future and use sampled ratio to compute rowIDMax
// currently we use 500 here. It's a relatively large value for most data.
rowIDMax = fileSize * compressDataRatio / divisor
// RealSize the estimated file size. There are some cases that the first few bytes of this compressed file
// has smaller compress ratio than the whole compressed file. So we still need to multiply this factor to
// make sure the rowIDMax computation is correct.
rowIDMax = fi.FileMeta.RealSize * CompressSizeFactor / divisor
fileSize = TableFileSizeINF
}
tableRegion := &TableRegion{
Expand All @@ -317,24 +315,23 @@ func MakeSourceFileRegion(
Chunk: Chunk{
Offset: 0,
EndOffset: fileSize,
RealOffset: 0,
PrevRowIDMax: 0,
RowIDMax: rowIDMax,
},
}

regionTooBig := false
if fi.FileMeta.Compression == CompressionNone {
regionTooBig = tableRegion.Size() > tableRegionSizeWarningThreshold
} else {
regionTooBig = fi.FileMeta.FileSize > compressedTableRegionSizeWarningThreshold
regionSize := tableRegion.Size()
if fi.FileMeta.Compression != CompressionNone {
regionSize = fi.FileMeta.RealSize
}
if regionTooBig {
if regionSize > tableRegionSizeWarningThreshold {
log.FromContext(ctx).Warn(
"file is too big to be processed efficiently; we suggest splitting it at 256 MB each",
zap.String("file", fi.FileMeta.Path),
zap.Int64("size", dataFileSize))
zap.Int64("size", regionSize))
}
return []*TableRegion{tableRegion}, []float64{float64(fi.FileMeta.FileSize)}, nil
return []*TableRegion{tableRegion}, []float64{float64(fi.FileMeta.RealSize)}, nil
}

// because parquet files can't seek efficiently, there is no benefit in split.
Expand Down
56 changes: 55 additions & 1 deletion br/pkg/lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ func TestMakeSourceFileRegion(t *testing.T) {
store, err := storage.NewLocalStorage(".")
assert.NoError(t, err)

// test - no compression
fileInfo.FileMeta.Compression = CompressionNone
regions, _, err := MakeSourceFileRegion(ctx, meta, fileInfo, colCnt, cfg, ioWorkers, store)
assert.NoError(t, err)
Expand All @@ -221,6 +220,61 @@ func TestMakeSourceFileRegion(t *testing.T) {
assert.Len(t, regions[0].Chunk.Columns, 0)
}

func TestCompressedMakeSourceFileRegion(t *testing.T) {
meta := &MDTableMeta{
DB: "csv",
Name: "large_csv_file",
}
cfg := &config.Config{
Mydumper: config.MydumperRuntime{
ReadBlockSize: config.ReadBlockSize,
MaxRegionSize: 1,
CSV: config.CSVConfig{
Separator: ",",
Delimiter: "",
Header: true,
TrimLastSep: false,
NotNull: false,
Null: "NULL",
BackslashEscape: true,
},
StrictFormat: true,
Filter: []string{"*.*"},
},
}
filePath := "./csv/split_large_file.csv.zst"
dataFileInfo, err := os.Stat(filePath)
require.NoError(t, err)
fileSize := dataFileInfo.Size()

fileInfo := FileInfo{FileMeta: SourceFileMeta{
Path: filePath,
Type: SourceTypeCSV,
Compression: CompressionZStd,
FileSize: fileSize,
}}
colCnt := 3

ctx := context.Background()
ioWorkers := worker.NewPool(ctx, 4, "io")
store, err := storage.NewLocalStorage(".")
assert.NoError(t, err)
compressRatio, err := SampleFileCompressRatio(ctx, fileInfo.FileMeta, store)
require.NoError(t, err)
fileInfo.FileMeta.RealSize = int64(compressRatio * float64(fileInfo.FileMeta.FileSize))

regions, sizes, err := MakeSourceFileRegion(ctx, meta, fileInfo, colCnt, cfg, ioWorkers, store)
assert.NoError(t, err)
assert.Len(t, regions, 1)
assert.Equal(t, int64(0), regions[0].Chunk.Offset)
assert.Equal(t, int64(0), regions[0].Chunk.RealOffset)
assert.Equal(t, TableFileSizeINF, regions[0].Chunk.EndOffset)
rowIDMax := fileInfo.FileMeta.RealSize * CompressSizeFactor / int64(colCnt)
assert.Equal(t, rowIDMax, regions[0].Chunk.RowIDMax)
assert.Len(t, regions[0].Chunk.Columns, 0)
assert.Equal(t, fileInfo.FileMeta.RealSize, int64(sizes[0]))
}

func TestSplitLargeFile(t *testing.T) {
meta := &MDTableMeta{
DB: "csv",
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/restore/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func NewMockImportSource(dbSrcDataMap map[string]*MockDBSourceData) (*MockImport
FileMeta: mydump.SourceFileMeta{
Path: tblDataFile.FileName,
FileSize: int64(fileSize),
RealSize: int64(fileSize),
},
}
fileName := tblDataFile.FileName
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/restore/precheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (b *PrecheckItemBuilder) BuildPrecheckItem(checkID CheckItemID) (PrecheckIt
case CheckLocalDiskPlacement:
return NewLocalDiskPlacementCheckItem(b.cfg), nil
case CheckLocalTempKVDir:
return NewLocalTempKVDirCheckItem(b.cfg, b.preInfoGetter), nil
return NewLocalTempKVDirCheckItem(b.cfg, b.preInfoGetter, b.dbMetas), nil
case CheckTargetUsingCDCPITR:
return NewCDCPITRCheckItem(b.cfg), nil
default:
Expand Down
26 changes: 23 additions & 3 deletions br/pkg/lightning/restore/precheck_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ func (ci *largeFileCheckItem) Check(ctx context.Context) (*CheckResult, error) {
for _, db := range ci.dbMetas {
for _, t := range db.Tables {
for _, f := range t.DataFiles {
if f.FileMeta.FileSize > defaultCSVSize {
if f.FileMeta.RealSize > defaultCSVSize {
theResult.Message = fmt.Sprintf("large csv: %s file exists and it will slow down import performance", f.FileMeta.Path)
theResult.Passed = false
}
Expand Down Expand Up @@ -484,23 +484,43 @@ func (ci *localDiskPlacementCheckItem) Check(ctx context.Context) (*CheckResult,
type localTempKVDirCheckItem struct {
cfg *config.Config
preInfoGetter PreRestoreInfoGetter
dbMetas []*mydump.MDDatabaseMeta
}

func NewLocalTempKVDirCheckItem(cfg *config.Config, preInfoGetter PreRestoreInfoGetter) PrecheckItem {
func NewLocalTempKVDirCheckItem(cfg *config.Config, preInfoGetter PreRestoreInfoGetter, dbMetas []*mydump.MDDatabaseMeta) PrecheckItem {
return &localTempKVDirCheckItem{
cfg: cfg,
preInfoGetter: preInfoGetter,
dbMetas: dbMetas,
}
}

func (ci *localTempKVDirCheckItem) GetCheckItemID() CheckItemID {
return CheckLocalTempKVDir
}

func (ci *localTempKVDirCheckItem) hasCompressedFiles() bool {
for _, dbMeta := range ci.dbMetas {
for _, tbMeta := range dbMeta.Tables {
for _, file := range tbMeta.DataFiles {
if file.FileMeta.Compression != mydump.CompressionNone {
return true
}
}
}
}
return false
}

func (ci *localTempKVDirCheckItem) Check(ctx context.Context) (*CheckResult, error) {
severity := Critical
// for cases that have compressed files, the estimated size may not be accurate, set severity to Warn to avoid failure
if ci.hasCompressedFiles() {
severity = Warn
}
theResult := &CheckResult{
Item: ci.GetCheckItemID(),
Severity: Critical,
Severity: severity,
}
storageSize, err := common.GetStorageSize(ci.cfg.TikvImporter.SortedKVDir)
if err != nil {
Expand Down
Loading

0 comments on commit 0c18082

Please sign in to comment.