Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into gregwebs/retool
Browse files Browse the repository at this point in the history
  • Loading branch information
gregwebs committed Aug 2, 2018
2 parents c9fedb6 + cc1c3be commit 11e4444
Show file tree
Hide file tree
Showing 49 changed files with 614 additions and 170 deletions.
2 changes: 1 addition & 1 deletion ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2229,7 +2229,7 @@ func (s *testDBSuite) getMaxTableRowID(ctx *testMaxTableRowIDContext) (int64, bo
tbl := ctx.tbl
curVer, err := s.store.CurrentVersion()
c.Assert(err, IsNil)
maxID, emptyTable, err := d.GetTableMaxRowID(curVer.Ver, tbl.Meta(), tbl.Meta().ID)
maxID, emptyTable, err := d.GetTableMaxRowID(curVer.Ver, tbl)
c.Assert(err, IsNil)
return maxID, emptyTable
}
Expand Down
3 changes: 2 additions & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/terror"
log "github.com/sirupsen/logrus"
"github.com/twinj/uuid"
Expand Down Expand Up @@ -216,7 +217,7 @@ type DDL interface {
// OwnerManager gets the owner manager.
OwnerManager() owner.Manager
// GetTableMaxRowID gets the max row ID of a normal table or a partition.
GetTableMaxRowID(startTS uint64, tblInfo *model.TableInfo, id int64) (int64, bool, error)
GetTableMaxRowID(startTS uint64, tbl table.Table) (int64, bool, error)
// SetBinlogClient sets the binlog client for DDL worker. It's exported for testing.
SetBinlogClient(interface{})
}
Expand Down
90 changes: 53 additions & 37 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ type addIndexWorker struct {
sessCtx sessionctx.Context
taskCh chan *reorgIndexTask
resultCh chan *addIndexResult
indexInfo *model.IndexInfo
index table.Index
table table.Table
colFieldMap map[int64]*types.FieldType
closed bool
Expand All @@ -477,6 +477,7 @@ type reorgIndexTask struct {
partitionID int64
startHandle int64
endHandle int64
// endIncluded indicates whether the range include the endHandle.
// When the last handle is math.MaxInt64, set endIncluded to true to
// tell worker backfilling index of endHandle.
endIncluded bool
Expand All @@ -490,14 +491,15 @@ type addIndexResult struct {
}

func newAddIndexWorker(sessCtx sessionctx.Context, worker *worker, id int, t table.Table, indexInfo *model.IndexInfo, colFieldMap map[int64]*types.FieldType) *addIndexWorker {
index := tables.NewIndex(t.GetID(), t.Meta(), indexInfo)
return &addIndexWorker{
id: id,
ddlWorker: worker,
batchCnt: DefaultTaskHandleCnt,
sessCtx: sessCtx,
taskCh: make(chan *reorgIndexTask, 1),
resultCh: make(chan *addIndexResult, 1),
indexInfo: indexInfo,
index: index,
table: t,
colFieldMap: colFieldMap,

Expand All @@ -514,10 +516,10 @@ func (w *addIndexWorker) close() {
}

// getIndexRecord gets index columns values from raw binary value row.
func (w *addIndexWorker) getIndexRecord(index table.Index, handle int64, recordKey []byte, rawRecord []byte) (*indexRecord, error) {
func (w *addIndexWorker) getIndexRecord(handle int64, recordKey []byte, rawRecord []byte) (*indexRecord, error) {
t := w.table
cols := t.Cols()
idxInfo := index.Meta()
idxInfo := w.index.Meta()
_, err := tablecodec.DecodeRowWithMap(rawRecord, w.colFieldMap, time.UTC, w.rowMap)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -574,15 +576,15 @@ func (w *addIndexWorker) getNextHandle(taskRange reorgIndexTask, taskDone bool)
// 2. Next handle of entry that we need to process.
// 3. Boolean indicates whether the task is done.
// 4. error occurs in fetchRowColVals. nil if no error occurs.
func (w *addIndexWorker) fetchRowColVals(txn kv.Transaction, index table.Index, taskRange reorgIndexTask) ([]*indexRecord, int64, bool, error) {
func (w *addIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgIndexTask) ([]*indexRecord, int64, bool, error) {
// TODO: use tableScan to prune columns.
w.idxRecords = w.idxRecords[:0]
startTime := time.Now()

// taskDone means that the added handle is out of taskRange.endHandle.
taskDone := false
oprStartTime := startTime
err := iterateSnapshotRows(w.sessCtx.GetStore(), taskRange.partitionID, txn.StartTS(), taskRange.startHandle,
err := iterateSnapshotRows(w.sessCtx.GetStore(), w.table, txn.StartTS(), taskRange.startHandle,
func(handle int64, recordKey kv.Key, rawRow []byte) (bool, error) {
oprEndTime := time.Now()
w.logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotRows in fetchRowColVals", 0)
Expand All @@ -598,7 +600,7 @@ func (w *addIndexWorker) fetchRowColVals(txn kv.Transaction, index table.Index,
return false, nil
}

idxRecord, err1 := w.getIndexRecord(index, handle, recordKey, rawRow)
idxRecord, err1 := w.getIndexRecord(handle, recordKey, rawRow)
if err1 != nil {
return false, errors.Trace(err1)
}
Expand Down Expand Up @@ -634,7 +636,7 @@ func (w *addIndexWorker) logSlowOperations(elapsed time.Duration, slowMsg string
// indicate that index columns values may changed, index is not allowed to be added, so the txn will rollback and retry.
// backfillIndexInTxn will add w.batchCnt indices once, default value of w.batchCnt is 128.
// TODO: make w.batchCnt can be modified by system variable.
func (w *addIndexWorker) backfillIndexInTxn(index table.Index, handleRange reorgIndexTask) (nextHandle int64, taskDone bool, addedCount, scanCount int, errInTxn error) {
func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (nextHandle int64, taskDone bool, addedCount, scanCount int, errInTxn error) {
oprStartTime := time.Now()
errInTxn = kv.RunInNewTxn(w.sessCtx.GetStore(), true, func(txn kv.Transaction) error {
addedCount = 0
Expand All @@ -644,7 +646,7 @@ func (w *addIndexWorker) backfillIndexInTxn(index table.Index, handleRange reorg
idxRecords []*indexRecord
err error
)
idxRecords, nextHandle, taskDone, err = w.fetchRowColVals(txn, index, handleRange)
idxRecords, nextHandle, taskDone, err = w.fetchRowColVals(txn, handleRange)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -658,7 +660,7 @@ func (w *addIndexWorker) backfillIndexInTxn(index table.Index, handleRange reorg

// Create the index.
// TODO: backfill unique-key will check constraint every row, we can speed up this case by using batch check.
handle, err := index.Create(w.sessCtx, txn, idxRecord.vals, idxRecord.handle)
handle, err := w.index.Create(w.sessCtx, txn, idxRecord.vals, idxRecord.handle)
if err != nil {
if kv.ErrKeyExists.Equal(err) && idxRecord.handle == handle {
// Index already exists, skip it.
Expand All @@ -683,11 +685,10 @@ func (w *addIndexWorker) handleBackfillTask(d *ddlCtx, task *reorgIndexTask) *ad
result := &addIndexResult{addedCount: 0, nextHandle: handleRange.startHandle, err: nil}
lastLogCount := 0
startTime := time.Now()
index := tables.NewIndex(task.partitionID, w.table.Meta(), w.indexInfo)

for {
addedCount := 0
nextHandle, taskDone, addedCount, scanCount, err := w.backfillIndexInTxn(index, handleRange)
nextHandle, taskDone, addedCount, scanCount, err := w.backfillIndexInTxn(handleRange)
if err == nil {
// Because reorgIndexTask may run a long time,
// we should check whether this ddl job is still runnable.
Expand Down Expand Up @@ -770,11 +771,12 @@ func makeupIndexColFieldMap(t table.Table, indexInfo *model.IndexInfo) map[int64

// splitTableRanges uses PD region's key ranges to split the backfilling table key range space,
// to speed up adding index in table with disperse handle.
func splitTableRanges(t table.Table, store kv.Storage, startHandle, endHandle, partitionID int64) ([]kv.KeyRange, error) {
startRecordKey := tablecodec.EncodeRowKeyWithHandle(partitionID, startHandle)
endRecordKey := tablecodec.EncodeRowKeyWithHandle(partitionID, endHandle).Next()
// The `t` should be a non-partitioned table or a partition.
func splitTableRanges(t table.Table, store kv.Storage, startHandle, endHandle int64) ([]kv.KeyRange, error) {
startRecordKey := t.RecordKey(startHandle)
endRecordKey := t.RecordKey(endHandle).Next()

log.Infof("[ddl-reorg] split partition %v range [%v, %v] from PD", partitionID, startHandle, endHandle)
log.Infof("[ddl-reorg] split partition %v range [%v, %v] from PD", t.GetID(), startHandle, endHandle)
kvRange := kv.KeyRange{StartKey: startRecordKey, EndKey: endRecordKey}
s, ok := store.(tikv.Storage)
if !ok {
Expand Down Expand Up @@ -928,7 +930,7 @@ func (w *worker) buildIndexForReorgInfo(t table.Table, workers []*addIndexWorker

startHandle, endHandle := reorgInfo.StartHandle, reorgInfo.EndHandle
for {
kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startHandle, endHandle, reorgInfo.PartitionID)
kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startHandle, endHandle)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -950,7 +952,9 @@ func (w *worker) buildIndexForReorgInfo(t table.Table, workers []*addIndexWorker
return nil
}

// addTableIndex adds index into table.
// addPhysicalTableIndex handles the add index reorganization state for a non-partitioned table or a partition.
// For a partitioned table, it should be handled partition by partition.
//
// How to add index in reorganization state?
// Concurrently process the defaultTaskHandleCnt tasks. Each task deals with a handle range of the index record.
// The handle range is split from PD regions now. Each worker deal with a region table key range one time.
Expand All @@ -962,7 +966,7 @@ func (w *worker) buildIndexForReorgInfo(t table.Table, workers []*addIndexWorker
// 4. Wait all these running tasks finished, then continue to step 3, until all tasks is done.
// The above operations are completed in a transaction.
// Finally, update the concurrent processing of the total number of rows, and store the completed handle value.
func (w *worker) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo *reorgInfo) error {
func (w *worker) addPhysicalTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo *reorgInfo) error {
job := reorgInfo.Job
log.Infof("[ddl-reorg] addTableIndex, job:%s, reorgInfo:%#v", job, reorgInfo)
colFieldMap := makeupIndexColFieldMap(t, indexInfo)
Expand All @@ -976,26 +980,39 @@ func (w *worker) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgI
go idxWorkers[i].run(reorgInfo.d)
}
defer closeAddIndexWorkers(idxWorkers)
err := w.buildIndexForReorgInfo(t, idxWorkers, job, reorgInfo)
return errors.Trace(err)
}

finish := false
for !finish {
err := w.buildIndexForReorgInfo(t, idxWorkers, job, reorgInfo)
if err != nil {
return errors.Trace(err)
}

finish, err = w.updateReorgInfo(t, reorgInfo)
if err != nil {
return errors.Trace(err)
// addTableIndex handles the add index reorganization state for a table.
func (w *worker) addTableIndex(t table.Table, idx *model.IndexInfo, reorgInfo *reorgInfo) error {
var err error
if tbl, ok := t.(table.PartitionedTable); ok {
var finish bool
for !finish {
p := tbl.GetPartition(reorgInfo.PartitionID)
if p == nil {
return errors.Errorf("Can not find partition id %d for table %d", reorgInfo.PartitionID, t.Meta().ID)
}
err = w.addPhysicalTableIndex(p, idx, reorgInfo)
if err != nil {
break
}
finish, err = w.updateReorgInfo(tbl, reorgInfo)
if err != nil {
return errors.Trace(err)
}
}
} else {
err = w.addPhysicalTableIndex(t, idx, reorgInfo)
}
return nil
return errors.Trace(err)
}

// updateReorgInfo will find the next partition according to current reorgInfo.
// If no more partitions, or table t is not a partitioned table, returns true to
// indicate that the reorganize work is finished.
func (w *worker) updateReorgInfo(t table.Table, reorg *reorgInfo) (bool, error) {
func (w *worker) updateReorgInfo(t table.PartitionedTable, reorg *reorgInfo) (bool, error) {
pi := t.Meta().GetPartitionInfo()
if pi == nil {
return true, nil
Expand All @@ -1012,7 +1029,7 @@ func (w *worker) updateReorgInfo(t table.Table, reorg *reorgInfo) (bool, error)
return true, nil
}

start, end, err := getTableRange(reorg.d, t.Meta(), pid, reorg.Job.SnapshotVer)
start, end, err := getTableRange(reorg.d, t.GetPartition(pid), reorg.Job.SnapshotVer)
if err != nil {
return false, errors.Trace(err)
}
Expand Down Expand Up @@ -1057,24 +1074,23 @@ func allocateIndexID(tblInfo *model.TableInfo) int64 {
// recordIterFunc is used for low-level record iteration.
type recordIterFunc func(h int64, rowKey kv.Key, rawRecord []byte) (more bool, err error)

func iterateSnapshotRows(store kv.Storage, partitionID int64, version uint64, seekHandle int64, fn recordIterFunc) error {
func iterateSnapshotRows(store kv.Storage, t table.Table, version uint64, seekHandle int64, fn recordIterFunc) error {
ver := kv.Version{Ver: version}

snap, err := store.GetSnapshot(ver)
snap.SetPriority(kv.PriorityLow)
if err != nil {
return errors.Trace(err)
}
firstKey := tablecodec.EncodeRowKeyWithHandle(partitionID, seekHandle)
firstKey := t.RecordKey(seekHandle)
it, err := snap.Seek(firstKey)
if err != nil {
return errors.Trace(err)
}
defer it.Close()

recordPrefix := tablecodec.GenTableRecordPrefix(partitionID)
for it.Valid() {
if !it.Key().HasPrefix(recordPrefix) {
if !it.Key().HasPrefix(t.RecordPrefix()) {
break
}

Expand All @@ -1083,7 +1099,7 @@ func iterateSnapshotRows(store kv.Storage, partitionID int64, version uint64, se
if err != nil {
return errors.Trace(err)
}
rk := tablecodec.EncodeRecordKey(recordPrefix, handle)
rk := t.RecordKey(handle)

more, err := fn(handle, rk, it.Value())
if !more || err != nil {
Expand Down
35 changes: 19 additions & 16 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ type reorgInfo struct {
EndHandle int64
d *ddlCtx
first bool
// PartitionID is used for partitioned table.
// DDL reorganize for a partitioned table will handle partitions one by one,
// PartitionID is used to trace the current partition we are handling.
// If the table is not partitioned, PartitionID would be TableID.
Expand All @@ -207,7 +208,7 @@ func constructLimitPB(count uint64) *tipb.Executor {
return &tipb.Executor{Tp: tipb.ExecType_TypeLimit, Limit: limitExec}
}

func buildDescTableScanDAG(startTS uint64, tblInfo *model.TableInfo, partitionID int64, columns []*model.ColumnInfo, limit uint64) (*tipb.DAGRequest, error) {
func buildDescTableScanDAG(startTS uint64, tbl table.Table, columns []*model.ColumnInfo, limit uint64) (*tipb.DAGRequest, error) {
dagReq := &tipb.DAGRequest{}
dagReq.StartTs = startTS
_, timeZoneOffset := time.Now().In(time.UTC).Zone()
Expand All @@ -217,8 +218,8 @@ func buildDescTableScanDAG(startTS uint64, tblInfo *model.TableInfo, partitionID
}
dagReq.Flags |= model.FlagInSelectStmt

pbColumnInfos := model.ColumnsToProto(columns, tblInfo.PKIsHandle)
tblScanExec := constructDescTableScanPB(partitionID, pbColumnInfos)
pbColumnInfos := model.ColumnsToProto(columns, tbl.Meta().PKIsHandle)
tblScanExec := constructDescTableScanPB(tbl.GetID(), pbColumnInfos)
dagReq.Executors = append(dagReq.Executors, tblScanExec)
dagReq.Executors = append(dagReq.Executors, constructLimitPB(limit))
return dagReq, nil
Expand All @@ -232,15 +233,15 @@ func getColumnsTypes(columns []*model.ColumnInfo) []*types.FieldType {
return colTypes
}

// builds a desc table scan upon tblInfo.
func (d *ddlCtx) buildDescTableScan(ctx context.Context, startTS uint64, tblInfo *model.TableInfo, partitionID int64, columns []*model.ColumnInfo, limit uint64) (distsql.SelectResult, error) {
dagPB, err := buildDescTableScanDAG(startTS, tblInfo, partitionID, columns, limit)
// buildDescTableScan builds a desc table scan upon tblInfo.
func (d *ddlCtx) buildDescTableScan(ctx context.Context, startTS uint64, tbl table.Table, columns []*model.ColumnInfo, limit uint64) (distsql.SelectResult, error) {
dagPB, err := buildDescTableScanDAG(startTS, tbl, columns, limit)
if err != nil {
return nil, errors.Trace(err)
}
ranges := ranger.FullIntRange(false)
var builder distsql.RequestBuilder
builder.SetTableRanges(partitionID, ranges, nil).
builder.SetTableRanges(tbl.GetID(), ranges, nil).
SetDAGRequest(dagPB).
SetKeepOrder(true).
SetConcurrency(1).SetDesc(true)
Expand All @@ -260,11 +261,11 @@ func (d *ddlCtx) buildDescTableScan(ctx context.Context, startTS uint64, tblInfo
}

// GetTableMaxRowID gets the last row id of the table partition.
func (d *ddlCtx) GetTableMaxRowID(startTS uint64, tblInfo *model.TableInfo, partitionID int64) (maxRowID int64, emptyTable bool, err error) {
func (d *ddlCtx) GetTableMaxRowID(startTS uint64, tbl table.Table) (maxRowID int64, emptyTable bool, err error) {
maxRowID = int64(math.MaxInt64)
var columns []*model.ColumnInfo
if tblInfo.PKIsHandle {
for _, col := range tblInfo.Columns {
if tbl.Meta().PKIsHandle {
for _, col := range tbl.Meta().Columns {
if mysql.HasPriKeyFlag(col.Flag) {
columns = []*model.ColumnInfo{col}
break
Expand All @@ -276,7 +277,7 @@ func (d *ddlCtx) GetTableMaxRowID(startTS uint64, tblInfo *model.TableInfo, part

ctx := context.Background()
// build a desc scan of tblInfo, which limit is 1, we can use it to retrive the last handle of the table.
result, err := d.buildDescTableScan(ctx, startTS, tblInfo, partitionID, columns, 1)
result, err := d.buildDescTableScan(ctx, startTS, tbl, columns, 1)
if err != nil {
return maxRowID, false, errors.Trace(err)
}
Expand All @@ -300,11 +301,11 @@ func (d *ddlCtx) GetTableMaxRowID(startTS uint64, tblInfo *model.TableInfo, part
var gofailOnceGuard bool

// getTableRange gets the start and end handle of a table (or partition).
func getTableRange(d *ddlCtx, tblInfo *model.TableInfo, partitionID int64, snapshotVer uint64) (startHandle, endHandle int64, err error) {
func getTableRange(d *ddlCtx, tbl table.Table, snapshotVer uint64) (startHandle, endHandle int64, err error) {
startHandle = math.MinInt64
endHandle = math.MaxInt64
// Get the start handle of this partition.
err = iterateSnapshotRows(d.store, partitionID, snapshotVer, math.MinInt64,
err = iterateSnapshotRows(d.store, tbl, snapshotVer, math.MinInt64,
func(h int64, rowKey kv.Key, rawRecord []byte) (bool, error) {
startHandle = h
return false, nil
Expand All @@ -314,12 +315,12 @@ func getTableRange(d *ddlCtx, tblInfo *model.TableInfo, partitionID int64, snaps
}
var emptyTable bool
// Get the end handle of this partition.
endHandle, emptyTable, err = d.GetTableMaxRowID(snapshotVer, tblInfo, partitionID)
endHandle, emptyTable, err = d.GetTableMaxRowID(snapshotVer, tbl)
if err != nil {
return 0, 0, errors.Trace(err)
}
if endHandle < startHandle || emptyTable {
log.Infof("[ddl-reorg] get table range %v endHandle < startHandle partition %d [%d %d]", tblInfo, partitionID, endHandle, startHandle)
log.Infof("[ddl-reorg] get table range %v endHandle < startHandle partition %d [%d %d]", tbl.Meta(), tbl.GetID(), endHandle, startHandle)
endHandle = startHandle
}
return
Expand All @@ -346,10 +347,12 @@ func getReorgInfo(d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table) (*re
}
tblInfo := tbl.Meta()
pid = tblInfo.ID
tp := tbl
if pi := tblInfo.GetPartitionInfo(); pi != nil {
pid = pi.Definitions[0].ID
tp = tbl.(table.PartitionedTable).GetPartition(pid)
}
start, end, err = getTableRange(d, tblInfo, pid, ver.Ver)
start, end, err = getTableRange(d, tp, ver.Ver)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 11e4444

Please sign in to comment.