Skip to content

Commit

Permalink
lightning: refine progress for compress files import (#39219)
Browse files Browse the repository at this point in the history
ref #38514
  • Loading branch information
lichunzhu authored Dec 2, 2022
1 parent 40d1ddb commit 10b3bc7
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 17 deletions.
23 changes: 23 additions & 0 deletions br/pkg/lightning/checkpoints/checkpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,29 @@ func (ccp *ChunkCheckpoint) DeepCopy() *ChunkCheckpoint {
}
}

func (ccp *ChunkCheckpoint) UnfinishedSize() int64 {
if ccp.FileMeta.Compression == mydump.CompressionNone {
return ccp.Chunk.EndOffset - ccp.Chunk.Offset
}
return ccp.FileMeta.FileSize - ccp.Chunk.RealOffset
}

func (ccp *ChunkCheckpoint) TotalSize() int64 {
if ccp.FileMeta.Compression == mydump.CompressionNone {
return ccp.Chunk.EndOffset - ccp.Key.Offset
}
// TODO: compressed file won't be split into chunks, so using FileSize as TotalSize is ok
// change this when we support split compressed file into chunks
return ccp.FileMeta.FileSize
}

func (ccp *ChunkCheckpoint) FinishedSize() int64 {
if ccp.FileMeta.Compression == mydump.CompressionNone {
return ccp.Chunk.Offset - ccp.Key.Offset
}
return ccp.Chunk.RealOffset - ccp.Key.Offset
}

type EngineCheckpoint struct {
Status CheckpointStatus
Chunks []*ChunkCheckpoint // a sorted array
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/lightning/mydump/parquet_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,12 @@ func (pp *ParquetParser) SetPos(pos int64, rowID int64) error {
return nil
}

// RealPos implements the Parser interface.
// For parquet it's equal to Pos().
func (pp *ParquetParser) RealPos() (int64, error) {
return pp.curStart + int64(pp.curIndex), nil
}

// Close closes the parquet file of the parser.
// It implements the Parser interface.
func (pp *ParquetParser) Close() error {
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/lightning/mydump/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type ChunkParser struct {
type Chunk struct {
Offset int64
EndOffset int64
RealOffset int64
PrevRowIDMax int64
RowIDMax int64
Columns []string
Expand Down Expand Up @@ -126,6 +127,7 @@ const (
type Parser interface {
Pos() (pos int64, rowID int64)
SetPos(pos int64, rowID int64) error
RealPos() (int64, error)
Close() error
ReadRow() error
LastRow() Row
Expand Down Expand Up @@ -175,6 +177,11 @@ func (parser *blockParser) SetPos(pos int64, rowID int64) error {
return nil
}

// RealPos gets the read position of current reader.
func (parser *blockParser) RealPos() (int64, error) {
return parser.reader.Seek(0, io.SeekCurrent)
}

// Pos returns the current file offset.
// Attention: for compressed sql/csv files, pos is the position in uncompressed files
func (parser *blockParser) Pos() (pos int64, lastRowID int64) {
Expand Down
7 changes: 6 additions & 1 deletion br/pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ const (
// TableFileSizeINF for compressed size, for lightning 10TB is a relatively big value and will strongly affect efficiency
// It's used to make sure compressed files can be read until EOF. Because we can't get the exact decompressed size of the compressed files.
TableFileSizeINF = 10 * 1024 * tableRegionSizeWarningThreshold
// compressDataRatio is a relatively maximum compress ratio for normal compressed data
// It's used to estimate rowIDMax, we use a large value to try to avoid overlapping
compressDataRatio = 500
)

// TableRegion contains information for a table region during import.
Expand Down Expand Up @@ -302,7 +305,9 @@ func MakeSourceFileRegion(
// set fileSize to INF to make sure compressed files can be read until EOF. Because we can't get the exact size of the compressed files.
// TODO: update progress bar calculation for compressed files.
if fi.FileMeta.Compression != CompressionNone {
rowIDMax = fileSize * 100 / divisor // FIXME: this is not accurate. Need more tests and fix solution.
// FIXME: this is not accurate. Need sample ratio in the future and use sampled ratio to compute rowIDMax
// currently we use 500 here. It's a relatively large value for most data.
rowIDMax = fileSize * compressDataRatio / divisor
fileSize = TableFileSizeINF
}
tableRegion := &TableRegion{
Expand Down
41 changes: 32 additions & 9 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ func (rc *Controller) estimateChunkCountIntoMetrics(ctx context.Context) error {
if _, ok := fileChunks[c.Key.Path]; !ok {
fileChunks[c.Key.Path] = 0.0
}
remainChunkCnt := float64(c.Chunk.EndOffset-c.Chunk.Offset) / float64(c.Chunk.EndOffset-c.Key.Offset)
remainChunkCnt := float64(c.UnfinishedSize()) / float64(c.TotalSize())
fileChunks[c.Key.Path] += remainChunkCnt
}
}
Expand Down Expand Up @@ -1619,7 +1619,7 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) {
} else {
for _, eng := range cp.Engines {
for _, chunk := range eng.Chunks {
totalDataSizeToRestore += chunk.Chunk.EndOffset - chunk.Chunk.Offset
totalDataSizeToRestore += chunk.UnfinishedSize()
}
}
}
Expand Down Expand Up @@ -2299,6 +2299,8 @@ type deliveredKVs struct {
columns []string
offset int64
rowID int64

realOffset int64 // indicates file reader's current position, only used for compressed files
}

type deliverResult struct {
Expand Down Expand Up @@ -2327,6 +2329,8 @@ func (cr *chunkRestore) deliverLoop(

dataSynced := true
hasMoreKVs := true
var startRealOffset, currRealOffset int64 // save to 0 at first

for hasMoreKVs {
var dataChecksum, indexChecksum verify.KVChecksum
var columns []string
Expand All @@ -2335,6 +2339,8 @@ func (cr *chunkRestore) deliverLoop(
// chunk checkpoint should stay the same
startOffset := cr.chunk.Chunk.Offset
currOffset := startOffset
startRealOffset = cr.chunk.Chunk.RealOffset
currRealOffset = startRealOffset
rowID := cr.chunk.Chunk.PrevRowIDMax

populate:
Expand All @@ -2349,12 +2355,14 @@ func (cr *chunkRestore) deliverLoop(
if p.kvs == nil {
// This is the last message.
currOffset = p.offset
currRealOffset = p.realOffset
hasMoreKVs = false
break populate
}
p.kvs.ClassifyAndAppend(&dataKVs, &dataChecksum, &indexKVs, &indexChecksum)
columns = p.columns
currOffset = p.offset
currRealOffset = p.realOffset
rowID = p.rowID
}
case <-ctx.Done():
Expand Down Expand Up @@ -2421,23 +2429,29 @@ func (cr *chunkRestore) deliverLoop(
cr.chunk.Checksum.Add(&dataChecksum)
cr.chunk.Checksum.Add(&indexChecksum)
cr.chunk.Chunk.Offset = currOffset
cr.chunk.Chunk.RealOffset = currRealOffset
cr.chunk.Chunk.PrevRowIDMax = rowID

if m, ok := metric.FromContext(ctx); ok {
// value of currOffset comes from parser.pos which increase monotonically. the init value of parser.pos
// comes from chunk.Chunk.Offset. so it shouldn't happen that currOffset - startOffset < 0.
// but we met it one time, but cannot reproduce it now, we add this check to make code more robust
// TODO: reproduce and find the root cause and fix it completely

delta := currOffset - startOffset
var lowOffset, highOffset int64
if cr.chunk.FileMeta.Compression != mydump.CompressionNone {
lowOffset, highOffset = startRealOffset, currRealOffset
} else {
lowOffset, highOffset = startOffset, currOffset
}
delta := highOffset - lowOffset
if delta >= 0 {
m.BytesCounter.WithLabelValues(metric.BytesStateRestored).Add(float64(delta))
if rc.status != nil && rc.status.backend == config.BackendTiDB {
rc.status.FinishedFileSize.Add(delta)
}
} else {
deliverLogger.Warn("offset go back", zap.Int64("curr", currOffset),
zap.Int64("start", startOffset))
deliverLogger.Warn("offset go back", zap.Int64("curr", highOffset),
zap.Int64("start", lowOffset))
}
}

Expand Down Expand Up @@ -2618,14 +2632,22 @@ func (cr *chunkRestore) encodeLoop(
canDeliver := false
kvPacket := make([]deliveredKVs, 0, maxKvPairsCnt)
curOffset := offset
var newOffset, rowID int64
var newOffset, rowID, realOffset int64
var kvSize uint64
var realOffsetErr error
outLoop:
for !canDeliver {
readDurStart := time.Now()
err = cr.parser.ReadRow()
columnNames := cr.parser.Columns()
newOffset, rowID = cr.parser.Pos()
if cr.chunk.FileMeta.Compression != mydump.CompressionNone {
realOffset, realOffsetErr = cr.parser.RealPos()
if realOffsetErr != nil {
logger.Warn("fail to get data engine RealPos, progress may not be accurate",
log.ShortError(realOffsetErr), zap.String("file", cr.chunk.FileMeta.Path))
}
}

switch errors.Cause(err) {
case nil:
Expand Down Expand Up @@ -2687,7 +2709,8 @@ func (cr *chunkRestore) encodeLoop(
continue
}

kvPacket = append(kvPacket, deliveredKVs{kvs: kvs, columns: filteredColumns, offset: newOffset, rowID: rowID})
kvPacket = append(kvPacket, deliveredKVs{kvs: kvs, columns: filteredColumns, offset: newOffset,
rowID: rowID, realOffset: realOffset})
kvSize += kvs.Size()
failpoint.Inject("mock-kv-size", func(val failpoint.Value) {
kvSize += uint64(val.(int))
Expand Down Expand Up @@ -2719,7 +2742,7 @@ func (cr *chunkRestore) encodeLoop(
}
}

err = send([]deliveredKVs{{offset: cr.chunk.Chunk.EndOffset}})
err = send([]deliveredKVs{{offset: cr.chunk.Chunk.EndOffset, realOffset: cr.chunk.FileMeta.FileSize}})
return
}

Expand Down
8 changes: 4 additions & 4 deletions br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp
err = tr.importEngine(ctx, dataClosedEngine, rc, eid, ecp)
if rc.status != nil && rc.status.backend == config.BackendLocal {
for _, chunk := range ecp.Chunks {
rc.status.FinishedFileSize.Add(chunk.Chunk.EndOffset - chunk.Key.Offset)
rc.status.FinishedFileSize.Add(chunk.TotalSize())
}
}
}
Expand All @@ -341,7 +341,7 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp
}(restoreWorker, engineID, engine)
} else {
for _, chunk := range engine.Chunks {
rc.status.FinishedFileSize.Add(chunk.Chunk.EndOffset - chunk.Key.Offset)
rc.status.FinishedFileSize.Add(chunk.TotalSize())
}
}
}
Expand Down Expand Up @@ -541,7 +541,7 @@ func (tr *TableRestore) restoreEngine(
}
var remainChunkCnt float64
if chunk.Chunk.Offset < chunk.Chunk.EndOffset {
remainChunkCnt = float64(chunk.Chunk.EndOffset-chunk.Chunk.Offset) / float64(chunk.Chunk.EndOffset-chunk.Key.Offset)
remainChunkCnt = float64(chunk.UnfinishedSize()) / float64(chunk.TotalSize())
if metrics != nil {
metrics.ChunkCounter.WithLabelValues(metric.ChunkStatePending).Add(remainChunkCnt)
}
Expand Down Expand Up @@ -616,7 +616,7 @@ func (tr *TableRestore) restoreEngine(
totalSQLSize := int64(0)
for _, chunk := range cp.Chunks {
totalKVSize += chunk.Checksum.SumSize()
totalSQLSize += chunk.Chunk.EndOffset - chunk.Chunk.Offset
totalSQLSize += chunk.UnfinishedSize()
}

err = chunkErr.Get()
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/web/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (cpm *checkpointsMap) update(diffs map[string]*checkpoints.TableCheckpointD
for _, engine := range cp.Engines {
for _, chunk := range engine.Chunks {
if engine.Status >= checkpoints.CheckpointStatusAllWritten {
tw += chunk.Chunk.EndOffset - chunk.Key.Offset
tw += chunk.TotalSize()
} else {
tw += chunk.Chunk.Offset - chunk.Key.Offset
}
Expand Down
11 changes: 9 additions & 2 deletions br/pkg/storage/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,10 @@ func (w *withCompression) ReadFile(ctx context.Context, name string) ([]byte, er
return io.ReadAll(compressBf)
}

// compressReader is a wrapper for compress.Reader
type compressReader struct {
io.Reader
io.Seeker
io.Closer
}

Expand All @@ -97,11 +99,16 @@ func newInterceptReader(fileReader ExternalFileReader, compressType CompressType
return &compressReader{
Reader: r,
Closer: fileReader,
Seeker: fileReader,
}, nil
}

func (*compressReader) Seek(_ int64, _ int) (int64, error) {
return int64(0), errors.Annotatef(berrors.ErrStorageInvalidConfig, "compressReader doesn't support Seek now")
func (c *compressReader) Seek(offset int64, whence int) (int64, error) {
// only support get original reader's current offset
if offset == 0 && whence == io.SeekCurrent {
return c.Seeker.Seek(offset, whence)
}
return int64(0), errors.Annotatef(berrors.ErrStorageInvalidConfig, "compressReader doesn't support Seek now, offset %d, whence %d", offset, whence)
}

func (c *compressReader) Close() error {
Expand Down

0 comments on commit 10b3bc7

Please sign in to comment.