Skip to content

Commit

Permalink
ddl:support the definition of null change to not null using `alte…
Browse files Browse the repository at this point in the history
…r table` (#7771)

* ddl:support the definition of null change to not null using alter table
  • Loading branch information
ciscoxll authored Oct 17, 2018
1 parent a7f78c7 commit e544882
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 19 deletions.
104 changes: 97 additions & 7 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,16 @@
package ddl

import (
"fmt"

"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -267,27 +272,42 @@ func onSetDefaultValue(t *meta.Meta, job *model.Job) (ver int64, _ error) {
return updateColumn(t, job, newCol, &newCol.Name)
}

func onModifyColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
func (w *worker) onModifyColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
newCol := &model.ColumnInfo{}
oldColName := &model.CIStr{}
pos := &ast.ColumnPosition{}
err := job.DecodeArgs(newCol, oldColName, pos)
var modifyColumnTp byte
err := job.DecodeArgs(newCol, oldColName, pos, &modifyColumnTp)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

return doModifyColumn(t, job, newCol, oldColName, pos)
return w.doModifyColumn(t, job, newCol, oldColName, pos, modifyColumnTp)
}

// doModifyColumn updates the column information and reorders all columns.
func doModifyColumn(t *meta.Meta, job *model.Job, newCol *model.ColumnInfo, oldName *model.CIStr, pos *ast.ColumnPosition) (ver int64, _ error) {
func (w *worker) doModifyColumn(t *meta.Meta, job *model.Job, newCol *model.ColumnInfo, oldName *model.CIStr, pos *ast.ColumnPosition, modifyColumnTp byte) (ver int64, _ error) {
dbInfo, err := t.GetDatabase(job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}

tblInfo, err := getTableInfo(t, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}

oldCol := model.FindColumnInfo(tblInfo.Columns, oldName.L)
if job.IsRollingback() {
ver, err = rollbackModifyColumnJob(t, tblInfo, job, oldCol, modifyColumnTp)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
return ver, nil
}

if oldCol == nil || oldCol.State != model.StatePublic {
job.State = model.JobStateCancelled
return ver, infoschema.ErrColumnNotExists.GenWithStackByArgs(oldName, tblInfo.Name)
Expand All @@ -308,6 +328,17 @@ func doModifyColumn(t *meta.Meta, job *model.Job, newCol *model.ColumnInfo, oldN
// }
// }

if !mysql.HasNotNullFlag(oldCol.Flag) && mysql.HasNotNullFlag(newCol.Flag) {
ver, err = modifyColumnFromNull2NotNull(w, t, dbInfo, tblInfo, job, oldCol, newCol)
if err != nil {
return ver, errors.Trace(err)
}
// Introduce the `mysql.HasPreventNullInsertFlag` flag to prevent users from inserting or updating null values.
if !mysql.HasPreventNullInsertFlag(oldCol.Flag) {
return ver, nil
}
}

// We need the latest column's offset and state. This information can be obtained from the store.
newCol.Offset = oldCol.Offset
newCol.State = oldCol.State
Expand All @@ -316,13 +347,14 @@ func doModifyColumn(t *meta.Meta, job *model.Job, newCol *model.ColumnInfo, oldN
if pos.Tp == ast.ColumnPositionAfter {
if oldName.L == pos.RelativeColumn.Name.L {
// `alter table tableName modify column b int after b` will return ver,ErrColumnNotExists.
job.State = model.JobStateCancelled
// Modified the type definition of 'null' to 'not null' before this, so rollback the job when an error occurs.
job.State = model.JobStateRollingback
return ver, infoschema.ErrColumnNotExists.GenWithStackByArgs(oldName, tblInfo.Name)
}

relative := model.FindColumnInfo(tblInfo.Columns, pos.RelativeColumn.Name.L)
if relative == nil || relative.State != model.StatePublic {
job.State = model.JobStateCancelled
job.State = model.JobStateRollingback
return ver, infoschema.ErrColumnNotExists.GenWithStackByArgs(pos.RelativeColumn, tblInfo.Name)
}

Expand Down Expand Up @@ -371,14 +403,33 @@ func doModifyColumn(t *meta.Meta, job *model.Job, newCol *model.ColumnInfo, oldN

ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
if err != nil {
job.State = model.JobStateCancelled
// Modified the type definition of 'null' to 'not null' before this, so rollBack the job when an error occurs.
job.State = model.JobStateRollingback
return ver, errors.Trace(err)
}

job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
return ver, nil
}

// checkForNullValue ensure there are no null values of the column of this table.
// `isDataTruncated` indicates whether the new field and the old field type are the same, in order to be compatible with mysql.
func checkForNullValue(ctx sessionctx.Context, isDataTruncated bool, schema, table, oldCol, newCol model.CIStr) error {
sql := fmt.Sprintf("select count(*) from `%s`.`%s` where `%s` is null limit 1;", schema.L, table.L, oldCol.L)
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, sql)
if err != nil {
return errors.Trace(err)
}
rowCount := rows[0].GetInt64(0)
if rowCount != 0 {
if isDataTruncated {
return errInvalidUseOfNull
}
return ErrWarnDataTruncated.GenWithStackByArgs(newCol.L, rowCount)
}
return nil
}

func updateColumn(t *meta.Meta, job *model.Job, newCol *model.ColumnInfo, oldColName *model.CIStr) (ver int64, _ error) {
tblInfo, err := getTableInfo(t, job, job.SchemaID)
if err != nil {
Expand Down Expand Up @@ -423,3 +474,42 @@ func checkAddColumnTooManyColumns(oldCols int) error {
}
return nil
}

// rollbackModifyColumnJob rollbacks the job when an error occurs.
func rollbackModifyColumnJob(t *meta.Meta, tblInfo *model.TableInfo, job *model.Job, oldCol *model.ColumnInfo, modifyColumnTp byte) (ver int64, _ error) {
var err error
if modifyColumnTp == mysql.TypeNull {
// field NotNullFlag flag reset.
tblInfo.Columns[oldCol.Offset].Flag = oldCol.Flag &^ mysql.NotNullFlag
// field PreventNullInsertFlag flag reset.
tblInfo.Columns[oldCol.Offset].Flag = oldCol.Flag &^ mysql.PreventNullInsertFlag
ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
}
return ver, nil
}

// modifyColumnFromNull2NotNull modifies the type definitions of 'null' to 'not null'.
func modifyColumnFromNull2NotNull(w *worker, t *meta.Meta, dbInfo *model.DBInfo, tblInfo *model.TableInfo, job *model.Job, oldCol, newCol *model.ColumnInfo) (ver int64, _ error) {
// Get sessionctx from context resource pool.
var ctx sessionctx.Context
ctx, err := w.sessPool.get()
if err != nil {
return ver, errors.Trace(err)
}
defer w.sessPool.put(ctx)

// If there is a null value inserted, it cannot be modified and needs to be rollback.
err = checkForNullValue(ctx, oldCol.Tp == newCol.Tp, dbInfo.Name, tblInfo.Name, oldCol.Name, newCol.Name)
if err != nil {
job.State = model.JobStateRollingback
return ver, errors.Trace(err)
}

// Prevent this field from inserting null values.
tblInfo.Columns[oldCol.Offset].Flag |= mysql.PreventNullInsertFlag
ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
return ver, errors.Trace(err)
}
12 changes: 12 additions & 0 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,18 @@ func (s *testStateChangeSuite) TestParallelAlterModifyColumn(c *C) {
s.testControlParallelExecSQL(c, sql, sql, f)
}

func (s *testStateChangeSuite) TestParallelColumnModifyingDefinition(c *C) {
sql1 := "insert into t(b) values (null);"
sql2 := "alter table t change b b2 bigint not null;"
f := func(c *C, err1, err2 error) {
c.Assert(err1, IsNil)
if err2 != nil {
c.Assert(err2.Error(), Equals, "[ddl:1265]Data truncated for column 'b2' at row 1")
}
}
s.testControlParallelExecSQL(c, sql1, sql2, f)
}

func (s *testStateChangeSuite) TestParallelChangeColumnName(c *C) {
sql1 := "ALTER TABLE t CHANGE a aa int;"
sql2 := "ALTER TABLE t CHANGE b aa int;"
Expand Down
114 changes: 112 additions & 2 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1226,8 +1226,12 @@ func (s *testDBSuite) TestChangeColumn(c *C) {
s.testErrorCode(c, sql, tmysql.ErrWrongDBName)
sql = "alter table t3 change t.a aa bigint"
s.testErrorCode(c, sql, tmysql.ErrWrongTableName)
sql = "alter table t3 change aa a bigint not null"
s.testErrorCode(c, sql, tmysql.ErrUnknown)
s.mustExec(c, "create table t4 (c1 int, c2 int, c3 int default 1, index (c1));")
s.tk.MustExec("insert into t4(c2) values (null);")
sql = "alter table t4 change c1 a1 int not null;"
s.testErrorCode(c, sql, tmysql.ErrInvalidUseOfNull)
sql = "alter table t4 change c2 a bigint not null;"
s.testErrorCode(c, sql, tmysql.WarnDataTruncated)
sql = "alter table t3 modify en enum('a', 'z', 'b', 'c') not null default 'a'"
s.testErrorCode(c, sql, tmysql.ErrUnknown)
// Rename to an existing column.
Expand Down Expand Up @@ -3416,6 +3420,112 @@ func backgroundExecOnJobUpdatedExported(c *C, s *testDBSuite, hook *ddl.TestDDLC
return hook.OnJobUpdatedExported, c3IdxInfo
}

func (s *testDBSuite) TestColumnModifyingDefinition(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use test")
s.tk.MustExec("drop table if exists test2;")
s.tk.MustExec("create table test2 (c1 int, c2 int, c3 int default 1, index (c1));")
s.tk.MustExec("alter table test2 change c2 a int not null;")
ctx := s.tk.Se.(sessionctx.Context)
is := domain.GetDomain(ctx).InfoSchema()
t, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("test2"))
c.Assert(err, IsNil)
var c2 *table.Column
for _, col := range t.Cols() {
if col.Name.L == "a" {
c2 = col
}
}
c.Assert(mysql.HasNotNullFlag(c2.Flag), IsTrue)

s.tk.MustExec("drop table if exists test2;")
s.tk.MustExec("create table test2 (c1 int, c2 int, c3 int default 1, index (c1));")
s.tk.MustExec("insert into test2(c2) values (null);")
s.testErrorCode(c, "alter table test2 change c2 a int not null", tmysql.ErrInvalidUseOfNull)
s.testErrorCode(c, "alter table test2 change c1 a1 bigint not null;", tmysql.WarnDataTruncated)
}

func (s *testDBSuite) TestModifyColumnRollBack(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.mustExec(c, "use test_db")
s.mustExec(c, "drop table if exists t1")
s.mustExec(c, "create table t1 (c1 int, c2 int, c3 int default 1, index (c1));")

var c2 *table.Column
var checkErr error
oldReorgWaitTimeout := ddl.ReorgWaitTimeout
ddl.ReorgWaitTimeout = 10 * time.Millisecond
hook := &ddl.TestDDLCallback{}
hook.OnJobUpdatedExported = func(job *model.Job) {
if checkErr != nil {
return
}

t := s.testGetTable(c, "t1")
for _, col := range t.Cols() {
if col.Name.L == "c2" {
c2 = col
}
}
if mysql.HasPreventNullInsertFlag(c2.Flag) {
s.testErrorCode(c, "insert into t1(c2) values (null);", tmysql.ErrBadNull)
}

hookCtx := mock.NewContext()
hookCtx.Store = s.store
var err error
err = hookCtx.NewTxn()
if err != nil {
checkErr = errors.Trace(err)
return
}

jobIDs := []int64{job.ID}
errs, err := admin.CancelJobs(hookCtx.Txn(), jobIDs)
if err != nil {
checkErr = errors.Trace(err)
return
}
// It only tests cancel one DDL job.
if errs[0] != nil {
checkErr = errors.Trace(errs[0])
return
}

err = hookCtx.Txn().Commit(context.Background())
if err != nil {
checkErr = errors.Trace(err)
}
}

s.dom.DDL().(ddl.DDLForTest).SetHook(hook)
done := make(chan error, 1)
go backgroundExec(s.store, "alter table t1 change c2 c2 bigint not null;", done)
ticker := time.NewTicker(s.lease / 2)
defer ticker.Stop()
LOOP:
for {
select {
case err := <-done:
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job")
break LOOP
case <-ticker.C:
s.mustExec(c, "insert into t1(c2) values (null);")
}
}

t := s.testGetTable(c, "t1")
for _, col := range t.Cols() {
if col.Name.L == "c2" {
c2 = col
}
}
c.Assert(mysql.HasNotNullFlag(c2.Flag), IsFalse)
s.mustExec(c, "drop table t1")
ddl.ReorgWaitTimeout = oldReorgWaitTimeout
}

func (s *testDBSuite) TestPartitionAddIndex(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
4 changes: 4 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ var (
// ErrUniqueKeyNeedAllFieldsInPf returns must include all columns in the table's partitioning function.
ErrUniqueKeyNeedAllFieldsInPf = terror.ClassDDL.New(codeUniqueKeyNeedAllFieldsInPf, mysql.MySQLErrName[mysql.ErrUniqueKeyNeedAllFieldsInPf])
errWrongExprInPartitionFunc = terror.ClassDDL.New(codeWrongExprInPartitionFunc, mysql.MySQLErrName[mysql.ErrWrongExprInPartitionFunc])
// ErrWarnDataTruncated returns data truncated error.
ErrWarnDataTruncated = terror.ClassDDL.New(codeWarnDataTruncated, mysql.MySQLErrName[mysql.WarnDataTruncated])
)

// DDL is responsible for updating schema in data store and maintaining in-memory InfoSchema cache.
Expand Down Expand Up @@ -620,6 +622,7 @@ const (
codeUniqueKeyNeedAllFieldsInPf = terror.ErrCode(mysql.ErrUniqueKeyNeedAllFieldsInPf)
codePrimaryCantHaveNull = terror.ErrCode(mysql.ErrPrimaryCantHaveNull)
codeWrongExprInPartitionFunc = terror.ErrCode(mysql.ErrWrongExprInPartitionFunc)
codeWarnDataTruncated = terror.ErrCode(mysql.WarnDataTruncated)
)

func init() {
Expand Down Expand Up @@ -667,6 +670,7 @@ func init() {
codeUniqueKeyNeedAllFieldsInPf: mysql.ErrUniqueKeyNeedAllFieldsInPf,
codePrimaryCantHaveNull: mysql.ErrPrimaryCantHaveNull,
codeWrongExprInPartitionFunc: mysql.ErrWrongExprInPartitionFunc,
codeWarnDataTruncated: mysql.WarnDataTruncated,
}
terror.ErrClassToMySQLCodes[terror.ClassDDL] = ddlMySQLErrCodes
}
11 changes: 8 additions & 3 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1652,9 +1652,14 @@ func (d *ddl) getModifiableColumnJob(ctx sessionctx.Context, ident ast.Ident, or
return nil, errUnsupportedModifyColumn.GenWithStackByArgs("set auto_increment")
}

// We don't support modifying the type definitions from 'null' to 'not null' now.
// We support modifying the type definitions of 'null' to 'not null' now.
var modifyColumnTp byte
if !mysql.HasNotNullFlag(col.Flag) && mysql.HasNotNullFlag(newCol.Flag) {
return nil, errUnsupportedModifyColumn.GenWithStackByArgs("null to not null")
if err = checkForNullValue(ctx, col.Tp == newCol.Tp, ident.Schema, ident.Name, col.Name, newCol.Name); err != nil {
return nil, errors.Trace(err)
}
// `modifyColumnTp` indicates that there is a type modification.
modifyColumnTp = mysql.TypeNull
}
// As same with MySQL, we don't support modifying the stored status for generated columns.
if err = checkModifyGeneratedColumn(t.Cols(), col, newCol); err != nil {
Expand All @@ -1666,7 +1671,7 @@ func (d *ddl) getModifiableColumnJob(ctx sessionctx.Context, ident ast.Ident, or
TableID: t.Meta().ID,
Type: model.ActionModifyColumn,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{&newCol, originalColName, spec.Position},
Args: []interface{}{&newCol, originalColName, spec.Position, modifyColumnTp},
}
return job, nil
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
case model.ActionDropColumn:
ver, err = onDropColumn(t, job)
case model.ActionModifyColumn:
ver, err = onModifyColumn(t, job)
ver, err = w.onModifyColumn(t, job)
case model.ActionSetDefaultValue:
ver, err = onSetDefaultValue(t, job)
case model.ActionAddIndex:
Expand Down
Loading

0 comments on commit e544882

Please sign in to comment.