From de6ef91c3e5be3af80715306d7b49cad5282c2e3 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Fri, 13 Jan 2023 18:25:48 +0800 Subject: [PATCH 1/2] move step iterate backup files into worker pool Signed-off-by: Leavrth --- br/pkg/restore/client.go | 14 +++++++------- br/pkg/task/restore.go | 3 ++- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 6f91a3b4deffc..1e494819d463c 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -2063,14 +2063,14 @@ func (rc *Client) RestoreKVFiles( } } - if supportBatch { - err = ApplyKVFilesWithBatchMethod(ectx, iter, int(pitrBatchCount), uint64(pitrBatchSize), applyFunc) - } else { - err = ApplyKVFilesWithSingelMethod(ectx, iter, applyFunc) - } - if err != nil { + rc.workerPool.ApplyOnErrorGroup(eg, func() error { + if supportBatch { + err = ApplyKVFilesWithBatchMethod(ectx, iter, int(pitrBatchCount), uint64(pitrBatchSize), applyFunc) + } else { + err = ApplyKVFilesWithSingelMethod(ectx, iter, applyFunc) + } return errors.Trace(err) - } + }) log.Info("total skip files due to table id not matched", zap.Int("count", skipFile)) if skipFile > 0 { diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 903b721f0a644..8a5cd0425e221 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -393,7 +393,8 @@ func (cfg *RestoreConfig) adjustRestoreConfigForStreamRestore() { if cfg.PitrBatchSize == 0 { cfg.PitrBatchSize = defaultPiTRBatchSize } - + // another goroutine is used to iterate the backup file + cfg.PitrConcurrency += 1 log.Info("set restore kv files concurrency", zap.Int("concurrency", int(cfg.PitrConcurrency))) cfg.Config.Concurrency = cfg.PitrConcurrency } From ac1c9816148b4becb1feb9bc742c4cebe194812a Mon Sep 17 00:00:00 2001 From: Leavrth Date: Mon, 16 Jan 2023 14:23:32 +0800 Subject: [PATCH 2/2] don't show the failed import on log Signed-off-by: Leavrth --- br/pkg/restore/client.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 1e494819d463c..e218ca7b014bc 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -2043,19 +2043,21 @@ func (rc *Client) RestoreKVFiles( log.Debug("skip file due to table id not matched", zap.Int64("table-id", files[0].TableId)) skipFile += len(files) } else { - rc.workerPool.ApplyOnErrorGroup(eg, func() error { + rc.workerPool.ApplyOnErrorGroup(eg, func() (err error) { fileStart := time.Now() defer func() { onProgress(int64(len(files))) updateStats(uint64(kvCount), size) summary.CollectInt("File", len(files)) - filenames := make([]string, 0, len(files)) - for _, f := range files { - filenames = append(filenames, f.Path+", ") + if err == nil { + filenames := make([]string, 0, len(files)) + for _, f := range files { + filenames = append(filenames, f.Path+", ") + } + log.Info("import files done", zap.Int("batch-count", len(files)), zap.Uint64("batch-size", size), + zap.Duration("take", time.Since(fileStart)), zap.Strings("files", filenames)) } - log.Info("import files done", zap.Int("batch-count", len(files)), zap.Uint64("batch-size", size), - zap.Duration("take", time.Since(fileStart)), zap.Strings("files", filenames)) }() return rc.fileImporter.ImportKVFiles(ectx, files, rule, rc.shiftStartTS, rc.startTS, rc.restoreTS, supportBatch)