Skip to content

Commit

Permalink
checkpoints: fixed some serious checkpoint issues (pingcap#96)
Browse files Browse the repository at this point in the history
1. Ensure FileCheckpointsDB is mutex-protected.
   This prevents the "concurrent map iteration and map write" error.
2. Split the DDLs creating a MySQLCheckpointsDB into 3 separate statements
3. Reduced the path length of chunks_v3 in the PRIMARY KEY to 500 chars
   to avoid the 3072 key-length limitation.
  • Loading branch information
kennytm authored Dec 12, 2018
1 parent 7ca4e96 commit 510f71e
Showing 1 changed file with 43 additions and 6 deletions.
49 changes: 43 additions & 6 deletions lightning/restore/checkpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io/ioutil"
"sort"
"strings"
"sync"
"time"

"github.com/cznic/mathutil"
Expand Down Expand Up @@ -239,10 +240,15 @@ func NewMySQLCheckpointsDB(ctx context.Context, db *sql.DB, schemaName string) (
common.WriteMySQLIdentifier(&escapedSchemaName, schemaName)
schema := escapedSchemaName.String()

// Apparently we could execute multiple DDL statements in Exec()
err := common.ExecWithRetry(ctx, db, "(create checkpoints database)", fmt.Sprintf(`
CREATE DATABASE IF NOT EXISTS %[1]s;
CREATE TABLE IF NOT EXISTS %[1]s.%[2]s (
CREATE DATABASE IF NOT EXISTS %s;
`, schema))
if err != nil {
return nil, errors.Trace(err)
}

err = common.ExecWithRetry(ctx, db, "(create table checkpoints table)", fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s.%s (
node_id int unsigned NOT NULL,
session bigint unsigned NOT NULL,
table_name varchar(261) NOT NULL PRIMARY KEY,
Expand All @@ -254,7 +260,13 @@ func NewMySQLCheckpointsDB(ctx context.Context, db *sql.DB, schemaName string) (
update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX(node_id, session)
);
CREATE TABLE IF NOT EXISTS %[1]s.%[3]s (
`, schema, checkpointTableNameTable))
if err != nil {
return nil, errors.Trace(err)
}

err = common.ExecWithRetry(ctx, db, "(create chunks checkpoints table)", fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s.%s (
table_name varchar(261) NOT NULL,
path varchar(2048) NOT NULL,
offset bigint NOT NULL,
Expand All @@ -269,9 +281,9 @@ func NewMySQLCheckpointsDB(ctx context.Context, db *sql.DB, schemaName string) (
kvc_checksum bigint unsigned NOT NULL DEFAULT 0,
create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY(table_name, path, offset)
PRIMARY KEY(table_name, path(500), offset)
);
`, schema, checkpointTableNameTable, checkpointTableNameChunk))
`, schema, checkpointTableNameChunk))
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -502,6 +514,7 @@ func (cpdb *MySQLCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpoi
}

type FileCheckpointsDB struct {
lock sync.Mutex // we need to ensure only a thread can access to `checkpoints` at a time
checkpoints CheckpointsModel
path string
}
Expand Down Expand Up @@ -530,6 +543,9 @@ func (cpdb *FileCheckpointsDB) save() error {
}

func (cpdb *FileCheckpointsDB) Initialize(ctx context.Context, dbInfo map[string]*TidbDBInfo) error {
cpdb.lock.Lock()
defer cpdb.lock.Unlock()

if cpdb.checkpoints.Checkpoints == nil {
cpdb.checkpoints.Checkpoints = make(map[string]*TableCheckpointModel)
}
Expand All @@ -551,10 +567,16 @@ func (cpdb *FileCheckpointsDB) Initialize(ctx context.Context, dbInfo map[string
}

func (cpdb *FileCheckpointsDB) Close() error {
cpdb.lock.Lock()
defer cpdb.lock.Unlock()

return errors.Trace(cpdb.save())
}

func (cpdb *FileCheckpointsDB) Get(_ context.Context, tableName string) (*TableCheckpoint, error) {
cpdb.lock.Lock()
defer cpdb.lock.Unlock()

tableModel := cpdb.checkpoints.Checkpoints[tableName]

engine, err := uuid.FromBytes(tableModel.Engine)
Expand Down Expand Up @@ -594,6 +616,9 @@ func (cpdb *FileCheckpointsDB) Get(_ context.Context, tableName string) (*TableC
}

func (cpdb *FileCheckpointsDB) InsertChunkCheckpoints(_ context.Context, tableName string, checkpoints []*ChunkCheckpoint) error {
cpdb.lock.Lock()
defer cpdb.lock.Unlock()

tableModel := cpdb.checkpoints.Checkpoints[tableName]
if tableModel.Chunks == nil {
tableModel.Chunks = make(map[string]*ChunkCheckpointModel)
Expand Down Expand Up @@ -624,6 +649,9 @@ func (cpdb *FileCheckpointsDB) InsertChunkCheckpoints(_ context.Context, tableNa
}

func (cpdb *FileCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff) {
cpdb.lock.Lock()
defer cpdb.lock.Unlock()

for tableName, cpd := range checkpointDiffs {
tableModel := cpdb.checkpoints.Checkpoints[tableName]
if cpd.hasStatus {
Expand Down Expand Up @@ -832,6 +860,9 @@ func (cpdb *MySQLCheckpointsDB) DumpChunks(ctx context.Context, writer io.Writer
}

func (cpdb *FileCheckpointsDB) RemoveCheckpoint(_ context.Context, tableName string) error {
cpdb.lock.Lock()
defer cpdb.lock.Unlock()

if tableName == "all" {
cpdb.checkpoints.Reset()
} else {
Expand All @@ -841,6 +872,9 @@ func (cpdb *FileCheckpointsDB) RemoveCheckpoint(_ context.Context, tableName str
}

func (cpdb *FileCheckpointsDB) IgnoreErrorCheckpoint(_ context.Context, targetTableName string) error {
cpdb.lock.Lock()
defer cpdb.lock.Unlock()

for tableName, tableModel := range cpdb.checkpoints.Checkpoints {
if !(targetTableName == "all" || targetTableName == tableName) {
continue
Expand All @@ -853,6 +887,9 @@ func (cpdb *FileCheckpointsDB) IgnoreErrorCheckpoint(_ context.Context, targetTa
}

func (cpdb *FileCheckpointsDB) DestroyErrorCheckpoint(_ context.Context, targetTableName string) ([]DestroyedTableCheckpoint, error) {
cpdb.lock.Lock()
defer cpdb.lock.Unlock()

var targetTables []DestroyedTableCheckpoint

for tableName, tableModel := range cpdb.checkpoints.Checkpoints {
Expand Down

0 comments on commit 510f71e

Please sign in to comment.