Skip to content

Commit

Permalink
address winkyao's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
bb7133 committed Dec 23, 2018
1 parent 3db4a20 commit e0e531d
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 36 deletions.
57 changes: 57 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,63 @@ func (s *testDBSuite) TestAddAnonymousIndex(c *C) {
c.Assert(t.Indices()[1].Meta().Name.String(), Equals, "primary_3")
}

func (s *testDBSuite) TestCancelCreateTable(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
sql := "create database if not exists test_db; use test_db;"
s.tk.MustExec(sql)
sql = "drop table if exists test_source; create table test_source(a int); insert into test_source values (1), (2);"
s.tk.MustExec(sql)

testCases := []struct {
jobState model.JobState
JobSchemaState model.SchemaState
}{
// Check create table.
{model.JobStateNone, model.StateNone},
{model.JobStateRunning, model.StateWriteReorganization},
}
var checkErr error
hook := &ddl.TestDDLCallback{}
testCase := &testCases[0]
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionCreateTable && job.State == testCase.jobState && job.SchemaState == testCase.JobSchemaState {
jobIDs := []int64{job.ID}
hookCtx := mock.NewContext()
hookCtx.Store = s.store
err := hookCtx.NewTxn(context.TODO())
if err != nil {
checkErr = errors.Trace(err)
return
}
errs, err := admin.CancelJobs(hookCtx.Txn(true), jobIDs)
if err != nil {
checkErr = errors.Trace(err)
return
}
if errs[0] != nil {
checkErr = errors.Trace(errs[0])
return
}
checkErr = hookCtx.Txn(true).Commit(context.Background())
}
}
originHook := s.dom.DDL().GetHook()
defer s.dom.DDL().(ddl.DDLForTest).SetHook(originHook)
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)
var err error
for i := range testCases {
testCase = &testCases[i]
sql = "create table test_cancel_create select * from test_source"
_, err = s.tk.Exec(sql)
c.Assert(checkErr, IsNil)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job")
}
// make sure the table can be created normally
s.dom.DDL().(ddl.DDLForTest).SetHook(originHook)
s.tk.Exec("create table test_cancel_create select * from test_source")
}

func (s *testDBSuite) testAlterLock(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use " + s.schemaName)
Expand Down
5 changes: 3 additions & 2 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -991,9 +991,10 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt, snapsh

err = d.doDDLJob(ctx, job)
if err == nil {
if tbInfo.AutoIncID > 1 {
if tbInfo.AutoIncID > 1 && s.Select == nil {
// Default tableAutoIncID base is 0.
// If the first ID is expected to greater than 1, we need to do rebase.
// But if s.Select != nil, we've rebased auto increase ID before inserting data.
err = d.handleAutoIncID(tbInfo, schema.ID)
}
}
Expand Down Expand Up @@ -2100,7 +2101,7 @@ func (d *ddl) AlterTableComment(ctx sessionctx.Context, ident ast.Ident, spec *a
return errors.Trace(err)
}

// AlterTableCharset changes the table charset and collate.
// AlterTableCharsetAndCollate changes the table charset and collate.
func (d *ddl) AlterTableCharsetAndCollate(ctx sessionctx.Context, ident ast.Ident, toCharset, toCollate string) error {
// use the last one.
if toCharset == "" && toCollate == "" {
Expand Down
23 changes: 7 additions & 16 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,28 +285,19 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {

// After rolling back an AddIndex operation, we need to use delete-range to delete the half-done index data.
err = w.deleteRange(job)
case model.ActionCreateTable:
if job.State != model.JobStateRollbackDone {
break
}

// After rolling back an CreateTable operation, we need to use delete-range to delete the half-done data.
err = w.deleteRange(job)
case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropTablePartition, model.ActionTruncateTablePartition:
err = w.deleteRange(job)
}
if err != nil {
return errors.Trace(err)
}
} else if job.Type == model.ActionCreateTable {
// We're canceling the job, table may be created already with some inserted data. A clean-up is needed
err = t.DropTable(job.SchemaID, job.TableID, true)
if err != nil {
if terror.ErrorEqual(err, meta.ErrDBNotExists) {
log.Warnf("Cancelling create table job, but database'(Schema ID %d)' does not exists", job.SchemaID)
} else if terror.ErrorEqual(err, meta.ErrTableNotExists) {
log.Warnf("Cancelling create table job, but table'(Schema ID %d).(Table ID %d)' does not exists", job.SchemaID, job.TableID)
} else {
return errors.Trace(err)
}
}
err = w.deleteRange(job)
if err != nil {
return errors.Trace(err)
}
}

_, err = t.DeQueueDDLJob()
Expand Down
8 changes: 8 additions & 0 deletions ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ func rollingbackAddindex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve
return
}

func rollingbackCreateTable(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
job.State = model.JobStateRollingback
log.Infof("[ddl-%s] run the cancelling create table job: %s", w, job)
return
}

func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
switch job.Type {
case model.ActionAddColumn:
Expand All @@ -184,6 +190,8 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job)
ver, err = rollingbackAddindex(w, d, t, job)
case model.ActionDropIndex:
ver, err = rollingbackDropIndex(t, job)
case model.ActionCreateTable:
ver, err = rollingbackCreateTable(w, d, t, job)
default:
job.State = model.JobStateCancelled
err = errCancelledDDLJob
Expand Down
30 changes: 25 additions & 5 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,29 @@ func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err erro
return ver, errors.Trace(err)
}

if job.IsRollingback() {
err = t.DropTable(job.SchemaID, job.TableID, true)
if err != nil {
if meta.ErrDBNotExists.Equal(err) {
log.Warnf("Cancelling create table job, but database'(Schema ID %d)' does not exists", job.SchemaID)
} else if meta.ErrTableNotExists.Equal(err) {
log.Warnf("Cancelling create table job, but table'(Schema ID %d).(Table ID %d)' does not exists", job.SchemaID, job.TableID)
} else {
return ver, errors.Trace(err)
}
}
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tbInfo)
// for 'create table', a rolling-back may be caused by errors from inserting-data, or cancel command.
if job.Error != nil {
// for insert error, `job.Error` is set already.
err = job.Error
} else {
// for cancel command, use 'cancel DDL job' as err
err = errCancelledDDLJob
}
return ver, err
}

originalState := job.SchemaState
switch tbInfo.State {
case model.StateNone:
Expand Down Expand Up @@ -76,7 +99,6 @@ func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err erro
// reorganization -> public (insert data before we make the table public)
err = doCreateTableInsert(d, t, job, tbInfo, snapshotTS)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

Expand All @@ -88,7 +110,6 @@ func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err erro
tbInfo.UpdateTS = t.StartTS
ver, err = updateVersionAndTableInfo(t, job, tbInfo, originalState != tbInfo.State)
if err != nil {
job.State = model.JobStateCancelled
return ver, err
}
if tbInfo.State == model.StatePublic {
Expand Down Expand Up @@ -593,6 +614,7 @@ func doCreateTableInsert(d *ddlCtx, t *meta.Meta, job *model.Job, tbInfo *model.
}
closable := sctx.(interface{ Close() })
if closable == nil {
job.State = model.JobStateCancelling
return errors.Errorf("temporary session cannot be closed, should never happen")
}
defer closable.Close()
Expand All @@ -608,9 +630,7 @@ func doCreateTableInsert(d *ddlCtx, t *meta.Meta, job *model.Job, tbInfo *model.

_, err = sctx.(sqlexec.SQLExecutor).Execute(context.Background(), job.Query)
if err != nil {
job.State = model.JobStateCancelled
startKey := tablecodec.EncodeTablePrefix(tbInfo.ID)
job.Args = append(job.Args, startKey, getPartitionIDs(tbInfo))
job.State = model.JobStateCancelling
return errors.Trace(err)
}
job.SetRowCount(int64(sctx.GetSessionVars().StmtCtx.AffectedRows()))
Expand Down
25 changes: 14 additions & 11 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,9 +675,10 @@ func (b *executorBuilder) buildRevoke(revoke *ast.RevokeStmt) Executor {
}

func (b *executorBuilder) buildDDL(v *plannercore.DDL) Executor {
if b.ctx.GetSessionVars().CreateTableInsertingID != 0 {
// in a 'inserting data from select' state of creating table.
return b.buildTableInserter(v, b.ctx.GetSessionVars().CreateTableInsertingID)
if b.ctx.GetSessionVars().InsertingDataForCreateTable() {
// Create insert executor if we need to insert data for 'create table ... select', see comments of
// `InsertingDataForCreateTable()` for more explanations.
return b.buildCreateTableInsert(v, b.ctx.GetSessionVars().CreateTableInsertingID)
}
return &DDLExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
Expand All @@ -686,8 +687,8 @@ func (b *executorBuilder) buildDDL(v *plannercore.DDL) Executor {
}
}

// buildTableInserter builds a CreateTableInsertExec to insert data when creating table by 'create table ... select'
func (b *executorBuilder) buildTableInserter(v *plannercore.DDL, tableID int64) Executor {
// buildCreateTableInsert builds a CreateTableInsertExec to insert data when creating table by 'create table ... select'
func (b *executorBuilder) buildCreateTableInsert(v *plannercore.DDL, tableID int64) Executor {
stmt, ok := v.Statement.(*ast.CreateTableStmt)
if !ok || v.InsertPlan.SelectPlan == nil {
b.err = errors.Errorf("Unexpected plan: %s", v.Statement.Text())
Expand All @@ -714,12 +715,14 @@ func (b *executorBuilder) buildTableInserter(v *plannercore.DDL, tableID int64)
b.err = errors.Trace(err)
return nil
}
// The operation of the minus 1 to make sure that the current value doesn't be used,
// the next Alloc operation will get this value.
// Its behavior is consistent with MySQL.
if err = tbl.RebaseAutoID(nil, tbInfo.AutoIncID-1, false); err != nil {
b.err = errors.Trace(err)
return nil
if tbInfo.AutoIncID > 1 {
// The operation of the minus 1 to make sure that the current value doesn't be used,
// the next Alloc operation will get this value.
// Its behavior is consistent with MySQL.
if err = tbl.RebaseAutoID(nil, tbInfo.AutoIncID-1, false); err != nil {
b.err = errors.Trace(err)
return nil
}
}

insertVal := &InsertValues{
Expand Down
18 changes: 18 additions & 0 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/meta/autoid"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
Expand Down Expand Up @@ -155,10 +156,27 @@ func (s *testSuite) TestCreateTable(c *C) {
r.Check(testkit.Rows("aa 1", "bb 2", "bb 3", "bb 5"))

// test auto-increment with create table ... select
originalAutoStep := autoid.GetStep()
defer func() {
autoid.SetStep(originalAutoStep)
}()
autoid.SetStep(10)

tk.MustExec("drop table create_target;")
tk.MustExec("create table create_target(idx int key auto_increment) select * from create_source;")
r = tk.MustQuery("select * from create_target;")
r.Check(testkit.Rows("1 aa 1", "2 bb 2", "3 bb 3"))
tk.MustExec("insert into create_target(a, b) values ('cc', 4)")
r = tk.MustQuery("select * from create_target;")
r.Check(testkit.Rows("1 aa 1", "2 bb 2", "3 bb 3", "11 cc 4"))

tk.MustExec("drop table create_target;")
tk.MustExec("create table create_target(idx int key auto_increment) auto_increment=5 select * from create_source;")
r = tk.MustQuery("select * from create_target;")
r.Check(testkit.Rows("5 aa 1", "6 bb 2", "7 bb 3"))
tk.MustExec("insert into create_target(a, b) values ('cc', 4)")
r = tk.MustQuery("select * from create_target;")
r.Check(testkit.Rows("5 aa 1", "6 bb 2", "7 bb 3", "15 cc 4"))

// test 'ignore' and 'replace' keywords
tk.MustExec("drop table if exists create_target;")
Expand Down
5 changes: 3 additions & 2 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1294,8 +1294,9 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
sc.IgnoreZeroInDate = !vars.StrictSQLMode || stmt.IgnoreErr
sc.Priority = stmt.Priority
case *ast.CreateTableStmt:
if ctx.GetSessionVars().CreateTableInsertingID != 0 {
// in a 'inserting data from select' state of creating table.
// We may need to insert data for 'create table ... select', see comments of
// `InsertingDataForCreateTable()` for more explanations.
if ctx.GetSessionVars().InsertingDataForCreateTable() {
ignoreError := stmt.OnDuplicate == ast.OnDuplicateCreateTableSelectIgnore
sc.InInsertStmt = true
sc.DupKeyAsWarning = ignoreError
Expand Down
13 changes: 13 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,19 @@ func (s *SessionVars) GetSystemVar(name string) (string, bool) {
return val, ok
}

// InsertingDataForCreateTable indicates if it's in a 'inserting data from select' state of creating table.
// ********************************************************************************************************
// In order to make 'create table ... select' one transaction, it's required to insert data from select
// statement during the execution of ddl job at ddl owner side. For now TiDB doesn't support serialization &
// transmission of an execution plan, so original query is parsed again inside a ddl job, and a insert/replace
// executor is created to insert data from the 'select' part of statement.
// This functions tells if we're trying to insert data inside a ddl job.
// ********************************************************************************************************
// TODO: current implementation is an expedience, refactor it after query plan serialization is supported.
func (s *SessionVars) InsertingDataForCreateTable() bool {
return s.CreateTableInsertingID != 0
}

// deleteSystemVar deletes a system variable.
func (s *SessionVars) deleteSystemVar(name string) error {
if name != CharacterSetResults {
Expand Down

0 comments on commit e0e531d

Please sign in to comment.