Skip to content

Commit

Permalink
checkpoints: remove node_id field and rename the schema on keep-after…
Browse files Browse the repository at this point in the history
…-success (pingcap#208)

* lightning: reduce the chance of spurious error in web server

* checkpoints: remove node_id

We intend to separate checkpoints from multiple nodes into different
schemas instead.

Since one node now owns the entire schema, when removing all checkpoints
we just drop the entire schema. We also changed the file driver to delete
the file instead of just emptying the content.

* lightning: set the task ID even outside server mode

* checkpoints,restore: move the checkpoints database on keep-after-success

The schema is renamed as `*.{taskID}.bak`.

* checkpoints: addressed comments
  • Loading branch information
kennytm authored Jul 24, 2019
1 parent cf08f2f commit 666c34e
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 124 deletions.
183 changes: 103 additions & 80 deletions lightning/checkpoints/checkpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"io"
"io/ioutil"
"math"
"os"
"sort"
"strings"
"sync"
"time"

"github.com/joho/sqltocsv"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -54,14 +54,12 @@ const (
CheckpointStatusAnalyzed CheckpointStatus = 210
)

const nodeID = 0

const WholeTableEngineID = math.MaxInt32

const (
// the table names to store each kind of checkpoint in the checkpoint database
// remember to increase the version number in case of incompatible change.
checkpointTableNameTable = "table_v4"
checkpointTableNameTable = "table_v5"
checkpointTableNameEngine = "engine_v5"
checkpointTableNameChunk = "chunk_v4"
)
Expand Down Expand Up @@ -329,6 +327,9 @@ type CheckpointsDB interface {
Update(checkpointDiffs map[string]*TableCheckpointDiff)

RemoveCheckpoint(ctx context.Context, tableName string) error
// MoveCheckpoints renames the checkpoint schema to include a suffix
// including the taskID (e.g. `tidb_lightning_checkpoints.1234567890.bak`).
MoveCheckpoints(ctx context.Context, taskID int64) error
IgnoreErrorCheckpoint(ctx context.Context, tableName string) error
DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error)
DumpTables(ctx context.Context, csv io.Writer) error
Expand Down Expand Up @@ -364,12 +365,12 @@ func (*NullCheckpointsDB) InsertEngineCheckpoints(_ context.Context, _ string, _
func (*NullCheckpointsDB) Update(map[string]*TableCheckpointDiff) {}

type MySQLCheckpointsDB struct {
db *sql.DB
schema string
session uint64
db *sql.DB
schema string
taskID int64
}

func NewMySQLCheckpointsDB(ctx context.Context, db *sql.DB, schemaName string) (*MySQLCheckpointsDB, error) {
func NewMySQLCheckpointsDB(ctx context.Context, db *sql.DB, schemaName string, taskID int64) (*MySQLCheckpointsDB, error) {
var escapedSchemaName strings.Builder
common.WriteMySQLIdentifier(&escapedSchemaName, schemaName)
schema := escapedSchemaName.String()
Expand All @@ -388,15 +389,14 @@ func NewMySQLCheckpointsDB(ctx context.Context, db *sql.DB, schemaName string) (

err = sql.Exec(ctx, "create table checkpoints table", fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s.%s (
node_id int unsigned NOT NULL,
session bigint unsigned NOT NULL,
task_id bigint NOT NULL,
table_name varchar(261) NOT NULL PRIMARY KEY,
hash binary(32) NOT NULL,
status tinyint unsigned DEFAULT 30,
alloc_base bigint NOT NULL DEFAULT 0,
create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX(node_id, session)
INDEX(task_id)
);
`, schema, checkpointTableNameTable))
if err != nil {
Expand Down Expand Up @@ -441,13 +441,10 @@ func NewMySQLCheckpointsDB(ctx context.Context, db *sql.DB, schemaName string) (
return nil, errors.Trace(err)
}

// Create a relatively unique number (on the same node) as the session ID.
session := uint64(time.Now().UnixNano())

return &MySQLCheckpointsDB{
db: db,
schema: schema,
session: session,
db: db,
schema: schema,
taskID: taskID,
}, nil
}

Expand All @@ -457,17 +454,17 @@ func (cpdb *MySQLCheckpointsDB) Initialize(ctx context.Context, dbInfo map[strin

s := common.SQLWithRetry{DB: cpdb.db, Logger: log.L()}
err := s.Transact(ctx, "insert checkpoints", func(c context.Context, tx *sql.Tx) error {
// If `node_id` is not the same but the `table_name` duplicates,
// If `hash` is not the same but the `table_name` duplicates,
// the CASE expression will return NULL, which can be used to violate
// the NOT NULL requirement of `session` column, and caused this INSERT
// the NOT NULL requirement of `task_id` column, and caused this INSERT
// statement to fail with an irrecoverable error.
// We do need to capture the error is display a user friendly message
// (multiple nodes cannot import the same table) though.
stmt, err := tx.PrepareContext(c, fmt.Sprintf(`
INSERT INTO %s.%s (node_id, session, table_name, hash) VALUES (?, ?, ?, ?)
ON DUPLICATE KEY UPDATE session = CASE
WHEN node_id = VALUES(node_id) AND hash = VALUES(hash)
THEN VALUES(session)
INSERT INTO %s.%s (task_id, table_name, hash) VALUES (?, ?, ?)
ON DUPLICATE KEY UPDATE task_id = CASE
WHEN hash = VALUES(hash)
THEN VALUES(task_id)
END;
`, cpdb.schema, checkpointTableNameTable))
if err != nil {
Expand All @@ -478,7 +475,7 @@ func (cpdb *MySQLCheckpointsDB) Initialize(ctx context.Context, dbInfo map[strin
for _, db := range dbInfo {
for _, table := range db.Tables {
tableName := common.UniqueTable(db.Name, table.Name)
_, err = stmt.ExecContext(c, nodeID, cpdb.session, tableName, 0)
_, err = stmt.ExecContext(c, cpdb.taskID, tableName, 0)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -920,6 +917,9 @@ var cannotManageNullDB = errors.New("cannot perform this function while checkpoi
func (*NullCheckpointsDB) RemoveCheckpoint(context.Context, string) error {
return errors.Trace(cannotManageNullDB)
}
func (*NullCheckpointsDB) MoveCheckpoints(context.Context, int64) error {
return errors.Trace(cannotManageNullDB)
}
func (*NullCheckpointsDB) IgnoreErrorCheckpoint(context.Context, string) error {
return errors.Trace(cannotManageNullDB)
}
Expand All @@ -937,57 +937,73 @@ func (*NullCheckpointsDB) DumpChunks(context.Context, io.Writer) error {
}

func (cpdb *MySQLCheckpointsDB) RemoveCheckpoint(ctx context.Context, tableName string) error {
var (
deleteChunkFmt string
deleteEngineFmt string
deleteTableFmt string
arg interface{}
)

if tableName == "all" {
deleteChunkFmt = "DELETE FROM %[1]s.%[2]s WHERE table_name IN (SELECT table_name FROM %[1]s.%[3]s WHERE node_id = ?)"
deleteEngineFmt = "DELETE FROM %[1]s.%[2]s WHERE table_name IN (SELECT table_name FROM %[1]s.%[3]s WHERE node_id = ?)"
deleteTableFmt = "DELETE FROM %s.%s WHERE node_id = ?"
arg = nodeID
} else {
deleteChunkFmt = "DELETE FROM %s.%s WHERE table_name = ?%.0s" // the %.0s is to consume the third parameter.
deleteEngineFmt = "DELETE FROM %s.%s WHERE table_name = ?%.0s"
deleteTableFmt = "DELETE FROM %s.%s WHERE table_name = ?"
arg = tableName
}

deleteChunkQuery := fmt.Sprintf(deleteChunkFmt, cpdb.schema, checkpointTableNameChunk, checkpointTableNameTable)
deleteEngineQuery := fmt.Sprintf(deleteEngineFmt, cpdb.schema, checkpointTableNameEngine, checkpointTableNameTable)
deleteTableQuery := fmt.Sprintf(deleteTableFmt, cpdb.schema, checkpointTableNameTable)
s := common.SQLWithRetry{
DB: cpdb.db,
Logger: log.With(zap.String("table", tableName)),
}
err := s.Transact(ctx, "remove checkpoints", func(c context.Context, tx *sql.Tx) error {
if _, e := tx.ExecContext(c, deleteChunkQuery, arg); e != nil {

if tableName == "all" {
return s.Exec(ctx, "remove all checkpoints", "DROP SCHEMA "+cpdb.schema)
}

deleteChunkQuery := fmt.Sprintf("DELETE FROM %s.%s WHERE table_name = ?", cpdb.schema, checkpointTableNameChunk)
deleteEngineQuery := fmt.Sprintf("DELETE FROM %s.%s WHERE table_name = ?", cpdb.schema, checkpointTableNameEngine)
deleteTableQuery := fmt.Sprintf("DELETE FROM %s.%s WHERE table_name = ?", cpdb.schema, checkpointTableNameTable)

return s.Transact(ctx, "remove checkpoints", func(c context.Context, tx *sql.Tx) error {
if _, e := tx.ExecContext(c, deleteChunkQuery, tableName); e != nil {
return errors.Trace(e)
}
if _, e := tx.ExecContext(c, deleteEngineQuery, arg); e != nil {
if _, e := tx.ExecContext(c, deleteEngineQuery, tableName); e != nil {
return errors.Trace(e)
}
if _, e := tx.ExecContext(c, deleteTableQuery, arg); e != nil {
if _, e := tx.ExecContext(c, deleteTableQuery, tableName); e != nil {
return errors.Trace(e)
}
return nil
})
return errors.Trace(err)
}

func (cpdb *MySQLCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error {
// The "cpdb.schema" is an escaped schema name of the form "`foo`".
// We use "x[1:len(x)-1]" instead of unescaping it to keep the
// double-backquotes (if any) intact.
newSchema := fmt.Sprintf("`%s.%d.bak`", cpdb.schema[1:len(cpdb.schema)-1], taskID)
s := common.SQLWithRetry{
DB: cpdb.db,
Logger: log.With(zap.Int64("taskID", taskID)),
}

createSchemaQuery := "CREATE SCHEMA IF NOT EXISTS " + newSchema
moveChunkQuery := fmt.Sprintf("RENAME TABLE %[1]s.%[3]s TO %[2]s.%[3]s", cpdb.schema, newSchema, checkpointTableNameChunk)
moveEngineQuery := fmt.Sprintf("RENAME TABLE %[1]s.%[3]s TO %[2]s.%[3]s", cpdb.schema, newSchema, checkpointTableNameEngine)
moveTableQuery := fmt.Sprintf("RENAME TABLE %[1]s.%[3]s TO %[2]s.%[3]s", cpdb.schema, newSchema, checkpointTableNameTable)

if e := s.Exec(ctx, "create backup checkpoints schema", createSchemaQuery); e != nil {
return e
}
if e := s.Exec(ctx, "move chunk checkpoints table", moveChunkQuery); e != nil {
return e
}
if e := s.Exec(ctx, "move engine checkpoints table", moveEngineQuery); e != nil {
return e
}
if e := s.Exec(ctx, "move table checkpoints table", moveTableQuery); e != nil {
return e
}
return nil
}

func (cpdb *MySQLCheckpointsDB) IgnoreErrorCheckpoint(ctx context.Context, tableName string) error {
var (
colName string
arg interface{}
)
var colName string
if tableName == "all" {
colName, arg = "node_id", nodeID
// This will expand to `WHERE 'all' = 'all'` and effectively allowing
// all tables to be included.
colName = "'all'"
} else {
colName, arg = "table_name", tableName
colName = "table_name"
}

engineQuery := fmt.Sprintf(`
UPDATE %s.%s SET status = %d WHERE %s = ? AND status <= %d;
`, cpdb.schema, checkpointTableNameEngine, CheckpointStatusLoaded, colName, CheckpointStatusMaxInvalid)
Expand All @@ -1000,10 +1016,10 @@ func (cpdb *MySQLCheckpointsDB) IgnoreErrorCheckpoint(ctx context.Context, table
Logger: log.With(zap.String("table", tableName)),
}
err := s.Transact(ctx, "ignore error checkpoints", func(c context.Context, tx *sql.Tx) error {
if _, e := tx.ExecContext(c, engineQuery, arg); e != nil {
if _, e := tx.ExecContext(c, engineQuery, tableName); e != nil {
return errors.Trace(e)
}
if _, e := tx.ExecContext(c, tableQuery, arg); e != nil {
if _, e := tx.ExecContext(c, tableQuery, tableName); e != nil {
return errors.Trace(e)
}
return nil
Expand All @@ -1012,17 +1028,16 @@ func (cpdb *MySQLCheckpointsDB) IgnoreErrorCheckpoint(ctx context.Context, table
}

func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error) {
var (
conditionColumn string
arg interface{}
)
var colName, aliasedColName string

if tableName == "all" {
conditionColumn = "node_id"
arg = nodeID
// These will expand to `WHERE 'all' = 'all'` and effectively allowing
// all tables to be included.
colName = "'all'"
aliasedColName = "'all'"
} else {
conditionColumn = "table_name"
arg = tableName
colName = "table_name"
aliasedColName = "t.table_name"
}

selectQuery := fmt.Sprintf(`
Expand All @@ -1032,18 +1047,18 @@ func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tabl
COALESCE(MAX(e.engine_id), -1)
FROM %[1]s.%[4]s t
LEFT JOIN %[1]s.%[5]s e ON t.table_name = e.table_name
WHERE t.%[2]s = ? AND t.status <= %[3]d
WHERE %[2]s = ? AND t.status <= %[3]d
GROUP BY t.table_name;
`, cpdb.schema, conditionColumn, CheckpointStatusMaxInvalid, checkpointTableNameTable, checkpointTableNameEngine)
`, cpdb.schema, aliasedColName, CheckpointStatusMaxInvalid, checkpointTableNameTable, checkpointTableNameEngine)
deleteChunkQuery := fmt.Sprintf(`
DELETE FROM %[1]s.%[4]s WHERE table_name IN (SELECT table_name FROM %[1]s.%[5]s WHERE %[2]s = ? AND status <= %[3]d)
`, cpdb.schema, conditionColumn, CheckpointStatusMaxInvalid, checkpointTableNameChunk, checkpointTableNameTable)
`, cpdb.schema, colName, CheckpointStatusMaxInvalid, checkpointTableNameChunk, checkpointTableNameTable)
deleteEngineQuery := fmt.Sprintf(`
DELETE FROM %[1]s.%[4]s WHERE table_name IN (SELECT table_name FROM %[1]s.%[5]s WHERE %[2]s = ? AND status <= %[3]d)
`, cpdb.schema, conditionColumn, CheckpointStatusMaxInvalid, checkpointTableNameEngine, checkpointTableNameTable)
`, cpdb.schema, colName, CheckpointStatusMaxInvalid, checkpointTableNameEngine, checkpointTableNameTable)
deleteTableQuery := fmt.Sprintf(`
DELETE FROM %s.%s WHERE %s = ? AND status <= %d
`, cpdb.schema, checkpointTableNameTable, conditionColumn, CheckpointStatusMaxInvalid)
`, cpdb.schema, checkpointTableNameTable, colName, CheckpointStatusMaxInvalid)

var targetTables []DestroyedTableCheckpoint

Expand All @@ -1054,7 +1069,7 @@ func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tabl
err := s.Transact(ctx, "destroy error checkpoints", func(c context.Context, tx *sql.Tx) error {
// Obtain the list of tables
targetTables = nil
rows, e := tx.QueryContext(c, selectQuery, arg)
rows, e := tx.QueryContext(c, selectQuery, tableName)
if e != nil {
return errors.Trace(e)
}
Expand All @@ -1071,13 +1086,13 @@ func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tabl
}

// Delete the checkpoints
if _, e := tx.ExecContext(c, deleteChunkQuery, arg); e != nil {
if _, e := tx.ExecContext(c, deleteChunkQuery, tableName); e != nil {
return errors.Trace(e)
}
if _, e := tx.ExecContext(c, deleteEngineQuery, arg); e != nil {
if _, e := tx.ExecContext(c, deleteEngineQuery, tableName); e != nil {
return errors.Trace(e)
}
if _, e := tx.ExecContext(c, deleteTableQuery, arg); e != nil {
if _, e := tx.ExecContext(c, deleteTableQuery, tableName); e != nil {
return errors.Trace(e)
}
return nil
Expand All @@ -1092,8 +1107,7 @@ func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tabl
func (cpdb *MySQLCheckpointsDB) DumpTables(ctx context.Context, writer io.Writer) error {
rows, err := cpdb.db.QueryContext(ctx, fmt.Sprintf(`
SELECT
node_id,
session,
task_id,
table_name,
hex(hash) AS hash,
status,
Expand Down Expand Up @@ -1160,12 +1174,21 @@ func (cpdb *FileCheckpointsDB) RemoveCheckpoint(_ context.Context, tableName str

if tableName == "all" {
cpdb.checkpoints.Reset()
} else {
delete(cpdb.checkpoints.Checkpoints, tableName)
return errors.Trace(os.Remove(cpdb.path))
}

delete(cpdb.checkpoints.Checkpoints, tableName)
return errors.Trace(cpdb.save())
}

func (cpdb *FileCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error {
cpdb.lock.Lock()
defer cpdb.lock.Unlock()

newPath := fmt.Sprintf("%s.%d.bak", cpdb.path, taskID)
return errors.Trace(os.Rename(cpdb.path, newPath))
}

func (cpdb *FileCheckpointsDB) IgnoreErrorCheckpoint(_ context.Context, targetTableName string) error {
cpdb.lock.Lock()
defer cpdb.lock.Unlock()
Expand Down
Loading

0 comments on commit 666c34e

Please sign in to comment.