From 183671bfa1675586d1e6c3579a7dc29feafb0b45 Mon Sep 17 00:00:00 2001 From: winkyao Date: Wed, 25 Jul 2018 20:26:16 +0800 Subject: [PATCH] ddl: fix a bug that we miss adding last handle index in some case. (#7142) (#7156) --- ddl/index.go | 82 +++++++++++++++++++++++++++++------------ ddl/integration_test.go | 14 +++++++ 2 files changed, 72 insertions(+), 24 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index 4228d1ec956e8..adb768b00867d 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -396,8 +396,9 @@ func (d *ddl) onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) { } const ( - defaultTaskHandleCnt = 128 - defaultWorkers = 16 + defaultWorkers = 16 + // DefaultTaskHandleCnt is default batch size of adding indices. + DefaultTaskHandleCnt = 128 ) // indexRecord is the record information of an index. @@ -444,7 +445,7 @@ func newAddIndexWorker(sessCtx sessionctx.Context, d *ddl, id int, t table.Table return &addIndexWorker{ id: id, d: d, - batchCnt: defaultTaskHandleCnt, + batchCnt: DefaultTaskHandleCnt, sessCtx: sessCtx, taskCh: make(chan *reorgIndexTask, 1), resultCh: make(chan *addIndexResult, 1), @@ -501,11 +502,37 @@ func (w *addIndexWorker) getIndexRecord(handle int64, recordKey []byte, rawRecor return idxRecord, nil } -func (w *addIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgIndexTask) ([]*indexRecord, bool, error) { +// getNextHandle gets next handle of entry that we are going to process. +func (w *addIndexWorker) getNextHandle(taskRange reorgIndexTask, taskDone bool) (nextHandle int64) { + if !taskDone { + // The task is not done. So we need to pick the last processed entry's handle and add one. + return w.idxRecords[len(w.idxRecords)-1].handle + 1 + } + + // The task is done. So we need to choose a handle outside this range. + // Some corner cases should be considered: + // - The end of task range is MaxInt64. + // - The end of the task is excluded in the range. + if taskRange.endHandle == math.MaxInt64 || !taskRange.endIncluded { + return taskRange.endHandle + } + + return taskRange.endHandle + 1 +} + +// fetchRowColVals fetch w.batchCnt count rows that need to backfill indices, and build the corresponding indexRecord slice. +// fetchRowColVals returns: +// 1. The corresponding indexRecord slice. +// 2. Next handle of entry that we need to process. +// 3. Boolean indicates whether the task is done. +// 4. error occurs in fetchRowColVals. nil if no error occurs. +func (w *addIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgIndexTask) ([]*indexRecord, int64, bool, error) { // TODO: use tableScan to prune columns. w.idxRecords = w.idxRecords[:0] startTime := time.Now() - handleOutOfRange := false + + // taskDone means that the added handle is out of taskRange.endHandle. + taskDone := false oprStartTime := time.Now() err := iterateSnapshotRows(w.sessCtx.GetStore(), w.table, txn.StartTS(), taskRange.startHandle, func(handle int64, recordKey kv.Key, rawRow []byte) (bool, error) { @@ -514,12 +541,12 @@ func (w *addIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgInde oprStartTime = oprEndTime if !taskRange.endIncluded { - handleOutOfRange = handle >= taskRange.endHandle + taskDone = handle >= taskRange.endHandle } else { - handleOutOfRange = handle > taskRange.endHandle + taskDone = handle > taskRange.endHandle } - if handleOutOfRange || len(w.idxRecords) >= w.batchCnt { + if taskDone || len(w.idxRecords) >= w.batchCnt { return false, nil } @@ -530,15 +557,19 @@ func (w *addIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgInde w.idxRecords = append(w.idxRecords, idxRecord) if handle == taskRange.endHandle { - // If !taskRange.endIncluded, we will not reach here when handle == taskRange.endHandle - handleOutOfRange = true + // If taskRange.endIncluded == false, we will not reach here when handle == taskRange.endHandle + taskDone = true return false, nil } return true, nil }) + if len(w.idxRecords) == 0 { + taskDone = true + } + log.Debugf("[ddl] txn %v fetches handle info %v, takes time %v", txn.StartTS(), taskRange, time.Since(startTime)) - return w.idxRecords, handleOutOfRange, errors.Trace(err) + return w.idxRecords, w.getNextHandle(taskRange, taskDone), taskDone, errors.Trace(err) } func (w *addIndexWorker) logSlowOperations(elapsed time.Duration, slowMsg string, threshold uint32) { @@ -555,13 +586,17 @@ func (w *addIndexWorker) logSlowOperations(elapsed time.Duration, slowMsg string // indicate that index columns values may changed, index is not allowed to be added, so the txn will rollback and retry. // backfillIndexInTxn will add w.batchCnt indices once, default value of w.batchCnt is 128. // TODO: make w.batchCnt can be modified by system variable. -func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (nextHandle int64, addedCount, scanCount int, errInTxn error) { - addedCount = 0 - scanCount = 0 +func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (nextHandle int64, taskDone bool, addedCount, scanCount int, errInTxn error) { oprStartTime := time.Now() errInTxn = kv.RunInNewTxn(w.sessCtx.GetStore(), true, func(txn kv.Transaction) error { + addedCount = 0 + scanCount = 0 txn.SetOption(kv.Priority, kv.PriorityLow) - idxRecords, handleOutOfRange, err := w.fetchRowColVals(txn, handleRange) + var ( + idxRecords []*indexRecord + err error + ) + idxRecords, nextHandle, taskDone, err = w.fetchRowColVals(txn, handleRange) if err != nil { return errors.Trace(err) } @@ -587,11 +622,6 @@ func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (nextHan addedCount++ } - if handleOutOfRange || len(idxRecords) == 0 { - nextHandle = handleRange.endHandle - } else { - nextHandle = idxRecords[len(idxRecords)-1].handle + 1 - } return nil }) w.logSlowOperations(time.Since(oprStartTime), "backfillIndexInTxn", 3000) @@ -607,7 +637,7 @@ func (w *addIndexWorker) handleBackfillTask(task *reorgIndexTask) *addIndexResul startTime := time.Now() for { addedCount := 0 - nextHandle, addedCount, scanCount, err := w.backfillIndexInTxn(handleRange) + nextHandle, taskDone, addedCount, scanCount, err := w.backfillIndexInTxn(handleRange) if err == nil { // Because reorgIndexTask may run a long time, // we should check whether this ddl job is still runnable. @@ -630,12 +660,16 @@ func (w *addIndexWorker) handleBackfillTask(task *reorgIndexTask) *addIndexResul } handleRange.startHandle = nextHandle - if handleRange.startHandle >= handleRange.endHandle { + if taskDone { break } } - log.Infof("[ddl-reorg] worker(%v), finish region ranges [%v,%v) addedCount:%v, scanCount:%v, nextHandle:%v, elapsed time(s):%v", - w.id, task.startHandle, task.endHandle, result.addedCount, result.scanCount, result.nextHandle, time.Since(startTime).Seconds()) + rightParenthesis := ")" + if task.endIncluded { + rightParenthesis = "]" + } + log.Infof("[ddl-reorg] worker(%v), finish region ranges [%v,%v%s, addedCount:%v, scanCount:%v, nextHandle:%v, elapsed time(s):%v", + w.id, task.startHandle, task.endHandle, rightParenthesis, result.addedCount, result.scanCount, result.nextHandle, time.Since(startTime).Seconds()) return result } diff --git a/ddl/integration_test.go b/ddl/integration_test.go index e37f2f3d2aa5b..2cacebb284be9 100644 --- a/ddl/integration_test.go +++ b/ddl/integration_test.go @@ -18,6 +18,7 @@ import ( "github.com/juju/errors" . "github.com/pingcap/check" + "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" @@ -88,6 +89,19 @@ func (s *testIntegrationSuite) TestCreateTableIfNotExists(c *C) { c.Assert(terror.ErrorEqual(infoschema.ErrTableExists, lastWarn.Err), IsTrue) } +func (s *testIntegrationSuite) TestEndIncluded(c *C) { + tk := testkit.NewTestKit(c, s.store) + + tk.MustExec("USE test") + tk.MustExec("create table t(a int, b int)") + for i := 0; i < ddl.DefaultTaskHandleCnt+1; i++ { + tk.MustExec("insert into t values(1, 1)") + } + tk.MustExec("alter table t add index b(b);") + tk.MustExec("admin check index t b") + tk.MustExec("admin check table t") +} + func newStoreWithBootstrap() (kv.Storage, *domain.Domain, error) { store, err := mockstore.NewMockTikvStore() if err != nil {