Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: implement ALTER DATABASE to alter charset/collation #10393

Merged
merged 7 commits into from
May 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1638,3 +1638,106 @@ func (s *testIntegrationSuite3) TestDefaultValueIsString(c *C) {
tbl := testGetTableByName(c, s.ctx, "test", "t")
c.Assert(tbl.Meta().Columns[0].DefaultValue, Equals, "1")
}

func (s *testIntegrationSuite11) TestChangingDBCharset(c *C) {
tk := testkit.NewTestKit(c, s.store)

tk.MustExec("DROP DATABASE IF EXISTS alterdb1")
tk.MustExec("CREATE DATABASE alterdb1 CHARSET=utf8 COLLATE=utf8_unicode_ci")

// No default DB errors.
noDBFailedCases := []struct {
stmt string
errMsg string
}{
{
"ALTER DATABASE CHARACTER SET = 'utf8'",
"[planner:1046]No database selected",
},
{
"ALTER SCHEMA `` CHARACTER SET = 'utf8'",
"[ddl:1102]Incorrect database name ''",
},
}
for _, fc := range noDBFailedCases {
c.Assert(tk.ExecToErr(fc.stmt).Error(), Equals, fc.errMsg, Commentf("%v", fc.stmt))
}

verifyDBCharsetAndCollate := func(dbName, chs string, coll string) {
// check `SHOW CREATE SCHEMA`.
r := tk.MustQuery("SHOW CREATE SCHEMA " + dbName).Rows()[0][1].(string)
c.Assert(strings.Contains(r, "CHARACTER SET "+chs), IsTrue)

template := `SELECT
DEFAULT_CHARACTER_SET_NAME,
DEFAULT_COLLATION_NAME
FROM INFORMATION_SCHEMA.SCHEMATA
WHERE SCHEMA_NAME = '%s'`
sql := fmt.Sprintf(template, dbName)
tk.MustQuery(sql).Check(testkit.Rows(fmt.Sprintf("%s %s", chs, coll)))

dom := domain.GetDomain(s.ctx)
// Make sure the table schema is the new schema.
err := dom.Reload()
c.Assert(err, IsNil)
dbInfo, ok := dom.InfoSchema().SchemaByName(model.NewCIStr(dbName))
c.Assert(ok, Equals, true)
c.Assert(dbInfo.Charset, Equals, chs)
c.Assert(dbInfo.Collate, Equals, coll)
}

tk.MustExec("ALTER SCHEMA alterdb1 COLLATE = utf8mb4_general_ci")
verifyDBCharsetAndCollate("alterdb1", "utf8mb4", "utf8mb4_general_ci")

tk.MustExec("DROP DATABASE IF EXISTS alterdb2")
tk.MustExec("CREATE DATABASE alterdb2 CHARSET=utf8 COLLATE=utf8_unicode_ci")
tk.MustExec("USE alterdb2")

failedCases := []struct {
stmt string
errMsg string
}{
{
"ALTER SCHEMA `` CHARACTER SET = 'utf8'",
"[ddl:1102]Incorrect database name ''",
},
{
"ALTER DATABASE CHARACTER SET = ''",
"[parser:1115]Unknown character set: ''",
},
{
"ALTER DATABASE CHARACTER SET = 'INVALID_CHARSET'",
"[parser:1115]Unknown character set: 'INVALID_CHARSET'",
},
{
"ALTER SCHEMA COLLATE = ''",
"[ddl:1273]Unknown collation: ''",
},
{
"ALTER DATABASE COLLATE = 'INVALID_COLLATION'",
"[ddl:1273]Unknown collation: 'INVALID_COLLATION'",
},
{
"ALTER DATABASE CHARACTER SET = 'utf8' DEFAULT CHARSET = 'utf8mb4'",
"[ddl:1302]Conflicting declarations: 'CHARACTER SET utf8' and 'CHARACTER SET utf8mb4'",
},
{
"ALTER SCHEMA CHARACTER SET = 'utf8' COLLATE = 'utf8mb4_bin'",
"[ddl:1302]Conflicting declarations: 'CHARACTER SET utf8' and 'CHARACTER SET utf8mb4'",
},
{
"ALTER DATABASE COLLATE = 'utf8mb4_bin' COLLATE = 'utf8_bin'",
"[ddl:1302]Conflicting declarations: 'CHARACTER SET utf8mb4' and 'CHARACTER SET utf8'",
},
}

for _, fc := range failedCases {
c.Assert(tk.ExecToErr(fc.stmt).Error(), Equals, fc.errMsg, Commentf("%v", fc.stmt))
}

tk.MustExec("ALTER SCHEMA CHARACTER SET = 'utf8mb4'")
verifyDBCharsetAndCollate("alterdb2", "utf8mb4", "utf8mb4_bin")

err := tk.ExecToErr("ALTER SCHEMA CHARACTER SET = 'utf8mb4' COLLATE = 'utf8mb4_general_ci'")
c.Assert(err.Error(), Equals, "[ddl:210]unsupported modify collate from utf8mb4_bin to utf8mb4_general_ci")
}
1 change: 1 addition & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ var (
// DDL is responsible for updating schema in data store and maintaining in-memory InfoSchema cache.
type DDL interface {
CreateSchema(ctx sessionctx.Context, name model.CIStr, charsetInfo *ast.CharsetOpt) error
AlterSchema(ctx sessionctx.Context, stmt *ast.AlterDatabaseStmt) error
DropSchema(ctx sessionctx.Context, schema model.CIStr) error
CreateTable(ctx sessionctx.Context, stmt *ast.CreateTableStmt) error
CreateView(ctx sessionctx.Context, stmt *ast.CreateViewStmt) error
Expand Down
57 changes: 57 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,63 @@ func (d *ddl) CreateSchema(ctx sessionctx.Context, schema model.CIStr, charsetIn
err = d.callHookOnChanged(err)
return errors.Trace(err)
}
func (d *ddl) AlterSchema(ctx sessionctx.Context, stmt *ast.AlterDatabaseStmt) (err error) {
// Resolve target charset and collation from options.
var toCharset, toCollate string
for _, val := range stmt.Options {
switch val.Tp {
case ast.DatabaseOptionCharset:
if toCharset == "" {
toCharset = val.Value
} else if toCharset != val.Value {
return ErrConflictingDeclarations.GenWithStackByArgs(toCharset, val.Value)
}
case ast.DatabaseOptionCollate:
info, err := charset.GetCollationByName(val.Value)
if err != nil {
return errors.Trace(err)
}
if toCharset == "" {
toCharset = info.CharsetName
} else if toCharset != info.CharsetName {
return ErrConflictingDeclarations.GenWithStackByArgs(toCharset, info.CharsetName)
}
toCollate = info.Name
}
}
if toCollate == "" {
if toCollate, err = charset.GetDefaultCollation(toCharset); err != nil {
return errors.Trace(err)
}
}

// Check if need to change charset/collation.
dbName := model.NewCIStr(stmt.Name)
is := d.GetInfoSchemaWithInterceptor(ctx)
dbInfo, ok := is.SchemaByName(dbName)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(dbName.O)
}
if dbInfo.Charset == toCharset && dbInfo.Charset == toCollate {
return nil
}

// Check the current TiDB limitations.
if err = modifiableCharsetAndCollation(toCharset, toCollate, dbInfo.Charset, dbInfo.Collate); err != nil {
return errors.Trace(err)
}

// Do the DDL job.
job := &model.Job{
SchemaID: dbInfo.ID,
Type: model.ActionModifySchemaCharsetAndCollate,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{toCharset, toCollate},
}
err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

func (d *ddl) DropSchema(ctx sessionctx.Context, schema model.CIStr) (err error) {
is := d.GetInfoSchemaWithInterceptor(ctx)
Expand Down
2 changes: 2 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,8 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
switch job.Type {
case model.ActionCreateSchema:
ver, err = onCreateSchema(d, t, job)
case model.ActionModifySchemaCharsetAndCollate:
ver, err = onModifySchemaCharsetAndCollate(t, job)
case model.ActionDropSchema:
ver, err = onDropSchema(t, job)
case model.ActionCreateTable:
Expand Down
3 changes: 2 additions & 1 deletion ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job)
case model.ActionRebaseAutoID, model.ActionShardRowID,
model.ActionModifyColumn, model.ActionAddForeignKey,
model.ActionDropForeignKey, model.ActionRenameTable,
model.ActionModifyTableCharsetAndCollate, model.ActionTruncateTablePartition:
model.ActionModifyTableCharsetAndCollate, model.ActionTruncateTablePartition,
model.ActionModifySchemaCharsetAndCollate:
ver, err = cancelOnlyNotHandledJob(job)
default:
job.State = model.JobStateCancelled
Expand Down
31 changes: 31 additions & 0 deletions ddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,37 @@ func checkSchemaNotExistsFromStore(t *meta.Meta, schemaID int64, dbInfo *model.D
return nil
}

func onModifySchemaCharsetAndCollate(t *meta.Meta, job *model.Job) (ver int64, _ error) {
var toCharset, toCollate string
if err := job.DecodeArgs(&toCharset, &toCollate); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

dbInfo, err := t.GetDatabase(job.SchemaID)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we need to add job.State = model.JobStateCancelled to make sure the job will be done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, if we don't cancel the job here, the job will never be finished.

job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can check "if dbInfo.Charset == toCharset && dbInfo.Charset == toCollate {return nil}" here。

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, then

if dbInfo.Charset == toCharset && dbInfo.Collate == toCollate {
job.State = model.JobStateCancelled
return ver, nil
}

dbInfo.Charset = toCharset
dbInfo.Collate = toCollate

if err = t.UpdateDatabase(dbInfo); err != nil {
return ver, errors.Trace(err)
}
if ver, err = updateSchemaVersion(t, job); err != nil {
return ver, errors.Trace(err)
}
job.FinishDBJob(model.JobStateDone, model.StatePublic, ver, dbInfo)
return ver, nil
}

func onDropSchema(t *meta.Meta, job *model.Job) (ver int64, _ error) {
dbInfo, err := checkSchemaExistAndCancelNotExistJob(t, job)
if err != nil {
Expand Down
27 changes: 17 additions & 10 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,28 +85,30 @@ func (e *DDLExec) Next(ctx context.Context, req *chunk.RecordBatch) (err error)
defer func() { e.ctx.GetSessionVars().StmtCtx.IsDDLJobInQueue = false }()

switch x := e.stmt.(type) {
case *ast.TruncateTableStmt:
err = e.executeTruncateTable(x)
case *ast.AlterDatabaseStmt:
err = e.executeAlterDatabase(x)
case *ast.AlterTableStmt:
err = e.executeAlterTable(x)
case *ast.CreateIndexStmt:
err = e.executeCreateIndex(x)
case *ast.CreateDatabaseStmt:
err = e.executeCreateDatabase(x)
case *ast.CreateTableStmt:
err = e.executeCreateTable(x)
case *ast.CreateViewStmt:
err = e.executeCreateView(x)
case *ast.CreateIndexStmt:
err = e.executeCreateIndex(x)
case *ast.DropIndexStmt:
err = e.executeDropIndex(x)
case *ast.DropDatabaseStmt:
err = e.executeDropDatabase(x)
case *ast.DropTableStmt:
err = e.executeDropTableOrView(x)
case *ast.DropIndexStmt:
err = e.executeDropIndex(x)
case *ast.AlterTableStmt:
err = e.executeAlterTable(x)
case *ast.RenameTableStmt:
err = e.executeRenameTable(x)
case *ast.RecoverTableStmt:
err = e.executeRecoverTable(x)
case *ast.RenameTableStmt:
err = e.executeRenameTable(x)
case *ast.TruncateTableStmt:
err = e.executeTruncateTable(x)
}
if err != nil {
return e.toErr(err)
Expand Down Expand Up @@ -163,6 +165,11 @@ func (e *DDLExec) executeCreateDatabase(s *ast.CreateDatabaseStmt) error {
return err
}

func (e *DDLExec) executeAlterDatabase(s *ast.AlterDatabaseStmt) error {
err := domain.GetDomain(e.ctx).DDL().AlterSchema(e.ctx, s)
return err
}

func (e *DDLExec) executeCreateTable(s *ast.CreateTableStmt) error {
err := domain.GetDomain(e.ctx).DDL().CreateTable(e.ctx, s)
return err
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ require (
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20190425131531-4ed0aa16f7ea
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596
github.com/pingcap/parser v0.0.0-20190506092653-e336082eb825
github.com/pingcap/parser v0.0.0-20190509110453-7a8657e6052b
github.com/pingcap/pd v0.0.0-20190424024702-bd1e2496a669
github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible
github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ github.com/pingcap/kvproto v0.0.0-20190425131531-4ed0aa16f7ea/go.mod h1:QMdbTAXC
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/parser v0.0.0-20190506092653-e336082eb825 h1:U9Kdnknj4n2v76Mg7wazevZ5N9U1OIaMwSNRVLEcLX0=
github.com/pingcap/parser v0.0.0-20190506092653-e336082eb825/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/parser v0.0.0-20190509110453-7a8657e6052b h1:IC1S/ZfKdQRccGJG7C3IkWB6S+d0OkYj8vnmNrMCu2c=
github.com/pingcap/parser v0.0.0-20190509110453-7a8657e6052b/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/pd v0.0.0-20190424024702-bd1e2496a669 h1:ZoKjndm/Ig7Ru/wojrQkc/YLUttUdQXoH77gtuWCvL4=
github.com/pingcap/pd v0.0.0-20190424024702-bd1e2496a669/go.mod h1:MUCxRzOkYiWZtlyi4MhxjCIj9PgQQ/j+BLNGm7aUsnM=
github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible h1:MkWCxgZpJBgY2f4HtwWMMFzSBb3+JPzeJgF3VrXE/bU=
Expand Down
19 changes: 19 additions & 0 deletions infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro
} else if diff.Type == model.ActionDropSchema {
tblIDs := b.applyDropSchema(diff.SchemaID)
return tblIDs, nil
} else if diff.Type == model.ActionModifySchemaCharsetAndCollate {
return nil, b.applyModifySchemaCharsetAndCollate(m, diff)
}

roDBInfo, ok := b.is.SchemaByID(diff.SchemaID)
Expand Down Expand Up @@ -127,6 +129,23 @@ func (b *Builder) applyCreateSchema(m *meta.Meta, diff *model.SchemaDiff) error
return nil
}

func (b *Builder) applyModifySchemaCharsetAndCollate(m *meta.Meta, diff *model.SchemaDiff) error {
di, err := m.GetDatabase(diff.SchemaID)
if err != nil {
return errors.Trace(err)
}
if di == nil {
// This should never happen.
return ErrDatabaseNotExists.GenWithStackByArgs(
fmt.Sprintf("(Schema ID %d)", diff.SchemaID),
)
}
newDbInfo := b.copySchemaTables(di.Name.O)
newDbInfo.Charset = di.Charset
newDbInfo.Collate = di.Collate
return nil
}

func (b *Builder) applyDropSchema(schemaID int64) []int64 {
di, ok := b.is.SchemaByID(schemaID)
if !ok {
Expand Down
12 changes: 12 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1650,6 +1650,18 @@ func (b *PlanBuilder) buildSplitIndexRegion(node *ast.SplitIndexRegionStmt) (Pla
func (b *PlanBuilder) buildDDL(node ast.DDLNode) (Plan, error) {
var authErr error
switch v := node.(type) {
case *ast.AlterDatabaseStmt:
if v.AlterDefaultDatabase {
v.Name = b.ctx.GetSessionVars().CurrentDB
}
if v.Name == "" {
return nil, ErrNoDB
}
if b.ctx.GetSessionVars().User != nil {
authErr = ErrDBaccessDenied.GenWithStackByArgs("ALTER", b.ctx.GetSessionVars().User.Hostname,
b.ctx.GetSessionVars().User.Username, v.Name)
}
b.visitInfo = appendVisitInfo(b.visitInfo, mysql.AlterPriv, v.Name, "", "", authErr)
case *ast.AlterTableStmt:
if b.ctx.GetSessionVars().User != nil {
authErr = ErrTableaccessDenied.GenWithStackByArgs("ALTER", b.ctx.GetSessionVars().User.Hostname,
Expand Down
9 changes: 9 additions & 0 deletions planner/core/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ func (p *preprocessor) Enter(in ast.Node) (out ast.Node, skipChildren bool) {
p.checkAlterTableGrammar(node)
case *ast.CreateDatabaseStmt:
p.checkCreateDatabaseGrammar(node)
case *ast.AlterDatabaseStmt:
p.checkAlterDatabaseGrammar(node)
case *ast.DropDatabaseStmt:
p.checkDropDatabaseGrammar(node)
case *ast.ShowStmt:
Expand Down Expand Up @@ -324,6 +326,13 @@ func (p *preprocessor) checkCreateDatabaseGrammar(stmt *ast.CreateDatabaseStmt)
}
}

func (p *preprocessor) checkAlterDatabaseGrammar(stmt *ast.AlterDatabaseStmt) {
// for 'ALTER DATABASE' statement, database name can be empty to alter default database.
if isIncorrectName(stmt.Name) && !stmt.AlterDefaultDatabase {
p.err = ddl.ErrWrongDBName.GenWithStackByArgs(stmt.Name)
}
}

func (p *preprocessor) checkDropDatabaseGrammar(stmt *ast.DropDatabaseStmt) {
if isIncorrectName(stmt.Name) {
p.err = ddl.ErrWrongDBName.GenWithStackByArgs(stmt.Name)
Expand Down
3 changes: 3 additions & 0 deletions planner/core/preprocess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ func (s *testValidatorSuite) TestValidator(c *C) {
{"drop table `t `", true, errors.New("[ddl:1103]Incorrect table name 't '")},
{"create database ``", true, errors.New("[ddl:1102]Incorrect database name ''")},
{"create database `test `", true, errors.New("[ddl:1102]Incorrect database name 'test '")},
{"alter database collate = 'utf8mb4_bin'", true, nil},
{"alter database `` collate = 'utf8mb4_bin'", true, errors.New("[ddl:1102]Incorrect database name ''")},
{"alter database `test ` collate = 'utf8mb4_bin'", true, errors.New("[ddl:1102]Incorrect database name 'test '")},
{"drop database ``", true, errors.New("[ddl:1102]Incorrect database name ''")},
{"drop database `test `", true, errors.New("[ddl:1102]Incorrect database name 'test '")},
{"alter table `t ` add column c int", true, errors.New("[ddl:1103]Incorrect table name 't '")},
Expand Down
Loading