From 2e7f366d2e2e4cb410a48f73d91538036719b441 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Wed, 27 Oct 2021 14:30:32 +0800 Subject: [PATCH 1/8] add log Signed-off-by: Little-Wallace --- br/pkg/lightning/lightning.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/br/pkg/lightning/lightning.go b/br/pkg/lightning/lightning.go index b068e4bad33f7..3ddb313dd25c8 100644 --- a/br/pkg/lightning/lightning.go +++ b/br/pkg/lightning/lightning.go @@ -261,6 +261,7 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue. failpoint.Return(nil) }) + log.L().Info("Before RegisterMySQL") if err := taskCfg.TiDB.Security.RegisterMySQL(); err != nil { return err } @@ -278,17 +279,21 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue. // initiation of default glue should be after RegisterMySQL, which is ready to be called after taskCfg.Adjust // and also put it here could avoid injecting another two SkipRunTask failpoint to caller if g == nil { + log.L().Info("Before create NewExternalTiDBGlue connection") db, err := restore.DBFromConfig(ctx, taskCfg.TiDB) if err != nil { return err } g = glue.NewExternalTiDBGlue(db, taskCfg.TiDB.SQLMode) + log.L().Info("End create NewExternalTiDBGlue connection") } + log.L().Info("Before create storage backend") u, err := storage.ParseBackend(taskCfg.Mydumper.SourceDir, nil) if err != nil { return errors.Annotate(err, "parse backend failed") } + log.L().Info("Before create storage") s, err := storage.New(ctx, u, &storage.ExternalStorageOptions{}) if err != nil { return errors.Annotate(err, "create storage failed") @@ -296,16 +301,19 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue. loadTask := log.L().Begin(zap.InfoLevel, "load data source") var mdl *mydump.MDLoader + log.L().Info("Before create MyDumper") mdl, err = mydump.NewMyDumpLoaderWithStore(ctx, taskCfg, s) loadTask.End(zap.ErrorLevel, err) if err != nil { return errors.Trace(err) } + log.L().Info("Before checkSystemRequirement") err = checkSystemRequirement(taskCfg, mdl.GetDatabases()) if err != nil { log.L().Error("check system requirements failed", zap.Error(err)) return errors.Trace(err) } + log.L().Info("Before check checkSchemaConflict") // check table schema conflicts err = checkSchemaConflict(taskCfg, mdl.GetDatabases()) if err != nil { @@ -318,6 +326,7 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue. var procedure *restore.Controller + log.L().Info("Before NewRestoreController") procedure, err = restore.NewRestoreController(ctx, dbMetas, taskCfg, &l.status, s, g) if err != nil { log.L().Error("restore failed", log.ShortError(err)) @@ -325,7 +334,9 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue. } defer procedure.Close() + log.L().Info("Before procedure run") err = procedure.Run(ctx) + log.L().Info("Before procedure close") return errors.Trace(err) } From 174f4b93425a355fa287c76fe07c8af5b4db4c4d Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Wed, 27 Oct 2021 14:45:55 +0800 Subject: [PATCH 2/8] add some log Signed-off-by: Little-Wallace --- br/pkg/lightning/restore/tidb.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/br/pkg/lightning/restore/tidb.go b/br/pkg/lightning/restore/tidb.go index 680cd1558e70a..f80441757653b 100644 --- a/br/pkg/lightning/restore/tidb.go +++ b/br/pkg/lightning/restore/tidb.go @@ -93,16 +93,14 @@ func DBFromConfig(ctx context.Context, dsn config.DBStore) (*sql.DB, error) { TLS: dsn.TLS, } - if dsn.Vars != nil { - for k, v := range dsn.Vars { - param.Vars[k] = v - } - } + log.L().Info("Before connect DB") db, err := param.Connect() if err != nil { + log.L().Info("After connect Error") return nil, errors.Trace(err) } + log.L().Info("After connect DB") vars := map[string]string{ "tidb_build_stats_concurrency": strconv.Itoa(dsn.BuildStatsConcurrency), @@ -119,18 +117,27 @@ func DBFromConfig(ctx context.Context, dsn config.DBStore) (*sql.DB, error) { "autocommit": "1", } + if dsn.Vars != nil { + for k, v := range dsn.Vars { + vars[k] = v + } + } for k, v := range vars { q := fmt.Sprintf("SET SESSION %s = %s;", k, v) + log.L().Info("Set session", zap.String("query", q)) if _, err1 := db.ExecContext(ctx, q); err1 != nil { log.L().Warn("set session variable failed, will skip this query", zap.String("query", q), zap.Error(err1)) delete(vars, k) } + log.L().Info("Set session End") } _ = db.Close() + log.L().Info("Close connections") param.Vars = vars db, err = param.Connect() + log.L().Info("connect again") return db, errors.Trace(err) } From 4dff473dcf962f3798a50fb488fc37937fe081ef Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Wed, 27 Oct 2021 16:31:12 +0800 Subject: [PATCH 3/8] fix init logger Signed-off-by: Little-Wallace --- br/pkg/lightning/lightning.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/br/pkg/lightning/lightning.go b/br/pkg/lightning/lightning.go index 3ddb313dd25c8..be4a8d297a51d 100644 --- a/br/pkg/lightning/lightning.go +++ b/br/pkg/lightning/lightning.go @@ -75,6 +75,9 @@ type Lightning struct { } func initEnv(cfg *config.GlobalConfig) error { + if cfg.App.Config.File == "" { + return nil + } return log.InitLogger(&cfg.App.Config, cfg.TiDB.LogLevel) } From b768e8c528f0d4310c4e1626144383d348d919ce Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Fri, 29 Oct 2021 16:54:07 +0800 Subject: [PATCH 4/8] add info log Signed-off-by: Little-Wallace --- br/pkg/lightning/backend/tidb/tidb.go | 10 ++++++++-- br/pkg/lightning/restore/restore.go | 1 + br/pkg/lightning/restore/table_restore.go | 7 +++++++ br/pkg/lightning/restore/tidb.go | 1 - 4 files changed, 16 insertions(+), 3 deletions(-) diff --git a/br/pkg/lightning/backend/tidb/tidb.go b/br/pkg/lightning/backend/tidb/tidb.go index c85e17fdbb4a8..dcdf16ef36ad8 100644 --- a/br/pkg/lightning/backend/tidb/tidb.go +++ b/br/pkg/lightning/backend/tidb/tidb.go @@ -323,11 +323,17 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co } } encoded.WriteByte(')') - return tidbRow{ + r := tidbRow{ insertStmt: encoded.String(), path: path, offset: offset, - }, nil + } + logger.Info("tidb encode failed", + zap.Int64("offset", offset), + zap.String("path", path), + zap.String("sql", r.insertStmt), + ) + return r, nil } // EncodeRowForRecord encodes a row to a string compatible with INSERT statements. diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 7ea7be3c9d161..0ad25039ec37d 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -1520,6 +1520,7 @@ func (tr *TableRestore) restoreTable( }, } } + tr.logger.Info("Begin restoreEngines") // 2. Restore engines (if still needed) err := tr.restoreEngines(ctx, rc, cp) diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index 5d0ad6addca89..d6bc9d56086c0 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -192,11 +192,13 @@ func createColumnPermutation(columns []string, ignoreColumns []string, tableInfo func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp *checkpoints.TableCheckpoint) error { indexEngineCp := cp.Engines[indexEngineID] if indexEngineCp == nil { + tr.logger.Error("fail to restoreEngines because indexengine is nil") return errors.Errorf("table %v index engine checkpoint not found", tr.tableName) } // If there is an index engine only, it indicates no data needs to restore. // So we can change status to imported directly and avoid opening engine. if len(cp.Engines) == 1 { + tr.logger.Info("There is an index engine only") if err := rc.saveStatusCheckpoint(pCtx, tr.tableName, indexEngineID, nil, checkpoints.CheckpointStatusImported); err != nil { return errors.Trace(err) } @@ -225,6 +227,7 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp idxEngineCfg := &backend.EngineConfig{ TableInfo: tr.tableInfo, } + tr.logger.Info("begin restoreEngines", zap.Uint8("index-engine-status", uint8(indexEngineCp.Status))) if indexEngineCp.Status < checkpoints.CheckpointStatusClosed { indexWorker := rc.indexWorkers.Apply() defer rc.indexWorkers.Recycle(indexWorker) @@ -290,13 +293,17 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp default: } if engineErr.Get() != nil { + logTask.Error("break because met error", zap.Error(engineErr.Get())) break } // Should skip index engine if engineID < 0 { + logTask.Info("Skip IndexEngine") continue } + logTask.Info("Begin Import Data Engine", zap.Int8("status", int8(engine.Status)), zap.Int32("engineID", engineID), + zap.Int("chunk size", len(engine.Chunks))) if engine.Status < checkpoints.CheckpointStatusImported { wg.Add(1) diff --git a/br/pkg/lightning/restore/tidb.go b/br/pkg/lightning/restore/tidb.go index f80441757653b..70f7793ab8b59 100644 --- a/br/pkg/lightning/restore/tidb.go +++ b/br/pkg/lightning/restore/tidb.go @@ -93,7 +93,6 @@ func DBFromConfig(ctx context.Context, dsn config.DBStore) (*sql.DB, error) { TLS: dsn.TLS, } - log.L().Info("Before connect DB") db, err := param.Connect() if err != nil { From 2ece9f2bf94579ac5c59605aa3b4d7a70f86f7b5 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Mon, 1 Nov 2021 11:33:40 +0800 Subject: [PATCH 5/8] remove debug log Signed-off-by: Little-Wallace --- br/pkg/lightning/backend/tidb/tidb.go | 10 ++-------- br/pkg/lightning/lightning.go | 11 ----------- br/pkg/lightning/restore/restore.go | 1 - br/pkg/lightning/restore/table_restore.go | 6 ------ br/pkg/lightning/restore/tidb.go | 5 ----- 5 files changed, 2 insertions(+), 31 deletions(-) diff --git a/br/pkg/lightning/backend/tidb/tidb.go b/br/pkg/lightning/backend/tidb/tidb.go index dcdf16ef36ad8..c85e17fdbb4a8 100644 --- a/br/pkg/lightning/backend/tidb/tidb.go +++ b/br/pkg/lightning/backend/tidb/tidb.go @@ -323,17 +323,11 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co } } encoded.WriteByte(')') - r := tidbRow{ + return tidbRow{ insertStmt: encoded.String(), path: path, offset: offset, - } - logger.Info("tidb encode failed", - zap.Int64("offset", offset), - zap.String("path", path), - zap.String("sql", r.insertStmt), - ) - return r, nil + }, nil } // EncodeRowForRecord encodes a row to a string compatible with INSERT statements. diff --git a/br/pkg/lightning/lightning.go b/br/pkg/lightning/lightning.go index be4a8d297a51d..0a9cf1585456a 100644 --- a/br/pkg/lightning/lightning.go +++ b/br/pkg/lightning/lightning.go @@ -264,7 +264,6 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue. failpoint.Return(nil) }) - log.L().Info("Before RegisterMySQL") if err := taskCfg.TiDB.Security.RegisterMySQL(); err != nil { return err } @@ -282,21 +281,17 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue. // initiation of default glue should be after RegisterMySQL, which is ready to be called after taskCfg.Adjust // and also put it here could avoid injecting another two SkipRunTask failpoint to caller if g == nil { - log.L().Info("Before create NewExternalTiDBGlue connection") db, err := restore.DBFromConfig(ctx, taskCfg.TiDB) if err != nil { return err } g = glue.NewExternalTiDBGlue(db, taskCfg.TiDB.SQLMode) - log.L().Info("End create NewExternalTiDBGlue connection") } - log.L().Info("Before create storage backend") u, err := storage.ParseBackend(taskCfg.Mydumper.SourceDir, nil) if err != nil { return errors.Annotate(err, "parse backend failed") } - log.L().Info("Before create storage") s, err := storage.New(ctx, u, &storage.ExternalStorageOptions{}) if err != nil { return errors.Annotate(err, "create storage failed") @@ -304,19 +299,16 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue. loadTask := log.L().Begin(zap.InfoLevel, "load data source") var mdl *mydump.MDLoader - log.L().Info("Before create MyDumper") mdl, err = mydump.NewMyDumpLoaderWithStore(ctx, taskCfg, s) loadTask.End(zap.ErrorLevel, err) if err != nil { return errors.Trace(err) } - log.L().Info("Before checkSystemRequirement") err = checkSystemRequirement(taskCfg, mdl.GetDatabases()) if err != nil { log.L().Error("check system requirements failed", zap.Error(err)) return errors.Trace(err) } - log.L().Info("Before check checkSchemaConflict") // check table schema conflicts err = checkSchemaConflict(taskCfg, mdl.GetDatabases()) if err != nil { @@ -329,7 +321,6 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue. var procedure *restore.Controller - log.L().Info("Before NewRestoreController") procedure, err = restore.NewRestoreController(ctx, dbMetas, taskCfg, &l.status, s, g) if err != nil { log.L().Error("restore failed", log.ShortError(err)) @@ -337,9 +328,7 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue. } defer procedure.Close() - log.L().Info("Before procedure run") err = procedure.Run(ctx) - log.L().Info("Before procedure close") return errors.Trace(err) } diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 0ad25039ec37d..7ea7be3c9d161 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -1520,7 +1520,6 @@ func (tr *TableRestore) restoreTable( }, } } - tr.logger.Info("Begin restoreEngines") // 2. Restore engines (if still needed) err := tr.restoreEngines(ctx, rc, cp) diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index d6bc9d56086c0..4abf8e8591270 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -198,7 +198,6 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp // If there is an index engine only, it indicates no data needs to restore. // So we can change status to imported directly and avoid opening engine. if len(cp.Engines) == 1 { - tr.logger.Info("There is an index engine only") if err := rc.saveStatusCheckpoint(pCtx, tr.tableName, indexEngineID, nil, checkpoints.CheckpointStatusImported); err != nil { return errors.Trace(err) } @@ -227,7 +226,6 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp idxEngineCfg := &backend.EngineConfig{ TableInfo: tr.tableInfo, } - tr.logger.Info("begin restoreEngines", zap.Uint8("index-engine-status", uint8(indexEngineCp.Status))) if indexEngineCp.Status < checkpoints.CheckpointStatusClosed { indexWorker := rc.indexWorkers.Apply() defer rc.indexWorkers.Recycle(indexWorker) @@ -293,17 +291,13 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp default: } if engineErr.Get() != nil { - logTask.Error("break because met error", zap.Error(engineErr.Get())) break } // Should skip index engine if engineID < 0 { - logTask.Info("Skip IndexEngine") continue } - logTask.Info("Begin Import Data Engine", zap.Int8("status", int8(engine.Status)), zap.Int32("engineID", engineID), - zap.Int("chunk size", len(engine.Chunks))) if engine.Status < checkpoints.CheckpointStatusImported { wg.Add(1) diff --git a/br/pkg/lightning/restore/tidb.go b/br/pkg/lightning/restore/tidb.go index 70f7793ab8b59..6ef7c6246a199 100644 --- a/br/pkg/lightning/restore/tidb.go +++ b/br/pkg/lightning/restore/tidb.go @@ -93,13 +93,10 @@ func DBFromConfig(ctx context.Context, dsn config.DBStore) (*sql.DB, error) { TLS: dsn.TLS, } - log.L().Info("Before connect DB") db, err := param.Connect() if err != nil { - log.L().Info("After connect Error") return nil, errors.Trace(err) } - log.L().Info("After connect DB") vars := map[string]string{ "tidb_build_stats_concurrency": strconv.Itoa(dsn.BuildStatsConcurrency), @@ -132,11 +129,9 @@ func DBFromConfig(ctx context.Context, dsn config.DBStore) (*sql.DB, error) { log.L().Info("Set session End") } _ = db.Close() - log.L().Info("Close connections") param.Vars = vars db, err = param.Connect() - log.L().Info("connect again") return db, errors.Trace(err) } From bf5ba21788dace37bedcc67b18a7d909a5aa2dc4 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Mon, 1 Nov 2021 18:12:32 +0800 Subject: [PATCH 6/8] remove code Signed-off-by: Little-Wallace --- br/pkg/lightning/restore/tidb.go | 1 + 1 file changed, 1 insertion(+) diff --git a/br/pkg/lightning/restore/tidb.go b/br/pkg/lightning/restore/tidb.go index 41d5cbe3c8e64..a4e1b50f9cc8a 100644 --- a/br/pkg/lightning/restore/tidb.go +++ b/br/pkg/lightning/restore/tidb.go @@ -123,6 +123,7 @@ func DBFromConfig(ctx context.Context, dsn config.DBStore) (*sql.DB, error) { vars[k] = v } } + for k, v := range vars { q := fmt.Sprintf("SET SESSION %s = %s;", k, v) log.L().Info("Set session", zap.String("query", q)) From e4a26ac92666eeed4d5b7054b92914fe013f4a9f Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Mon, 1 Nov 2021 18:14:19 +0800 Subject: [PATCH 7/8] remove code Signed-off-by: Little-Wallace --- br/pkg/lightning/restore/tidb.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/br/pkg/lightning/restore/tidb.go b/br/pkg/lightning/restore/tidb.go index a4e1b50f9cc8a..9b1c2af4de910 100644 --- a/br/pkg/lightning/restore/tidb.go +++ b/br/pkg/lightning/restore/tidb.go @@ -112,11 +112,6 @@ func DBFromConfig(ctx context.Context, dsn config.DBStore) (*sql.DB, error) { // always set auto-commit to ON "autocommit": "1", } - if dsn.Vars != nil { - for k, v := range dsn.Vars { - param.Vars[k] = v - } - } if dsn.Vars != nil { for k, v := range dsn.Vars { From ad3e6fa3f2348c0dc8dd72541ab3535cc86c1fa1 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Mon, 1 Nov 2021 19:10:02 +0800 Subject: [PATCH 8/8] remove debug log Signed-off-by: Little-Wallace --- br/pkg/lightning/restore/tidb.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/br/pkg/lightning/restore/tidb.go b/br/pkg/lightning/restore/tidb.go index 9b1c2af4de910..ee1252fd3862d 100644 --- a/br/pkg/lightning/restore/tidb.go +++ b/br/pkg/lightning/restore/tidb.go @@ -121,13 +121,11 @@ func DBFromConfig(ctx context.Context, dsn config.DBStore) (*sql.DB, error) { for k, v := range vars { q := fmt.Sprintf("SET SESSION %s = %s;", k, v) - log.L().Info("Set session", zap.String("query", q)) if _, err1 := db.ExecContext(ctx, q); err1 != nil { log.L().Warn("set session variable failed, will skip this query", zap.String("query", q), zap.Error(err1)) delete(vars, k) } - log.L().Info("Set session End") } _ = db.Close()