Skip to content

Commit

Permalink
ddl: validate table info before doing ddl job to avoid load infoschem…
Browse files Browse the repository at this point in the history
…a error (pingcap#10464)
  • Loading branch information
crazycs520 authored and db-storage committed May 29, 2019
1 parent ee8baef commit 6c6b295
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 7 deletions.
8 changes: 4 additions & 4 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error)
// none -> delete only
job.SchemaState = model.StateDeleteOnly
columnInfo.State = model.StateDeleteOnly
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != columnInfo.State)
ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, originalState != columnInfo.State)
case model.StateDeleteOnly:
// delete only -> write only
job.SchemaState = model.StateWriteOnly
Expand Down Expand Up @@ -257,7 +257,7 @@ func onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
return ver, errors.Trace(err)
}
}
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfo.State)
ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, originalState != colInfo.State)
case model.StateWriteOnly:
// write only -> delete only
job.SchemaState = model.StateDeleteOnly
Expand Down Expand Up @@ -452,7 +452,7 @@ func (w *worker) doModifyColumn(t *meta.Meta, job *model.Job, newCol *model.Colu
}
}

ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, true)
if err != nil {
// Modified the type definition of 'null' to 'not null' before this, so rollBack the job when an error occurs.
job.State = model.JobStateRollingback
Expand Down Expand Up @@ -561,7 +561,7 @@ func modifyColumnFromNull2NotNull(w *worker, t *meta.Meta, dbInfo *model.DBInfo,

// Prevent this field from inserting null values.
tblInfo.Columns[oldCol.Offset].Flag |= mysql.PreventNullInsertFlag
ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, true)
return ver, errors.Trace(err)
}

Expand Down
38 changes: 38 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1526,6 +1526,13 @@ func (s *testDBSuite2) TestDropColumn(c *C) {
}
}

// Test for drop partition table column.
s.tk.MustExec("drop table if exists t1")
s.tk.MustExec("create table t1 (a int,b int) partition by hash(a) partitions 4;")
_, err := s.tk.Exec("alter table t1 drop column a")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[expression:1054]Unknown column 'a' in 'expression'")

s.tk.MustExec("drop database drop_col_db")
}

Expand Down Expand Up @@ -2750,3 +2757,34 @@ func (s *testDBSuite2) TestAlterShardRowIDBits(c *C) {
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[autoid:1467]Failed to read auto-increment value from storage engine")
}

func (s *testDBSuite2) TestDDLWithInvalidTableInfo(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
tk := s.tk

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
defer tk.MustExec("drop table if exists t")
// Test create with invalid expression.
_, err := s.tk.Exec(`CREATE TABLE t (
c0 int(11) ,
c1 int(11),
c2 decimal(16,4) GENERATED ALWAYS AS ((case when (c0 = 0) then 0 when (c0 > 0) then (c1 / c0) end))
);`)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[parser:1064]You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 55 near \"THEN (`c1` / `c0`) END)\" ")

tk.MustExec("create table t (a bigint, b int, c int generated always as (b+1)) partition by hash(a) partitions 4;")
// Test drop partition column.
_, err = tk.Exec("alter table t drop column a;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[expression:1054]Unknown column 'a' in 'expression'")
// Test modify column with invalid expression.
_, err = tk.Exec("alter table t modify column c int GENERATED ALWAYS AS ((case when (a = 0) then 0 when (a > 0) then (b / a) end));")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[parser:1064]You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 53 near \"THEN (`b` / `a`) END)\" ")
// Test add column with invalid expression.
_, err = tk.Exec("alter table t add column d int GENERATED ALWAYS AS ((case when (a = 0) then 0 when (a > 0) then (b / a) end));")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[parser:1064]You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 53 near \"THEN (`b` / `a`) END)\" ")
}
12 changes: 12 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,12 @@ func (d *ddl) CreateTableWithLike(ctx sessionctx.Context, ident, referIdent ast.
return errors.Trace(err)
}

// checkTableInfoValid uses to check table info valid. This is used to validate table info.
func checkTableInfoValid(tblInfo *model.TableInfo) error {
_, err := tables.TableFromMeta(nil, tblInfo)
return err
}

func buildTableInfoWithLike(ident ast.Ident, referTblInfo *model.TableInfo) model.TableInfo {
tblInfo := *referTblInfo
// Check non-public column and adjust column offset.
Expand Down Expand Up @@ -1248,6 +1254,12 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e
if err != nil {
return errors.Trace(err)
}
tbInfo.State = model.StatePublic
err = checkTableInfoValid(tbInfo)
if err != nil {
return err
}
tbInfo.State = model.StateNone

job := &model.Job{
SchemaID: schema.ID,
Expand Down
2 changes: 1 addition & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int
// none -> delete only
job.SchemaState = model.StateDeleteOnly
indexInfo.State = model.StateDeleteOnly
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != indexInfo.State)
ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, originalState != indexInfo.State)
case model.StateDeleteOnly:
// delete only -> write only
job.SchemaState = model.StateWriteOnly
Expand Down
25 changes: 23 additions & 2 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
// none -> public
tbInfo.State = model.StatePublic
tbInfo.UpdateTS = t.StartTS
err = t.CreateTableOrView(schemaID, tbInfo)
err = createTableOrViewWithCheck(t, job, schemaID, tbInfo)
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -82,6 +82,15 @@ func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
}
}

func createTableOrViewWithCheck(t *meta.Meta, job *model.Job, schemaID int64, tbInfo *model.TableInfo) error {
err := checkTableInfoValid(tbInfo)
if err != nil {
job.State = model.JobStateCancelled
return errors.Trace(err)
}
return t.CreateTableOrView(schemaID, tbInfo)
}

func onCreateView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
schemaID := job.SchemaID
tbInfo := &model.TableInfo{}
Expand Down Expand Up @@ -122,7 +131,7 @@ func onCreateView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
return ver, errors.Trace(err)
}
}
err = t.CreateTableOrView(schemaID, tbInfo)
err = createTableOrViewWithCheck(t, job, schemaID, tbInfo)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -802,6 +811,18 @@ func checkTableNotExistsFromStore(t *meta.Meta, schemaID int64, tableName string
return nil
}

// updateVersionAndTableInfoWithCheck checks table info validate and updates the schema version and the table information
func updateVersionAndTableInfoWithCheck(t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, shouldUpdateVer bool) (
ver int64, err error) {
err = checkTableInfoValid(tblInfo)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
return updateVersionAndTableInfo(t, job, tblInfo, shouldUpdateVer)

}

// updateVersionAndTableInfo updates the schema version and the table information.
func updateVersionAndTableInfo(t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, shouldUpdateVer bool) (
ver int64, err error) {
Expand Down

0 comments on commit 6c6b295

Please sign in to comment.