diff --git a/lightning/checkpoints/checkpoints.go b/lightning/checkpoints/checkpoints.go index 621eb857d..8e6f06f19 100644 --- a/lightning/checkpoints/checkpoints.go +++ b/lightning/checkpoints/checkpoints.go @@ -21,10 +21,10 @@ import ( "io" "io/ioutil" "math" + "os" "sort" "strings" "sync" - "time" "github.com/joho/sqltocsv" "github.com/pingcap/errors" @@ -54,14 +54,12 @@ const ( CheckpointStatusAnalyzed CheckpointStatus = 210 ) -const nodeID = 0 - const WholeTableEngineID = math.MaxInt32 const ( // the table names to store each kind of checkpoint in the checkpoint database // remember to increase the version number in case of incompatible change. - checkpointTableNameTable = "table_v4" + checkpointTableNameTable = "table_v5" checkpointTableNameEngine = "engine_v5" checkpointTableNameChunk = "chunk_v4" ) @@ -329,6 +327,9 @@ type CheckpointsDB interface { Update(checkpointDiffs map[string]*TableCheckpointDiff) RemoveCheckpoint(ctx context.Context, tableName string) error + // MoveCheckpoints renames the checkpoint schema to include a suffix + // including the taskID (e.g. `tidb_lightning_checkpoints.1234567890.bak`). + MoveCheckpoints(ctx context.Context, taskID int64) error IgnoreErrorCheckpoint(ctx context.Context, tableName string) error DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error) DumpTables(ctx context.Context, csv io.Writer) error @@ -364,12 +365,12 @@ func (*NullCheckpointsDB) InsertEngineCheckpoints(_ context.Context, _ string, _ func (*NullCheckpointsDB) Update(map[string]*TableCheckpointDiff) {} type MySQLCheckpointsDB struct { - db *sql.DB - schema string - session uint64 + db *sql.DB + schema string + taskID int64 } -func NewMySQLCheckpointsDB(ctx context.Context, db *sql.DB, schemaName string) (*MySQLCheckpointsDB, error) { +func NewMySQLCheckpointsDB(ctx context.Context, db *sql.DB, schemaName string, taskID int64) (*MySQLCheckpointsDB, error) { var escapedSchemaName strings.Builder common.WriteMySQLIdentifier(&escapedSchemaName, schemaName) schema := escapedSchemaName.String() @@ -388,15 +389,14 @@ func NewMySQLCheckpointsDB(ctx context.Context, db *sql.DB, schemaName string) ( err = sql.Exec(ctx, "create table checkpoints table", fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s.%s ( - node_id int unsigned NOT NULL, - session bigint unsigned NOT NULL, + task_id bigint NOT NULL, table_name varchar(261) NOT NULL PRIMARY KEY, hash binary(32) NOT NULL, status tinyint unsigned DEFAULT 30, alloc_base bigint NOT NULL DEFAULT 0, create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - INDEX(node_id, session) + INDEX(task_id) ); `, schema, checkpointTableNameTable)) if err != nil { @@ -441,13 +441,10 @@ func NewMySQLCheckpointsDB(ctx context.Context, db *sql.DB, schemaName string) ( return nil, errors.Trace(err) } - // Create a relatively unique number (on the same node) as the session ID. - session := uint64(time.Now().UnixNano()) - return &MySQLCheckpointsDB{ - db: db, - schema: schema, - session: session, + db: db, + schema: schema, + taskID: taskID, }, nil } @@ -457,17 +454,17 @@ func (cpdb *MySQLCheckpointsDB) Initialize(ctx context.Context, dbInfo map[strin s := common.SQLWithRetry{DB: cpdb.db, Logger: log.L()} err := s.Transact(ctx, "insert checkpoints", func(c context.Context, tx *sql.Tx) error { - // If `node_id` is not the same but the `table_name` duplicates, + // If `hash` is not the same but the `table_name` duplicates, // the CASE expression will return NULL, which can be used to violate - // the NOT NULL requirement of `session` column, and caused this INSERT + // the NOT NULL requirement of `task_id` column, and caused this INSERT // statement to fail with an irrecoverable error. // We do need to capture the error is display a user friendly message // (multiple nodes cannot import the same table) though. stmt, err := tx.PrepareContext(c, fmt.Sprintf(` - INSERT INTO %s.%s (node_id, session, table_name, hash) VALUES (?, ?, ?, ?) - ON DUPLICATE KEY UPDATE session = CASE - WHEN node_id = VALUES(node_id) AND hash = VALUES(hash) - THEN VALUES(session) + INSERT INTO %s.%s (task_id, table_name, hash) VALUES (?, ?, ?) + ON DUPLICATE KEY UPDATE task_id = CASE + WHEN hash = VALUES(hash) + THEN VALUES(task_id) END; `, cpdb.schema, checkpointTableNameTable)) if err != nil { @@ -478,7 +475,7 @@ func (cpdb *MySQLCheckpointsDB) Initialize(ctx context.Context, dbInfo map[strin for _, db := range dbInfo { for _, table := range db.Tables { tableName := common.UniqueTable(db.Name, table.Name) - _, err = stmt.ExecContext(c, nodeID, cpdb.session, tableName, 0) + _, err = stmt.ExecContext(c, cpdb.taskID, tableName, 0) if err != nil { return errors.Trace(err) } @@ -920,6 +917,9 @@ var cannotManageNullDB = errors.New("cannot perform this function while checkpoi func (*NullCheckpointsDB) RemoveCheckpoint(context.Context, string) error { return errors.Trace(cannotManageNullDB) } +func (*NullCheckpointsDB) MoveCheckpoints(context.Context, int64) error { + return errors.Trace(cannotManageNullDB) +} func (*NullCheckpointsDB) IgnoreErrorCheckpoint(context.Context, string) error { return errors.Trace(cannotManageNullDB) } @@ -937,57 +937,73 @@ func (*NullCheckpointsDB) DumpChunks(context.Context, io.Writer) error { } func (cpdb *MySQLCheckpointsDB) RemoveCheckpoint(ctx context.Context, tableName string) error { - var ( - deleteChunkFmt string - deleteEngineFmt string - deleteTableFmt string - arg interface{} - ) - - if tableName == "all" { - deleteChunkFmt = "DELETE FROM %[1]s.%[2]s WHERE table_name IN (SELECT table_name FROM %[1]s.%[3]s WHERE node_id = ?)" - deleteEngineFmt = "DELETE FROM %[1]s.%[2]s WHERE table_name IN (SELECT table_name FROM %[1]s.%[3]s WHERE node_id = ?)" - deleteTableFmt = "DELETE FROM %s.%s WHERE node_id = ?" - arg = nodeID - } else { - deleteChunkFmt = "DELETE FROM %s.%s WHERE table_name = ?%.0s" // the %.0s is to consume the third parameter. - deleteEngineFmt = "DELETE FROM %s.%s WHERE table_name = ?%.0s" - deleteTableFmt = "DELETE FROM %s.%s WHERE table_name = ?" - arg = tableName - } - - deleteChunkQuery := fmt.Sprintf(deleteChunkFmt, cpdb.schema, checkpointTableNameChunk, checkpointTableNameTable) - deleteEngineQuery := fmt.Sprintf(deleteEngineFmt, cpdb.schema, checkpointTableNameEngine, checkpointTableNameTable) - deleteTableQuery := fmt.Sprintf(deleteTableFmt, cpdb.schema, checkpointTableNameTable) s := common.SQLWithRetry{ DB: cpdb.db, Logger: log.With(zap.String("table", tableName)), } - err := s.Transact(ctx, "remove checkpoints", func(c context.Context, tx *sql.Tx) error { - if _, e := tx.ExecContext(c, deleteChunkQuery, arg); e != nil { + + if tableName == "all" { + return s.Exec(ctx, "remove all checkpoints", "DROP SCHEMA "+cpdb.schema) + } + + deleteChunkQuery := fmt.Sprintf("DELETE FROM %s.%s WHERE table_name = ?", cpdb.schema, checkpointTableNameChunk) + deleteEngineQuery := fmt.Sprintf("DELETE FROM %s.%s WHERE table_name = ?", cpdb.schema, checkpointTableNameEngine) + deleteTableQuery := fmt.Sprintf("DELETE FROM %s.%s WHERE table_name = ?", cpdb.schema, checkpointTableNameTable) + + return s.Transact(ctx, "remove checkpoints", func(c context.Context, tx *sql.Tx) error { + if _, e := tx.ExecContext(c, deleteChunkQuery, tableName); e != nil { return errors.Trace(e) } - if _, e := tx.ExecContext(c, deleteEngineQuery, arg); e != nil { + if _, e := tx.ExecContext(c, deleteEngineQuery, tableName); e != nil { return errors.Trace(e) } - if _, e := tx.ExecContext(c, deleteTableQuery, arg); e != nil { + if _, e := tx.ExecContext(c, deleteTableQuery, tableName); e != nil { return errors.Trace(e) } return nil }) - return errors.Trace(err) +} + +func (cpdb *MySQLCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error { + // The "cpdb.schema" is an escaped schema name of the form "`foo`". + // We use "x[1:len(x)-1]" instead of unescaping it to keep the + // double-backquotes (if any) intact. + newSchema := fmt.Sprintf("`%s.%d.bak`", cpdb.schema[1:len(cpdb.schema)-1], taskID) + s := common.SQLWithRetry{ + DB: cpdb.db, + Logger: log.With(zap.Int64("taskID", taskID)), + } + + createSchemaQuery := "CREATE SCHEMA IF NOT EXISTS " + newSchema + moveChunkQuery := fmt.Sprintf("RENAME TABLE %[1]s.%[3]s TO %[2]s.%[3]s", cpdb.schema, newSchema, checkpointTableNameChunk) + moveEngineQuery := fmt.Sprintf("RENAME TABLE %[1]s.%[3]s TO %[2]s.%[3]s", cpdb.schema, newSchema, checkpointTableNameEngine) + moveTableQuery := fmt.Sprintf("RENAME TABLE %[1]s.%[3]s TO %[2]s.%[3]s", cpdb.schema, newSchema, checkpointTableNameTable) + + if e := s.Exec(ctx, "create backup checkpoints schema", createSchemaQuery); e != nil { + return e + } + if e := s.Exec(ctx, "move chunk checkpoints table", moveChunkQuery); e != nil { + return e + } + if e := s.Exec(ctx, "move engine checkpoints table", moveEngineQuery); e != nil { + return e + } + if e := s.Exec(ctx, "move table checkpoints table", moveTableQuery); e != nil { + return e + } + return nil } func (cpdb *MySQLCheckpointsDB) IgnoreErrorCheckpoint(ctx context.Context, tableName string) error { - var ( - colName string - arg interface{} - ) + var colName string if tableName == "all" { - colName, arg = "node_id", nodeID + // This will expand to `WHERE 'all' = 'all'` and effectively allowing + // all tables to be included. + colName = "'all'" } else { - colName, arg = "table_name", tableName + colName = "table_name" } + engineQuery := fmt.Sprintf(` UPDATE %s.%s SET status = %d WHERE %s = ? AND status <= %d; `, cpdb.schema, checkpointTableNameEngine, CheckpointStatusLoaded, colName, CheckpointStatusMaxInvalid) @@ -1000,10 +1016,10 @@ func (cpdb *MySQLCheckpointsDB) IgnoreErrorCheckpoint(ctx context.Context, table Logger: log.With(zap.String("table", tableName)), } err := s.Transact(ctx, "ignore error checkpoints", func(c context.Context, tx *sql.Tx) error { - if _, e := tx.ExecContext(c, engineQuery, arg); e != nil { + if _, e := tx.ExecContext(c, engineQuery, tableName); e != nil { return errors.Trace(e) } - if _, e := tx.ExecContext(c, tableQuery, arg); e != nil { + if _, e := tx.ExecContext(c, tableQuery, tableName); e != nil { return errors.Trace(e) } return nil @@ -1012,17 +1028,16 @@ func (cpdb *MySQLCheckpointsDB) IgnoreErrorCheckpoint(ctx context.Context, table } func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error) { - var ( - conditionColumn string - arg interface{} - ) + var colName, aliasedColName string if tableName == "all" { - conditionColumn = "node_id" - arg = nodeID + // These will expand to `WHERE 'all' = 'all'` and effectively allowing + // all tables to be included. + colName = "'all'" + aliasedColName = "'all'" } else { - conditionColumn = "table_name" - arg = tableName + colName = "table_name" + aliasedColName = "t.table_name" } selectQuery := fmt.Sprintf(` @@ -1032,18 +1047,18 @@ func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tabl COALESCE(MAX(e.engine_id), -1) FROM %[1]s.%[4]s t LEFT JOIN %[1]s.%[5]s e ON t.table_name = e.table_name - WHERE t.%[2]s = ? AND t.status <= %[3]d + WHERE %[2]s = ? AND t.status <= %[3]d GROUP BY t.table_name; - `, cpdb.schema, conditionColumn, CheckpointStatusMaxInvalid, checkpointTableNameTable, checkpointTableNameEngine) + `, cpdb.schema, aliasedColName, CheckpointStatusMaxInvalid, checkpointTableNameTable, checkpointTableNameEngine) deleteChunkQuery := fmt.Sprintf(` DELETE FROM %[1]s.%[4]s WHERE table_name IN (SELECT table_name FROM %[1]s.%[5]s WHERE %[2]s = ? AND status <= %[3]d) - `, cpdb.schema, conditionColumn, CheckpointStatusMaxInvalid, checkpointTableNameChunk, checkpointTableNameTable) + `, cpdb.schema, colName, CheckpointStatusMaxInvalid, checkpointTableNameChunk, checkpointTableNameTable) deleteEngineQuery := fmt.Sprintf(` DELETE FROM %[1]s.%[4]s WHERE table_name IN (SELECT table_name FROM %[1]s.%[5]s WHERE %[2]s = ? AND status <= %[3]d) - `, cpdb.schema, conditionColumn, CheckpointStatusMaxInvalid, checkpointTableNameEngine, checkpointTableNameTable) + `, cpdb.schema, colName, CheckpointStatusMaxInvalid, checkpointTableNameEngine, checkpointTableNameTable) deleteTableQuery := fmt.Sprintf(` DELETE FROM %s.%s WHERE %s = ? AND status <= %d - `, cpdb.schema, checkpointTableNameTable, conditionColumn, CheckpointStatusMaxInvalid) + `, cpdb.schema, checkpointTableNameTable, colName, CheckpointStatusMaxInvalid) var targetTables []DestroyedTableCheckpoint @@ -1054,7 +1069,7 @@ func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tabl err := s.Transact(ctx, "destroy error checkpoints", func(c context.Context, tx *sql.Tx) error { // Obtain the list of tables targetTables = nil - rows, e := tx.QueryContext(c, selectQuery, arg) + rows, e := tx.QueryContext(c, selectQuery, tableName) if e != nil { return errors.Trace(e) } @@ -1071,13 +1086,13 @@ func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tabl } // Delete the checkpoints - if _, e := tx.ExecContext(c, deleteChunkQuery, arg); e != nil { + if _, e := tx.ExecContext(c, deleteChunkQuery, tableName); e != nil { return errors.Trace(e) } - if _, e := tx.ExecContext(c, deleteEngineQuery, arg); e != nil { + if _, e := tx.ExecContext(c, deleteEngineQuery, tableName); e != nil { return errors.Trace(e) } - if _, e := tx.ExecContext(c, deleteTableQuery, arg); e != nil { + if _, e := tx.ExecContext(c, deleteTableQuery, tableName); e != nil { return errors.Trace(e) } return nil @@ -1092,8 +1107,7 @@ func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tabl func (cpdb *MySQLCheckpointsDB) DumpTables(ctx context.Context, writer io.Writer) error { rows, err := cpdb.db.QueryContext(ctx, fmt.Sprintf(` SELECT - node_id, - session, + task_id, table_name, hex(hash) AS hash, status, @@ -1160,12 +1174,21 @@ func (cpdb *FileCheckpointsDB) RemoveCheckpoint(_ context.Context, tableName str if tableName == "all" { cpdb.checkpoints.Reset() - } else { - delete(cpdb.checkpoints.Checkpoints, tableName) + return errors.Trace(os.Remove(cpdb.path)) } + + delete(cpdb.checkpoints.Checkpoints, tableName) return errors.Trace(cpdb.save()) } +func (cpdb *FileCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error { + cpdb.lock.Lock() + defer cpdb.lock.Unlock() + + newPath := fmt.Sprintf("%s.%d.bak", cpdb.path, taskID) + return errors.Trace(os.Rename(cpdb.path, newPath)) +} + func (cpdb *FileCheckpointsDB) IgnoreErrorCheckpoint(_ context.Context, targetTableName string) error { cpdb.lock.Lock() defer cpdb.lock.Unlock() diff --git a/lightning/checkpoints/checkpoints_sql_test.go b/lightning/checkpoints/checkpoints_sql_test.go index 1a140f234..901b90449 100644 --- a/lightning/checkpoints/checkpoints_sql_test.go +++ b/lightning/checkpoints/checkpoints_sql_test.go @@ -8,8 +8,8 @@ import ( "github.com/DATA-DOG/go-sqlmock" . "github.com/pingcap/check" - "github.com/pingcap/tidb-lightning/lightning/mydump" "github.com/pingcap/tidb-lightning/lightning/checkpoints" + "github.com/pingcap/tidb-lightning/lightning/mydump" "github.com/pingcap/tidb-lightning/lightning/verification" ) @@ -42,7 +42,7 @@ func (s *cpSQLSuite) SetUpTest(c *C) { ExpectExec("CREATE TABLE IF NOT EXISTS `mock-schema`\\.chunk_v\\d+ .+"). WillReturnResult(sqlmock.NewResult(4, 1)) - cpdb, err := checkpoints.NewMySQLCheckpointsDB(context.Background(), s.db, "mock-schema") + cpdb, err := checkpoints.NewMySQLCheckpointsDB(context.Background(), s.db, "mock-schema", 1234) c.Assert(err, IsNil) c.Assert(s.mock.ExpectationsWereMet(), IsNil) s.cpdb = cpdb @@ -64,13 +64,13 @@ func (s *cpSQLSuite) TestNormalOperations(c *C) { initializeStmt := s.mock. ExpectPrepare("INSERT INTO `mock-schema`\\.table_v\\d+") initializeStmt.ExpectExec(). - WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg(), "`db1`.`t1`", sqlmock.AnyArg()). + WithArgs(1234, "`db1`.`t1`", sqlmock.AnyArg()). WillReturnResult(sqlmock.NewResult(5, 1)) initializeStmt.ExpectExec(). - WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg(), "`db1`.`t2`", sqlmock.AnyArg()). + WithArgs(1234, "`db1`.`t2`", sqlmock.AnyArg()). WillReturnResult(sqlmock.NewResult(6, 1)) initializeStmt.ExpectExec(). - WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg(), "`db2`.`t3`", sqlmock.AnyArg()). + WithArgs(1234, "`db2`.`t3`", sqlmock.AnyArg()). WillReturnResult(sqlmock.NewResult(7, 1)) s.mock.ExpectCommit() @@ -263,20 +263,7 @@ func (s *cpSQLSuite) TestNormalOperations(c *C) { } func (s *cpSQLSuite) TestRemoveAllCheckpoints(c *C) { - s.mock.ExpectBegin() - s.mock. - ExpectExec("DELETE FROM `mock-schema`\\.chunk_v\\d+ WHERE table_name IN"). - WithArgs(sqlmock.AnyArg()). - WillReturnResult(sqlmock.NewResult(0, 5)) - s.mock. - ExpectExec("DELETE FROM `mock-schema`\\.engine_v\\d+ WHERE table_name IN"). - WithArgs(sqlmock.AnyArg()). - WillReturnResult(sqlmock.NewResult(0, 3)) - s.mock. - ExpectExec("DELETE FROM `mock-schema`\\.table_v\\d+ WHERE node_id = \\?"). - WithArgs(sqlmock.AnyArg()). - WillReturnResult(sqlmock.NewResult(0, 2)) - s.mock.ExpectCommit() + s.mock.ExpectExec("DROP SCHEMA `mock-schema`").WillReturnResult(sqlmock.NewResult(0, 1)) err := s.cpdb.RemoveCheckpoint(context.Background(), "all") c.Assert(err, IsNil) @@ -305,11 +292,11 @@ func (s *cpSQLSuite) TestRemoveOneCheckpoint(c *C) { func (s *cpSQLSuite) TestIgnoreAllErrorCheckpoints(c *C) { s.mock.ExpectBegin() s.mock. - ExpectExec("UPDATE `mock-schema`\\.engine_v\\d+ SET status = 30 WHERE node_id = \\? AND status <= 25"). + ExpectExec("UPDATE `mock-schema`\\.engine_v\\d+ SET status = 30 WHERE 'all' = \\? AND status <= 25"). WithArgs(sqlmock.AnyArg()). WillReturnResult(sqlmock.NewResult(5, 3)) s.mock. - ExpectExec("UPDATE `mock-schema`\\.table_v\\d+ SET status = 30 WHERE node_id = \\? AND status <= 25"). + ExpectExec("UPDATE `mock-schema`\\.table_v\\d+ SET status = 30 WHERE 'all' = \\? AND status <= 25"). WithArgs(sqlmock.AnyArg()). WillReturnResult(sqlmock.NewResult(6, 2)) s.mock.ExpectCommit() @@ -337,22 +324,22 @@ func (s *cpSQLSuite) TestIgnoreOneErrorCheckpoint(c *C) { func (s *cpSQLSuite) TestDestroyAllErrorCheckpoints(c *C) { s.mock.ExpectBegin() s.mock. - ExpectQuery("SELECT (?s:.+)node_id = \\?"). + ExpectQuery("SELECT (?s:.+)'all' = \\?"). WithArgs(sqlmock.AnyArg()). WillReturnRows( sqlmock.NewRows([]string{"table_name", "__min__", "__max__"}). AddRow("`db1`.`t2`", -1, 0), ) s.mock. - ExpectExec("DELETE FROM `mock-schema`\\.chunk_v\\d+ WHERE table_name IN .+ node_id = \\?"). + ExpectExec("DELETE FROM `mock-schema`\\.chunk_v\\d+ WHERE table_name IN .+ 'all' = \\?"). WithArgs(sqlmock.AnyArg()). WillReturnResult(sqlmock.NewResult(0, 5)) s.mock. - ExpectExec("DELETE FROM `mock-schema`\\.engine_v\\d+ WHERE table_name IN .+ node_id = \\?"). + ExpectExec("DELETE FROM `mock-schema`\\.engine_v\\d+ WHERE table_name IN .+ 'all' = \\?"). WithArgs(sqlmock.AnyArg()). WillReturnResult(sqlmock.NewResult(0, 3)) s.mock. - ExpectExec("DELETE FROM `mock-schema`\\.table_v\\d+ WHERE node_id = \\?"). + ExpectExec("DELETE FROM `mock-schema`\\.table_v\\d+ WHERE 'all' = \\?"). WithArgs(sqlmock.AnyArg()). WillReturnResult(sqlmock.NewResult(0, 2)) s.mock.ExpectCommit() @@ -446,15 +433,35 @@ func (s *cpSQLSuite) TestDump(c *C) { s.mock. ExpectQuery("SELECT .+ FROM `mock-schema`\\.table_v\\d+"). WillReturnRows( - sqlmock.NewRows([]string{"node_id", "session", "table_name", "hash", "status", "alloc_base", "create_time", "update_time"}). - AddRow(0, 1555555555, "`db1`.`t2`", 0, 90, 132861, t, t), + sqlmock.NewRows([]string{"task_id", "table_name", "hash", "status", "alloc_base", "create_time", "update_time"}). + AddRow(1555555555, "`db1`.`t2`", 0, 90, 132861, t, t), ) csvBuilder.Reset() err = s.cpdb.DumpTables(ctx, &csvBuilder) c.Assert(err, IsNil) c.Assert(csvBuilder.String(), Equals, - "node_id,session,table_name,hash,status,alloc_base,create_time,update_time\n"+ - "0,1555555555,`db1`.`t2`,0,90,132861,2019-04-18 02:45:55 +0000 UTC,2019-04-18 02:45:55 +0000 UTC\n", + "task_id,table_name,hash,status,alloc_base,create_time,update_time\n"+ + "1555555555,`db1`.`t2`,0,90,132861,2019-04-18 02:45:55 +0000 UTC,2019-04-18 02:45:55 +0000 UTC\n", ) } + +func (s *cpSQLSuite) TestMoveCheckpoints(c *C) { + ctx := context.Background() + + s.mock. + ExpectExec("CREATE SCHEMA IF NOT EXISTS `mock-schema\\.12345678\\.bak`"). + WillReturnResult(sqlmock.NewResult(1, 1)) + s.mock. + ExpectExec("RENAME TABLE `mock-schema`\\.chunk_v\\d+ TO `mock-schema\\.12345678\\.bak`\\.chunk_v\\d+"). + WillReturnResult(sqlmock.NewResult(0, 1)) + s.mock. + ExpectExec("RENAME TABLE `mock-schema`\\.engine_v\\d+ TO `mock-schema\\.12345678\\.bak`\\.engine_v\\d+"). + WillReturnResult(sqlmock.NewResult(0, 1)) + s.mock. + ExpectExec("RENAME TABLE `mock-schema`\\.table_v\\d+ TO `mock-schema\\.12345678\\.bak`\\.table_v\\d+"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + err := s.cpdb.MoveCheckpoints(ctx, 12345678) + c.Assert(err, IsNil) +} diff --git a/lightning/lightning.go b/lightning/lightning.go index 08f566695..51f1c8c67 100644 --- a/lightning/lightning.go +++ b/lightning/lightning.go @@ -26,6 +26,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -130,6 +131,10 @@ func (l *Lightning) RunOnce() error { if err := cfg.Adjust(); err != nil { return err } + cfg.TaskID = time.Now().UnixNano() + failpoint.Inject("SetTaskID", func(val failpoint.Value) { + cfg.TaskID = int64(val.(int)) + }) return l.run(cfg) } diff --git a/lightning/lightning_test.go b/lightning/lightning_test.go index 4c1798799..1354d8e53 100644 --- a/lightning/lightning_test.go +++ b/lightning/lightning_test.go @@ -119,6 +119,7 @@ func (s *lightningServerSuite) TestRunServer(c *C) { resp.Body.Close() go s.lightning.RunServer() + time.Sleep(100 * time.Millisecond) req, err := http.NewRequest(http.MethodPut, url, nil) c.Assert(err, IsNil) @@ -205,6 +206,7 @@ func (s *lightningServerSuite) TestGetDeleteTask(c *C) { } go s.lightning.RunServer() + time.Sleep(100 * time.Millisecond) // Check `GET /tasks` without any active tasks diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index 8c8d56ed2..4ebefcba1 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -200,7 +200,7 @@ func OpenCheckpointsDB(ctx context.Context, cfg *config.Config) (CheckpointsDB, if err != nil { return nil, errors.Trace(err) } - cpdb, err := NewMySQLCheckpointsDB(ctx, db, cfg.Checkpoint.Schema) + cpdb, err := NewMySQLCheckpointsDB(ctx, db, cfg.Checkpoint.Schema, cfg.TaskID) if err != nil { db.Close() return nil, errors.Trace(err) @@ -1069,12 +1069,22 @@ func checkVersion(component string, expected, actual semver.Version) error { } func (rc *RestoreController) cleanCheckpoints(ctx context.Context) error { - if !rc.cfg.Checkpoint.Enable || rc.cfg.Checkpoint.KeepAfterSuccess { - log.L().Info("skip clean checkpoints") + if !rc.cfg.Checkpoint.Enable { return nil } - task := log.L().Begin(zap.InfoLevel, "clean checkpoints") - err := rc.checkpointsDB.RemoveCheckpoint(ctx, "all") + + logger := log.With( + zap.Bool("keepAfterSuccess", rc.cfg.Checkpoint.KeepAfterSuccess), + zap.Int64("taskID", rc.cfg.TaskID), + ) + + task := logger.Begin(zap.InfoLevel, "clean checkpoints") + var err error + if rc.cfg.Checkpoint.KeepAfterSuccess { + err = rc.checkpointsDB.MoveCheckpoints(ctx, rc.cfg.TaskID) + } else { + err = rc.checkpointsDB.RemoveCheckpoint(ctx, "all") + } task.End(zap.ErrorLevel, err) return errors.Trace(err) } diff --git a/tests/checkpoint/run.sh b/tests/checkpoint/run.sh index 17a405680..c7a33405f 100755 --- a/tests/checkpoint/run.sh +++ b/tests/checkpoint/run.sh @@ -57,11 +57,13 @@ PARTIAL_IMPORT_QUERY="$PARTIAL_IMPORT_QUERY AS s;" # Set the failpoint to kill the lightning instance as soon as one table is imported # If checkpoint does work, this should only kill 9 instances of lightnings. -export GO_FAILPOINTS='github.com/pingcap/tidb-lightning/lightning/restore/SlowDownImport=sleep(500);github.com/pingcap/tidb-lightning/lightning/restore/FailBeforeIndexEngineImported=return' +SLOWDOWN_FAILPOINTS='github.com/pingcap/tidb-lightning/lightning/restore/SlowDownImport=sleep(500)' +export GO_FAILPOINTS="$SLOWDOWN_FAILPOINTS;github.com/pingcap/tidb-lightning/lightning/restore/FailBeforeIndexEngineImported=return" # Start importing the tables. run_sql 'DROP DATABASE IF EXISTS cppk_tsr' run_sql 'DROP DATABASE IF EXISTS tidb_lightning_checkpoint_test_cppk' +run_sql 'DROP DATABASE IF EXISTS `tidb_lightning_checkpoint_test_cppk.1357924680.bak`' # panic after saving index engine checkpoint status before saving table checkpoint status set +e @@ -72,7 +74,7 @@ for i in $(seq "$TABLE_COUNT"); do done set -e -export GO_FAILPOINTS='github.com/pingcap/tidb-lightning/lightning/restore/SlowDownImport=sleep(500)' +export GO_FAILPOINTS="$SLOWDOWN_FAILPOINTS" set +e for i in $(seq "$TABLE_COUNT"); do echo "******** Importing Table Now (step $i/$TABLE_COUNT) ********" @@ -83,8 +85,9 @@ set -e # Start importing the tables. run_sql 'DROP DATABASE IF EXISTS cppk_tsr' run_sql 'DROP DATABASE IF EXISTS tidb_lightning_checkpoint_test_cppk' +run_sql 'DROP DATABASE IF EXISTS `tidb_lightning_checkpoint_test_cppk.1357924680.bak`' -export GO_FAILPOINTS='github.com/pingcap/tidb-lightning/lightning/restore/SlowDownImport=sleep(500);github.com/pingcap/tidb-lightning/lightning/restore/FailIfIndexEngineImported=return(1)' +export GO_FAILPOINTS="$SLOWDOWN_FAILPOINTS;github.com/pingcap/tidb-lightning/lightning/SetTaskID=return(1357924680);github.com/pingcap/tidb-lightning/lightning/restore/FailIfIndexEngineImported=return(1)" set +e for i in $(seq "$TABLE_COUNT"); do @@ -100,7 +103,7 @@ echo "******** Verify checkpoint no-op ********" run_lightning run_sql "$PARTIAL_IMPORT_QUERY" check_contains "s: $(( (1000 * $CHUNK_COUNT + 1001) * $CHUNK_COUNT * $TABLE_COUNT ))" -run_sql "SELECT count(*) FROM tidb_lightning_checkpoint_test_cppk.table_v4 WHERE status >= 200" +run_sql 'SELECT count(*) FROM `tidb_lightning_checkpoint_test_cppk.1357924680.bak`.table_v5 WHERE status >= 200' check_contains "count(*): $TABLE_COUNT" # Ensure there is no dangling open engines diff --git a/tests/checkpoint_chunks/file.toml b/tests/checkpoint_chunks/file.toml index 4489cdb10..beb90c34f 100644 --- a/tests/checkpoint_chunks/file.toml +++ b/tests/checkpoint_chunks/file.toml @@ -9,6 +9,7 @@ enable = true schema = "tidb_lightning_checkpoint_test_cpch" driver = "file" dsn = "/tmp/lightning_test_result/cpch.pb" +keep-after-success = true [tikv-importer] addr = "127.0.0.1:8808" diff --git a/tests/checkpoint_chunks/run.sh b/tests/checkpoint_chunks/run.sh index ef5f91a8a..97427e315 100755 --- a/tests/checkpoint_chunks/run.sh +++ b/tests/checkpoint_chunks/run.sh @@ -28,7 +28,7 @@ verify_checkpoint_noop() { run_sql 'SELECT count(i), sum(i) FROM cpch_tsr.tbl;' check_contains "count(i): $(($ROW_COUNT*$CHUNK_COUNT))" check_contains "sum(i): $(( $ROW_COUNT*$CHUNK_COUNT*(($CHUNK_COUNT+2)*$ROW_COUNT + 1)/2 ))" - run_sql "SELECT count(*) FROM tidb_lightning_checkpoint_test_cpch.table_v4 WHERE status >= 200" + run_sql 'SELECT count(*) FROM `tidb_lightning_checkpoint_test_cpch.1234567890.bak`.table_v5 WHERE status >= 200' check_contains "count(*): 1" } @@ -46,11 +46,13 @@ done # Set the failpoint to kill the lightning instance as soon as # one file (after writing totally $ROW_COUNT rows) is imported. # If checkpoint does work, this should kill exactly $CHUNK_COUNT instances of lightnings. -export GO_FAILPOINTS="github.com/pingcap/tidb-lightning/lightning/restore/FailIfImportedChunk=return($ROW_COUNT)" +TASKID_FAILPOINTS="github.com/pingcap/tidb-lightning/lightning/SetTaskID=return(1234567890)" +export GO_FAILPOINTS="$TASKID_FAILPOINTS;github.com/pingcap/tidb-lightning/lightning/restore/FailIfImportedChunk=return($ROW_COUNT)" # Start importing the tables. run_sql 'DROP DATABASE IF EXISTS cpch_tsr' run_sql 'DROP DATABASE IF EXISTS tidb_lightning_checkpoint_test_cpch' +run_sql 'DROP DATABASE IF EXISTS `tidb_lightning_checkpoint_test_cpch.1234567890.bak`' set +e for i in $(seq "$CHUNK_COUNT"); do @@ -65,10 +67,11 @@ verify_checkpoint_noop # Next, test kill lightning via signal mechanism run_sql 'DROP DATABASE IF EXISTS cpch_tsr' run_sql 'DROP DATABASE IF EXISTS tidb_lightning_checkpoint_test_cpch' +run_sql 'DROP DATABASE IF EXISTS `tidb_lightning_checkpoint_test_cpch.1234567890.bak`' # Set the failpoint to kill the lightning instance as soon as one chunk is imported, via signal mechanism # If checkpoint does work, this should only kill $CHUNK_COUNT instances of lightnings. -export GO_FAILPOINTS="github.com/pingcap/tidb-lightning/lightning/restore/KillIfImportedChunk=return($ROW_COUNT)" +export GO_FAILPOINTS="$TASKID_FAILPOINTS;github.com/pingcap/tidb-lightning/lightning/restore/KillIfImportedChunk=return($ROW_COUNT)" for i in $(seq "$CHUNK_COUNT"); do echo "******** Importing Chunk Now (step $i/$CHUNK_COUNT) ********" @@ -98,11 +101,11 @@ verify_checkpoint_noop # Repeat, but using the file checkpoint run_sql 'DROP DATABASE IF EXISTS cpch_tsr' run_sql 'DROP DATABASE IF EXISTS tidb_lightning_checkpoint_test_cpch' -rm -f "$TEST_DIR/cpch.pb" +rm -f "$TEST_DIR"/cpch.pb* # Set the failpoint to kill the lightning instance as soon as one chunk is imported # If checkpoint does work, this should only kill $CHUNK_COUNT instances of lightnings. -export GO_FAILPOINTS="github.com/pingcap/tidb-lightning/lightning/restore/FailIfImportedChunk=return($ROW_COUNT)" +export GO_FAILPOINTS="$TASKID_FAILPOINTS;github.com/pingcap/tidb-lightning/lightning/restore/FailIfImportedChunk=return($ROW_COUNT)" set +e for i in $(seq "$CHUNK_COUNT"); do echo "******** Importing Chunk using File checkpoint Now (step $i/$CHUNK_COUNT) ********" @@ -116,4 +119,5 @@ run_lightning file run_sql 'SELECT count(i), sum(i) FROM cpch_tsr.tbl;' check_contains "count(i): $(($ROW_COUNT*$CHUNK_COUNT))" check_contains "sum(i): $(( $ROW_COUNT*$CHUNK_COUNT*(($CHUNK_COUNT+2)*$ROW_COUNT + 1)/2 ))" -[ -f "$TEST_DIR/cpch.pb" ] +[ ! -e "$TEST_DIR/cpch.pb" ] +[ -e "$TEST_DIR/cpch.pb.1234567890.bak" ]