Skip to content

Commit

Permalink
ddl: remove onDropColumns/onAddColumns implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta committed Jun 30, 2022
1 parent 4887126 commit e4d94f8
Show file tree
Hide file tree
Showing 12 changed files with 7 additions and 379 deletions.
151 changes: 0 additions & 151 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,50 +250,6 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error)
return ver, errors.Trace(err)
}

func checkAddColumns(t *meta.Meta, job *model.Job) (*model.TableInfo, []*model.ColumnInfo, []*model.ColumnInfo, []*ast.ColumnPosition, []int, []bool, error) {
schemaID := job.SchemaID
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return nil, nil, nil, nil, nil, nil, errors.Trace(err)
}
columns := []*model.ColumnInfo{}
positions := []*ast.ColumnPosition{}
offsets := []int{}
ifNotExists := []bool{}
err = job.DecodeArgs(&columns, &positions, &offsets, &ifNotExists)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, nil, nil, nil, nil, errors.Trace(err)
}

columnInfos := make([]*model.ColumnInfo, 0, len(columns))
newColumns := make([]*model.ColumnInfo, 0, len(columns))
newPositions := make([]*ast.ColumnPosition, 0, len(columns))
newOffsets := make([]int, 0, len(columns))
newIfNotExists := make([]bool, 0, len(columns))
for i, col := range columns {
columnInfo := model.FindColumnInfo(tblInfo.Columns, col.Name.L)
if columnInfo != nil {
if columnInfo.State == model.StatePublic {
// We already have a column with the same column name.
if ifNotExists[i] {
// TODO: Should return a warning.
logutil.BgLogger().Warn("[ddl] check add columns, duplicate column", zap.Stringer("col", col.Name))
continue
}
job.State = model.JobStateCancelled
return nil, nil, nil, nil, nil, nil, infoschema.ErrColumnExists.GenWithStackByArgs(col.Name)
}
columnInfos = append(columnInfos, columnInfo)
}
newColumns = append(newColumns, columns[i])
newPositions = append(newPositions, positions[i])
newOffsets = append(newOffsets, offsets[i])
newIfNotExists = append(newIfNotExists, ifNotExists[i])
}
return tblInfo, columnInfos, newColumns, newPositions, newOffsets, newIfNotExists, nil
}

func setColumnsState(columnInfos []*model.ColumnInfo, state model.SchemaState) {
for i := range columnInfos {
columnInfos[i].State = state
Expand All @@ -318,113 +274,6 @@ func setIndicesState(indexInfos []*model.IndexInfo, state model.SchemaState) {
}
}

func onAddColumns(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
// Handle the rolling back job.
if job.IsRollingback() {
ver, err = onDropColumns(d, t, job)
if err != nil {
return ver, errors.Trace(err)
}
return ver, nil
}

failpoint.Inject("errorBeforeDecodeArgs", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(ver, errors.New("occur an error before decode args"))
}
})

tblInfo, columnInfos, columns, positions, offsets, ifNotExists, err := checkAddColumns(t, job)
if err != nil {
return ver, errors.Trace(err)
}
if len(columnInfos) == 0 {
if len(columns) == 0 {
job.State = model.JobStateCancelled
return ver, nil
}
for i := range columns {
columnInfo, pos, offset, err := createColumnInfoWithPosCheck(tblInfo, columns[i], positions[i])
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
logutil.BgLogger().Info("[ddl] run add columns job", zap.String("job", job.String()), zap.Reflect("columnInfo", *columnInfo), zap.Int("offset", offset))
positions[i] = pos
offsets[i] = offset
if err = checkAddColumnTooManyColumns(len(tblInfo.Columns)); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
columnInfos = append(columnInfos, columnInfo)
}
// Set arg to job.
job.Args = []interface{}{columnInfos, positions, offsets, ifNotExists}
}

originalState := columnInfos[0].State
switch columnInfos[0].State {
case model.StateNone:
// none -> delete only
setColumnsState(columnInfos, model.StateDeleteOnly)
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != columnInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateDeleteOnly
case model.StateDeleteOnly:
// delete only -> write only
setColumnsState(columnInfos, model.StateWriteOnly)
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != columnInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateWriteOnly
case model.StateWriteOnly:
// write only -> reorganization
setColumnsState(columnInfos, model.StateWriteReorganization)
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != columnInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateWriteReorganization
case model.StateWriteReorganization:
// reorganization -> public
// Adjust table column offsets.
oldCols := tblInfo.Columns[:len(tblInfo.Columns)-len(offsets)]
newCols := tblInfo.Columns[len(tblInfo.Columns)-len(offsets):]
tblInfo.Columns = oldCols
for i := range offsets {
// For multiple columns with after position, should adjust offsets.
// e.g. create table t(a int);
// alter table t add column b int after a, add column c int after a;
// alter table t add column a1 int after a, add column b1 int after b, add column c1 int after c;
// alter table t add column a1 int after a, add column b1 int first;
if positions[i].Tp == ast.ColumnPositionAfter {
for j := 0; j < i; j++ {
if (positions[j].Tp == ast.ColumnPositionAfter && offsets[j] < offsets[i]) || positions[j].Tp == ast.ColumnPositionFirst {
offsets[i]++
}
}
}
tblInfo.Columns = append(tblInfo.Columns, newCols[i])
tblInfo.MoveColumnInfo(len(tblInfo.Columns)-1, offsets[i])
}
setColumnsState(columnInfos, model.StatePublic)
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != columnInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
asyncNotifyEvent(d, &ddlutil.Event{Tp: model.ActionAddColumns, TableInfo: tblInfo, ColumnInfos: columnInfos})
default:
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("column", columnInfos[0].State)
}

return ver, errors.Trace(err)
}

func onDropColumns(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
tblInfo, colInfos, delCount, idxInfos, err := checkDropColumns(t, job)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,7 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
return errors.Trace(historyJob.Error)
}
// Only for JobStateCancelled job which is adding columns or drop columns or drop indexes.
if historyJob.IsCancelled() && (historyJob.Type == model.ActionAddColumns || historyJob.Type == model.ActionDropColumns || historyJob.Type == model.ActionDropIndexes) {
if historyJob.IsCancelled() && (historyJob.Type == model.ActionDropIndexes) {
if historyJob.MultiSchemaInfo != nil && len(historyJob.MultiSchemaInfo.Warnings) != 0 {
for _, warning := range historyJob.MultiSchemaInfo.Warnings {
ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
Expand Down
151 changes: 0 additions & 151 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3613,81 +3613,6 @@ func (d *ddl) AddColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTab
return errors.Trace(err)
}

// AddColumns will add multi new columns to the table.
func (d *ddl) AddColumns(ctx sessionctx.Context, ti ast.Ident, specs []*ast.AlterTableSpec) error {
schema, t, err := d.getSchemaAndTableByIdent(ctx, ti)
if err != nil {
return errors.Trace(err)
}

// Check all the columns at once.
addingColumnNames := make(map[string]bool)
dupColumnNames := make(map[string]bool)
for _, spec := range specs {
for _, specNewColumn := range spec.NewColumns {
if !addingColumnNames[specNewColumn.Name.Name.L] {
addingColumnNames[specNewColumn.Name.Name.L] = true
continue
}
if !spec.IfNotExists {
return errors.Trace(infoschema.ErrColumnExists.GenWithStackByArgs(specNewColumn.Name.Name.O))
}
dupColumnNames[specNewColumn.Name.Name.L] = true
}
}
columns := make([]*table.Column, 0, len(addingColumnNames))
positions := make([]*ast.ColumnPosition, 0, len(addingColumnNames))
offsets := make([]int, 0, len(addingColumnNames))
ifNotExists := make([]bool, 0, len(addingColumnNames))
newColumnsCount := 0
// Check the columns one by one.
for _, spec := range specs {
for _, specNewColumn := range spec.NewColumns {
if spec.IfNotExists && dupColumnNames[specNewColumn.Name.Name.L] {
err = infoschema.ErrColumnExists.GenWithStackByArgs(specNewColumn.Name.Name.O)
ctx.GetSessionVars().StmtCtx.AppendNote(err)
continue
}
col, err := checkAndCreateNewColumn(ctx, ti, schema, spec, t, specNewColumn)
if err != nil {
return errors.Trace(err)
}
// Added column has existed and if_not_exists flag is true.
if col == nil && spec.IfNotExists {
continue
}
columns = append(columns, col)
positions = append(positions, spec.Position)
offsets = append(offsets, 0)
ifNotExists = append(ifNotExists, spec.IfNotExists)
newColumnsCount++
}
}
if newColumnsCount == 0 {
return nil
}
if err = checkAddColumnTooManyColumns(len(t.Cols()) + newColumnsCount); err != nil {
return errors.Trace(err)
}

job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
TableName: t.Meta().Name.L,
Type: model.ActionAddColumns,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{columns, positions, offsets, ifNotExists},
}

err = d.DoDDLJob(ctx, job)
if err != nil {
return errors.Trace(err)
}
err = d.callHookOnChanged(job, err)
return errors.Trace(err)
}

// AddTablePartitions will add a new partition to the table.
func (d *ddl) AddTablePartitions(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
is := d.infoCache.GetLatest()
Expand Down Expand Up @@ -4130,82 +4055,6 @@ func (d *ddl) DropColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTa
return errors.Trace(err)
}

// DropColumns will drop multi-columns from the table, now we don't support drop the column with index covered.
func (d *ddl) DropColumns(ctx sessionctx.Context, ti ast.Ident, specs []*ast.AlterTableSpec) error {
schema, t, err := d.getSchemaAndTableByIdent(ctx, ti)
if err != nil {
return errors.Trace(err)
}
tblInfo := t.Meta()

dropingColumnNames := make(map[string]bool)
dupColumnNames := make(map[string]bool)
for _, spec := range specs {
if !dropingColumnNames[spec.OldColumnName.Name.L] {
dropingColumnNames[spec.OldColumnName.Name.L] = true
} else {
if spec.IfExists {
dupColumnNames[spec.OldColumnName.Name.L] = true
continue
}
return errors.Trace(dbterror.ErrCantDropFieldOrKey.GenWithStack("column %s doesn't exist", spec.OldColumnName.Name.O))
}
}

ifExists := make([]bool, 0, len(specs))
colNames := make([]model.CIStr, 0, len(specs))
for _, spec := range specs {
if spec.IfExists && dupColumnNames[spec.OldColumnName.Name.L] {
err = dbterror.ErrCantDropFieldOrKey.GenWithStack("column %s doesn't exist", spec.OldColumnName.Name.L)
ctx.GetSessionVars().StmtCtx.AppendNote(err)
continue
}
isDropable, err := checkIsDroppableColumn(ctx, t, spec)
if err != nil {
return err
}
// Column can't drop and if_exists flag is true.
if !isDropable && spec.IfExists {
continue
}
colNames = append(colNames, spec.OldColumnName.Name)
ifExists = append(ifExists, spec.IfExists)
}
if len(colNames) == 0 {
return nil
}
if len(tblInfo.Columns) == len(colNames) {
return dbterror.ErrCantRemoveAllFields.GenWithStack("can't drop all columns in table %s",
tblInfo.Name)
}
err = checkVisibleColumnCnt(t, 0, len(colNames))
if err != nil {
return err
}
var multiSchemaInfo *model.MultiSchemaInfo
if variable.EnableChangeMultiSchema.Load() {
multiSchemaInfo = &model.MultiSchemaInfo{}
}

job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
TableName: t.Meta().Name.L,
Type: model.ActionDropColumns,
BinlogInfo: &model.HistoryInfo{},
MultiSchemaInfo: multiSchemaInfo,
Args: []interface{}{colNames, ifExists},
}

err = d.DoDDLJob(ctx, job)
if err != nil {
return errors.Trace(err)
}
err = d.callHookOnChanged(job, err)
return errors.Trace(err)
}

func checkIsDroppableColumn(ctx sessionctx.Context, t table.Table, spec *ast.AlterTableSpec) (isDrapable bool, err error) {
tblInfo := t.Meta()
// Check whether dropped column has existed.
Expand Down
1 change: 0 additions & 1 deletion ddl/ddl_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ func TestIsJobRollbackable(t *testing.T) {
{model.ActionDropIndex, model.StateDeleteOnly, false},
{model.ActionDropSchema, model.StateDeleteOnly, false},
{model.ActionDropColumn, model.StateDeleteOnly, false},
{model.ActionDropColumns, model.StateDeleteOnly, false},
{model.ActionDropIndexes, model.StateDeleteOnly, false},
}
job := &model.Job{}
Expand Down
8 changes: 2 additions & 6 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func jobNeedGC(job *model.Job) bool {
// After rolling back an AddIndex operation, we need to use delete-range to delete the half-done index data.
return true
case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropPrimaryKey,
model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn, model.ActionDropColumns, model.ActionModifyColumn, model.ActionDropIndexes:
model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn, model.ActionModifyColumn, model.ActionDropIndexes:
return true
case model.ActionMultiSchemaChange:
for _, sub := range job.MultiSchemaInfo.SubJobs {
Expand Down Expand Up @@ -747,7 +747,7 @@ func writeBinlog(binlogCli *pumpcli.PumpsClient, txn kv.Transaction, job *model.
// When this column is in the "delete only" and "delete reorg" states, the binlog of "drop column" has not been written yet,
// but the column has been removed from the binlog of the write operation.
// So we add this binlog to enable downstream components to handle DML correctly in this schema state.
((job.Type == model.ActionDropColumn || job.Type == model.ActionDropColumns) && job.SchemaState == model.StateDeleteOnly) {
((job.Type == model.ActionDropColumn) && job.SchemaState == model.StateDeleteOnly) {
if skipWriteBinlog(job) {
return
}
Expand Down Expand Up @@ -890,12 +890,8 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
ver, err = w.onExchangeTablePartition(d, t, job)
case model.ActionAddColumn:
ver, err = onAddColumn(d, t, job)
case model.ActionAddColumns:
ver, err = onAddColumns(d, t, job)
case model.ActionDropColumn:
ver, err = onDropColumn(d, t, job)
case model.ActionDropColumns:
ver, err = onDropColumns(d, t, job)
case model.ActionModifyColumn:
ver, err = w.onModifyColumn(d, t, job)
case model.ActionSetDefaultValue:
Expand Down
Loading

0 comments on commit e4d94f8

Please sign in to comment.