Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: fix unexpect fail when create expression index (#39822) #39857

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,8 +528,7 @@ func loadDDLReorgVars(w *worker) error {
return ddlutil.LoadDDLReorgVars(w.ddlJobCtx, ctx)
}

func makeupDecodeColMap(sessCtx sessionctx.Context, t table.Table) (map[int64]decoder.Column, error) {
dbName := model.NewCIStr(sessCtx.GetSessionVars().CurrentDB)
func makeupDecodeColMap(sessCtx sessionctx.Context, dbName model.CIStr, t table.Table) (map[int64]decoder.Column, error) {
writableColInfos := make([]*model.ColumnInfo, 0, len(t.WritableCols()))
for _, col := range t.WritableCols() {
writableColInfos = append(writableColInfos, col.ColumnInfo)
Expand Down Expand Up @@ -579,7 +578,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba

startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey
sessCtx := newContext(reorgInfo.d.store)
decodeColMap, err := makeupDecodeColMap(sessCtx, t)
decodeColMap, err := makeupDecodeColMap(sessCtx, reorgInfo.dbInfo.Name, t)
if err != nil {
return errors.Trace(err)
}
Expand Down
10 changes: 10 additions & 0 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,17 @@ func (w *worker) doModifyColumnTypeWithData(

func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table,
oldCol, changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) (done bool, ver int64, err error) {
<<<<<<< HEAD
reorgInfo, err := getReorgInfo(w.JobContext, d, t, job, tbl, BuildElements(changingCol, changingIdxs))
=======
job.ReorgMeta.ReorgTp = model.ReorgTypeTxn
rh := newReorgHandler(t, w.sess, w.concurrentDDL)
dbInfo, err := t.GetDatabase(job.SchemaID)
if err != nil {
return false, ver, errors.Trace(err)
}
reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, dbInfo, tbl, BuildElements(changingCol, changingIdxs), false)
>>>>>>> b73eb4bf4c (ddl: fix unexpect fail when create expression index (#39822))
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
Expand Down
8 changes: 8 additions & 0 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1794,6 +1794,14 @@ func (s *stateChangeSuite) TestCreateExpressionIndex() {
s.Require().NoError(checkErr)
tk.MustExec("admin check table t")
tk.MustQuery("select * from t order by a, b").Check(testkit.Rows("0 9", "0 11", "0 11", "1 7", "2 7", "5 7", "8 8", "10 10", "10 10"))

// https://github.com/pingcap/tidb/issues/39784
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(name varchar(20))")
tk.MustExec("insert into t values ('Abc'), ('Bcd'), ('abc')")
tk.MustExec("create index idx on test.t((lower(test.t.name)))")
tk.MustExec("admin check table t")
}

func (s *stateChangeSuite) TestCreateUniqueExpressionIndex() {
Expand Down
9 changes: 9 additions & 0 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,16 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) {
elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}}
<<<<<<< HEAD
reorgInfo, err := getReorgInfo(w.JobContext, d, t, job, tbl, elements)
=======
rh := newReorgHandler(t, w.sess, w.concurrentDDL)
dbInfo, err := t.GetDatabase(job.SchemaID)
if err != nil {
return false, ver, errors.Trace(err)
}
reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, dbInfo, tbl, elements, mergingTmpIdx)
>>>>>>> b73eb4bf4c (ddl: fix unexpect fail when create expression index (#39822))
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
Expand Down
9 changes: 9 additions & 0 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1153,6 +1153,10 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
if err != nil {
return ver, errors.Trace(err)
}
dbInfo, err := t.GetDatabase(job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}
// If table has global indexes, we need reorg to clean up them.
if pt, ok := tbl.(table.PartitionedTable); ok && hasGlobalIndex(tblInfo) {
// Build elements for compatible with modify column type. elements will not be used when reorganizing.
Expand All @@ -1162,7 +1166,12 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
elements = append(elements, &meta.Element{ID: idxInfo.ID, TypeKey: meta.IndexElementKey})
}
}
<<<<<<< HEAD
reorgInfo, err := getReorgInfoFromPartitions(w.JobContext, d, t, job, tbl, physicalTableIDs, elements)
=======
rh := newReorgHandler(t, w.sess, w.concurrentDDL)
reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job), d, rh, job, dbInfo, tbl, physicalTableIDs, elements)
>>>>>>> b73eb4bf4c (ddl: fix unexpect fail when create expression index (#39822))

if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
Expand Down
73 changes: 73 additions & 0 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ type reorgInfo struct {
// PhysicalTableID is used to trace the current partition we are handling.
// If the table is not partitioned, PhysicalTableID would be TableID.
PhysicalTableID int64
dbInfo *model.DBInfo
elements []*meta.Element
currElement *meta.Element
}
Expand Down Expand Up @@ -590,7 +591,12 @@ func getValidCurrentVersion(store kv.Storage) (ver kv.Version, err error) {
return ver, nil
}

<<<<<<< HEAD
func getReorgInfo(ctx *JobContext, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, elements []*meta.Element) (*reorgInfo, error) {
=======
func getReorgInfo(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job, dbInfo *model.DBInfo,
tbl table.Table, elements []*meta.Element, mergingTmpIdx bool) (*reorgInfo, error) {
>>>>>>> b73eb4bf4c (ddl: fix unexpect fail when create expression index (#39822))
var (
element *meta.Element
start kv.Key
Expand Down Expand Up @@ -659,7 +665,73 @@ func getReorgInfo(ctx *JobContext, d *ddlCtx, t *meta.Meta, job *model.Job, tbl
})

var err error
<<<<<<< HEAD
element, start, end, pid, err = t.GetDDLReorgHandle(job)
=======
element, start, end, pid, err = rh.GetDDLReorgHandle(job)
if err != nil {
// If the reorg element doesn't exist, this reorg info should be saved by the older TiDB versions.
// It's compatible with the older TiDB versions.
// We'll try to remove it in the next major TiDB version.
if meta.ErrDDLReorgElementNotExist.Equal(err) {
job.SnapshotVer = 0
logutil.BgLogger().Warn("[ddl] get reorg info, the element does not exist", zap.String("job", job.String()), zap.Bool("enableConcurrentDDL", rh.enableConcurrentDDL))
}
return &info, errors.Trace(err)
}
}
info.Job = job
info.d = d
info.StartKey = start
info.EndKey = end
info.PhysicalTableID = pid
info.currElement = element
info.elements = elements
info.mergingTmpIdx = mergingTmpIdx
info.dbInfo = dbInfo

return &info, nil
}

func getReorgInfoFromPartitions(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job, dbInfo *model.DBInfo, tbl table.Table, partitionIDs []int64, elements []*meta.Element) (*reorgInfo, error) {
var (
element *meta.Element
start kv.Key
end kv.Key
pid int64
info reorgInfo
)
if job.SnapshotVer == 0 {
info.first = true
if d.lease > 0 { // Only delay when it's not in test.
delayForAsyncCommit()
}
ver, err := getValidCurrentVersion(d.store)
if err != nil {
return nil, errors.Trace(err)
}
pid = partitionIDs[0]
tb := tbl.(table.PartitionedTable).GetPartition(pid)
start, end, err = getTableRange(ctx, d, tb, ver.Ver, job.Priority)
if err != nil {
return nil, errors.Trace(err)
}
logutil.BgLogger().Info("[ddl] job get table range",
zap.Int64("job ID", job.ID), zap.Int64("physical table ID", pid),
zap.String("start key", hex.EncodeToString(start)),
zap.String("end key", hex.EncodeToString(end)))

err = rh.InitDDLReorgHandle(job, start, end, pid, elements[0])
if err != nil {
return &info, errors.Trace(err)
}
// Update info should after data persistent.
job.SnapshotVer = ver.Ver
element = elements[0]
} else {
var err error
element, start, end, pid, err = rh.GetDDLReorgHandle(job)
>>>>>>> b73eb4bf4c (ddl: fix unexpect fail when create expression index (#39822))
if err != nil {
// If the reorg element doesn't exist, this reorg info should be saved by the older TiDB versions.
// It's compatible with the older TiDB versions.
Expand All @@ -678,6 +750,7 @@ func getReorgInfo(ctx *JobContext, d *ddlCtx, t *meta.Meta, job *model.Job, tbl
info.PhysicalTableID = pid
info.currElement = element
info.elements = elements
info.dbInfo = dbInfo

return &info, nil
}
Expand Down