-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ddl:support the definition of null
change to not null
using alter table
#7771
Changes from all commits
f16f213
e9d24dd
7e2cea2
478e2d4
63ebfe5
96d7210
10aa28f
41b003e
e4485ce
8be9eec
f5910fd
14f8559
936d81f
78c29f5
fab2c06
afc1f01
7de5a88
14326b4
0cec56e
99cf471
bd66996
286c61e
f273c6c
d17e77d
86b7200
0f2c134
b3706a8
53321b9
3eff6bb
eb8ac2d
a682f9c
d7b7785
613fe46
3b638bf
64c83b6
6b1beef
d8f5133
84c3c2e
f87f096
869f9b8
44a592e
e16ed3c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,11 +14,16 @@ | |
package ddl | ||
|
||
import ( | ||
"fmt" | ||
|
||
"github.com/pingcap/tidb/ast" | ||
"github.com/pingcap/tidb/ddl/util" | ||
"github.com/pingcap/tidb/infoschema" | ||
"github.com/pingcap/tidb/meta" | ||
"github.com/pingcap/tidb/model" | ||
"github.com/pingcap/tidb/mysql" | ||
"github.com/pingcap/tidb/sessionctx" | ||
"github.com/pingcap/tidb/util/sqlexec" | ||
"github.com/pkg/errors" | ||
log "github.com/sirupsen/logrus" | ||
) | ||
|
@@ -267,27 +272,42 @@ func onSetDefaultValue(t *meta.Meta, job *model.Job) (ver int64, _ error) { | |
return updateColumn(t, job, newCol, &newCol.Name) | ||
} | ||
|
||
func onModifyColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) { | ||
func (w *worker) onModifyColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) { | ||
newCol := &model.ColumnInfo{} | ||
oldColName := &model.CIStr{} | ||
pos := &ast.ColumnPosition{} | ||
err := job.DecodeArgs(newCol, oldColName, pos) | ||
var modifyColumnTp byte | ||
err := job.DecodeArgs(newCol, oldColName, pos, &modifyColumnTp) | ||
if err != nil { | ||
job.State = model.JobStateCancelled | ||
return ver, errors.Trace(err) | ||
} | ||
|
||
return doModifyColumn(t, job, newCol, oldColName, pos) | ||
return w.doModifyColumn(t, job, newCol, oldColName, pos, modifyColumnTp) | ||
} | ||
|
||
// doModifyColumn updates the column information and reorders all columns. | ||
func doModifyColumn(t *meta.Meta, job *model.Job, newCol *model.ColumnInfo, oldName *model.CIStr, pos *ast.ColumnPosition) (ver int64, _ error) { | ||
func (w *worker) doModifyColumn(t *meta.Meta, job *model.Job, newCol *model.ColumnInfo, oldName *model.CIStr, pos *ast.ColumnPosition, modifyColumnTp byte) (ver int64, _ error) { | ||
dbInfo, err := t.GetDatabase(job.SchemaID) | ||
if err != nil { | ||
return ver, errors.Trace(err) | ||
} | ||
|
||
tblInfo, err := getTableInfo(t, job, job.SchemaID) | ||
if err != nil { | ||
return ver, errors.Trace(err) | ||
} | ||
|
||
oldCol := model.FindColumnInfo(tblInfo.Columns, oldName.L) | ||
if job.IsRollingback() { | ||
ver, err = rollbackModifyColumnJob(t, tblInfo, job, oldCol, modifyColumnTp) | ||
if err != nil { | ||
return ver, errors.Trace(err) | ||
} | ||
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo) | ||
return ver, nil | ||
crazycs520 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
if oldCol == nil || oldCol.State != model.StatePublic { | ||
job.State = model.JobStateCancelled | ||
return ver, infoschema.ErrColumnNotExists.GenWithStackByArgs(oldName, tblInfo.Name) | ||
|
@@ -308,6 +328,17 @@ func doModifyColumn(t *meta.Meta, job *model.Job, newCol *model.ColumnInfo, oldN | |
// } | ||
// } | ||
|
||
if !mysql.HasNotNullFlag(oldCol.Flag) && mysql.HasNotNullFlag(newCol.Flag) { | ||
ver, err = modifyColumnFromNull2NotNull(w, t, dbInfo, tblInfo, job, oldCol, newCol) | ||
if err != nil { | ||
return ver, errors.Trace(err) | ||
} | ||
// Introduce the `mysql.HasPreventNullInsertFlag` flag to prevent users from inserting or updating null values. | ||
if !mysql.HasPreventNullInsertFlag(oldCol.Flag) { | ||
return ver, nil | ||
} | ||
} | ||
|
||
// We need the latest column's offset and state. This information can be obtained from the store. | ||
newCol.Offset = oldCol.Offset | ||
newCol.State = oldCol.State | ||
|
@@ -316,13 +347,14 @@ func doModifyColumn(t *meta.Meta, job *model.Job, newCol *model.ColumnInfo, oldN | |
if pos.Tp == ast.ColumnPositionAfter { | ||
if oldName.L == pos.RelativeColumn.Name.L { | ||
// `alter table tableName modify column b int after b` will return ver,ErrColumnNotExists. | ||
job.State = model.JobStateCancelled | ||
// Modified the type definition of 'null' to 'not null' before this, so rollback the job when an error occurs. | ||
job.State = model.JobStateRollingback | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why change it to JobStateRollingback? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If an error occurs here, it will not be a rollback. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add some comment to describe why we use model.JobStateRollingback state. |
||
return ver, infoschema.ErrColumnNotExists.GenWithStackByArgs(oldName, tblInfo.Name) | ||
} | ||
|
||
relative := model.FindColumnInfo(tblInfo.Columns, pos.RelativeColumn.Name.L) | ||
if relative == nil || relative.State != model.StatePublic { | ||
job.State = model.JobStateCancelled | ||
job.State = model.JobStateRollingback | ||
return ver, infoschema.ErrColumnNotExists.GenWithStackByArgs(pos.RelativeColumn, tblInfo.Name) | ||
} | ||
|
||
|
@@ -371,14 +403,33 @@ func doModifyColumn(t *meta.Meta, job *model.Job, newCol *model.ColumnInfo, oldN | |
|
||
ver, err = updateVersionAndTableInfo(t, job, tblInfo, true) | ||
if err != nil { | ||
job.State = model.JobStateCancelled | ||
// Modified the type definition of 'null' to 'not null' before this, so rollBack the job when an error occurs. | ||
job.State = model.JobStateRollingback | ||
return ver, errors.Trace(err) | ||
} | ||
|
||
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) | ||
return ver, nil | ||
} | ||
|
||
// checkForNullValue ensure there are no null values of the column of this table. | ||
// `isDataTruncated` indicates whether the new field and the old field type are the same, in order to be compatible with mysql. | ||
func checkForNullValue(ctx sessionctx.Context, isDataTruncated bool, schema, table, oldCol, newCol model.CIStr) error { | ||
sql := fmt.Sprintf("select count(*) from `%s`.`%s` where `%s` is null limit 1;", schema.L, table.L, oldCol.L) | ||
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, sql) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
rowCount := rows[0].GetInt64(0) | ||
if rowCount != 0 { | ||
if isDataTruncated { | ||
return errInvalidUseOfNull | ||
} | ||
return ErrWarnDataTruncated.GenWithStackByArgs(newCol.L, rowCount) | ||
} | ||
return nil | ||
} | ||
|
||
func updateColumn(t *meta.Meta, job *model.Job, newCol *model.ColumnInfo, oldColName *model.CIStr) (ver int64, _ error) { | ||
tblInfo, err := getTableInfo(t, job, job.SchemaID) | ||
if err != nil { | ||
|
@@ -423,3 +474,42 @@ func checkAddColumnTooManyColumns(oldCols int) error { | |
} | ||
return nil | ||
} | ||
|
||
// rollbackModifyColumnJob rollbacks the job when an error occurs. | ||
func rollbackModifyColumnJob(t *meta.Meta, tblInfo *model.TableInfo, job *model.Job, oldCol *model.ColumnInfo, modifyColumnTp byte) (ver int64, _ error) { | ||
var err error | ||
if modifyColumnTp == mysql.TypeNull { | ||
// field NotNullFlag flag reset. | ||
tblInfo.Columns[oldCol.Offset].Flag = oldCol.Flag &^ mysql.NotNullFlag | ||
// field PreventNullInsertFlag flag reset. | ||
tblInfo.Columns[oldCol.Offset].Flag = oldCol.Flag &^ mysql.PreventNullInsertFlag | ||
ver, err = updateVersionAndTableInfo(t, job, tblInfo, true) | ||
if err != nil { | ||
return ver, errors.Trace(err) | ||
} | ||
} | ||
return ver, nil | ||
} | ||
|
||
// modifyColumnFromNull2NotNull modifies the type definitions of 'null' to 'not null'. | ||
func modifyColumnFromNull2NotNull(w *worker, t *meta.Meta, dbInfo *model.DBInfo, tblInfo *model.TableInfo, job *model.Job, oldCol, newCol *model.ColumnInfo) (ver int64, _ error) { | ||
// Get sessionctx from context resource pool. | ||
var ctx sessionctx.Context | ||
ctx, err := w.sessPool.get() | ||
if err != nil { | ||
return ver, errors.Trace(err) | ||
} | ||
defer w.sessPool.put(ctx) | ||
|
||
// If there is a null value inserted, it cannot be modified and needs to be rollback. | ||
err = checkForNullValue(ctx, oldCol.Tp == newCol.Tp, dbInfo.Name, tblInfo.Name, oldCol.Name, newCol.Name) | ||
if err != nil { | ||
job.State = model.JobStateRollingback | ||
return ver, errors.Trace(err) | ||
} | ||
|
||
// Prevent this field from inserting null values. | ||
tblInfo.Columns[oldCol.Offset].Flag |= mysql.PreventNullInsertFlag | ||
ver, err = updateVersionAndTableInfo(t, job, tblInfo, true) | ||
return ver, errors.Trace(err) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1226,8 +1226,12 @@ func (s *testDBSuite) TestChangeColumn(c *C) { | |
s.testErrorCode(c, sql, tmysql.ErrWrongDBName) | ||
sql = "alter table t3 change t.a aa bigint" | ||
s.testErrorCode(c, sql, tmysql.ErrWrongTableName) | ||
sql = "alter table t3 change aa a bigint not null" | ||
s.testErrorCode(c, sql, tmysql.ErrUnknown) | ||
s.mustExec(c, "create table t4 (c1 int, c2 int, c3 int default 1, index (c1));") | ||
s.tk.MustExec("insert into t4(c2) values (null);") | ||
sql = "alter table t4 change c1 a1 int not null;" | ||
s.testErrorCode(c, sql, tmysql.ErrInvalidUseOfNull) | ||
sql = "alter table t4 change c2 a bigint not null;" | ||
s.testErrorCode(c, sql, tmysql.WarnDataTruncated) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this the same with mysql? In my environment:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @lamxTyler Done mysql mysql> create table test2 (c1 int, c2 int, c3 int default 1, index (c1));
Query OK, 0 rows affected (0.03 sec)
mysql> insert into test2(c2) values (null);
Query OK, 1 row affected (0.00 sec)
mysql> alter table test2 change c2 a bigint not null;
ERROR 1265 (01000): Data truncated for column 'a' at row 1 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Your test also changes the column type, but if you use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
FYI @ciscoxll |
||
sql = "alter table t3 modify en enum('a', 'z', 'b', 'c') not null default 'a'" | ||
s.testErrorCode(c, sql, tmysql.ErrUnknown) | ||
// Rename to an existing column. | ||
|
@@ -3416,6 +3420,112 @@ func backgroundExecOnJobUpdatedExported(c *C, s *testDBSuite, hook *ddl.TestDDLC | |
return hook.OnJobUpdatedExported, c3IdxInfo | ||
} | ||
|
||
func (s *testDBSuite) TestColumnModifyingDefinition(c *C) { | ||
s.tk = testkit.NewTestKit(c, s.store) | ||
s.tk.MustExec("use test") | ||
s.tk.MustExec("drop table if exists test2;") | ||
s.tk.MustExec("create table test2 (c1 int, c2 int, c3 int default 1, index (c1));") | ||
s.tk.MustExec("alter table test2 change c2 a int not null;") | ||
ctx := s.tk.Se.(sessionctx.Context) | ||
is := domain.GetDomain(ctx).InfoSchema() | ||
t, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("test2")) | ||
c.Assert(err, IsNil) | ||
var c2 *table.Column | ||
for _, col := range t.Cols() { | ||
if col.Name.L == "a" { | ||
c2 = col | ||
} | ||
} | ||
c.Assert(mysql.HasNotNullFlag(c2.Flag), IsTrue) | ||
|
||
s.tk.MustExec("drop table if exists test2;") | ||
s.tk.MustExec("create table test2 (c1 int, c2 int, c3 int default 1, index (c1));") | ||
s.tk.MustExec("insert into test2(c2) values (null);") | ||
s.testErrorCode(c, "alter table test2 change c2 a int not null", tmysql.ErrInvalidUseOfNull) | ||
s.testErrorCode(c, "alter table test2 change c1 a1 bigint not null;", tmysql.WarnDataTruncated) | ||
} | ||
|
||
func (s *testDBSuite) TestModifyColumnRollBack(c *C) { | ||
s.tk = testkit.NewTestKit(c, s.store) | ||
s.mustExec(c, "use test_db") | ||
s.mustExec(c, "drop table if exists t1") | ||
s.mustExec(c, "create table t1 (c1 int, c2 int, c3 int default 1, index (c1));") | ||
|
||
var c2 *table.Column | ||
var checkErr error | ||
oldReorgWaitTimeout := ddl.ReorgWaitTimeout | ||
ddl.ReorgWaitTimeout = 10 * time.Millisecond | ||
hook := &ddl.TestDDLCallback{} | ||
hook.OnJobUpdatedExported = func(job *model.Job) { | ||
if checkErr != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a test here, when the PreventNullInsertFlag is set, insert a null value into this table, it must fail. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @winkyao Done. |
||
return | ||
} | ||
|
||
t := s.testGetTable(c, "t1") | ||
for _, col := range t.Cols() { | ||
if col.Name.L == "c2" { | ||
c2 = col | ||
} | ||
} | ||
if mysql.HasPreventNullInsertFlag(c2.Flag) { | ||
s.testErrorCode(c, "insert into t1(c2) values (null);", tmysql.ErrBadNull) | ||
} | ||
|
||
hookCtx := mock.NewContext() | ||
hookCtx.Store = s.store | ||
var err error | ||
err = hookCtx.NewTxn() | ||
if err != nil { | ||
checkErr = errors.Trace(err) | ||
return | ||
} | ||
|
||
jobIDs := []int64{job.ID} | ||
errs, err := admin.CancelJobs(hookCtx.Txn(), jobIDs) | ||
if err != nil { | ||
checkErr = errors.Trace(err) | ||
return | ||
} | ||
// It only tests cancel one DDL job. | ||
if errs[0] != nil { | ||
checkErr = errors.Trace(errs[0]) | ||
return | ||
} | ||
|
||
err = hookCtx.Txn().Commit(context.Background()) | ||
if err != nil { | ||
checkErr = errors.Trace(err) | ||
} | ||
} | ||
|
||
s.dom.DDL().(ddl.DDLForTest).SetHook(hook) | ||
done := make(chan error, 1) | ||
go backgroundExec(s.store, "alter table t1 change c2 c2 bigint not null;", done) | ||
ticker := time.NewTicker(s.lease / 2) | ||
defer ticker.Stop() | ||
LOOP: | ||
for { | ||
select { | ||
case err := <-done: | ||
c.Assert(err, NotNil) | ||
c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job") | ||
break LOOP | ||
case <-ticker.C: | ||
s.mustExec(c, "insert into t1(c2) values (null);") | ||
} | ||
} | ||
|
||
t := s.testGetTable(c, "t1") | ||
for _, col := range t.Cols() { | ||
if col.Name.L == "c2" { | ||
c2 = col | ||
} | ||
} | ||
c.Assert(mysql.HasNotNullFlag(c2.Flag), IsFalse) | ||
s.mustExec(c, "drop table t1") | ||
ddl.ReorgWaitTimeout = oldReorgWaitTimeout | ||
} | ||
|
||
func (s *testDBSuite) TestPartitionAddIndex(c *C) { | ||
tk := testkit.NewTestKit(c, s.store) | ||
tk.MustExec("use test") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need to change this to member function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@winkyao I need to get
sessionctx
from context resource pool.