Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DDL: fix a bug in column charset and collate when create table and modify column (#11300) #11424

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 68 additions & 11 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,15 +295,34 @@ func typesNeedCharset(tp byte) bool {
return false
}

func setCharsetCollationFlenDecimal(tp *types.FieldType, tblCharset string, dbCharset string) error {
func setCharsetCollationFlenDecimal(tp *types.FieldType, specifiedCollates []string, tblCharset string, dbCharset string) error {
tp.Charset = strings.ToLower(tp.Charset)
tp.Collate = strings.ToLower(tp.Collate)
if len(tp.Charset) == 0 {
if typesNeedCharset(tp.Tp) {
var err error
tp.Charset, tp.Collate, err = ResolveCharsetCollation(tblCharset, dbCharset)
if err != nil {
return errors.Trace(err)
if len(specifiedCollates) == 0 {
// Both the charset and collate are not specified.
var err error
tp.Charset, tp.Collate, err = ResolveCharsetCollation(tblCharset, dbCharset)
if err != nil {
return errors.Trace(err)
}
} else {
// The charset is not specified but the collate is.
// We should derive charset from it's collate specified rather than getting from table and db.
// It is handled like mysql's logic, use derived charset to judge conflict with next collate.
for _, spc := range specifiedCollates {
derivedCollation, err := charset.GetCollationByName(spc)
if err != nil {
return errors.Trace(err)
}
if len(tp.Charset) == 0 {
tp.Charset = derivedCollation.CharsetName
} else if tp.Charset != derivedCollation.CharsetName {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it dead code in this else if clause? The condition len(tp.Charset) in line 319 is always to be true, because line 301 checks the same thing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it dead code in this else if clause? The condition len(tp.Charset) in line 319 is always to be true, because line 301 checks the same thing.

The condition len(tp.Charset) in line 319 not always to be true. For the first time, it is. After we assigned charset to it, it should be judged in following loop.

return ErrCollationCharsetMismatch.GenWithStackByArgs(derivedCollation.Name, tp.Charset)
}
tp.Collate = derivedCollation.Name
}
}
} else {
tp.Charset = charset.CharsetBin
Expand All @@ -314,10 +333,25 @@ func setCharsetCollationFlenDecimal(tp *types.FieldType, tblCharset string, dbCh
return errUnsupportedCharset.GenWithStackByArgs(tp.Charset, tp.Collate)
}
if len(tp.Collate) == 0 {
var err error
tp.Collate, err = charset.GetDefaultCollation(tp.Charset)
if err != nil {
return errors.Trace(err)
if len(specifiedCollates) == 0 {
// The charset is specified, but the collate is not.
var err error
tp.Collate, err = charset.GetDefaultCollation(tp.Charset)
if err != nil {
return errors.Trace(err)
}
} else {
// Both the charset and collate are specified.
for _, spc := range specifiedCollates {
derivedCollation, err := charset.GetCollationByName(spc)
if err != nil {
return errors.Trace(err)
}
if tp.Charset != derivedCollation.CharsetName {
return ErrCollationCharsetMismatch.GenWithStackByArgs(derivedCollation.Name, tp.Charset)
}
tp.Collate = derivedCollation.Name
}
}
}
}
Expand All @@ -341,7 +375,10 @@ func setCharsetCollationFlenDecimal(tp *types.FieldType, tblCharset string, dbCh
// outPriKeyConstraint is the primary key constraint out of column definition. such as: create table t1 (id int , age int, primary key(id));
func buildColumnAndConstraint(ctx sessionctx.Context, offset int,
colDef *ast.ColumnDef, outPriKeyConstraint *ast.Constraint, tblCharset, dbCharset string) (*table.Column, []*ast.Constraint, error) {
if err := setCharsetCollationFlenDecimal(colDef.Tp, tblCharset, dbCharset); err != nil {
// specifiedCollates refers to collates in colDef.Options, should handle them together.
specifiedCollates := extractCollateFromOption(colDef)

if err := setCharsetCollationFlenDecimal(colDef.Tp, specifiedCollates, tblCharset, dbCharset); err != nil {
return nil, nil, errors.Trace(err)
}
col, cts, err := columnDefToCol(ctx, offset, colDef, outPriKeyConstraint)
Expand Down Expand Up @@ -2516,7 +2553,11 @@ func (d *ddl) getModifiableColumnJob(ctx sessionctx.Context, ident ast.Ident, or
newCol.FieldType.Charset = col.FieldType.Charset
newCol.FieldType.Collate = col.FieldType.Collate
}
err = setCharsetCollationFlenDecimal(&newCol.FieldType, t.Meta().Charset, schema.Charset)
// specifiedCollates refers to collates in colDef.Option. When setting charset and collate here we
// should take the collate in colDef.Option into consideration rather than handling it separately
specifiedCollates := extractCollateFromOption(specNewColumn)

err = setCharsetCollationFlenDecimal(&newCol.FieldType, specifiedCollates, t.Meta().Charset, schema.Charset)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -3266,3 +3307,19 @@ func buildPartitionInfo(meta *model.TableInfo, d *ddl, spec *ast.AlterTableSpec)
}
return part, nil
}

// extractCollateFromOption take collates(may multiple) in option into consideration
// when handle charset and collate of a column, rather than handling it separately.
func extractCollateFromOption(def *ast.ColumnDef) []string {
specifiedCollates := make([]string, 0, 0)
for i := 0; i < len(def.Options); i++ {
op := def.Options[i]
if op.Tp == ast.ColumnOptionCollate {
specifiedCollates = append(specifiedCollates, op.StrValue)
def.Options = append(def.Options[:i], def.Options[i+1:]...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if we don't delete the ColumnOptionCollate option in ColumnDef?

Copy link
Contributor Author

@AilinKid AilinKid Jul 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if we don't delete the ColumnOptionCollate option in ColumnDef?

Here if we don't remove it in Option, it is ok for the subsequent logic, not a big deal.
But for ast.Restore func, it will cause duplicate collate in restored SQL string.

// maintain the correct index
i--
}
}
return specifiedCollates
}
67 changes: 67 additions & 0 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,36 @@ func (s *testSuite3) TestCreateTable(c *C) {
}
}

// test multiple collate specified in column when create.
tk.MustExec("drop table if exists test_multiple_column_collate;")
tk.MustExec("create table test_multiple_column_collate (a char(1) collate utf8_bin collate utf8_general_ci) charset utf8mb4 collate utf8mb4_bin")
t, err := domain.GetDomain(tk.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("test_multiple_column_collate"))
c.Assert(err, IsNil)
c.Assert(t.Cols()[0].Charset, Equals, "utf8")
c.Assert(t.Cols()[0].Collate, Equals, "utf8_general_ci")
c.Assert(t.Meta().Charset, Equals, "utf8mb4")
c.Assert(t.Meta().Collate, Equals, "utf8mb4_bin")

tk.MustExec("drop table if exists test_multiple_column_collate;")
tk.MustExec("create table test_multiple_column_collate (a char(1) charset utf8 collate utf8_bin collate utf8_general_ci) charset utf8mb4 collate utf8mb4_bin")
t, err = domain.GetDomain(tk.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("test_multiple_column_collate"))
c.Assert(err, IsNil)
c.Assert(t.Cols()[0].Charset, Equals, "utf8")
c.Assert(t.Cols()[0].Collate, Equals, "utf8_general_ci")
c.Assert(t.Meta().Charset, Equals, "utf8mb4")
c.Assert(t.Meta().Collate, Equals, "utf8mb4_bin")

// test Err case for multiple collate specified in column when create.
tk.MustExec("drop table if exists test_err_multiple_collate;")
_, err = tk.Exec("create table test_err_multiple_collate (a char(1) charset utf8mb4 collate utf8_unicode_ci collate utf8_general_ci) charset utf8mb4 collate utf8mb4_bin")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, ddl.ErrCollationCharsetMismatch.GenWithStackByArgs("utf8_unicode_ci", "utf8mb4").Error())

tk.MustExec("drop table if exists test_err_multiple_collate;")
_, err = tk.Exec("create table test_err_multiple_collate (a char(1) collate utf8_unicode_ci collate utf8mb4_general_ci) charset utf8mb4 collate utf8mb4_bin")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, ddl.ErrCollationCharsetMismatch.GenWithStackByArgs("utf8mb4_general_ci", "utf8").Error())

// table option is auto-increment
tk.MustExec("drop table if exists create_auto_increment_test;")
tk.MustExec("create table create_auto_increment_test (id int not null auto_increment, name varchar(255), primary key(id)) auto_increment = 999;")
Expand Down Expand Up @@ -322,6 +352,43 @@ func (s *testSuite3) TestAlterTableModifyColumn(c *C) {
_, err = tk.Exec("alter table alter_view modify column c2 text")
c.Assert(err.Error(), Equals, ddl.ErrWrongObject.GenWithStackByArgs("test", "alter_view", "BASE TABLE").Error())
tk.MustExec("drop view alter_view")

// test multiple collate modification in column.
tk.MustExec("drop table if exists modify_column_multiple_collate")
tk.MustExec("create table modify_column_multiple_collate (a char(1) collate utf8_bin collate utf8_general_ci) charset utf8mb4 collate utf8mb4_bin")
_, err = tk.Exec("alter table modify_column_multiple_collate modify column a char(1) collate utf8mb4_bin;")
c.Assert(err, IsNil)
t, err := domain.GetDomain(tk.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("modify_column_multiple_collate"))
c.Assert(err, IsNil)
c.Assert(t.Cols()[0].Charset, Equals, "utf8mb4")
c.Assert(t.Cols()[0].Collate, Equals, "utf8mb4_bin")
c.Assert(t.Meta().Charset, Equals, "utf8mb4")
c.Assert(t.Meta().Collate, Equals, "utf8mb4_bin")

tk.MustExec("drop table if exists modify_column_multiple_collate;")
tk.MustExec("create table modify_column_multiple_collate (a char(1) collate utf8_bin collate utf8_general_ci) charset utf8mb4 collate utf8mb4_bin")
_, err = tk.Exec("alter table modify_column_multiple_collate modify column a char(1) charset utf8mb4 collate utf8mb4_bin;")
c.Assert(err, IsNil)
t, err = domain.GetDomain(tk.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("modify_column_multiple_collate"))
c.Assert(err, IsNil)
c.Assert(t.Cols()[0].Charset, Equals, "utf8mb4")
c.Assert(t.Cols()[0].Collate, Equals, "utf8mb4_bin")
c.Assert(t.Meta().Charset, Equals, "utf8mb4")
c.Assert(t.Meta().Collate, Equals, "utf8mb4_bin")

// test Err case for multiple collate modification in column.
tk.MustExec("drop table if exists err_modify_multiple_collate;")
tk.MustExec("create table err_modify_multiple_collate (a char(1) collate utf8_bin collate utf8_general_ci) charset utf8mb4 collate utf8mb4_bin")
_, err = tk.Exec("alter table err_modify_multiple_collate modify column a char(1) charset utf8mb4 collate utf8_bin;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, ddl.ErrCollationCharsetMismatch.GenWithStackByArgs("utf8_bin", "utf8mb4").Error())

tk.MustExec("drop table if exists err_modify_multiple_collate;")
tk.MustExec("create table err_modify_multiple_collate (a char(1) collate utf8_bin collate utf8_general_ci) charset utf8mb4 collate utf8mb4_bin")
_, err = tk.Exec("alter table err_modify_multiple_collate modify column a char(1) collate utf8_bin collate utf8mb4_bin;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, ddl.ErrCollationCharsetMismatch.GenWithStackByArgs("utf8mb4_bin", "utf8").Error())

}

func (s *testSuite3) TestDefaultDBAfterDropCurDB(c *C) {
Expand Down