From 365264cd21561362491fcefea4641109f8e23d0d Mon Sep 17 00:00:00 2001 From: crazycs Date: Wed, 16 Jan 2019 15:37:33 +0800 Subject: [PATCH] ddl: add restore deleted table (#7937) --- ddl/ddl.go | 1 + ddl/ddl_api.go | 27 +++ ddl/ddl_worker.go | 29 ++- ddl/delete_range.go | 40 ++++- ddl/mock.go | 5 + ddl/serial_test.go | 338 +++++++++++++++++++++++++++++++++++ ddl/table.go | 150 ++++++++++++++++ ddl/util/util.go | 9 +- domain/domain.go | 9 + executor/builder.go | 12 ++ executor/ddl.go | 172 ++++++++++++++++++ executor/ddl_test.go | 3 - executor/set.go | 28 +-- go.mod | 2 +- go.sum | 4 +- infoschema/builder.go | 2 +- kv/txn.go | 19 ++ meta/meta.go | 11 ++ planner/core/common_plans.go | 8 + planner/core/planbuilder.go | 8 + planner/core/preprocess.go | 5 + store/tikv/txn.go | 6 + util/gcutil/gcutil.go | 98 ++++++++++ 23 files changed, 946 insertions(+), 40 deletions(-) create mode 100644 util/gcutil/gcutil.go diff --git a/ddl/ddl.go b/ddl/ddl.go index a498383c84a91..7f4b3cd971175 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -214,6 +214,7 @@ type DDL interface { CreateView(ctx sessionctx.Context, stmt *ast.CreateViewStmt) error CreateTableWithLike(ctx sessionctx.Context, ident, referIdent ast.Ident, ifNotExists bool) error DropTable(ctx sessionctx.Context, tableIdent ast.Ident) (err error) + RestoreTable(ctx sessionctx.Context, tbInfo *model.TableInfo, schemaID, autoID, dropJobID int64, snapshotTS uint64) (err error) DropView(ctx sessionctx.Context, tableIdent ast.Ident) (err error) CreateIndex(ctx sessionctx.Context, tableIdent ast.Ident, unique bool, indexName model.CIStr, columnNames []*ast.IndexColName, indexOption *ast.IndexOption) error diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 9f76e6f42ae67..e541baf91617a 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -1064,6 +1064,33 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e return errors.Trace(err) } +func (d *ddl) RestoreTable(ctx sessionctx.Context, tbInfo *model.TableInfo, schemaID, autoID, dropJobID int64, snapshotTS uint64) (err error) { + is := d.GetInformationSchema(ctx) + // Check schema exist. + schema, ok := is.SchemaByID(schemaID) + if !ok { + return errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs( + fmt.Sprintf("(Schema ID %d)", schemaID), + )) + } + // Check not exist table with same name. + if ok := is.TableExists(schema.Name, tbInfo.Name); ok { + return infoschema.ErrTableExists.GenWithStackByArgs(tbInfo.Name) + } + + tbInfo.State = model.StateNone + job := &model.Job{ + SchemaID: schemaID, + TableID: tbInfo.ID, + Type: model.ActionRestoreTable, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{tbInfo, autoID, dropJobID, snapshotTS, restoreTableCheckFlagNone}, + } + err = d.doDDLJob(ctx, job) + err = d.callHookOnChanged(err) + return errors.Trace(err) +} + func (d *ddl) CreateView(ctx sessionctx.Context, s *ast.CreateViewStmt) (err error) { ident := ast.Ident{Name: s.ViewName.Name, Schema: s.ViewName.Schema} is := d.GetInformationSchema(ctx) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 8723804d3099a..09e95f4a6ceef 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -277,9 +277,13 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) { case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropTablePartition, model.ActionTruncateTablePartition: err = w.deleteRange(job) } - if err != nil { - return errors.Trace(err) - } + } + switch job.Type { + case model.ActionRestoreTable: + err = finishRestoreTable(w, t, job) + } + if err != nil { + return errors.Trace(err) } _, err = t.DeQueueDDLJob() @@ -293,6 +297,23 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) { return errors.Trace(err) } +func finishRestoreTable(w *worker, t *meta.Meta, job *model.Job) error { + tbInfo := &model.TableInfo{} + var autoID, dropJobID, restoreTableCheckFlag int64 + var snapshotTS uint64 + err := job.DecodeArgs(tbInfo, &autoID, &dropJobID, &snapshotTS, &restoreTableCheckFlag) + if err != nil { + return errors.Trace(err) + } + if restoreTableCheckFlag == restoreTableCheckFlagEnableGC { + err = enableGC(w) + if err != nil { + return errors.Trace(err) + } + } + return nil +} + func isDependencyJobDone(t *meta.Meta, job *model.Job) (bool, error) { if job.DependencyID == noneDependencyJob { return true, nil @@ -497,6 +518,8 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, ver, err = onAddTablePartition(t, job) case model.ActionModifyTableCharsetAndCollate: ver, err = onModifyTableCharsetAndCollate(t, job) + case model.ActionRestoreTable: + ver, err = w.onRestoreTable(d, t, job) default: // Invalid job, cancel it. job.State = model.JobStateCancelled diff --git a/ddl/delete_range.go b/ddl/delete_range.go index 3b50347b92e1b..6fa032d0aaff1 100644 --- a/ddl/delete_range.go +++ b/ddl/delete_range.go @@ -19,6 +19,7 @@ import ( "fmt" "math" "sync" + "sync/atomic" "github.com/pingcap/errors" "github.com/pingcap/parser/model" @@ -38,9 +39,16 @@ const ( delBackLog = 128 ) +// enableEmulatorGC means whether to enable emulator GC. The default is enable. +// In some unit tests, we want to stop emulator GC, then wen can set enableEmulatorGC to 0. +var emulatorGCEnable = int32(1) + type delRangeManager interface { // addDelRangeJob add a DDL job into gc_delete_range table. addDelRangeJob(job *model.Job) error + // removeFromGCDeleteRange removes the deleting table job from gc_delete_range table by jobID and tableID. + // It's use for recover the table that was mistakenly deleted. + removeFromGCDeleteRange(jobID, tableID int64) error start() clear() } @@ -90,6 +98,17 @@ func (dr *delRange) addDelRangeJob(job *model.Job) error { return nil } +// removeFromGCDeleteRange implements delRangeManager interface. +func (dr *delRange) removeFromGCDeleteRange(jobID, tableID int64) error { + ctx, err := dr.sessPool.get() + if err != nil { + return errors.Trace(err) + } + defer dr.sessPool.put(ctx) + err = util.RemoveFromGCDeleteRange(ctx, jobID, tableID) + return errors.Trace(err) +} + // start implements delRangeManager interface. func (dr *delRange) start() { if !dr.storeSupport { @@ -117,11 +136,28 @@ func (dr *delRange) startEmulator() { case <-dr.quitCh: return } - err := dr.doDelRangeWork() - terror.Log(errors.Trace(err)) + if IsEmulatorGCEnable() { + err := dr.doDelRangeWork() + terror.Log(errors.Trace(err)) + } } } +// EmulatorGCEnable enables emulator gc. It exports for testing. +func EmulatorGCEnable() { + atomic.StoreInt32(&emulatorGCEnable, 1) +} + +// EmulatorGCDisable disables emulator gc. It exports for testing. +func EmulatorGCDisable() { + atomic.StoreInt32(&emulatorGCEnable, 0) +} + +// IsEmulatorGCEnable indicates whether emulator GC enabled. It exports for testing. +func IsEmulatorGCEnable() bool { + return atomic.LoadInt32(&emulatorGCEnable) == 1 +} + func (dr *delRange) doDelRangeWork() error { ctx, err := dr.sessPool.get() if err != nil { diff --git a/ddl/mock.go b/ddl/mock.go index 4acc92f57688b..aebff5011d07e 100644 --- a/ddl/mock.go +++ b/ddl/mock.go @@ -126,6 +126,11 @@ func (dr *mockDelRange) addDelRangeJob(job *model.Job) error { return nil } +// removeFromGCDeleteRange implements delRangeManager interface. +func (dr *mockDelRange) removeFromGCDeleteRange(jobID, tableID int64) error { + return nil +} + // start implements delRangeManager interface. func (dr *mockDelRange) start() { return diff --git a/ddl/serial_test.go b/ddl/serial_test.go index d846cdafe7df1..a7604a8d61165 100644 --- a/ddl/serial_test.go +++ b/ddl/serial_test.go @@ -15,6 +15,8 @@ package ddl_test import ( "context" + "fmt" + "strings" "time" . "github.com/pingcap/check" @@ -23,10 +25,12 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/util/admin" + "github.com/pingcap/tidb/util/gcutil" "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tidb/util/testkit" "github.com/pingcap/tidb/util/testleak" @@ -123,3 +127,337 @@ func (s *testSerialSuite) TestCancelAddIndexPanic(c *C) { c.Assert(err, NotNil) c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job") } + +func (s *testSerialSuite) TestRestoreTableByJobID(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("create database if not exists test_restore") + tk.MustExec("use test_restore") + tk.MustExec("drop table if exists t_recover") + tk.MustExec("create table t_recover (a int);") + defer func(originGC bool) { + if originGC { + ddl.EmulatorGCEnable() + } else { + ddl.EmulatorGCDisable() + } + }(ddl.IsEmulatorGCEnable()) + + // disable emulator GC. + // Otherwise emulator GC will delete table record as soon as possible after execute drop table ddl. + ddl.EmulatorGCDisable() + gcTimeFormat := "20060102-15:04:05 -0700 MST" + timeBeforeDrop := time.Now().Add(0 - time.Duration(48*60*60*time.Second)).Format(gcTimeFormat) + timeAfterDrop := time.Now().Add(time.Duration(48 * 60 * 60 * time.Second)).Format(gcTimeFormat) + safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '') + ON DUPLICATE KEY + UPDATE variable_value = '%[1]s'` + // clear GC variables first. + tk.MustExec("delete from mysql.tidb where variable_name in ( 'tikv_gc_safe_point','tikv_gc_enable' )") + + tk.MustExec("insert into t_recover values (1),(2),(3)") + tk.MustExec("drop table t_recover") + + rs, err := tk.Exec("admin show ddl jobs") + c.Assert(err, IsNil) + rows, err := session.GetRows4Test(context.Background(), tk.Se, rs) + c.Assert(err, IsNil) + row := rows[0] + c.Assert(row.GetString(1), Equals, "test_restore") + c.Assert(row.GetString(3), Equals, "drop table") + jobID := row.GetInt64(0) + + // if GC safe point is not exists in mysql.tidb + _, err = tk.Exec(fmt.Sprintf("admin restore table by job %d", jobID)) + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, "can not get 'tikv_gc_safe_point'") + // set GC safe point + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + + // if GC enable is not exists in mysql.tidb + _, err = tk.Exec(fmt.Sprintf("admin restore table by job %d", jobID)) + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, "[ddl:-1]can not get 'tikv_gc_enable'") + + err = gcutil.EnableGC(tk.Se) + c.Assert(err, IsNil) + + // recover job is before GC safe point + tk.MustExec(fmt.Sprintf(safePointSQL, timeAfterDrop)) + _, err = tk.Exec(fmt.Sprintf("admin restore table by job %d", jobID)) + c.Assert(err, NotNil) + c.Assert(strings.Contains(err.Error(), "snapshot is older than GC safe point"), Equals, true) + + // set GC safe point + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + // if there is a new table with the same name, should return failed. + tk.MustExec("create table t_recover (a int);") + _, err = tk.Exec(fmt.Sprintf("admin restore table by job %d", jobID)) + c.Assert(err.Error(), Equals, infoschema.ErrTableExists.GenWithStackByArgs("t_recover").Error()) + + // drop the new table with the same name, then restore table. + tk.MustExec("drop table t_recover") + + // do restore table. + tk.MustExec(fmt.Sprintf("admin restore table by job %d", jobID)) + + // check recover table meta and data record. + tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1", "2", "3")) + // check recover table autoID. + tk.MustExec("insert into t_recover values (4),(5),(6)") + tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1", "2", "3", "4", "5", "6")) + + // restore table by none exits job. + _, err = tk.Exec(fmt.Sprintf("admin restore table by job %d", 10000000)) + c.Assert(err, NotNil) + + // Disable GC by manual first, then after recover table, the GC enable status should also be disabled. + err = gcutil.DisableGC(tk.Se) + c.Assert(err, IsNil) + + tk.MustExec("delete from t_recover where a > 1") + tk.MustExec("drop table t_recover") + rs, err = tk.Exec("admin show ddl jobs") + c.Assert(err, IsNil) + rows, err = session.GetRows4Test(context.Background(), tk.Se, rs) + c.Assert(err, IsNil) + row = rows[0] + c.Assert(row.GetString(1), Equals, "test_restore") + c.Assert(row.GetString(3), Equals, "drop table") + jobID = row.GetInt64(0) + + tk.MustExec(fmt.Sprintf("admin restore table by job %d", jobID)) + + // check recover table meta and data record. + tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1")) + // check recover table autoID. + tk.MustExec("insert into t_recover values (7),(8),(9)") + tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1", "7", "8", "9")) + + gcEnable, err := gcutil.CheckGCEnable(tk.Se) + c.Assert(err, IsNil) + c.Assert(gcEnable, Equals, false) +} + +func (s *testSerialSuite) TestRestoreTableByTableName(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("create database if not exists test_restore") + tk.MustExec("use test_restore") + tk.MustExec("drop table if exists t_recover, t_recover2") + tk.MustExec("create table t_recover (a int);") + defer func(originGC bool) { + if originGC { + ddl.EmulatorGCEnable() + } else { + ddl.EmulatorGCDisable() + } + }(ddl.IsEmulatorGCEnable()) + + // disable emulator GC. + // Otherwise emulator GC will delete table record as soon as possible after execute drop table ddl. + ddl.EmulatorGCDisable() + gcTimeFormat := "20060102-15:04:05 -0700 MST" + timeBeforeDrop := time.Now().Add(0 - time.Duration(48*60*60*time.Second)).Format(gcTimeFormat) + timeAfterDrop := time.Now().Add(time.Duration(48 * 60 * 60 * time.Second)).Format(gcTimeFormat) + safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '') + ON DUPLICATE KEY + UPDATE variable_value = '%[1]s'` + // clear GC variables first. + tk.MustExec("delete from mysql.tidb where variable_name in ( 'tikv_gc_safe_point','tikv_gc_enable' )") + + tk.MustExec("insert into t_recover values (1),(2),(3)") + tk.MustExec("drop table t_recover") + + // if GC safe point is not exists in mysql.tidb + _, err := tk.Exec("admin restore table t_recover") + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, "can not get 'tikv_gc_safe_point'") + // set GC safe point + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + + // if GC enable is not exists in mysql.tidb + _, err = tk.Exec("admin restore table t_recover") + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, "[ddl:-1]can not get 'tikv_gc_enable'") + + err = gcutil.EnableGC(tk.Se) + c.Assert(err, IsNil) + + // recover job is before GC safe point + tk.MustExec(fmt.Sprintf(safePointSQL, timeAfterDrop)) + _, err = tk.Exec("admin restore table t_recover") + c.Assert(err, NotNil) + c.Assert(strings.Contains(err.Error(), "snapshot is older than GC safe point"), Equals, true) + + // set GC safe point + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + // if there is a new table with the same name, should return failed. + tk.MustExec("create table t_recover (a int);") + _, err = tk.Exec("admin restore table t_recover") + c.Assert(err.Error(), Equals, infoschema.ErrTableExists.GenWithStackByArgs("t_recover").Error()) + + // drop the new table with the same name, then restore table. + tk.MustExec("rename table t_recover to t_recover2") + + // do restore table. + tk.MustExec("admin restore table t_recover") + + // check recover table meta and data record. + tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1", "2", "3")) + // check recover table autoID. + tk.MustExec("insert into t_recover values (4),(5),(6)") + tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1", "2", "3", "4", "5", "6")) + // check rebase auto id. + tk.MustQuery("select a,_tidb_rowid from t_recover;").Check(testkit.Rows("1 1", "2 2", "3 3", "4 5001", "5 5002", "6 5003")) + + // restore table by none exits job. + _, err = tk.Exec(fmt.Sprintf("admin restore table by job %d", 10000000)) + c.Assert(err, NotNil) + + // Disable GC by manual first, then after recover table, the GC enable status should also be disabled. + err = gcutil.DisableGC(tk.Se) + c.Assert(err, IsNil) + + tk.MustExec("delete from t_recover where a > 1") + tk.MustExec("drop table t_recover") + + tk.MustExec("admin restore table t_recover") + + // check recover table meta and data record. + tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1")) + // check recover table autoID. + tk.MustExec("insert into t_recover values (7),(8),(9)") + tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1", "7", "8", "9")) + + gcEnable, err := gcutil.CheckGCEnable(tk.Se) + c.Assert(err, IsNil) + c.Assert(gcEnable, Equals, false) +} + +func (s *testSerialSuite) TestRestoreTableByJobIDFail(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("create database if not exists test_restore") + tk.MustExec("use test_restore") + tk.MustExec("drop table if exists t_recover") + tk.MustExec("create table t_recover (a int);") + defer func(originGC bool) { + if originGC { + ddl.EmulatorGCEnable() + } else { + ddl.EmulatorGCDisable() + } + }(ddl.IsEmulatorGCEnable()) + + // disable emulator GC. + // Otherwise emulator GC will delete table record as soon as possible after execute drop table ddl. + ddl.EmulatorGCDisable() + gcTimeFormat := "20060102-15:04:05 -0700 MST" + timeBeforeDrop := time.Now().Add(0 - time.Duration(48*60*60*time.Second)).Format(gcTimeFormat) + safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '') + ON DUPLICATE KEY + UPDATE variable_value = '%[1]s'` + + tk.MustExec("insert into t_recover values (1),(2),(3)") + tk.MustExec("drop table t_recover") + + rs, err := tk.Exec("admin show ddl jobs") + c.Assert(err, IsNil) + rows, err := session.GetRows4Test(context.Background(), tk.Se, rs) + c.Assert(err, IsNil) + row := rows[0] + c.Assert(row.GetString(1), Equals, "test_restore") + c.Assert(row.GetString(3), Equals, "drop table") + jobID := row.GetInt64(0) + + // enableGC first + err = gcutil.EnableGC(tk.Se) + c.Assert(err, IsNil) + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + + // set hook + hook := &ddl.TestDDLCallback{} + hook.OnJobRunBeforeExported = func(job *model.Job) { + if job.Type == model.ActionRestoreTable { + gofail.Enable("github.com/pingcap/tidb/store/tikv/mockCommitError", `return(true)`) + gofail.Enable("github.com/pingcap/tidb/ddl/mockRestoreTableCommitErr", `return(true)`) + } + } + origHook := s.dom.DDL().GetHook() + defer s.dom.DDL().(ddl.DDLForTest).SetHook(origHook) + s.dom.DDL().(ddl.DDLForTest).SetHook(hook) + + // do restore table. + tk.MustExec(fmt.Sprintf("admin restore table by job %d", jobID)) + gofail.Disable("github.com/pingcap/tidb/store/tikv/mockCommitError") + gofail.Disable("github.com/pingcap/tidb/ddl/mockRestoreTableCommitErr") + + // make sure enable GC after restore table. + enable, err := gcutil.CheckGCEnable(tk.Se) + c.Assert(err, IsNil) + c.Assert(enable, Equals, true) + + // check recover table meta and data record. + tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1", "2", "3")) + // check recover table autoID. + tk.MustExec("insert into t_recover values (4),(5),(6)") + tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1", "2", "3", "4", "5", "6")) +} + +func (s *testSerialSuite) TestRestoreTableByTableNameFail(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("create database if not exists test_restore") + tk.MustExec("use test_restore") + tk.MustExec("drop table if exists t_recover") + tk.MustExec("create table t_recover (a int);") + defer func(originGC bool) { + if originGC { + ddl.EmulatorGCEnable() + } else { + ddl.EmulatorGCDisable() + } + }(ddl.IsEmulatorGCEnable()) + + // disable emulator GC. + // Otherwise emulator GC will delete table record as soon as possible after execute drop table ddl. + ddl.EmulatorGCDisable() + gcTimeFormat := "20060102-15:04:05 -0700 MST" + timeBeforeDrop := time.Now().Add(0 - time.Duration(48*60*60*time.Second)).Format(gcTimeFormat) + safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '') + ON DUPLICATE KEY + UPDATE variable_value = '%[1]s'` + + tk.MustExec("insert into t_recover values (1),(2),(3)") + tk.MustExec("drop table t_recover") + + // enableGC first + err := gcutil.EnableGC(tk.Se) + c.Assert(err, IsNil) + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + + // set hook + hook := &ddl.TestDDLCallback{} + hook.OnJobRunBeforeExported = func(job *model.Job) { + if job.Type == model.ActionRestoreTable { + gofail.Enable("github.com/pingcap/tidb/store/tikv/mockCommitError", `return(true)`) + gofail.Enable("github.com/pingcap/tidb/ddl/mockRestoreTableCommitErr", `return(true)`) + } + } + origHook := s.dom.DDL().GetHook() + defer s.dom.DDL().(ddl.DDLForTest).SetHook(origHook) + s.dom.DDL().(ddl.DDLForTest).SetHook(hook) + + // do restore table. + tk.MustExec("admin restore table t_recover") + gofail.Disable("github.com/pingcap/tidb/store/tikv/mockCommitError") + gofail.Disable("github.com/pingcap/tidb/ddl/mockRestoreTableCommitErr") + + // make sure enable GC after restore table. + enable, err := gcutil.CheckGCEnable(tk.Se) + c.Assert(err, IsNil) + c.Assert(enable, Equals, true) + + // check recover table meta and data record. + tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1", "2", "3")) + // check recover table autoID. + tk.MustExec("insert into t_recover values (4),(5),(6)") + tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1", "2", "3", "4", "5", "6")) +} diff --git a/ddl/table.go b/ddl/table.go index e53256153c594..a964c6100b554 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/util/gcutil" log "github.com/sirupsen/logrus" ) @@ -147,6 +148,155 @@ func onDropTableOrView(t *meta.Meta, job *model.Job) (ver int64, _ error) { return ver, errors.Trace(err) } +const ( + restoreTableCheckFlagNone int64 = iota + restoreTableCheckFlagEnableGC + restoreTableCheckFlagDisableGC +) + +func (w *worker) onRestoreTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) { + schemaID := job.SchemaID + tblInfo := &model.TableInfo{} + var autoID, dropJobID, restoreTableCheckFlag int64 + var snapshotTS uint64 + if err = job.DecodeArgs(tblInfo, &autoID, &dropJobID, &snapshotTS, &restoreTableCheckFlag); err != nil { + // Invalid arguments, cancel this job. + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + // check GC and safe point + gcEnable, err := checkGCEnable(w) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + err = checkTableNotExists(t, job, schemaID, tblInfo.Name.L) + if err != nil { + return ver, errors.Trace(err) + } + + // Restore table divide into 2 steps: + // 1. Check GC enable status, to decided whether enable GC after restore table. + // a. Why not disable GC before put the job to DDL job queue? + // Think about concurrency problem. If a restore job-1 is doing and already disabled GC, + // then, another restore table job-2 check GC enable will get disable before into the job queue. + // then, after restore table job-2 finished, the GC will be disabled. + // b. Why split into 2 steps? 1 step also can finish this job: check GC -> disable GC -> restore table -> finish job. + // What if the transaction commit failed? then, the job will retry, but the GC already disabled when first running. + // So, after this job retry succeed, the GC will be disabled. + // 2. Do restore table job. + // a. Check whether GC enabled, if enabled, disable GC first. + // b. Check GC safe point. If drop table time if after safe point time, then can do restore. + // otherwise, can't restore table, because the records of the table may already delete by gc. + // c. Remove GC task of the table from gc_delete_range table. + // d. Create table and rebase table auto ID. + // e. Finish. + switch tblInfo.State { + case model.StateNone: + // none -> write only + // check GC enable and update flag. + if gcEnable { + job.Args[len(job.Args)-1] = restoreTableCheckFlagEnableGC + } else { + job.Args[len(job.Args)-1] = restoreTableCheckFlagDisableGC + } + + job.SchemaState = model.StateWriteOnly + tblInfo.State = model.StateWriteOnly + ver, err = updateVersionAndTableInfo(t, job, tblInfo, false) + case model.StateWriteOnly: + // write only -> public + // do restore table. + if gcEnable { + err = disableGC(w) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Errorf("disable gc failed, try again later. err: %v", err) + } + } + // check GC safe point + err = checkSafePoint(w, snapshotTS) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + // Remove dropped table DDL job from gc_delete_range table. + err = w.delRangeManager.removeFromGCDeleteRange(dropJobID, tblInfo.ID) + if err != nil { + return ver, errors.Trace(err) + } + + tblInfo.State = model.StatePublic + tblInfo.UpdateTS = t.StartTS + err = t.CreateTableAndSetAutoID(schemaID, tblInfo, autoID) + if err != nil { + return ver, errors.Trace(err) + } + + // gofail: var mockRestoreTableCommitErr bool + // if mockRestoreTableCommitErr && mockRestoreTableCommitErrOnce { + // mockRestoreTableCommitErrOnce = false + // kv.MockCommitErrorEnable() + // } + + ver, err = updateVersionAndTableInfo(t, job, tblInfo, true) + if err != nil { + return ver, errors.Trace(err) + } + + // Finish this job. + job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) + default: + return ver, ErrInvalidTableState.GenWithStack("invalid restore table state %v", tblInfo.State) + } + return ver, nil +} + +// mockRestoreTableCommitErrOnce uses to make sure `mockRestoreTableCommitErr` only mock error once. +var mockRestoreTableCommitErrOnce = true + +func enableGC(w *worker) error { + ctx, err := w.sessPool.get() + if err != nil { + return errors.Trace(err) + } + defer w.sessPool.put(ctx) + + return gcutil.EnableGC(ctx) +} + +func disableGC(w *worker) error { + ctx, err := w.sessPool.get() + if err != nil { + return errors.Trace(err) + } + defer w.sessPool.put(ctx) + + return gcutil.DisableGC(ctx) +} + +func checkGCEnable(w *worker) (enable bool, err error) { + ctx, err := w.sessPool.get() + if err != nil { + return false, errors.Trace(err) + } + defer w.sessPool.put(ctx) + + return gcutil.CheckGCEnable(ctx) +} + +func checkSafePoint(w *worker, snapshotTS uint64) error { + ctx, err := w.sessPool.get() + if err != nil { + return errors.Trace(err) + } + defer w.sessPool.put(ctx) + + return gcutil.ValidateSnapshot(ctx, snapshotTS) +} + type splitableStore interface { SplitRegion(splitKey kv.Key) error } diff --git a/ddl/util/util.go b/ddl/util/util.go index 07d6000b5db80..2a85b95ba0077 100644 --- a/ddl/util/util.go +++ b/ddl/util/util.go @@ -109,8 +109,13 @@ func CompleteDeleteRange(ctx sessionctx.Context, dr DelRangeTask) error { return errors.Trace(err) } - sql = fmt.Sprintf(completeDeleteRangeSQL, dr.JobID, dr.ElementID) - _, err = ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) + return RemoveFromGCDeleteRange(ctx, dr.JobID, dr.ElementID) +} + +// RemoveFromGCDeleteRange is exported for ddl pkg to use. +func RemoveFromGCDeleteRange(ctx sessionctx.Context, jobID, elementID int64) error { + sql := fmt.Sprintf(completeDeleteRangeSQL, jobID, elementID) + _, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) return errors.Trace(err) } diff --git a/domain/domain.go b/domain/domain.go index 8d78d795cbace..4c129f39aa931 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -255,6 +255,15 @@ func (do *Domain) GetSnapshotInfoSchema(snapshotTS uint64) (infoschema.InfoSchem return snapHandle.Get(), nil } +// GetSnapshotMeta gets a new snapshot meta at startTS. +func (do *Domain) GetSnapshotMeta(startTS uint64) (*meta.Meta, error) { + snapshot, err := do.store.GetSnapshot(kv.NewVersion(startTS)) + if err != nil { + return nil, errors.Trace(err) + } + return meta.NewSnapshotMeta(snapshot), nil +} + // DDL gets DDL from domain. func (do *Domain) DDL() ddl.DDL { return do.ddl diff --git a/executor/builder.go b/executor/builder.go index 00607f4564547..e84c26bae68da 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -83,6 +83,8 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor { return b.buildCheckIndexRange(v) case *plannercore.ChecksumTable: return b.buildChecksumTable(v) + case *plannercore.RestoreTable: + return b.buildRestoreTable(v) case *plannercore.DDL: return b.buildDDL(v) case *plannercore.Deallocate: @@ -337,6 +339,16 @@ func (b *executorBuilder) buildRecoverIndex(v *plannercore.RecoverIndex) Executo return e } +func (b *executorBuilder) buildRestoreTable(v *plannercore.RestoreTable) Executor { + e := &RestoreTableExec{ + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), + jobID: v.JobID, + Table: v.Table, + JobNum: v.JobNum, + } + return e +} + func buildCleanupIndexCols(tblInfo *model.TableInfo, indexInfo *model.IndexInfo) []*model.ColumnInfo { columns := make([]*model.ColumnInfo, 0, len(indexInfo.Columns)+1) for _, idxCol := range indexInfo.Columns { diff --git a/executor/ddl.go b/executor/ddl.go index c06f9caaadb2c..5f545a9bfe619 100644 --- a/executor/ddl.go +++ b/executor/ddl.go @@ -25,9 +25,13 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/meta" + "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/admin" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/gcutil" "github.com/pingcap/tidb/util/sqlexec" log "github.com/sirupsen/logrus" ) @@ -287,3 +291,171 @@ func (e *DDLExec) executeAlterTable(s *ast.AlterTableStmt) error { err := domain.GetDomain(e.ctx).DDL().AlterTable(e.ctx, ti, s.Specs) return errors.Trace(err) } + +// RestoreTableExec represents a recover table executor. +// It is built from "admin restore table by job" statement, +// is used to recover the table that deleted by mistake. +type RestoreTableExec struct { + baseExecutor + jobID int64 + Table *ast.TableName + JobNum int64 +} + +// Open implements the Executor Open interface. +func (e *RestoreTableExec) Open(ctx context.Context) error { + if err := e.baseExecutor.Open(ctx); err != nil { + return errors.Trace(err) + } + return nil +} + +// Next implements the Executor Open interface. +func (e *RestoreTableExec) Next(ctx context.Context, req *chunk.RecordBatch) (err error) { + // Should commit the previous transaction and create a new transaction. + if err = e.ctx.NewTxn(ctx); err != nil { + return errors.Trace(err) + } + defer func() { e.ctx.GetSessionVars().StmtCtx.IsDDLJobInQueue = false }() + + err = e.executeRestoreTable() + if err != nil { + return errors.Trace(err) + } + + dom := domain.GetDomain(e.ctx) + // Update InfoSchema in TxnCtx, so it will pass schema check. + is := dom.InfoSchema() + txnCtx := e.ctx.GetSessionVars().TxnCtx + txnCtx.InfoSchema = is + txnCtx.SchemaVersion = is.SchemaMetaVersion() + // DDL will force commit old transaction, after DDL, in transaction status should be false. + e.ctx.GetSessionVars().SetStatusFlag(mysql.ServerStatusInTrans, false) + return nil +} + +func (e *RestoreTableExec) executeRestoreTable() error { + txn, err := e.ctx.Txn(true) + if err != nil { + return errors.Trace(err) + } + t := meta.NewMeta(txn) + dom := domain.GetDomain(e.ctx) + var job *model.Job + var tblInfo *model.TableInfo + if e.jobID != 0 { + job, tblInfo, err = getRestoreTableByJobID(e, t, dom) + } else { + job, tblInfo, err = getRestoreTableByTableName(e, t, dom) + } + if err != nil { + return errors.Trace(err) + } + // Get table original autoID before table drop. + m, err := dom.GetSnapshotMeta(job.StartTS) + if err != nil { + return errors.Trace(err) + } + autoID, err := m.GetAutoTableID(job.SchemaID, job.TableID) + if err != nil { + return errors.Errorf("recover table_id: %d, get original autoID from snapshot meta err: %s", job.TableID, err.Error()) + } + // Call DDL RestoreTable + err = domain.GetDomain(e.ctx).DDL().RestoreTable(e.ctx, tblInfo, job.SchemaID, autoID, job.ID, job.StartTS) + return errors.Trace(err) +} + +func getRestoreTableByJobID(e *RestoreTableExec, t *meta.Meta, dom *domain.Domain) (*model.Job, *model.TableInfo, error) { + job, err := t.GetHistoryDDLJob(e.jobID) + if err != nil { + return nil, nil, errors.Trace(err) + } + if job == nil { + return nil, nil, admin.ErrDDLJobNotFound.GenWithStackByArgs(e.jobID) + } + if job.Type != model.ActionDropTable { + return nil, nil, errors.Errorf("Job %v type is %v, not drop table", job.ID, job.Type) + } + + // Check GC safe point for getting snapshot infoSchema. + err = gcutil.ValidateSnapshot(e.ctx, job.StartTS) + if err != nil { + return nil, nil, errors.Trace(err) + } + + // Get the snapshot infoSchema before drop table. + snapInfo, err := dom.GetSnapshotInfoSchema(job.StartTS) + if err != nil { + return nil, nil, errors.Trace(err) + } + // Get table meta from snapshot infoSchema. + table, ok := snapInfo.TableByID(job.TableID) + if !ok { + return nil, nil, infoschema.ErrTableNotExists.GenWithStackByArgs( + fmt.Sprintf("(Schema ID %d)", job.SchemaID), + fmt.Sprintf("(Table ID %d)", job.TableID), + ) + } + return job, table.Meta(), nil +} + +func getRestoreTableByTableName(e *RestoreTableExec, t *meta.Meta, dom *domain.Domain) (*model.Job, *model.TableInfo, error) { + jobs, err := t.GetAllHistoryDDLJobs() + if err != nil { + return nil, nil, errors.Trace(err) + } + var job *model.Job + var tblInfo *model.TableInfo + gcSafePoint, err := gcutil.GetGCSafePoint(e.ctx) + if err != nil { + return nil, nil, errors.Trace(err) + } + schemaName := e.Table.Schema.L + if schemaName == "" { + schemaName = e.ctx.GetSessionVars().CurrentDB + } + if schemaName == "" { + return nil, nil, errors.Trace(core.ErrNoDB) + } + // TODO: only search recent `e.JobNum` DDL jobs. + for i := len(jobs) - 1; i > 0; i-- { + job = jobs[i] + if job.Type != model.ActionDropTable { + continue + } + // Check GC safe point for getting snapshot infoSchema. + err = gcutil.ValidateSnapshotWithGCSafePoint(job.StartTS, gcSafePoint) + if err != nil { + return nil, nil, errors.Trace(err) + } + // Get the snapshot infoSchema before drop table. + snapInfo, err := dom.GetSnapshotInfoSchema(job.StartTS) + if err != nil { + return nil, nil, errors.Trace(err) + } + // Get table meta from snapshot infoSchema. + table, ok := snapInfo.TableByID(job.TableID) + if !ok { + return nil, nil, infoschema.ErrTableNotExists.GenWithStackByArgs( + fmt.Sprintf("(Schema ID %d)", job.SchemaID), + fmt.Sprintf("(Table ID %d)", job.TableID), + ) + } + if table.Meta().Name.L == e.Table.Name.L { + schema, ok := dom.InfoSchema().SchemaByID(job.SchemaID) + if !ok { + return nil, nil, errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs( + fmt.Sprintf("(Schema ID %d)", job.SchemaID), + )) + } + if schema.Name.L == schemaName { + tblInfo = table.Meta() + break + } + } + } + if tblInfo == nil { + return nil, nil, errors.Errorf("Can't found drop table: %v in ddl history jobs", e.Table.Name) + } + return job, tblInfo, nil +} diff --git a/executor/ddl_test.go b/executor/ddl_test.go index 9f30dcde74636..a3c3806ad429e 100644 --- a/executor/ddl_test.go +++ b/executor/ddl_test.go @@ -577,7 +577,4 @@ func (s *testSuite3) TestSetDDLReorgBatchSize(c *C) { tk.MustExec("set @@global.tidb_ddl_reorg_batch_size = 1000") res = tk.MustQuery("select @@global.tidb_ddl_reorg_batch_size") res.Check(testkit.Rows("1000")) - - // If do not LoadDDLReorgVars, the local variable will be the last loaded value. - c.Assert(variable.GetDDLReorgBatchSize(), Equals, int32(100)) } diff --git a/executor/set.go b/executor/set.go index e4ad853181532..fe6711dbfbf67 100644 --- a/executor/set.go +++ b/executor/set.go @@ -24,12 +24,10 @@ import ( "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/expression" - "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" - "github.com/pingcap/tidb/util/sqlexec" + "github.com/pingcap/tidb/util/gcutil" log "github.com/sirupsen/logrus" ) @@ -158,7 +156,7 @@ func (e *SetExecutor) setSysVariable(name string, v *expression.VarAssignment) e } newSnapshotIsSet := sessionVars.SnapshotTS > 0 && sessionVars.SnapshotTS != oldSnapshotTS if newSnapshotIsSet { - err = validateSnapshot(e.ctx, sessionVars.SnapshotTS) + err = gcutil.ValidateSnapshot(e.ctx, sessionVars.SnapshotTS) if err != nil { sessionVars.SnapshotTS = oldSnapshotTS return errors.Trace(err) @@ -183,28 +181,6 @@ func (e *SetExecutor) setSysVariable(name string, v *expression.VarAssignment) e return nil } -// validateSnapshot checks that the newly set snapshot time is after GC safe point time. -func validateSnapshot(ctx sessionctx.Context, snapshotTS uint64) error { - sql := "SELECT variable_value FROM mysql.tidb WHERE variable_name = 'tikv_gc_safe_point'" - rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, sql) - if err != nil { - return errors.Trace(err) - } - if len(rows) != 1 { - return errors.New("can not get 'tikv_gc_safe_point'") - } - safePointString := rows[0].GetString(0) - safePointTime, err := util.CompatibleParseGCTime(safePointString) - if err != nil { - return errors.Trace(err) - } - safePointTS := variable.GoTimeToTS(safePointTime) - if safePointTS > snapshotTS { - return variable.ErrSnapshotTooOld.GenWithStackByArgs(safePointString) - } - return nil -} - func (e *SetExecutor) setCharset(cs, co string) error { var err error if len(co) == 0 { diff --git a/go.mod b/go.mod index 66057655c30d1..72678225a4e54 100644 --- a/go.mod +++ b/go.mod @@ -48,7 +48,7 @@ require ( github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3 github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e github.com/pingcap/kvproto v0.0.0-20190110035000-d4fe6b336379 - github.com/pingcap/parser v0.0.0-20190114091152-8b799d66df61 + github.com/pingcap/parser v0.0.0-20190114105451-005df5698910 github.com/pingcap/pd v2.1.0-rc.4+incompatible github.com/pingcap/tidb-tools v2.1.3-0.20190104033906-883b07a04a73+incompatible github.com/pingcap/tipb v0.0.0-20181012112600-11e33c750323 diff --git a/go.sum b/go.sum index 55fa6c24a2a22..1a8e8baaa654e 100644 --- a/go.sum +++ b/go.sum @@ -110,8 +110,8 @@ github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rG github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20190110035000-d4fe6b336379 h1:l4KInBOtxjbgQLjCFHzX66vZgNzsH4a+RiuVZGrO0xk= github.com/pingcap/kvproto v0.0.0-20190110035000-d4fe6b336379/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= -github.com/pingcap/parser v0.0.0-20190114091152-8b799d66df61 h1:J9Z8Xn0MwBMOsB3jUcrirtRh0Df1Nzrv+hBmCyUg6E4= -github.com/pingcap/parser v0.0.0-20190114091152-8b799d66df61/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA= +github.com/pingcap/parser v0.0.0-20190114105451-005df5698910 h1:3kybw5XEJIcAkOQ1t8UuohQu+O3ndC/mRsJZFOEi83U= +github.com/pingcap/parser v0.0.0-20190114105451-005df5698910/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA= github.com/pingcap/pd v2.1.0-rc.4+incompatible h1:/buwGk04aHO5odk/+O8ZOXGs4qkUjYTJ2UpCJXna8NE= github.com/pingcap/pd v2.1.0-rc.4+incompatible/go.mod h1:nD3+EoYes4+aNNODO99ES59V83MZSI+dFbhyr667a0E= github.com/pingcap/tidb-tools v2.1.3-0.20190104033906-883b07a04a73+incompatible h1:Ba48wwPwPq5hd1kkQpgua49dqB5cthC2zXVo7fUUDec= diff --git a/infoschema/builder.go b/infoschema/builder.go index 10a1dadf40974..b9e1d5b3ba8a0 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -51,7 +51,7 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro var oldTableID, newTableID int64 tblIDs := make([]int64, 0, 2) switch diff.Type { - case model.ActionCreateTable: + case model.ActionCreateTable, model.ActionRestoreTable: newTableID = diff.TableID tblIDs = append(tblIDs, newTableID) case model.ActionDropTable, model.ActionDropView: diff --git a/kv/txn.go b/kv/txn.go index ff2360778eed6..00ee680049ad4 100644 --- a/kv/txn.go +++ b/kv/txn.go @@ -17,6 +17,7 @@ import ( "context" "math" "math/rand" + "sync/atomic" "time" "github.com/pingcap/errors" @@ -123,3 +124,21 @@ func BatchGetValues(txn Transaction, keys []Key) (map[string][]byte, error) { } return storageValues, nil } + +// mockCommitErrorEnable uses to enable `mockCommitError` and only mock error once. +var mockCommitErrorEnable = int64(0) + +// MockCommitErrorEnable exports for gofail testing. +func MockCommitErrorEnable() { + atomic.StoreInt64(&mockCommitErrorEnable, 1) +} + +// MockCommitErrorDisable exports for gofail testing. +func MockCommitErrorDisable() { + atomic.StoreInt64(&mockCommitErrorEnable, 0) +} + +// IsMockCommitErrorEnable exports for gofail testing. +func IsMockCommitErrorEnable() bool { + return atomic.LoadInt64(&mockCommitErrorEnable) == 1 +} diff --git a/meta/meta.go b/meta/meta.go index 41e93dda03d84..59751b45ce7a4 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -296,6 +296,17 @@ func (m *Meta) CreateTable(dbID int64, tableInfo *model.TableInfo) error { return m.txn.HSet(dbKey, tableKey, data) } +// CreateTableAndSetAutoID creates a table with tableInfo in database, +// and rebases the table autoID. +func (m *Meta) CreateTableAndSetAutoID(dbID int64, tableInfo *model.TableInfo, autoID int64) error { + err := m.CreateTable(dbID, tableInfo) + if err != nil { + return errors.Trace(err) + } + _, err = m.txn.HInc(m.dbKey(dbID), m.autoTableIDKey(tableInfo.ID), autoID) + return errors.Trace(err) +} + // DropDatabase drops whole database. func (m *Meta) DropDatabase(dbID int64) error { // Check if db exists. diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index efca01ffd618a..37adc390b402c 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -84,6 +84,14 @@ type RecoverIndex struct { IndexName string } +// RestoreTable is used for recover deleted files by mistake. +type RestoreTable struct { + baseSchemaProducer + JobID int64 + Table *ast.TableName + JobNum int64 +} + // CleanupIndex is used to delete dangling index data. type CleanupIndex struct { baseSchemaProducer diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index a8f986375a7b8..e42ae92219ac4 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -550,6 +550,14 @@ func (b *PlanBuilder) buildAdmin(as *ast.AdminStmt) (Plan, error) { p := &ShowSlow{ShowSlow: as.ShowSlow} p.SetSchema(buildShowSlowSchema()) ret = p + case ast.AdminRestoreTable: + if len(as.JobIDs) > 0 { + ret = &RestoreTable{JobID: as.JobIDs[0]} + } else if len(as.Tables) > 0 { + ret = &RestoreTable{Table: as.Tables[0], JobNum: as.JobNumber} + } else { + return nil, ErrUnsupportedType.GenWithStack("Unsupported ast.AdminStmt(%T) for buildAdmin", as) + } default: return nil, ErrUnsupportedType.GenWithStack("Unsupported ast.AdminStmt(%T) for buildAdmin", as) } diff --git a/planner/core/preprocess.go b/planner/core/preprocess.go index a4ed20acd77eb..1d04615e75eb9 100644 --- a/planner/core/preprocess.go +++ b/planner/core/preprocess.go @@ -84,6 +84,11 @@ func (p *preprocessor) Enter(in ast.Node) (out ast.Node, skipChildren bool) { return in, true case *ast.Join: p.checkNonUniqTableAlias(node) + case *ast.AdminStmt: + // The specified table in admin restore syntax maybe already been dropped. + // So skip check table name here, otherwise, admin restore table [table_name] syntax will return + // table not exists error. But admin restore is use to restore the dropped table. So skip children here. + return in, node.Tp == ast.AdminRestoreTable default: p.parentIsJoin = false } diff --git a/store/tikv/txn.go b/store/tikv/txn.go index e19dec8bf50ba..33222f0778807 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -169,6 +169,12 @@ func (txn *tikvTxn) Commit(ctx context.Context) error { } defer txn.close() + // gofail: var mockCommitError bool + // if mockCommitError && kv.IsMockCommitErrorEnable() { + // kv.MockCommitErrorDisable() + // return errors.New("mock commit error") + // } + metrics.TiKVTxnCmdCounter.WithLabelValues("set").Add(float64(txn.setCnt)) metrics.TiKVTxnCmdCounter.WithLabelValues("commit").Inc() start := time.Now() diff --git a/util/gcutil/gcutil.go b/util/gcutil/gcutil.go new file mode 100644 index 0000000000000..986a4eb7533ea --- /dev/null +++ b/util/gcutil/gcutil.go @@ -0,0 +1,98 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package gcutil + +import ( + "fmt" + + "github.com/pingcap/errors" + "github.com/pingcap/parser/model" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/sqlexec" +) + +const ( + selectVariableValueSQL = `SELECT HIGH_PRIORITY variable_value FROM mysql.tidb WHERE variable_name='%s'` + insertVariableValueSQL = `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s') + ON DUPLICATE KEY + UPDATE variable_value = '%[2]s', comment = '%[3]s'` +) + +// CheckGCEnable is use to check whether GC is enable. +func CheckGCEnable(ctx sessionctx.Context) (enable bool, err error) { + sql := fmt.Sprintf(selectVariableValueSQL, "tikv_gc_enable") + rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, sql) + if err != nil { + return false, errors.Trace(err) + } + if len(rows) != 1 { + return false, errors.New("can not get 'tikv_gc_enable'") + } + return rows[0].GetString(0) == "true", nil +} + +// DisableGC will disable GC enable variable. +func DisableGC(ctx sessionctx.Context) error { + sql := fmt.Sprintf(insertVariableValueSQL, "tikv_gc_enable", "false", "Current GC enable status") + _, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, sql) + return errors.Trace(err) +} + +// EnableGC will enable GC enable variable. +func EnableGC(ctx sessionctx.Context) error { + sql := fmt.Sprintf(insertVariableValueSQL, "tikv_gc_enable", "true", "Current GC enable status") + _, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, sql) + return errors.Trace(err) +} + +// ValidateSnapshot checks that the newly set snapshot time is after GC safe point time. +func ValidateSnapshot(ctx sessionctx.Context, snapshotTS uint64) error { + safePointTS, err := GetGCSafePoint(ctx) + if err != nil { + return errors.Trace(err) + } + if safePointTS > snapshotTS { + return variable.ErrSnapshotTooOld.GenWithStackByArgs(model.TSConvert2Time(safePointTS).String()) + } + return nil +} + +// ValidateSnapshotWithGCSafePoint checks that the newly set snapshot time is after GC safe point time. +func ValidateSnapshotWithGCSafePoint(snapshotTS, safePointTS uint64) error { + if safePointTS > snapshotTS { + return variable.ErrSnapshotTooOld.GenWithStackByArgs(model.TSConvert2Time(safePointTS).String()) + } + return nil +} + +// GetGCSafePoint loads GC safe point time from mysql.tidb. +func GetGCSafePoint(ctx sessionctx.Context) (uint64, error) { + sql := fmt.Sprintf(selectVariableValueSQL, "tikv_gc_safe_point") + rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, sql) + if err != nil { + return 0, errors.Trace(err) + } + if len(rows) != 1 { + return 0, errors.New("can not get 'tikv_gc_safe_point'") + } + safePointString := rows[0].GetString(0) + safePointTime, err := util.CompatibleParseGCTime(safePointString) + if err != nil { + return 0, errors.Trace(err) + } + ts := variable.GoTimeToTS(safePointTime) + return ts, nil +}