Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

restore: Try to create tables in parallel #502

Merged
merged 57 commits into from
Jan 6, 2021
Merged
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
d7ce306
restore: try to create tables in parallel
hidehalo Dec 1, 2020
886908d
glue: fix error condition test for db close
hidehalo Dec 3, 2020
fd8a5de
restore, glue: remove duplicate db pool implementation
hidehalo Dec 3, 2020
5f91c56
restore: try to prevent db connection too early
hidehalo Dec 3, 2020
18eb88c
restore: try to prevent db connection too early
hidehalo Dec 3, 2020
c5b48a4
restore: try to make restore schema run parallel totally
hidehalo Dec 8, 2020
75a5baa
Merge branch 'test-434' into issue-434
hidehalo Dec 8, 2020
92699bd
restore: remove impl&test of tidb#InitSchema
hidehalo Dec 8, 2020
c7f0420
restore: make restore schema run parallelly
hidehalo Dec 8, 2020
8a4118c
restore: remove db connection control
hidehalo Dec 9, 2020
5d07779
restore: a little change of restore schema schedule
hidehalo Dec 9, 2020
d751937
Merge branch 'master' into issue-434
lance6716 Dec 11, 2020
af5a19b
wip
hidehalo Dec 11, 2020
4331dca
restore: keep restore schema job hold the same session(DB connection)
hidehalo Dec 13, 2020
a4c41e1
Merge branch 'master' into issue-434
hidehalo Dec 13, 2020
4491add
restore: fix log message error
hidehalo Dec 14, 2020
8567813
Merge branch 'master' into issue-434
glorv Dec 14, 2020
8bce7fc
restore: remove purpose array for `restoreSchemaWorker`
hidehalo Dec 15, 2020
494f67c
restore: remove useless sql mode set code
hidehalo Dec 15, 2020
25aea0d
restore: restore view statements run after database|table created
hidehalo Dec 15, 2020
74f9025
Merge remote-tracking branch 'pingcap/master' into issue-434
hidehalo Dec 15, 2020
fee4594
Merge branch 'issue-434' of https://github.com/hidehalo/tidb-lightnin…
hidehalo Dec 15, 2020
0e315ca
restore: interrupt job producing when error happens
hidehalo Dec 15, 2020
3e2cc16
util: add SQLDriver interface
hidehalo Dec 16, 2020
52168fc
restore: make sure single restore schema job vs. single db session
hidehalo Dec 16, 2020
1dd9152
restore: run restore view schema statements in txn
hidehalo Dec 16, 2020
8d01745
glue: add checkpoints.Session implementation(sqlConnSession)
hidehalo Dec 16, 2020
09d06d4
restore: close whole database connections after restore schema done
hidehalo Dec 17, 2020
90e9305
restore: revert remove of `InitSchema`
hidehalo Dec 15, 2020
bc32cf8
glue: return a new error when sqlConnSesson.CommitTxn called
hidehalo Dec 18, 2020
a9c00ca
Revert "util: add SQLDriver interface"
hidehalo Dec 21, 2020
dafcf31
glue: update GetSession(context.Context) for Glue interface
hidehalo Dec 21, 2020
2539b7d
glue: disable more methods of sqlConnSession
hidehalo Dec 21, 2020
3ee2e97
Merge remote-tracking branch 'pingcap/master' into issue-434
hidehalo Dec 21, 2020
1542e52
restore: disable implicit initiation of `sync.WaitGroup`
hidehalo Dec 21, 2020
9a27c33
restore: cancel nil error throw when restore schema done
hidehalo Dec 21, 2020
cdc2ae9
Merge remote-tracking branch 'pingcap/master' into issue-434
hidehalo Dec 21, 2020
983d913
Merge branch 'master' into issue-434
lance6716 Dec 21, 2020
b35b26e
restore: replace session map to pool
hidehalo Dec 22, 2020
c0e6692
Merge branch 'issue-434' of https://github.com/hidehalo/tidb-lightnin…
hidehalo Dec 22, 2020
b376656
restore: keep restore table statements ordered
hidehalo Dec 22, 2020
e2cf8e7
restore: assign single session to `restoreSchemaWorker#doJob`'s gorou…
hidehalo Dec 29, 2020
48195f8
Merge remote-tracking branch 'pingcap/master' into issue-434
hidehalo Dec 29, 2020
9fea8d0
restore: add log for `restoreSchema`
hidehalo Dec 29, 2020
5bd96c8
restore: add quit case when error thrown blocked
hidehalo Dec 29, 2020
336fe52
restore: Improve the robustness of concurrency pattern
hidehalo Dec 29, 2020
8eed047
restore: fix channel send/recv logic to avoid blocked forever occurs.
hidehalo Dec 31, 2020
1690e4a
Merge remote-tracking branch 'pingcap/master' into issue-434
hidehalo Dec 31, 2020
42f46b7
restore: `sync.WaitGroup#Add` first when `restoreSchemaWorker#appendJob`
hidehalo Dec 31, 2020
911440e
restore: add impl of `schemaStmtType#String`
hidehalo Dec 31, 2020
e914004
restore: avoid to wait whole jobs done forever when goroutine of `doJ…
hidehalo Dec 31, 2020
d123a85
restore: call cancel function when `makeJobs` exit
hidehalo Jan 1, 2021
e2c2022
restore: a few improvement
hidehalo Jan 4, 2021
1fcc9df
test: add unit tests of `RestoreController#restoreSchema()`
hidehalo Jan 4, 2021
ef85de8
Merge branch 'issue-434' of https://github.com/hidehalo/tidb-lightnin…
hidehalo Jan 5, 2021
08acbc1
Merge remote-tracking branch 'pingcap/master' into issue-434
hidehalo Jan 5, 2021
e02a4a7
Merge branch 'master' into issue-434
glorv Jan 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 43 additions & 3 deletions lightning/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ import (
"errors"

"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-lightning/lightning/checkpoints"
"github.com/pingcap/tidb-lightning/lightning/common"
"github.com/pingcap/tidb-lightning/lightning/config"
"github.com/pingcap/tidb-lightning/lightning/log"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/sqlexec"
)

type Glue interface {
Expand All @@ -33,7 +36,7 @@ type Glue interface {
GetDB() (*sql.DB, error)
GetParser() *parser.Parser
GetTables(context.Context, string) ([]*model.TableInfo, error)
GetSession() (checkpoints.Session, error)
GetSession(context.Context) (checkpoints.Session, error)
OpenCheckpointsDB(context.Context, *config.Config) (checkpoints.CheckpointsDB, error)
// Record is used to report some information (key, value) to host TiDB, including progress, stage currently
Record(string, uint64)
Expand All @@ -48,6 +51,39 @@ type SQLExecutor interface {
Close()
}

// sqlConnSession implement checkpoints.Session used only for lighting itself
type sqlConnSession struct {
checkpoints.Session
conn *sql.Conn
}

func (session *sqlConnSession) Close() {
session.conn.Close()
}

func (session *sqlConnSession) Execute(ctx context.Context, sql string) ([]sqlexec.RecordSet, error) {
_, err := session.conn.ExecContext(ctx, sql)
return nil, err
}

func (session *sqlConnSession) CommitTxn(context.Context) error {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
return errors.New("sqlConnSession doesn't have a valid CommitTxn implementation")
}

func (session *sqlConnSession) RollbackTxn(context.Context) {}

func (session *sqlConnSession) PrepareStmt(sql string) (stmtID uint32, paramCount int, fields []*ast.ResultField, err error) {
hidehalo marked this conversation as resolved.
Show resolved Hide resolved
return 0, 0, nil, errors.New("sqlConnSession doesn't have a valid PrepareStmt implementation")
}

func (session *sqlConnSession) ExecutePreparedStmt(ctx context.Context, stmtID uint32, param []types.Datum) (sqlexec.RecordSet, error) {
return nil, errors.New("sqlConnSession doesn't have a valid ExecutePreparedStmt implementation")
}

func (session *sqlConnSession) DropPreparedStmt(stmtID uint32) error {
return errors.New("sqlConnSession doesn't have a valid DropPreparedStmt implementation")
}

type ExternalTiDBGlue struct {
db *sql.DB
parser *parser.Parser
Expand Down Expand Up @@ -125,8 +161,12 @@ func (e ExternalTiDBGlue) GetTables(context.Context, string) ([]*model.TableInfo
return nil, errors.New("ExternalTiDBGlue doesn't have a valid GetTables function")
}

func (e ExternalTiDBGlue) GetSession() (checkpoints.Session, error) {
return nil, errors.New("ExternalTiDBGlue doesn't have a valid GetSession function")
func (e ExternalTiDBGlue) GetSession(ctx context.Context) (checkpoints.Session, error) {
hidehalo marked this conversation as resolved.
Show resolved Hide resolved
conn, err := e.db.Conn(ctx)
if err != nil {
return nil, err
}
return &sqlConnSession{conn: conn}, nil
}

func (e *ExternalTiDBGlue) OpenCheckpointsDB(ctx context.Context, cfg *config.Config) (checkpoints.CheckpointsDB, error) {
Expand Down
251 changes: 221 additions & 30 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/failpoint"
sstpb "github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb-lightning/lightning/checkpoints"
"github.com/pingcap/tidb-lightning/lightning/glue"
tidbcfg "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/meta/autoid"
Expand Down Expand Up @@ -304,48 +305,238 @@ outside:
return errors.Trace(err)
}

func (rc *RestoreController) restoreSchema(ctx context.Context) error {
if !rc.cfg.Mydumper.NoSchema {
if rc.tidbGlue.OwnsSQLExecutor() {
db, err := DBFromConfig(rc.cfg.TiDB)
if err != nil {
return errors.Annotate(err, "connect to tidb failed")
}
defer db.Close()
db.ExecContext(ctx, "SET SQL_MODE = ?", rc.cfg.TiDB.StrSQLMode)
}
type schemaStmtType int

const (
schemaCreateDatabase = iota
hidehalo marked this conversation as resolved.
Show resolved Hide resolved
schemaCreateTable
schemaCreateView
)

for _, dbMeta := range rc.dbMetas {
task := log.With(zap.String("db", dbMeta.Name)).Begin(zap.InfoLevel, "restore table schema")
type schemaJob struct {
dbName string
tblName string // empty for create db jobs
stmtType schemaStmtType
stmts []*schemaStmt
}

tablesSchema := make(map[string]string)
for _, tblMeta := range dbMeta.Tables {
tablesSchema[tblMeta.Name] = tblMeta.GetSchema(ctx, rc.store)
}
err := InitSchema(ctx, rc.tidbGlue, dbMeta.Name, tablesSchema)
type schemaStmt struct {
sql string
}

task.End(zap.ErrorLevel, err)
if err != nil {
return errors.Annotatef(err, "restore table schema %s failed", dbMeta.Name)
type restoreSchemaWorker struct {
ctx context.Context
quit context.CancelFunc
jobCh chan *schemaJob
errCh chan error
wg sync.WaitGroup
glue glue.Glue
store storage.ExternalStorage
}

func (worker *restoreSchemaWorker) makeJobs(dbMetas []*mydump.MDDatabaseMeta) error {
defer close(worker.jobCh)
var err error
// 1. restore databases, execute statements concurrency
for _, dbMeta := range dbMetas {
restoreSchemaJob := &schemaJob{
dbName: dbMeta.Name,
stmtType: schemaCreateDatabase,
stmts: make([]*schemaStmt, 0, 1),
}
restoreSchemaJob.stmts = append(restoreSchemaJob.stmts, &schemaStmt{
sql: createDatabaseIfNotExistStmt(dbMeta.Name),
})
err = worker.appendJob(restoreSchemaJob)
if err != nil {
return err
}
}
err = worker.wait()
if err != nil {
return err
}
// 2. restore tables, execute statements concurrency
for _, dbMeta := range dbMetas {
for _, tblMeta := range dbMeta.Tables {
sql := tblMeta.GetSchema(worker.ctx, worker.store)
if sql != "" {
stmts, err := createTableIfNotExistsStmt(worker.glue.GetParser(), sql, dbMeta.Name, tblMeta.Name)
if err != nil {
return err
}
restoreSchemaJob := &schemaJob{
dbName: dbMeta.Name,
tblName: tblMeta.Name,
stmtType: schemaCreateTable,
stmts: make([]*schemaStmt, 0, len(stmts)),
}
for _, sql := range stmts {
restoreSchemaJob.stmts = append(restoreSchemaJob.stmts, &schemaStmt{
sql: sql,
})
}
err = worker.appendJob(restoreSchemaJob)
if err != nil {
return err
}
}
}

// restore views. Since views can cross database we must restore views after all table schemas are restored.
for _, dbMeta := range rc.dbMetas {
if len(dbMeta.Views) > 0 {
task := log.With(zap.String("db", dbMeta.Name)).Begin(zap.InfoLevel, "restore view schema")
viewsSchema := make(map[string]string)
for _, viewMeta := range dbMeta.Views {
viewsSchema[viewMeta.Name] = viewMeta.GetSchema(ctx, rc.store)
}
err = worker.wait()
if err != nil {
return err
}
// 3. restore views. Since views can cross database we must restore views after all table schemas are restored.
for _, dbMeta := range dbMetas {
for _, viewMeta := range dbMeta.Views {
sql := viewMeta.GetSchema(worker.ctx, worker.store)
if sql != "" {
stmts, err := createTableIfNotExistsStmt(worker.glue.GetParser(), sql, dbMeta.Name, viewMeta.Name)
if err != nil {
return err
}
restoreSchemaJob := &schemaJob{
dbName: dbMeta.Name,
tblName: viewMeta.Name,
stmtType: schemaCreateView,
stmts: make([]*schemaStmt, 0, len(stmts)),
}
for _, sql := range stmts {
restoreSchemaJob.stmts = append(restoreSchemaJob.stmts, &schemaStmt{
sql: sql,
})
}
err = worker.appendJob(restoreSchemaJob)
if err != nil {
return err
}
err := InitSchema(ctx, rc.tidbGlue, dbMeta.Name, viewsSchema)
// we don't support restore views concurrency, cauz it maybe will raise a error
err = worker.wait()
if err != nil {
return err
}
}
}
}
return nil
}

func (worker *restoreSchemaWorker) doJob() {
var logger log.Logger
var session checkpoints.Session
defer func() {
if session != nil {
session.Close()
}
}()
for {
select {
case <-worker.ctx.Done():
return
case job := <-worker.jobCh:
if job == nil {
return
}
var err error
if session == nil {
session, err = worker.glue.GetSession(worker.ctx)
if err != nil {
worker.throw(err)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
return
}
}
if job.stmtType == schemaCreateDatabase {
logger = log.With(zap.String("db", job.dbName))
hidehalo marked this conversation as resolved.
Show resolved Hide resolved
} else if job.stmtType == schemaCreateTable {
logger = log.With(zap.String("table", common.UniqueTable(job.dbName, job.tblName)))
} else if job.stmtType == schemaCreateView {
logger = log.With(zap.String("table", common.UniqueTable(job.dbName, job.tblName)))
}
for _, stmt := range job.stmts {
task := logger.Begin(zap.DebugLevel, fmt.Sprintf("execute SQL: %s", stmt.sql))
_, err = session.Execute(worker.ctx, stmt.sql)
task.End(zap.ErrorLevel, err)
if err != nil {
return errors.Annotatef(err, "restore view schema %s failed", dbMeta.Name)
switch job.stmtType {
hidehalo marked this conversation as resolved.
Show resolved Hide resolved
case schemaCreateDatabase:
err = errors.Annotatef(err, "restore database schema %s failed", job.dbName)
case schemaCreateTable:
err = errors.Annotatef(err, "restore table schema %s failed", job.tblName)
case schemaCreateView:
err = errors.Annotatef(err, "restore view schema %s failed", job.tblName)
}
worker.wg.Done()
worker.throw(err)
hidehalo marked this conversation as resolved.
Show resolved Hide resolved
return
}
}
worker.wg.Done()
}
}
}

func (worker *restoreSchemaWorker) wait() error {
select {
case err := <-worker.errCh:
if err != nil {
worker.quit()
}
return err
case <-worker.ctx.Done():
hidehalo marked this conversation as resolved.
Show resolved Hide resolved
return worker.ctx.Err()
default:
worker.wg.Wait()
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
}

func (worker *restoreSchemaWorker) throw(err error) {
if err != nil {
hidehalo marked this conversation as resolved.
Show resolved Hide resolved
select {
case <-worker.ctx.Done():
err := worker.ctx.Err()
hidehalo marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
worker.errCh <- err
hidehalo marked this conversation as resolved.
Show resolved Hide resolved
}
case worker.errCh <- err:
worker.quit()
}
}
}

func (worker *restoreSchemaWorker) appendJob(job *schemaJob) error {
select {
case err := <-worker.errCh:
return err
case <-worker.ctx.Done():
return worker.ctx.Err()
case worker.jobCh <- job:
worker.wg.Add(1)
hidehalo marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
}

func (rc *RestoreController) restoreSchema(ctx context.Context) error {
if !rc.cfg.Mydumper.NoSchema {
logTask := log.L().Begin(zap.InfoLevel, "restore all schema")
concurrency := 16
hidehalo marked this conversation as resolved.
Show resolved Hide resolved
childCtx, cancel := context.WithCancel(ctx)
hidehalo marked this conversation as resolved.
Show resolved Hide resolved
worker := restoreSchemaWorker{
ctx: childCtx,
quit: cancel,
jobCh: make(chan *schemaJob, concurrency),
errCh: make(chan error),
glue: rc.tidbGlue,
store: rc.store,
}
for i := 0; i < concurrency; i++ {
go worker.doJob()
hidehalo marked this conversation as resolved.
Show resolved Hide resolved
}
err := worker.makeJobs(rc.dbMetas)
logTask.End(zap.ErrorLevel, err)
if err != nil {
return err
}
}
getTableFunc := rc.backend.FetchRemoteTableModels
Expand Down
7 changes: 7 additions & 0 deletions lightning/restore/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,13 @@ loopCreate:
return errors.Trace(err)
}

func createDatabaseIfNotExistStmt(dbName string) string {
var createDatabase strings.Builder
createDatabase.WriteString("CREATE DATABASE IF NOT EXISTS ")
common.WriteMySQLIdentifier(&createDatabase, dbName)
return createDatabase.String()
}

func createTableIfNotExistsStmt(p *parser.Parser, createTable, dbName, tblName string) ([]string, error) {
stmts, _, err := p.Parse(createTable, "", "")
if err != nil {
Expand Down