Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#39822
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
Defined2014 authored and ti-chi-bot committed Dec 13, 2022
1 parent 115e63d commit 243e7b6
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 3 deletions.
5 changes: 2 additions & 3 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,8 +528,7 @@ func loadDDLReorgVars(w *worker) error {
return ddlutil.LoadDDLReorgVars(w.ddlJobCtx, ctx)
}

func makeupDecodeColMap(sessCtx sessionctx.Context, t table.Table) (map[int64]decoder.Column, error) {
dbName := model.NewCIStr(sessCtx.GetSessionVars().CurrentDB)
func makeupDecodeColMap(sessCtx sessionctx.Context, dbName model.CIStr, t table.Table) (map[int64]decoder.Column, error) {
writableColInfos := make([]*model.ColumnInfo, 0, len(t.WritableCols()))
for _, col := range t.WritableCols() {
writableColInfos = append(writableColInfos, col.ColumnInfo)
Expand Down Expand Up @@ -579,7 +578,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba

startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey
sessCtx := newContext(reorgInfo.d.store)
decodeColMap, err := makeupDecodeColMap(sessCtx, t)
decodeColMap, err := makeupDecodeColMap(sessCtx, reorgInfo.dbInfo.Name, t)
if err != nil {
return errors.Trace(err)
}
Expand Down
10 changes: 10 additions & 0 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,17 @@ func (w *worker) doModifyColumnTypeWithData(

func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table,
oldCol, changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) (done bool, ver int64, err error) {
<<<<<<< HEAD
reorgInfo, err := getReorgInfo(w.JobContext, d, t, job, tbl, BuildElements(changingCol, changingIdxs))
=======
job.ReorgMeta.ReorgTp = model.ReorgTypeTxn
rh := newReorgHandler(t, w.sess, w.concurrentDDL)
dbInfo, err := t.GetDatabase(job.SchemaID)
if err != nil {
return false, ver, errors.Trace(err)
}
reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, dbInfo, tbl, BuildElements(changingCol, changingIdxs), false)
>>>>>>> b73eb4bf4c (ddl: fix unexpect fail when create expression index (#39822))
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
Expand Down
8 changes: 8 additions & 0 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1794,6 +1794,14 @@ func (s *stateChangeSuite) TestCreateExpressionIndex() {
s.Require().NoError(checkErr)
tk.MustExec("admin check table t")
tk.MustQuery("select * from t order by a, b").Check(testkit.Rows("0 9", "0 11", "0 11", "1 7", "2 7", "5 7", "8 8", "10 10", "10 10"))

// https://github.com/pingcap/tidb/issues/39784
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(name varchar(20))")
tk.MustExec("insert into t values ('Abc'), ('Bcd'), ('abc')")
tk.MustExec("create index idx on test.t((lower(test.t.name)))")
tk.MustExec("admin check table t")
}

func (s *stateChangeSuite) TestCreateUniqueExpressionIndex() {
Expand Down
9 changes: 9 additions & 0 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,16 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) {
elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}}
<<<<<<< HEAD
reorgInfo, err := getReorgInfo(w.JobContext, d, t, job, tbl, elements)
=======
rh := newReorgHandler(t, w.sess, w.concurrentDDL)
dbInfo, err := t.GetDatabase(job.SchemaID)
if err != nil {
return false, ver, errors.Trace(err)
}
reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, dbInfo, tbl, elements, mergingTmpIdx)
>>>>>>> b73eb4bf4c (ddl: fix unexpect fail when create expression index (#39822))
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
Expand Down
9 changes: 9 additions & 0 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1153,6 +1153,10 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
if err != nil {
return ver, errors.Trace(err)
}
dbInfo, err := t.GetDatabase(job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}
// If table has global indexes, we need reorg to clean up them.
if pt, ok := tbl.(table.PartitionedTable); ok && hasGlobalIndex(tblInfo) {
// Build elements for compatible with modify column type. elements will not be used when reorganizing.
Expand All @@ -1162,7 +1166,12 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
elements = append(elements, &meta.Element{ID: idxInfo.ID, TypeKey: meta.IndexElementKey})
}
}
<<<<<<< HEAD
reorgInfo, err := getReorgInfoFromPartitions(w.JobContext, d, t, job, tbl, physicalTableIDs, elements)
=======
rh := newReorgHandler(t, w.sess, w.concurrentDDL)
reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job), d, rh, job, dbInfo, tbl, physicalTableIDs, elements)
>>>>>>> b73eb4bf4c (ddl: fix unexpect fail when create expression index (#39822))

if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
Expand Down
73 changes: 73 additions & 0 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ type reorgInfo struct {
// PhysicalTableID is used to trace the current partition we are handling.
// If the table is not partitioned, PhysicalTableID would be TableID.
PhysicalTableID int64
dbInfo *model.DBInfo
elements []*meta.Element
currElement *meta.Element
}
Expand Down Expand Up @@ -590,7 +591,12 @@ func getValidCurrentVersion(store kv.Storage) (ver kv.Version, err error) {
return ver, nil
}

<<<<<<< HEAD
func getReorgInfo(ctx *JobContext, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, elements []*meta.Element) (*reorgInfo, error) {
=======
func getReorgInfo(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job, dbInfo *model.DBInfo,
tbl table.Table, elements []*meta.Element, mergingTmpIdx bool) (*reorgInfo, error) {
>>>>>>> b73eb4bf4c (ddl: fix unexpect fail when create expression index (#39822))
var (
element *meta.Element
start kv.Key
Expand Down Expand Up @@ -659,7 +665,73 @@ func getReorgInfo(ctx *JobContext, d *ddlCtx, t *meta.Meta, job *model.Job, tbl
})

var err error
<<<<<<< HEAD
element, start, end, pid, err = t.GetDDLReorgHandle(job)
=======
element, start, end, pid, err = rh.GetDDLReorgHandle(job)
if err != nil {
// If the reorg element doesn't exist, this reorg info should be saved by the older TiDB versions.
// It's compatible with the older TiDB versions.
// We'll try to remove it in the next major TiDB version.
if meta.ErrDDLReorgElementNotExist.Equal(err) {
job.SnapshotVer = 0
logutil.BgLogger().Warn("[ddl] get reorg info, the element does not exist", zap.String("job", job.String()), zap.Bool("enableConcurrentDDL", rh.enableConcurrentDDL))
}
return &info, errors.Trace(err)
}
}
info.Job = job
info.d = d
info.StartKey = start
info.EndKey = end
info.PhysicalTableID = pid
info.currElement = element
info.elements = elements
info.mergingTmpIdx = mergingTmpIdx
info.dbInfo = dbInfo

return &info, nil
}

func getReorgInfoFromPartitions(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job, dbInfo *model.DBInfo, tbl table.Table, partitionIDs []int64, elements []*meta.Element) (*reorgInfo, error) {
var (
element *meta.Element
start kv.Key
end kv.Key
pid int64
info reorgInfo
)
if job.SnapshotVer == 0 {
info.first = true
if d.lease > 0 { // Only delay when it's not in test.
delayForAsyncCommit()
}
ver, err := getValidCurrentVersion(d.store)
if err != nil {
return nil, errors.Trace(err)
}
pid = partitionIDs[0]
tb := tbl.(table.PartitionedTable).GetPartition(pid)
start, end, err = getTableRange(ctx, d, tb, ver.Ver, job.Priority)
if err != nil {
return nil, errors.Trace(err)
}
logutil.BgLogger().Info("[ddl] job get table range",
zap.Int64("job ID", job.ID), zap.Int64("physical table ID", pid),
zap.String("start key", hex.EncodeToString(start)),
zap.String("end key", hex.EncodeToString(end)))

err = rh.InitDDLReorgHandle(job, start, end, pid, elements[0])
if err != nil {
return &info, errors.Trace(err)
}
// Update info should after data persistent.
job.SnapshotVer = ver.Ver
element = elements[0]
} else {
var err error
element, start, end, pid, err = rh.GetDDLReorgHandle(job)
>>>>>>> b73eb4bf4c (ddl: fix unexpect fail when create expression index (#39822))
if err != nil {
// If the reorg element doesn't exist, this reorg info should be saved by the older TiDB versions.
// It's compatible with the older TiDB versions.
Expand All @@ -678,6 +750,7 @@ func getReorgInfo(ctx *JobContext, d *ddlCtx, t *meta.Meta, job *model.Job, tbl
info.PhysicalTableID = pid
info.currElement = element
info.elements = elements
info.dbInfo = dbInfo

return &info, nil
}
Expand Down

0 comments on commit 243e7b6

Please sign in to comment.