Skip to content

Commit

Permalink
ddl: for schema-level DDL method parameter is now XXXStmt (#35722)
Browse files Browse the repository at this point in the history
ref #35665, close #35734
  • Loading branch information
lance6716 authored Jun 27, 2022
1 parent 383d1c8 commit f53e3c7
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 95 deletions.
4 changes: 2 additions & 2 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ var (

// DDL is responsible for updating schema in data store and maintaining in-memory InfoSchema cache.
type DDL interface {
CreateSchema(ctx sessionctx.Context, schema model.CIStr, charsetInfo *ast.CharsetOpt, placementPolicyRef *model.PolicyRefInfo) error
CreateSchema(ctx sessionctx.Context, stmt *ast.CreateDatabaseStmt) error
AlterSchema(sctx sessionctx.Context, stmt *ast.AlterDatabaseStmt) error
DropSchema(ctx sessionctx.Context, schema model.CIStr) error
DropSchema(ctx sessionctx.Context, stmt *ast.DropDatabaseStmt) error
CreateTable(ctx sessionctx.Context, stmt *ast.CreateTableStmt) error
CreateView(ctx sessionctx.Context, stmt *ast.CreateViewStmt) error
DropTable(ctx sessionctx.Context, tableIdent ast.Ident) (err error)
Expand Down
98 changes: 84 additions & 14 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,21 +82,76 @@ const (
tiflashCheckPendingTablesRetry = 7
)

func (d *ddl) CreateSchema(ctx sessionctx.Context, schema model.CIStr, charsetInfo *ast.CharsetOpt, placementPolicyRef *model.PolicyRefInfo) (err error) {
dbInfo := &model.DBInfo{Name: schema}
if charsetInfo != nil {
chs, coll, err := ResolveCharsetCollation(ast.CharsetOpt{Chs: charsetInfo.Chs, Col: charsetInfo.Col})
func (d *ddl) CreateSchema(ctx sessionctx.Context, stmt *ast.CreateDatabaseStmt) (err error) {
var placementPolicyRef *model.PolicyRefInfo
sessionVars := ctx.GetSessionVars()

// If no charset and/or collation is specified use collation_server and character_set_server
charsetOpt := &ast.CharsetOpt{}
if sessionVars.GlobalVarsAccessor != nil {
charsetOpt.Col, err = variable.GetSessionOrGlobalSystemVar(sessionVars, variable.CollationServer)
if err != nil {
return errors.Trace(err)
return err
}
charsetOpt.Chs, err = variable.GetSessionOrGlobalSystemVar(sessionVars, variable.CharacterSetServer)
if err != nil {
return err
}
}

explicitCharset := false
explicitCollation := false
if len(stmt.Options) != 0 {
for _, val := range stmt.Options {
switch val.Tp {
case ast.DatabaseOptionCharset:
charsetOpt.Chs = val.Value
explicitCharset = true
case ast.DatabaseOptionCollate:
charsetOpt.Col = val.Value
explicitCollation = true
case ast.DatabaseOptionPlacementPolicy:
placementPolicyRef = &model.PolicyRefInfo{
Name: model.NewCIStr(val.Value),
}
}
}
dbInfo.Charset = chs
dbInfo.Collate = coll
} else {
dbInfo.Charset, dbInfo.Collate = charset.GetDefaultCharsetAndCollate()
}

if charsetOpt.Col != "" {
coll, err := collate.GetCollationByName(charsetOpt.Col)
if err != nil {
return err
}

// The collation is not valid for the specified character set.
// Try to remove any of them, but not if they are explicitly defined.
if coll.CharsetName != charsetOpt.Chs {
if explicitCollation && !explicitCharset {
// Use the explicitly set collation, not the implicit charset.
charsetOpt.Chs = ""
}
if !explicitCollation && explicitCharset {
// Use the explicitly set charset, not the (session) collation.
charsetOpt.Col = ""
}
}

}
dbInfo := &model.DBInfo{Name: stmt.Name}
chs, coll, err := ResolveCharsetCollation(ast.CharsetOpt{Chs: charsetOpt.Chs, Col: charsetOpt.Col})
if err != nil {
return errors.Trace(err)
}
dbInfo.Charset = chs
dbInfo.Collate = coll
dbInfo.PlacementPolicyRef = placementPolicyRef
return d.CreateSchemaWithInfo(ctx, dbInfo, OnExistError)

onExist := OnExistError
if stmt.IfNotExists {
onExist = OnExistIgnore
}
return d.CreateSchemaWithInfo(ctx, dbInfo, onExist)
}

func (d *ddl) CreateSchemaWithInfo(
Expand Down Expand Up @@ -147,6 +202,12 @@ func (d *ddl) CreateSchemaWithInfo(

err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)

if infoschema.ErrDatabaseExists.Equal(err) && onExist == OnExistIgnore {
ctx.GetSessionVars().StmtCtx.AppendNote(err)
return nil
}

return errors.Trace(err)
}

Expand Down Expand Up @@ -520,11 +581,14 @@ func (d *ddl) AlterSchema(sctx sessionctx.Context, stmt *ast.AlterDatabaseStmt)
return nil
}

func (d *ddl) DropSchema(ctx sessionctx.Context, schema model.CIStr) (err error) {
func (d *ddl) DropSchema(ctx sessionctx.Context, stmt *ast.DropDatabaseStmt) (err error) {
is := d.GetInfoSchemaWithInterceptor(ctx)
old, ok := is.SchemaByName(schema)
old, ok := is.SchemaByName(stmt.Name)
if !ok {
return errors.Trace(infoschema.ErrDatabaseNotExists)
if stmt.IfExists {
return nil
}
return infoschema.ErrDatabaseDropExists.GenWithStackByArgs(stmt.Name)
}
job := &model.Job{
SchemaID: old.ID,
Expand All @@ -537,13 +601,19 @@ func (d *ddl) DropSchema(ctx sessionctx.Context, schema model.CIStr) (err error)
err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
if err != nil {
if infoschema.ErrDatabaseNotExists.Equal(err) {
if stmt.IfExists {
return nil
}
return infoschema.ErrDatabaseDropExists.GenWithStackByArgs(stmt.Name)
}
return errors.Trace(err)
}
if !config.TableLockEnabled() {
return nil
}
// Clear table locks hold by the session.
tbs := is.SchemaTables(schema)
tbs := is.SchemaTables(stmt.Name)
lockTableIDs := make([]int64, 0)
for _, tb := range tbs {
if ok, _ := ctx.CheckTableLocked(tb.Meta().ID); ok {
Expand Down
19 changes: 14 additions & 5 deletions domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,22 @@ func TestInfo(t *testing.T) {
}
require.True(t, syncerStarted)

// Make sure loading schema is normal.
cs := &ast.CharsetOpt{
Chs: "utf8",
Col: "utf8_bin",
stmt := &ast.CreateDatabaseStmt{
Name: model.NewCIStr("aaa"),
// Make sure loading schema is normal.
Options: []*ast.DatabaseOption{
{
Tp: ast.DatabaseOptionCharset,
Value: "utf8",
},
{
Tp: ast.DatabaseOptionCollate,
Value: "utf8_bin",
},
},
}
ctx := mock.NewContext()
require.NoError(t, dom.ddl.CreateSchema(ctx, model.NewCIStr("aaa"), cs, nil))
require.NoError(t, dom.ddl.CreateSchema(ctx, stmt))
require.NoError(t, dom.Reload())
require.Equal(t, int64(1), dom.InfoSchema().SchemaMetaVersion())

Expand Down
75 changes: 2 additions & 73 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/temptable"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/gcutil"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -248,70 +247,7 @@ func (e *DDLExec) executeRenameTable(s *ast.RenameTableStmt) error {
}

func (e *DDLExec) executeCreateDatabase(s *ast.CreateDatabaseStmt) error {
var opt *ast.CharsetOpt
var placementPolicyRef *model.PolicyRefInfo
var err error
sessionVars := e.ctx.GetSessionVars()

// If no charset and/or collation is specified use collation_server and character_set_server
opt = &ast.CharsetOpt{}
if sessionVars.GlobalVarsAccessor != nil {
opt.Col, err = variable.GetSessionOrGlobalSystemVar(sessionVars, variable.CollationServer)
if err != nil {
return err
}
opt.Chs, err = variable.GetSessionOrGlobalSystemVar(sessionVars, variable.CharacterSetServer)
if err != nil {
return err
}
}

explicitCharset := false
explicitCollation := false
if len(s.Options) != 0 {
for _, val := range s.Options {
switch val.Tp {
case ast.DatabaseOptionCharset:
opt.Chs = val.Value
explicitCharset = true
case ast.DatabaseOptionCollate:
opt.Col = val.Value
explicitCollation = true
case ast.DatabaseOptionPlacementPolicy:
placementPolicyRef = &model.PolicyRefInfo{
Name: model.NewCIStr(val.Value),
}
}
}
}

if opt.Col != "" {
coll, err := collate.GetCollationByName(opt.Col)
if err != nil {
return err
}

// The collation is not valid for the specified character set.
// Try to remove any of them, but not if they are explicitly defined.
if coll.CharsetName != opt.Chs {
if explicitCollation && !explicitCharset {
// Use the explicitly set collation, not the implicit charset.
opt.Chs = ""
}
if !explicitCollation && explicitCharset {
// Use the explicitly set charset, not the (session) collation.
opt.Col = ""
}
}

}

err = domain.GetDomain(e.ctx).DDL().CreateSchema(e.ctx, s.Name, opt, placementPolicyRef)
if err != nil {
if infoschema.ErrDatabaseExists.Equal(err) && s.IfNotExists {
err = nil
}
}
err := domain.GetDomain(e.ctx).DDL().CreateSchema(e.ctx, s)
return err
}

Expand Down Expand Up @@ -383,14 +319,7 @@ func (e *DDLExec) executeDropDatabase(s *ast.DropDatabaseStmt) error {
return errors.New("Drop 'mysql' database is forbidden")
}

err := domain.GetDomain(e.ctx).DDL().DropSchema(e.ctx, dbName)
if infoschema.ErrDatabaseNotExists.Equal(err) {
if s.IfExists {
err = nil
} else {
err = infoschema.ErrDatabaseDropExists.GenWithStackByArgs(s.Name)
}
}
err := domain.GetDomain(e.ctx).DDL().DropSchema(e.ctx, s)
sessionVars := e.ctx.GetSessionVars()
if err == nil && strings.ToLower(sessionVars.CurrentDB) == dbName.L {
sessionVars.CurrentDB = ""
Expand Down
3 changes: 2 additions & 1 deletion tests/realtikvtest/sessiontest/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/auth"
"github.com/pingcap/tidb/parser/format"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -1363,7 +1364,7 @@ func TestDoDDLJobQuit(t *testing.T) {
defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/storeCloseInLoop")) }()

// this DDL call will enter deadloop before this fix
err = dom.DDL().CreateSchema(se, model.NewCIStr("testschema"), nil, nil)
err = dom.DDL().CreateSchema(se, &ast.CreateDatabaseStmt{Name: model.NewCIStr("testschema")})
require.Equal(t, "context canceled", err.Error())
}

Expand Down

0 comments on commit f53e3c7

Please sign in to comment.