Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: remove onDropColumns and onAddColumns #35862

Merged
merged 4 commits into from
Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 0 additions & 151 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,50 +250,6 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error)
return ver, errors.Trace(err)
}

func checkAddColumns(t *meta.Meta, job *model.Job) (*model.TableInfo, []*model.ColumnInfo, []*model.ColumnInfo, []*ast.ColumnPosition, []int, []bool, error) {
schemaID := job.SchemaID
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return nil, nil, nil, nil, nil, nil, errors.Trace(err)
}
columns := []*model.ColumnInfo{}
positions := []*ast.ColumnPosition{}
offsets := []int{}
ifNotExists := []bool{}
err = job.DecodeArgs(&columns, &positions, &offsets, &ifNotExists)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, nil, nil, nil, nil, errors.Trace(err)
}

columnInfos := make([]*model.ColumnInfo, 0, len(columns))
newColumns := make([]*model.ColumnInfo, 0, len(columns))
newPositions := make([]*ast.ColumnPosition, 0, len(columns))
newOffsets := make([]int, 0, len(columns))
newIfNotExists := make([]bool, 0, len(columns))
for i, col := range columns {
columnInfo := model.FindColumnInfo(tblInfo.Columns, col.Name.L)
if columnInfo != nil {
if columnInfo.State == model.StatePublic {
// We already have a column with the same column name.
if ifNotExists[i] {
// TODO: Should return a warning.
logutil.BgLogger().Warn("[ddl] check add columns, duplicate column", zap.Stringer("col", col.Name))
continue
}
job.State = model.JobStateCancelled
return nil, nil, nil, nil, nil, nil, infoschema.ErrColumnExists.GenWithStackByArgs(col.Name)
}
columnInfos = append(columnInfos, columnInfo)
}
newColumns = append(newColumns, columns[i])
newPositions = append(newPositions, positions[i])
newOffsets = append(newOffsets, offsets[i])
newIfNotExists = append(newIfNotExists, ifNotExists[i])
}
return tblInfo, columnInfos, newColumns, newPositions, newOffsets, newIfNotExists, nil
}

func setColumnsState(columnInfos []*model.ColumnInfo, state model.SchemaState) {
for i := range columnInfos {
columnInfos[i].State = state
Expand All @@ -318,113 +274,6 @@ func setIndicesState(indexInfos []*model.IndexInfo, state model.SchemaState) {
}
}

func onAddColumns(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
// Handle the rolling back job.
if job.IsRollingback() {
ver, err = onDropColumns(d, t, job)
if err != nil {
return ver, errors.Trace(err)
}
return ver, nil
}

failpoint.Inject("errorBeforeDecodeArgs", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(ver, errors.New("occur an error before decode args"))
}
})

tblInfo, columnInfos, columns, positions, offsets, ifNotExists, err := checkAddColumns(t, job)
if err != nil {
return ver, errors.Trace(err)
}
if len(columnInfos) == 0 {
if len(columns) == 0 {
job.State = model.JobStateCancelled
return ver, nil
}
for i := range columns {
columnInfo, pos, offset, err := createColumnInfoWithPosCheck(tblInfo, columns[i], positions[i])
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
logutil.BgLogger().Info("[ddl] run add columns job", zap.String("job", job.String()), zap.Reflect("columnInfo", *columnInfo), zap.Int("offset", offset))
positions[i] = pos
offsets[i] = offset
if err = checkAddColumnTooManyColumns(len(tblInfo.Columns)); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
columnInfos = append(columnInfos, columnInfo)
}
// Set arg to job.
job.Args = []interface{}{columnInfos, positions, offsets, ifNotExists}
}

originalState := columnInfos[0].State
switch columnInfos[0].State {
case model.StateNone:
// none -> delete only
setColumnsState(columnInfos, model.StateDeleteOnly)
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != columnInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateDeleteOnly
case model.StateDeleteOnly:
// delete only -> write only
setColumnsState(columnInfos, model.StateWriteOnly)
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != columnInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateWriteOnly
case model.StateWriteOnly:
// write only -> reorganization
setColumnsState(columnInfos, model.StateWriteReorganization)
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != columnInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateWriteReorganization
case model.StateWriteReorganization:
// reorganization -> public
// Adjust table column offsets.
oldCols := tblInfo.Columns[:len(tblInfo.Columns)-len(offsets)]
newCols := tblInfo.Columns[len(tblInfo.Columns)-len(offsets):]
tblInfo.Columns = oldCols
for i := range offsets {
// For multiple columns with after position, should adjust offsets.
// e.g. create table t(a int);
// alter table t add column b int after a, add column c int after a;
// alter table t add column a1 int after a, add column b1 int after b, add column c1 int after c;
// alter table t add column a1 int after a, add column b1 int first;
if positions[i].Tp == ast.ColumnPositionAfter {
for j := 0; j < i; j++ {
if (positions[j].Tp == ast.ColumnPositionAfter && offsets[j] < offsets[i]) || positions[j].Tp == ast.ColumnPositionFirst {
offsets[i]++
}
}
}
tblInfo.Columns = append(tblInfo.Columns, newCols[i])
tblInfo.MoveColumnInfo(len(tblInfo.Columns)-1, offsets[i])
}
setColumnsState(columnInfos, model.StatePublic)
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != columnInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
asyncNotifyEvent(d, &ddlutil.Event{Tp: model.ActionAddColumns, TableInfo: tblInfo, ColumnInfos: columnInfos})
default:
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("column", columnInfos[0].State)
}

return ver, errors.Trace(err)
}

func onDropColumns(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
tblInfo, colInfos, delCount, idxInfos, err := checkDropColumns(t, job)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,7 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
return errors.Trace(historyJob.Error)
}
// Only for JobStateCancelled job which is adding columns or drop columns or drop indexes.
if historyJob.IsCancelled() && (historyJob.Type == model.ActionAddColumns || historyJob.Type == model.ActionDropColumns || historyJob.Type == model.ActionDropIndexes) {
if historyJob.IsCancelled() && (historyJob.Type == model.ActionDropIndexes) {
if historyJob.MultiSchemaInfo != nil && len(historyJob.MultiSchemaInfo.Warnings) != 0 {
for _, warning := range historyJob.MultiSchemaInfo.Warnings {
ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
Expand Down
151 changes: 0 additions & 151 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3613,81 +3613,6 @@ func (d *ddl) AddColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTab
return errors.Trace(err)
}

// AddColumns will add multi new columns to the table.
func (d *ddl) AddColumns(ctx sessionctx.Context, ti ast.Ident, specs []*ast.AlterTableSpec) error {
schema, t, err := d.getSchemaAndTableByIdent(ctx, ti)
if err != nil {
return errors.Trace(err)
}

// Check all the columns at once.
addingColumnNames := make(map[string]bool)
dupColumnNames := make(map[string]bool)
for _, spec := range specs {
for _, specNewColumn := range spec.NewColumns {
if !addingColumnNames[specNewColumn.Name.Name.L] {
addingColumnNames[specNewColumn.Name.Name.L] = true
continue
}
if !spec.IfNotExists {
return errors.Trace(infoschema.ErrColumnExists.GenWithStackByArgs(specNewColumn.Name.Name.O))
}
dupColumnNames[specNewColumn.Name.Name.L] = true
}
}
columns := make([]*table.Column, 0, len(addingColumnNames))
positions := make([]*ast.ColumnPosition, 0, len(addingColumnNames))
offsets := make([]int, 0, len(addingColumnNames))
ifNotExists := make([]bool, 0, len(addingColumnNames))
newColumnsCount := 0
// Check the columns one by one.
for _, spec := range specs {
for _, specNewColumn := range spec.NewColumns {
if spec.IfNotExists && dupColumnNames[specNewColumn.Name.Name.L] {
err = infoschema.ErrColumnExists.GenWithStackByArgs(specNewColumn.Name.Name.O)
ctx.GetSessionVars().StmtCtx.AppendNote(err)
continue
}
col, err := checkAndCreateNewColumn(ctx, ti, schema, spec, t, specNewColumn)
if err != nil {
return errors.Trace(err)
}
// Added column has existed and if_not_exists flag is true.
if col == nil && spec.IfNotExists {
continue
}
columns = append(columns, col)
positions = append(positions, spec.Position)
offsets = append(offsets, 0)
ifNotExists = append(ifNotExists, spec.IfNotExists)
newColumnsCount++
}
}
if newColumnsCount == 0 {
return nil
}
if err = checkAddColumnTooManyColumns(len(t.Cols()) + newColumnsCount); err != nil {
return errors.Trace(err)
}

job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
TableName: t.Meta().Name.L,
Type: model.ActionAddColumns,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{columns, positions, offsets, ifNotExists},
}

err = d.DoDDLJob(ctx, job)
if err != nil {
return errors.Trace(err)
}
err = d.callHookOnChanged(job, err)
return errors.Trace(err)
}

// AddTablePartitions will add a new partition to the table.
func (d *ddl) AddTablePartitions(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
is := d.infoCache.GetLatest()
Expand Down Expand Up @@ -4130,82 +4055,6 @@ func (d *ddl) DropColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTa
return errors.Trace(err)
}

// DropColumns will drop multi-columns from the table, now we don't support drop the column with index covered.
func (d *ddl) DropColumns(ctx sessionctx.Context, ti ast.Ident, specs []*ast.AlterTableSpec) error {
schema, t, err := d.getSchemaAndTableByIdent(ctx, ti)
if err != nil {
return errors.Trace(err)
}
tblInfo := t.Meta()

dropingColumnNames := make(map[string]bool)
dupColumnNames := make(map[string]bool)
for _, spec := range specs {
if !dropingColumnNames[spec.OldColumnName.Name.L] {
dropingColumnNames[spec.OldColumnName.Name.L] = true
} else {
if spec.IfExists {
dupColumnNames[spec.OldColumnName.Name.L] = true
continue
}
return errors.Trace(dbterror.ErrCantDropFieldOrKey.GenWithStack("column %s doesn't exist", spec.OldColumnName.Name.O))
}
}

ifExists := make([]bool, 0, len(specs))
colNames := make([]model.CIStr, 0, len(specs))
for _, spec := range specs {
if spec.IfExists && dupColumnNames[spec.OldColumnName.Name.L] {
err = dbterror.ErrCantDropFieldOrKey.GenWithStack("column %s doesn't exist", spec.OldColumnName.Name.L)
ctx.GetSessionVars().StmtCtx.AppendNote(err)
continue
}
isDropable, err := checkIsDroppableColumn(ctx, t, spec)
if err != nil {
return err
}
// Column can't drop and if_exists flag is true.
if !isDropable && spec.IfExists {
continue
}
colNames = append(colNames, spec.OldColumnName.Name)
ifExists = append(ifExists, spec.IfExists)
}
if len(colNames) == 0 {
return nil
}
if len(tblInfo.Columns) == len(colNames) {
return dbterror.ErrCantRemoveAllFields.GenWithStack("can't drop all columns in table %s",
tblInfo.Name)
}
err = checkVisibleColumnCnt(t, 0, len(colNames))
if err != nil {
return err
}
var multiSchemaInfo *model.MultiSchemaInfo
if variable.EnableChangeMultiSchema.Load() {
multiSchemaInfo = &model.MultiSchemaInfo{}
}

job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
TableName: t.Meta().Name.L,
Type: model.ActionDropColumns,
BinlogInfo: &model.HistoryInfo{},
MultiSchemaInfo: multiSchemaInfo,
Args: []interface{}{colNames, ifExists},
}

err = d.DoDDLJob(ctx, job)
if err != nil {
return errors.Trace(err)
}
err = d.callHookOnChanged(job, err)
return errors.Trace(err)
}

func checkIsDroppableColumn(ctx sessionctx.Context, t table.Table, spec *ast.AlterTableSpec) (isDrapable bool, err error) {
tblInfo := t.Meta()
// Check whether dropped column has existed.
Expand Down
1 change: 0 additions & 1 deletion ddl/ddl_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ func TestIsJobRollbackable(t *testing.T) {
{model.ActionDropIndex, model.StateDeleteOnly, false},
{model.ActionDropSchema, model.StateDeleteOnly, false},
{model.ActionDropColumn, model.StateDeleteOnly, false},
{model.ActionDropColumns, model.StateDeleteOnly, false},
{model.ActionDropIndexes, model.StateDeleteOnly, false},
}
job := &model.Job{}
Expand Down
8 changes: 2 additions & 6 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func jobNeedGC(job *model.Job) bool {
// After rolling back an AddIndex operation, we need to use delete-range to delete the half-done index data.
return true
case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropPrimaryKey,
model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn, model.ActionDropColumns, model.ActionModifyColumn, model.ActionDropIndexes:
model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn, model.ActionModifyColumn, model.ActionDropIndexes:
return true
case model.ActionMultiSchemaChange:
for _, sub := range job.MultiSchemaInfo.SubJobs {
Expand Down Expand Up @@ -747,7 +747,7 @@ func writeBinlog(binlogCli *pumpcli.PumpsClient, txn kv.Transaction, job *model.
// When this column is in the "delete only" and "delete reorg" states, the binlog of "drop column" has not been written yet,
// but the column has been removed from the binlog of the write operation.
// So we add this binlog to enable downstream components to handle DML correctly in this schema state.
((job.Type == model.ActionDropColumn || job.Type == model.ActionDropColumns) && job.SchemaState == model.StateDeleteOnly) {
((job.Type == model.ActionDropColumn) && job.SchemaState == model.StateDeleteOnly) {
if skipWriteBinlog(job) {
return
}
Expand Down Expand Up @@ -890,12 +890,8 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
ver, err = w.onExchangeTablePartition(d, t, job)
case model.ActionAddColumn:
ver, err = onAddColumn(d, t, job)
case model.ActionAddColumns:
ver, err = onAddColumns(d, t, job)
case model.ActionDropColumn:
ver, err = onDropColumn(d, t, job)
case model.ActionDropColumns:
ver, err = onDropColumns(d, t, job)
case model.ActionModifyColumn:
ver, err = w.onModifyColumn(d, t, job)
case model.ActionSetDefaultValue:
Expand Down
Loading