Skip to content

Commit

Permalink
ddl(ticdc): Add error handling for mismatched Job.Query and `Job.Bi…
Browse files Browse the repository at this point in the history
…nlogInfo.MultipleTableInfos` in TiCDC (#11580)

close #11507
  • Loading branch information
wlwilliamx authored Sep 12, 2024
1 parent e1d56c3 commit 5df797c
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 2 deletions.
13 changes: 11 additions & 2 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,15 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
case timodel.ActionCreateTables:
// we only use multiTableInfos and Querys when we generate job event
// So if some table should be discard, we just need to delete the info from multiTableInfos and Querys
if strings.Count(job.Query, ";") != len(job.BinlogInfo.MultipleTableInfos) {
log.Error("the number of queries in `Job.Query` is not equal to "+
"the number of `TableInfo` in `Job.BinlogInfo.MultipleTableInfos`",
zap.String("Job.Query", job.Query),
zap.Any("Job.BinlogInfo.MultipleTableInfos", job.BinlogInfo.MultipleTableInfos),
zap.Error(cerror.ErrTiDBUnexpectedJobMeta.GenWithStackByArgs()))
return false, cerror.ErrTiDBUnexpectedJobMeta.GenWithStackByArgs()
}

var newMultiTableInfos []*timodel.TableInfo
var newQuerys []string

Expand All @@ -407,13 +416,13 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
continue
}
newMultiTableInfos = append(newMultiTableInfos, multiTableInfos[index])
newQuerys = append(newQuerys, querys[index])
newQuerys = append(newQuerys, querys[index]+";")
}

skip = len(newMultiTableInfos) == 0

job.BinlogInfo.MultipleTableInfos = newMultiTableInfos
job.Query = strings.Join(newQuerys, ";")
job.Query = strings.Join(newQuerys, "")
case timodel.ActionRenameTable:
oldTable, ok := snap.PhysicalTableByID(job.TableID)
if !ok {
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,11 @@ error = '''
fail to create changefeed because target-ts %d is earlier than start-ts %d
'''

["CDC:ErrTiDBUnexpectedJobMeta"]
error = '''
unexpected `job_meta` from tidb
'''

["CDC:ErrTiKVEventFeed"]
error = '''
tikv event feed failed
Expand Down
6 changes: 6 additions & 0 deletions pkg/errors/cdc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ var (
errors.RFCCodeText("CDC:ErrCodeNilFunction"),
)

// Errors caused by unexpected behavior from external systems
ErrTiDBUnexpectedJobMeta = errors.Normalize(
"unexpected `job_meta` from tidb",
errors.RFCCodeText("CDC:ErrTiDBUnexpectedJobMeta"),
)

// ErrVersionIncompatible is an error for running CDC on an incompatible Cluster.
ErrVersionIncompatible = errors.Normalize(
"version is incompatible: %s",
Expand Down

0 comments on commit 5df797c

Please sign in to comment.