diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 6f91a3b4deffc..e218ca7b014bc 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -2043,19 +2043,21 @@ func (rc *Client) RestoreKVFiles( log.Debug("skip file due to table id not matched", zap.Int64("table-id", files[0].TableId)) skipFile += len(files) } else { - rc.workerPool.ApplyOnErrorGroup(eg, func() error { + rc.workerPool.ApplyOnErrorGroup(eg, func() (err error) { fileStart := time.Now() defer func() { onProgress(int64(len(files))) updateStats(uint64(kvCount), size) summary.CollectInt("File", len(files)) - filenames := make([]string, 0, len(files)) - for _, f := range files { - filenames = append(filenames, f.Path+", ") + if err == nil { + filenames := make([]string, 0, len(files)) + for _, f := range files { + filenames = append(filenames, f.Path+", ") + } + log.Info("import files done", zap.Int("batch-count", len(files)), zap.Uint64("batch-size", size), + zap.Duration("take", time.Since(fileStart)), zap.Strings("files", filenames)) } - log.Info("import files done", zap.Int("batch-count", len(files)), zap.Uint64("batch-size", size), - zap.Duration("take", time.Since(fileStart)), zap.Strings("files", filenames)) }() return rc.fileImporter.ImportKVFiles(ectx, files, rule, rc.shiftStartTS, rc.startTS, rc.restoreTS, supportBatch) @@ -2063,14 +2065,14 @@ func (rc *Client) RestoreKVFiles( } } - if supportBatch { - err = ApplyKVFilesWithBatchMethod(ectx, iter, int(pitrBatchCount), uint64(pitrBatchSize), applyFunc) - } else { - err = ApplyKVFilesWithSingelMethod(ectx, iter, applyFunc) - } - if err != nil { + rc.workerPool.ApplyOnErrorGroup(eg, func() error { + if supportBatch { + err = ApplyKVFilesWithBatchMethod(ectx, iter, int(pitrBatchCount), uint64(pitrBatchSize), applyFunc) + } else { + err = ApplyKVFilesWithSingelMethod(ectx, iter, applyFunc) + } return errors.Trace(err) - } + }) log.Info("total skip files due to table id not matched", zap.Int("count", skipFile)) if skipFile > 0 { diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 903b721f0a644..8a5cd0425e221 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -393,7 +393,8 @@ func (cfg *RestoreConfig) adjustRestoreConfigForStreamRestore() { if cfg.PitrBatchSize == 0 { cfg.PitrBatchSize = defaultPiTRBatchSize } - + // another goroutine is used to iterate the backup file + cfg.PitrConcurrency += 1 log.Info("set restore kv files concurrency", zap.Int("concurrency", int(cfg.PitrConcurrency))) cfg.Config.Concurrency = cfg.PitrConcurrency }