diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index f24102e713d12..f96c1723dac62 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -312,15 +312,34 @@ func typesNeedCharset(tp byte) bool { return false } -func setCharsetCollationFlenDecimal(tp *types.FieldType, tblCharset string, dbCharset string) error { +func setCharsetCollationFlenDecimal(tp *types.FieldType, specifiedCollates []string, tblCharset string, dbCharset string) error { tp.Charset = strings.ToLower(tp.Charset) tp.Collate = strings.ToLower(tp.Collate) if len(tp.Charset) == 0 { if typesNeedCharset(tp.Tp) { - var err error - tp.Charset, tp.Collate, err = ResolveCharsetCollation(tblCharset, dbCharset) - if err != nil { - return errors.Trace(err) + if len(specifiedCollates) == 0 { + // Both the charset and collate are not specified. + var err error + tp.Charset, tp.Collate, err = ResolveCharsetCollation(tblCharset, dbCharset) + if err != nil { + return errors.Trace(err) + } + } else { + // The charset is not specified but the collate is. + // We should derive charset from it's collate specified rather than getting from table and db. + // It is handled like mysql's logic, use derived charset to judge conflict with next collate. + for _, spc := range specifiedCollates { + derivedCollation, err := charset.GetCollationByName(spc) + if err != nil { + return errors.Trace(err) + } + if len(tp.Charset) == 0 { + tp.Charset = derivedCollation.CharsetName + } else if tp.Charset != derivedCollation.CharsetName { + return ErrCollationCharsetMismatch.GenWithStackByArgs(derivedCollation.Name, tp.Charset) + } + tp.Collate = derivedCollation.Name + } } } else { tp.Charset = charset.CharsetBin @@ -331,10 +350,25 @@ func setCharsetCollationFlenDecimal(tp *types.FieldType, tblCharset string, dbCh return errUnsupportedCharset.GenWithStackByArgs(tp.Charset, tp.Collate) } if len(tp.Collate) == 0 { - var err error - tp.Collate, err = charset.GetDefaultCollation(tp.Charset) - if err != nil { - return errors.Trace(err) + if len(specifiedCollates) == 0 { + // The charset is specified, but the collate is not. + var err error + tp.Collate, err = charset.GetDefaultCollation(tp.Charset) + if err != nil { + return errors.Trace(err) + } + } else { + // Both the charset and collate are specified. + for _, spc := range specifiedCollates { + derivedCollation, err := charset.GetCollationByName(spc) + if err != nil { + return errors.Trace(err) + } + if tp.Charset != derivedCollation.CharsetName { + return ErrCollationCharsetMismatch.GenWithStackByArgs(derivedCollation.Name, tp.Charset) + } + tp.Collate = derivedCollation.Name + } } } } @@ -358,7 +392,10 @@ func setCharsetCollationFlenDecimal(tp *types.FieldType, tblCharset string, dbCh // outPriKeyConstraint is the primary key constraint out of column definition. such as: create table t1 (id int , age int, primary key(id)); func buildColumnAndConstraint(ctx sessionctx.Context, offset int, colDef *ast.ColumnDef, outPriKeyConstraint *ast.Constraint, tblCharset, dbCharset string) (*table.Column, []*ast.Constraint, error) { - if err := setCharsetCollationFlenDecimal(colDef.Tp, tblCharset, dbCharset); err != nil { + // specifiedCollates refers to collates in colDef.Options, should handle them together. + specifiedCollates := extractCollateFromOption(colDef) + + if err := setCharsetCollationFlenDecimal(colDef.Tp, specifiedCollates, tblCharset, dbCharset); err != nil { return nil, nil, errors.Trace(err) } col, cts, err := columnDefToCol(ctx, offset, colDef, outPriKeyConstraint) @@ -2596,7 +2633,11 @@ func (d *ddl) getModifiableColumnJob(ctx sessionctx.Context, ident ast.Ident, or newCol.FieldType.Charset = col.FieldType.Charset newCol.FieldType.Collate = col.FieldType.Collate } - err = setCharsetCollationFlenDecimal(&newCol.FieldType, t.Meta().Charset, schema.Charset) + // specifiedCollates refers to collates in colDef.Option. When setting charset and collate here we + // should take the collate in colDef.Option into consideration rather than handling it separately + specifiedCollates := extractCollateFromOption(specNewColumn) + + err = setCharsetCollationFlenDecimal(&newCol.FieldType, specifiedCollates, t.Meta().Charset, schema.Charset) if err != nil { return nil, errors.Trace(err) } @@ -3582,3 +3623,19 @@ type lockTablesArg struct { SessionInfo model.SessionInfo IsCleanup bool } + +// extractCollateFromOption take collates(may multiple) in option into consideration +// when handle charset and collate of a column, rather than handling it separately. +func extractCollateFromOption(def *ast.ColumnDef) []string { + specifiedCollates := make([]string, 0, 0) + for i := 0; i < len(def.Options); i++ { + op := def.Options[i] + if op.Tp == ast.ColumnOptionCollate { + specifiedCollates = append(specifiedCollates, op.StrValue) + def.Options = append(def.Options[:i], def.Options[i+1:]...) + // maintain the correct index + i-- + } + } + return specifiedCollates +} diff --git a/executor/ddl_test.go b/executor/ddl_test.go index c29dce7a8acc8..2590b72af428d 100644 --- a/executor/ddl_test.go +++ b/executor/ddl_test.go @@ -121,6 +121,36 @@ func (s *testSuite3) TestCreateTable(c *C) { } } + // test multiple collate specified in column when create. + tk.MustExec("drop table if exists test_multiple_column_collate;") + tk.MustExec("create table test_multiple_column_collate (a char(1) collate utf8_bin collate utf8_general_ci) charset utf8mb4 collate utf8mb4_bin") + t, err := domain.GetDomain(tk.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("test_multiple_column_collate")) + c.Assert(err, IsNil) + c.Assert(t.Cols()[0].Charset, Equals, "utf8") + c.Assert(t.Cols()[0].Collate, Equals, "utf8_general_ci") + c.Assert(t.Meta().Charset, Equals, "utf8mb4") + c.Assert(t.Meta().Collate, Equals, "utf8mb4_bin") + + tk.MustExec("drop table if exists test_multiple_column_collate;") + tk.MustExec("create table test_multiple_column_collate (a char(1) charset utf8 collate utf8_bin collate utf8_general_ci) charset utf8mb4 collate utf8mb4_bin") + t, err = domain.GetDomain(tk.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("test_multiple_column_collate")) + c.Assert(err, IsNil) + c.Assert(t.Cols()[0].Charset, Equals, "utf8") + c.Assert(t.Cols()[0].Collate, Equals, "utf8_general_ci") + c.Assert(t.Meta().Charset, Equals, "utf8mb4") + c.Assert(t.Meta().Collate, Equals, "utf8mb4_bin") + + // test Err case for multiple collate specified in column when create. + tk.MustExec("drop table if exists test_err_multiple_collate;") + _, err = tk.Exec("create table test_err_multiple_collate (a char(1) charset utf8mb4 collate utf8_unicode_ci collate utf8_general_ci) charset utf8mb4 collate utf8mb4_bin") + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, ddl.ErrCollationCharsetMismatch.GenWithStackByArgs("utf8_unicode_ci", "utf8mb4").Error()) + + tk.MustExec("drop table if exists test_err_multiple_collate;") + _, err = tk.Exec("create table test_err_multiple_collate (a char(1) collate utf8_unicode_ci collate utf8mb4_general_ci) charset utf8mb4 collate utf8mb4_bin") + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, ddl.ErrCollationCharsetMismatch.GenWithStackByArgs("utf8mb4_general_ci", "utf8").Error()) + // table option is auto-increment tk.MustExec("drop table if exists create_auto_increment_test;") tk.MustExec("create table create_auto_increment_test (id int not null auto_increment, name varchar(255), primary key(id)) auto_increment = 999;") @@ -322,6 +352,43 @@ func (s *testSuite3) TestAlterTableModifyColumn(c *C) { _, err = tk.Exec("alter table alter_view modify column c2 text") c.Assert(err.Error(), Equals, ddl.ErrWrongObject.GenWithStackByArgs("test", "alter_view", "BASE TABLE").Error()) tk.MustExec("drop view alter_view") + + // test multiple collate modification in column. + tk.MustExec("drop table if exists modify_column_multiple_collate") + tk.MustExec("create table modify_column_multiple_collate (a char(1) collate utf8_bin collate utf8_general_ci) charset utf8mb4 collate utf8mb4_bin") + _, err = tk.Exec("alter table modify_column_multiple_collate modify column a char(1) collate utf8mb4_bin;") + c.Assert(err, IsNil) + t, err := domain.GetDomain(tk.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("modify_column_multiple_collate")) + c.Assert(err, IsNil) + c.Assert(t.Cols()[0].Charset, Equals, "utf8mb4") + c.Assert(t.Cols()[0].Collate, Equals, "utf8mb4_bin") + c.Assert(t.Meta().Charset, Equals, "utf8mb4") + c.Assert(t.Meta().Collate, Equals, "utf8mb4_bin") + + tk.MustExec("drop table if exists modify_column_multiple_collate;") + tk.MustExec("create table modify_column_multiple_collate (a char(1) collate utf8_bin collate utf8_general_ci) charset utf8mb4 collate utf8mb4_bin") + _, err = tk.Exec("alter table modify_column_multiple_collate modify column a char(1) charset utf8mb4 collate utf8mb4_bin;") + c.Assert(err, IsNil) + t, err = domain.GetDomain(tk.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("modify_column_multiple_collate")) + c.Assert(err, IsNil) + c.Assert(t.Cols()[0].Charset, Equals, "utf8mb4") + c.Assert(t.Cols()[0].Collate, Equals, "utf8mb4_bin") + c.Assert(t.Meta().Charset, Equals, "utf8mb4") + c.Assert(t.Meta().Collate, Equals, "utf8mb4_bin") + + // test Err case for multiple collate modification in column. + tk.MustExec("drop table if exists err_modify_multiple_collate;") + tk.MustExec("create table err_modify_multiple_collate (a char(1) collate utf8_bin collate utf8_general_ci) charset utf8mb4 collate utf8mb4_bin") + _, err = tk.Exec("alter table err_modify_multiple_collate modify column a char(1) charset utf8mb4 collate utf8_bin;") + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, ddl.ErrCollationCharsetMismatch.GenWithStackByArgs("utf8_bin", "utf8mb4").Error()) + + tk.MustExec("drop table if exists err_modify_multiple_collate;") + tk.MustExec("create table err_modify_multiple_collate (a char(1) collate utf8_bin collate utf8_general_ci) charset utf8mb4 collate utf8mb4_bin") + _, err = tk.Exec("alter table err_modify_multiple_collate modify column a char(1) collate utf8_bin collate utf8mb4_bin;") + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, ddl.ErrCollationCharsetMismatch.GenWithStackByArgs("utf8mb4_bin", "utf8").Error()) + } func (s *testSuite3) TestDefaultDBAfterDropCurDB(c *C) {