diff --git a/br/pkg/lightning/mydump/csv/split_large_file.csv.zst b/br/pkg/lightning/mydump/csv/split_large_file.csv.zst new file mode 100644 index 0000000000000..9609230bf04a5 Binary files /dev/null and b/br/pkg/lightning/mydump/csv/split_large_file.csv.zst differ diff --git a/br/pkg/lightning/mydump/loader.go b/br/pkg/lightning/mydump/loader.go index 98661d3a46f8c..d55bce6f94fc5 100644 --- a/br/pkg/lightning/mydump/loader.go +++ b/br/pkg/lightning/mydump/loader.go @@ -16,6 +16,7 @@ package mydump import ( "context" + "io" "path/filepath" "sort" "strings" @@ -30,6 +31,9 @@ import ( "go.uber.org/zap" ) +// sampleCompressedFileSize represents how many bytes need to be sampled for compressed files +const sampleCompressedFileSize = 4 * 1024 + // MDDatabaseMeta contains some parsed metadata for a database in the source by MyDumper Loader. type MDDatabaseMeta struct { Name string @@ -82,7 +86,9 @@ type SourceFileMeta struct { Compression Compression SortKey string FileSize int64 - ExtendData ExtendColumnData + // WARNING: variables below are not persistent + ExtendData ExtendColumnData + RealSize int64 } // NewMDTableMeta creates an Mydumper table meta with specified character set. @@ -386,7 +392,7 @@ func (s *mdLoaderSetup) setup(ctx context.Context) error { // set a dummy `FileInfo` here without file meta because we needn't restore the table schema tableMeta, _, _ := s.insertTable(FileInfo{TableName: fileInfo.TableName}) tableMeta.DataFiles = append(tableMeta.DataFiles, fileInfo) - tableMeta.TotalSize += fileInfo.FileMeta.FileSize + tableMeta.TotalSize += fileInfo.FileMeta.RealSize } for _, dbMeta := range s.loader.dbs { @@ -453,7 +459,7 @@ func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size info := FileInfo{ TableName: filter.Table{Schema: res.Schema, Name: res.Name}, - FileMeta: SourceFileMeta{Path: path, Type: res.Type, Compression: res.Compression, SortKey: res.Key, FileSize: size}, + FileMeta: SourceFileMeta{Path: path, Type: res.Type, Compression: res.Compression, SortKey: res.Key, FileSize: size, RealSize: size}, } if s.loader.shouldSkip(&info.TableName) { @@ -470,6 +476,15 @@ func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size case SourceTypeViewSchema: s.viewSchemas = append(s.viewSchemas, info) case SourceTypeSQL, SourceTypeCSV, SourceTypeParquet: + if info.FileMeta.Compression != CompressionNone { + compressRatio, err2 := SampleFileCompressRatio(ctx, info.FileMeta, s.loader.GetStore()) + if err2 != nil { + logger.Error("[loader] fail to calculate data file compress ratio", + zap.String("schema", res.Schema), zap.String("table", res.Name), zap.Stringer("type", res.Type)) + } else { + info.FileMeta.RealSize = int64(compressRatio * float64(info.FileMeta.FileSize)) + } + } s.tableDatas = append(s.tableDatas, info) } @@ -648,3 +663,81 @@ func (l *MDLoader) GetDatabases() []*MDDatabaseMeta { func (l *MDLoader) GetStore() storage.ExternalStorage { return l.store } + +func calculateFileBytes(ctx context.Context, + dataFile string, + compressType storage.CompressType, + store storage.ExternalStorage, + offset int64) (tot int, pos int64, err error) { + bytes := make([]byte, sampleCompressedFileSize) + reader, err := store.Open(ctx, dataFile) + if err != nil { + return 0, 0, errors.Trace(err) + } + defer reader.Close() + + compressReader, err := storage.NewLimitedInterceptReader(reader, compressType, offset) + if err != nil { + return 0, 0, errors.Trace(err) + } + + readBytes := func() error { + n, err2 := compressReader.Read(bytes) + if err2 != nil && errors.Cause(err2) != io.EOF && errors.Cause(err) != io.ErrUnexpectedEOF { + return err2 + } + tot += n + return err2 + } + + if offset == 0 { + err = readBytes() + if err != nil && errors.Cause(err) != io.EOF && errors.Cause(err) != io.ErrUnexpectedEOF { + return 0, 0, err + } + pos, err = compressReader.Seek(0, io.SeekCurrent) + if err != nil { + return 0, 0, errors.Trace(err) + } + return tot, pos, nil + } + + for { + err = readBytes() + if err != nil { + break + } + } + if err != nil && errors.Cause(err) != io.EOF && errors.Cause(err) != io.ErrUnexpectedEOF { + return 0, 0, errors.Trace(err) + } + return tot, offset, nil +} + +// SampleFileCompressRatio samples the compress ratio of the compressed file. +func SampleFileCompressRatio(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (float64, error) { + if fileMeta.Compression == CompressionNone { + return 1, nil + } + compressType, err := ToStorageCompressType(fileMeta.Compression) + if err != nil { + return 0, err + } + // We use the following method to sample the compress ratio of the first few bytes of the file. + // 1. read first time aiming to find a valid compressed file offset. If we continue read now, the compress reader will + // request more data from file reader buffer them in its memory. We can't compute an accurate compress ratio. + // 2. we use a second reading and limit the file reader only read n bytes(n is the valid position we find in the first reading). + // Then we read all the data out from the compress reader. The data length m we read out is the uncompressed data length. + // Use m/n to compute the compress ratio. + // read first time, aims to find a valid end pos in compressed file + _, pos, err := calculateFileBytes(ctx, fileMeta.Path, compressType, store, 0) + if err != nil { + return 0, err + } + // read second time, original reader ends at first time's valid pos, compute sample data compress ratio + tot, pos, err := calculateFileBytes(ctx, fileMeta.Path, compressType, store, pos) + if err != nil { + return 0, err + } + return float64(tot) / float64(pos), nil +} diff --git a/br/pkg/lightning/mydump/loader_test.go b/br/pkg/lightning/mydump/loader_test.go index da910c70bedc0..58236d7b626f5 100644 --- a/br/pkg/lightning/mydump/loader_test.go +++ b/br/pkg/lightning/mydump/loader_test.go @@ -15,6 +15,8 @@ package mydump_test import ( + "bytes" + "compress/gzip" "context" "fmt" "os" @@ -1053,3 +1055,34 @@ func TestExternalDataRoutes(t *testing.T) { require.Equal(t, expectedExtendVals[i], fileInfo.FileMeta.ExtendData.Values) } } + +func TestSampleFileCompressRatio(t *testing.T) { + s := newTestMydumpLoaderSuite(t) + store, err := storage.NewLocalStorage(s.sourceDir) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + byteArray := make([]byte, 0, 4096) + bf := bytes.NewBuffer(byteArray) + compressWriter := gzip.NewWriter(bf) + csvData := []byte("aaaa\n") + for i := 0; i < 1000; i++ { + _, err = compressWriter.Write(csvData) + require.NoError(t, err) + } + err = compressWriter.Flush() + require.NoError(t, err) + + fileName := "test_1.t1.csv.gz" + err = store.WriteFile(ctx, fileName, bf.Bytes()) + require.NoError(t, err) + + ratio, err := md.SampleFileCompressRatio(ctx, md.SourceFileMeta{ + Path: fileName, + Compression: md.CompressionGZ, + }, store) + require.NoError(t, err) + require.InDelta(t, ratio, 5000.0/float64(bf.Len()), 1e-5) +} diff --git a/br/pkg/lightning/mydump/region.go b/br/pkg/lightning/mydump/region.go index f1eb7934c55e8..da3b4d0af1a53 100644 --- a/br/pkg/lightning/mydump/region.go +++ b/br/pkg/lightning/mydump/region.go @@ -31,16 +31,14 @@ import ( ) const ( - tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024 - compressedTableRegionSizeWarningThreshold int64 = 410 * 1024 * 1024 // 0.4 * tableRegionSizeWarningThreshold + tableRegionSizeWarningThreshold int64 = 1024 * 1024 * 1024 // the increment ratio of large CSV file size threshold by `region-split-size` largeCSVLowerThresholdRation = 10 // TableFileSizeINF for compressed size, for lightning 10TB is a relatively big value and will strongly affect efficiency // It's used to make sure compressed files can be read until EOF. Because we can't get the exact decompressed size of the compressed files. TableFileSizeINF = 10 * 1024 * tableRegionSizeWarningThreshold - // compressDataRatio is a relatively maximum compress ratio for normal compressed data - // It's used to estimate rowIDMax, we use a large value to try to avoid overlapping - compressDataRatio = 500 + // CompressSizeFactor is used to adjust compressed data size + CompressSizeFactor = 5 ) // TableRegion contains information for a table region during import. @@ -303,11 +301,11 @@ func MakeSourceFileRegion( rowIDMax := fileSize / divisor // for compressed files, suggest the compress ratio is 1% to calculate the rowIDMax. // set fileSize to INF to make sure compressed files can be read until EOF. Because we can't get the exact size of the compressed files. - // TODO: update progress bar calculation for compressed files. if fi.FileMeta.Compression != CompressionNone { - // FIXME: this is not accurate. Need sample ratio in the future and use sampled ratio to compute rowIDMax - // currently we use 500 here. It's a relatively large value for most data. - rowIDMax = fileSize * compressDataRatio / divisor + // RealSize the estimated file size. There are some cases that the first few bytes of this compressed file + // has smaller compress ratio than the whole compressed file. So we still need to multiply this factor to + // make sure the rowIDMax computation is correct. + rowIDMax = fi.FileMeta.RealSize * CompressSizeFactor / divisor fileSize = TableFileSizeINF } tableRegion := &TableRegion{ @@ -317,24 +315,23 @@ func MakeSourceFileRegion( Chunk: Chunk{ Offset: 0, EndOffset: fileSize, + RealOffset: 0, PrevRowIDMax: 0, RowIDMax: rowIDMax, }, } - regionTooBig := false - if fi.FileMeta.Compression == CompressionNone { - regionTooBig = tableRegion.Size() > tableRegionSizeWarningThreshold - } else { - regionTooBig = fi.FileMeta.FileSize > compressedTableRegionSizeWarningThreshold + regionSize := tableRegion.Size() + if fi.FileMeta.Compression != CompressionNone { + regionSize = fi.FileMeta.RealSize } - if regionTooBig { + if regionSize > tableRegionSizeWarningThreshold { log.FromContext(ctx).Warn( "file is too big to be processed efficiently; we suggest splitting it at 256 MB each", zap.String("file", fi.FileMeta.Path), - zap.Int64("size", dataFileSize)) + zap.Int64("size", regionSize)) } - return []*TableRegion{tableRegion}, []float64{float64(fi.FileMeta.FileSize)}, nil + return []*TableRegion{tableRegion}, []float64{float64(fi.FileMeta.RealSize)}, nil } // because parquet files can't seek efficiently, there is no benefit in split. diff --git a/br/pkg/lightning/mydump/region_test.go b/br/pkg/lightning/mydump/region_test.go index 0c990278e65cd..5aa2b3a85b752 100644 --- a/br/pkg/lightning/mydump/region_test.go +++ b/br/pkg/lightning/mydump/region_test.go @@ -199,7 +199,6 @@ func TestMakeSourceFileRegion(t *testing.T) { store, err := storage.NewLocalStorage(".") assert.NoError(t, err) - // test - no compression fileInfo.FileMeta.Compression = CompressionNone regions, _, err := MakeSourceFileRegion(ctx, meta, fileInfo, colCnt, cfg, ioWorkers, store) assert.NoError(t, err) @@ -221,6 +220,61 @@ func TestMakeSourceFileRegion(t *testing.T) { assert.Len(t, regions[0].Chunk.Columns, 0) } +func TestCompressedMakeSourceFileRegion(t *testing.T) { + meta := &MDTableMeta{ + DB: "csv", + Name: "large_csv_file", + } + cfg := &config.Config{ + Mydumper: config.MydumperRuntime{ + ReadBlockSize: config.ReadBlockSize, + MaxRegionSize: 1, + CSV: config.CSVConfig{ + Separator: ",", + Delimiter: "", + Header: true, + TrimLastSep: false, + NotNull: false, + Null: "NULL", + BackslashEscape: true, + }, + StrictFormat: true, + Filter: []string{"*.*"}, + }, + } + filePath := "./csv/split_large_file.csv.zst" + dataFileInfo, err := os.Stat(filePath) + require.NoError(t, err) + fileSize := dataFileInfo.Size() + + fileInfo := FileInfo{FileMeta: SourceFileMeta{ + Path: filePath, + Type: SourceTypeCSV, + Compression: CompressionZStd, + FileSize: fileSize, + }} + colCnt := 3 + + ctx := context.Background() + ioWorkers := worker.NewPool(ctx, 4, "io") + store, err := storage.NewLocalStorage(".") + assert.NoError(t, err) + compressRatio, err := SampleFileCompressRatio(ctx, fileInfo.FileMeta, store) + require.NoError(t, err) + fileInfo.FileMeta.RealSize = int64(compressRatio * float64(fileInfo.FileMeta.FileSize)) + + regions, sizes, err := MakeSourceFileRegion(ctx, meta, fileInfo, colCnt, cfg, ioWorkers, store) + assert.NoError(t, err) + assert.Len(t, regions, 1) + assert.Equal(t, int64(0), regions[0].Chunk.Offset) + assert.Equal(t, int64(0), regions[0].Chunk.RealOffset) + assert.Equal(t, TableFileSizeINF, regions[0].Chunk.EndOffset) + rowIDMax := fileInfo.FileMeta.RealSize * CompressSizeFactor / int64(colCnt) + assert.Equal(t, rowIDMax, regions[0].Chunk.RowIDMax) + assert.Len(t, regions[0].Chunk.Columns, 0) + assert.Equal(t, fileInfo.FileMeta.RealSize, int64(sizes[0])) +} + func TestSplitLargeFile(t *testing.T) { meta := &MDTableMeta{ DB: "csv", diff --git a/br/pkg/lightning/restore/mock/mock.go b/br/pkg/lightning/restore/mock/mock.go index 5556e1caf3363..24e287f11c5f0 100644 --- a/br/pkg/lightning/restore/mock/mock.go +++ b/br/pkg/lightning/restore/mock/mock.go @@ -111,6 +111,7 @@ func NewMockImportSource(dbSrcDataMap map[string]*MockDBSourceData) (*MockImport FileMeta: mydump.SourceFileMeta{ Path: tblDataFile.FileName, FileSize: int64(fileSize), + RealSize: int64(fileSize), }, } fileName := tblDataFile.FileName diff --git a/br/pkg/lightning/restore/precheck.go b/br/pkg/lightning/restore/precheck.go index a76854556a165..f078fe50f473c 100644 --- a/br/pkg/lightning/restore/precheck.go +++ b/br/pkg/lightning/restore/precheck.go @@ -139,7 +139,7 @@ func (b *PrecheckItemBuilder) BuildPrecheckItem(checkID CheckItemID) (PrecheckIt case CheckLocalDiskPlacement: return NewLocalDiskPlacementCheckItem(b.cfg), nil case CheckLocalTempKVDir: - return NewLocalTempKVDirCheckItem(b.cfg, b.preInfoGetter), nil + return NewLocalTempKVDirCheckItem(b.cfg, b.preInfoGetter, b.dbMetas), nil case CheckTargetUsingCDCPITR: return NewCDCPITRCheckItem(b.cfg), nil default: diff --git a/br/pkg/lightning/restore/precheck_impl.go b/br/pkg/lightning/restore/precheck_impl.go index b3c3c2fc00f25..f412b101ff08b 100644 --- a/br/pkg/lightning/restore/precheck_impl.go +++ b/br/pkg/lightning/restore/precheck_impl.go @@ -434,7 +434,7 @@ func (ci *largeFileCheckItem) Check(ctx context.Context) (*CheckResult, error) { for _, db := range ci.dbMetas { for _, t := range db.Tables { for _, f := range t.DataFiles { - if f.FileMeta.FileSize > defaultCSVSize { + if f.FileMeta.RealSize > defaultCSVSize { theResult.Message = fmt.Sprintf("large csv: %s file exists and it will slow down import performance", f.FileMeta.Path) theResult.Passed = false } @@ -484,12 +484,14 @@ func (ci *localDiskPlacementCheckItem) Check(ctx context.Context) (*CheckResult, type localTempKVDirCheckItem struct { cfg *config.Config preInfoGetter PreRestoreInfoGetter + dbMetas []*mydump.MDDatabaseMeta } -func NewLocalTempKVDirCheckItem(cfg *config.Config, preInfoGetter PreRestoreInfoGetter) PrecheckItem { +func NewLocalTempKVDirCheckItem(cfg *config.Config, preInfoGetter PreRestoreInfoGetter, dbMetas []*mydump.MDDatabaseMeta) PrecheckItem { return &localTempKVDirCheckItem{ cfg: cfg, preInfoGetter: preInfoGetter, + dbMetas: dbMetas, } } @@ -497,10 +499,28 @@ func (ci *localTempKVDirCheckItem) GetCheckItemID() CheckItemID { return CheckLocalTempKVDir } +func (ci *localTempKVDirCheckItem) hasCompressedFiles() bool { + for _, dbMeta := range ci.dbMetas { + for _, tbMeta := range dbMeta.Tables { + for _, file := range tbMeta.DataFiles { + if file.FileMeta.Compression != mydump.CompressionNone { + return true + } + } + } + } + return false +} + func (ci *localTempKVDirCheckItem) Check(ctx context.Context) (*CheckResult, error) { + severity := Critical + // for cases that have compressed files, the estimated size may not be accurate, set severity to Warn to avoid failure + if ci.hasCompressedFiles() { + severity = Warn + } theResult := &CheckResult{ Item: ci.GetCheckItemID(), - Severity: Critical, + Severity: severity, } storageSize, err := common.GetStorageSize(ci.cfg.TikvImporter.SortedKVDir) if err != nil { diff --git a/br/pkg/lightning/restore/precheck_impl_test.go b/br/pkg/lightning/restore/precheck_impl_test.go index 2811937a71c82..7842bd1fd75e7 100644 --- a/br/pkg/lightning/restore/precheck_impl_test.go +++ b/br/pkg/lightning/restore/precheck_impl_test.go @@ -381,7 +381,7 @@ func (s *precheckImplSuite) TestLocalTempKVDirCheckBasic() { defer cancel() s.cfg.TikvImporter.SortedKVDir = "/tmp/" - ci = NewLocalTempKVDirCheckItem(s.cfg, s.preInfoGetter) + ci = NewLocalTempKVDirCheckItem(s.cfg, s.preInfoGetter, s.mockSrc.GetAllDBFileMetas()) s.Require().Equal(CheckLocalTempKVDir, ci.GetCheckItemID()) result, err = ci.Check(ctx) s.Require().NoError(err) @@ -400,7 +400,7 @@ func (s *precheckImplSuite) TestLocalTempKVDirCheckBasic() { }, ) s.Require().NoError(s.setMockImportData(testMockSrcData)) - ci = NewLocalTempKVDirCheckItem(s.cfg, s.preInfoGetter) + ci = NewLocalTempKVDirCheckItem(s.cfg, s.preInfoGetter, s.mockSrc.GetAllDBFileMetas()) s.Require().Equal(CheckLocalTempKVDir, ci.GetCheckItemID()) result, err = ci.Check(ctx) s.Require().NoError(err) diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 329ef29c98667..543eddc3435fd 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -953,7 +953,8 @@ func (rc *Controller) estimateChunkCountIntoMetrics(ctx context.Context) error { } if fileMeta.FileMeta.Type == mydump.SourceTypeCSV { cfg := rc.cfg.Mydumper - if fileMeta.FileMeta.FileSize > int64(cfg.MaxRegionSize) && cfg.StrictFormat && !cfg.CSV.Header { + if fileMeta.FileMeta.FileSize > int64(cfg.MaxRegionSize) && cfg.StrictFormat && + !cfg.CSV.Header && fileMeta.FileMeta.Compression == mydump.CompressionNone { estimatedChunkCount += math.Round(float64(fileMeta.FileMeta.FileSize) / float64(cfg.MaxRegionSize)) } else { estimatedChunkCount++ diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index 86d3ed2622ddc..37ba113c82eed 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -250,7 +250,7 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp if !common.TableHasAutoRowID(tr.tableInfo.Core) { idxCnt-- } - threshold := estimateCompactionThreshold(cp, int64(idxCnt)) + threshold := estimateCompactionThreshold(tr.tableMeta.DataFiles, cp, int64(idxCnt)) idxEngineCfg.Local = &backend.LocalEngineConfig{ Compact: threshold > 0, CompactConcurrency: 4, @@ -1050,15 +1050,23 @@ func (tr *TableRestore) analyzeTable(ctx context.Context, g glue.SQLExecutor) er // Try to limit the total SST files number under 500. But size compress 32GB SST files cost about 20min, // we set the upper bound to 32GB to avoid too long compression time. // factor is the non-clustered(1 for data engine and number of non-clustered index count for index engine). -func estimateCompactionThreshold(cp *checkpoints.TableCheckpoint, factor int64) int64 { +func estimateCompactionThreshold(files []mydump.FileInfo, cp *checkpoints.TableCheckpoint, factor int64) int64 { totalRawFileSize := int64(0) var lastFile string + fileSizeMap := make(map[string]int64, len(files)) + for _, file := range files { + fileSizeMap[file.FileMeta.Path] = file.FileMeta.RealSize + } + for _, engineCp := range cp.Engines { for _, chunk := range engineCp.Chunks { if chunk.FileMeta.Path == lastFile { continue } - size := chunk.FileMeta.FileSize + size, ok := fileSizeMap[chunk.FileMeta.Path] + if !ok { + size = chunk.FileMeta.FileSize + } if chunk.FileMeta.Type == mydump.SourceTypeParquet { // parquet file is compressed, thus estimates with a factor of 2 size *= 2 diff --git a/br/pkg/lightning/restore/table_restore_test.go b/br/pkg/lightning/restore/table_restore_test.go index 17fb97e346e36..5cfaeabc804d9 100644 --- a/br/pkg/lightning/restore/table_restore_test.go +++ b/br/pkg/lightning/restore/table_restore_test.go @@ -129,6 +129,7 @@ func (s *tableRestoreSuiteBase) setupSuite(t *testing.T) { Type: mydump.SourceTypeSQL, SortKey: strconv.Itoa(i), FileSize: 37, + RealSize: 37, }, }) } @@ -144,6 +145,7 @@ func (s *tableRestoreSuiteBase) setupSuite(t *testing.T) { Type: mydump.SourceTypeCSV, SortKey: "99", FileSize: 14, + RealSize: 14, }, }) @@ -427,7 +429,7 @@ func (s *tableRestoreSuite) TestPopulateChunksCSVHeader() { require.NoError(s.T(), err) fakeDataFiles = append(fakeDataFiles, mydump.FileInfo{ TableName: filter.Table{Schema: "db", Name: "table"}, - FileMeta: mydump.SourceFileMeta{Path: csvName, Type: mydump.SourceTypeCSV, SortKey: fmt.Sprintf("%02d", i), FileSize: int64(len(str))}, + FileMeta: mydump.SourceFileMeta{Path: csvName, Type: mydump.SourceTypeCSV, SortKey: fmt.Sprintf("%02d", i), FileSize: int64(len(str)), RealSize: int64(len(str))}, }) total += len(str) } @@ -1349,6 +1351,7 @@ func (s *tableRestoreSuite) TestCheckHasLargeCSV() { { FileMeta: mydump.SourceFileMeta{ FileSize: 1 * units.TiB, + RealSize: 1 * units.TiB, Path: "/testPath", }, }, diff --git a/br/pkg/storage/compress.go b/br/pkg/storage/compress.go index 5794c813c9d5f..36c07846f3271 100644 --- a/br/pkg/storage/compress.go +++ b/br/pkg/storage/compress.go @@ -103,6 +103,20 @@ func newInterceptReader(fileReader ExternalFileReader, compressType CompressType }, nil } +func NewLimitedInterceptReader(fileReader ExternalFileReader, compressType CompressType, n int64) (ExternalFileReader, error) { + newFileReader := fileReader + if n < 0 { + return nil, errors.Annotatef(berrors.ErrStorageInvalidConfig, "compressReader doesn't support negative limit, n: %d", n) + } else if n > 0 { + newFileReader = &compressReader{ + Reader: io.LimitReader(fileReader, n), + Seeker: fileReader, + Closer: fileReader, + } + } + return newInterceptReader(newFileReader, compressType) +} + func (c *compressReader) Seek(offset int64, whence int) (int64, error) { // only support get original reader's current offset if offset == 0 && whence == io.SeekCurrent {