From 5944432c0d3327df2bb01d6eeb245be0b7c5852d Mon Sep 17 00:00:00 2001 From: Shen Li Date: Wed, 1 Aug 2018 11:17:49 +0800 Subject: [PATCH 01/13] ddl: Refactor add index reorganization related code (#7184) --- ddl/db_test.go | 2 +- ddl/ddl.go | 3 +- ddl/index.go | 90 +++++++++++++++++++++++++++++--------------------- ddl/reorg.go | 35 +++++++++++--------- 4 files changed, 75 insertions(+), 55 deletions(-) diff --git a/ddl/db_test.go b/ddl/db_test.go index 3a662005c80eb..eab38babeebef 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -2229,7 +2229,7 @@ func (s *testDBSuite) getMaxTableRowID(ctx *testMaxTableRowIDContext) (int64, bo tbl := ctx.tbl curVer, err := s.store.CurrentVersion() c.Assert(err, IsNil) - maxID, emptyTable, err := d.GetTableMaxRowID(curVer.Ver, tbl.Meta(), tbl.Meta().ID) + maxID, emptyTable, err := d.GetTableMaxRowID(curVer.Ver, tbl) c.Assert(err, IsNil) return maxID, emptyTable } diff --git a/ddl/ddl.go b/ddl/ddl.go index 2943df74376c4..0661b40d4605b 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -37,6 +37,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/terror" log "github.com/sirupsen/logrus" "github.com/twinj/uuid" @@ -216,7 +217,7 @@ type DDL interface { // OwnerManager gets the owner manager. OwnerManager() owner.Manager // GetTableMaxRowID gets the max row ID of a normal table or a partition. - GetTableMaxRowID(startTS uint64, tblInfo *model.TableInfo, id int64) (int64, bool, error) + GetTableMaxRowID(startTS uint64, tbl table.Table) (int64, bool, error) // SetBinlogClient sets the binlog client for DDL worker. It's exported for testing. SetBinlogClient(interface{}) } diff --git a/ddl/index.go b/ddl/index.go index 38f4794836807..ff7b3ad47150e 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -463,7 +463,7 @@ type addIndexWorker struct { sessCtx sessionctx.Context taskCh chan *reorgIndexTask resultCh chan *addIndexResult - indexInfo *model.IndexInfo + index table.Index table table.Table colFieldMap map[int64]*types.FieldType closed bool @@ -477,6 +477,7 @@ type reorgIndexTask struct { partitionID int64 startHandle int64 endHandle int64 + // endIncluded indicates whether the range include the endHandle. // When the last handle is math.MaxInt64, set endIncluded to true to // tell worker backfilling index of endHandle. endIncluded bool @@ -490,6 +491,7 @@ type addIndexResult struct { } func newAddIndexWorker(sessCtx sessionctx.Context, worker *worker, id int, t table.Table, indexInfo *model.IndexInfo, colFieldMap map[int64]*types.FieldType) *addIndexWorker { + index := tables.NewIndex(t.GetID(), t.Meta(), indexInfo) return &addIndexWorker{ id: id, ddlWorker: worker, @@ -497,7 +499,7 @@ func newAddIndexWorker(sessCtx sessionctx.Context, worker *worker, id int, t tab sessCtx: sessCtx, taskCh: make(chan *reorgIndexTask, 1), resultCh: make(chan *addIndexResult, 1), - indexInfo: indexInfo, + index: index, table: t, colFieldMap: colFieldMap, @@ -514,10 +516,10 @@ func (w *addIndexWorker) close() { } // getIndexRecord gets index columns values from raw binary value row. -func (w *addIndexWorker) getIndexRecord(index table.Index, handle int64, recordKey []byte, rawRecord []byte) (*indexRecord, error) { +func (w *addIndexWorker) getIndexRecord(handle int64, recordKey []byte, rawRecord []byte) (*indexRecord, error) { t := w.table cols := t.Cols() - idxInfo := index.Meta() + idxInfo := w.index.Meta() _, err := tablecodec.DecodeRowWithMap(rawRecord, w.colFieldMap, time.UTC, w.rowMap) if err != nil { return nil, errors.Trace(err) @@ -574,7 +576,7 @@ func (w *addIndexWorker) getNextHandle(taskRange reorgIndexTask, taskDone bool) // 2. Next handle of entry that we need to process. // 3. Boolean indicates whether the task is done. // 4. error occurs in fetchRowColVals. nil if no error occurs. -func (w *addIndexWorker) fetchRowColVals(txn kv.Transaction, index table.Index, taskRange reorgIndexTask) ([]*indexRecord, int64, bool, error) { +func (w *addIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgIndexTask) ([]*indexRecord, int64, bool, error) { // TODO: use tableScan to prune columns. w.idxRecords = w.idxRecords[:0] startTime := time.Now() @@ -582,7 +584,7 @@ func (w *addIndexWorker) fetchRowColVals(txn kv.Transaction, index table.Index, // taskDone means that the added handle is out of taskRange.endHandle. taskDone := false oprStartTime := startTime - err := iterateSnapshotRows(w.sessCtx.GetStore(), taskRange.partitionID, txn.StartTS(), taskRange.startHandle, + err := iterateSnapshotRows(w.sessCtx.GetStore(), w.table, txn.StartTS(), taskRange.startHandle, func(handle int64, recordKey kv.Key, rawRow []byte) (bool, error) { oprEndTime := time.Now() w.logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotRows in fetchRowColVals", 0) @@ -598,7 +600,7 @@ func (w *addIndexWorker) fetchRowColVals(txn kv.Transaction, index table.Index, return false, nil } - idxRecord, err1 := w.getIndexRecord(index, handle, recordKey, rawRow) + idxRecord, err1 := w.getIndexRecord(handle, recordKey, rawRow) if err1 != nil { return false, errors.Trace(err1) } @@ -634,7 +636,7 @@ func (w *addIndexWorker) logSlowOperations(elapsed time.Duration, slowMsg string // indicate that index columns values may changed, index is not allowed to be added, so the txn will rollback and retry. // backfillIndexInTxn will add w.batchCnt indices once, default value of w.batchCnt is 128. // TODO: make w.batchCnt can be modified by system variable. -func (w *addIndexWorker) backfillIndexInTxn(index table.Index, handleRange reorgIndexTask) (nextHandle int64, taskDone bool, addedCount, scanCount int, errInTxn error) { +func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (nextHandle int64, taskDone bool, addedCount, scanCount int, errInTxn error) { oprStartTime := time.Now() errInTxn = kv.RunInNewTxn(w.sessCtx.GetStore(), true, func(txn kv.Transaction) error { addedCount = 0 @@ -644,7 +646,7 @@ func (w *addIndexWorker) backfillIndexInTxn(index table.Index, handleRange reorg idxRecords []*indexRecord err error ) - idxRecords, nextHandle, taskDone, err = w.fetchRowColVals(txn, index, handleRange) + idxRecords, nextHandle, taskDone, err = w.fetchRowColVals(txn, handleRange) if err != nil { return errors.Trace(err) } @@ -658,7 +660,7 @@ func (w *addIndexWorker) backfillIndexInTxn(index table.Index, handleRange reorg // Create the index. // TODO: backfill unique-key will check constraint every row, we can speed up this case by using batch check. - handle, err := index.Create(w.sessCtx, txn, idxRecord.vals, idxRecord.handle) + handle, err := w.index.Create(w.sessCtx, txn, idxRecord.vals, idxRecord.handle) if err != nil { if kv.ErrKeyExists.Equal(err) && idxRecord.handle == handle { // Index already exists, skip it. @@ -683,11 +685,10 @@ func (w *addIndexWorker) handleBackfillTask(d *ddlCtx, task *reorgIndexTask) *ad result := &addIndexResult{addedCount: 0, nextHandle: handleRange.startHandle, err: nil} lastLogCount := 0 startTime := time.Now() - index := tables.NewIndex(task.partitionID, w.table.Meta(), w.indexInfo) for { addedCount := 0 - nextHandle, taskDone, addedCount, scanCount, err := w.backfillIndexInTxn(index, handleRange) + nextHandle, taskDone, addedCount, scanCount, err := w.backfillIndexInTxn(handleRange) if err == nil { // Because reorgIndexTask may run a long time, // we should check whether this ddl job is still runnable. @@ -770,11 +771,12 @@ func makeupIndexColFieldMap(t table.Table, indexInfo *model.IndexInfo) map[int64 // splitTableRanges uses PD region's key ranges to split the backfilling table key range space, // to speed up adding index in table with disperse handle. -func splitTableRanges(t table.Table, store kv.Storage, startHandle, endHandle, partitionID int64) ([]kv.KeyRange, error) { - startRecordKey := tablecodec.EncodeRowKeyWithHandle(partitionID, startHandle) - endRecordKey := tablecodec.EncodeRowKeyWithHandle(partitionID, endHandle).Next() +// The `t` should be a non-partitioned table or a partition. +func splitTableRanges(t table.Table, store kv.Storage, startHandle, endHandle int64) ([]kv.KeyRange, error) { + startRecordKey := t.RecordKey(startHandle) + endRecordKey := t.RecordKey(endHandle).Next() - log.Infof("[ddl-reorg] split partition %v range [%v, %v] from PD", partitionID, startHandle, endHandle) + log.Infof("[ddl-reorg] split partition %v range [%v, %v] from PD", t.GetID(), startHandle, endHandle) kvRange := kv.KeyRange{StartKey: startRecordKey, EndKey: endRecordKey} s, ok := store.(tikv.Storage) if !ok { @@ -928,7 +930,7 @@ func (w *worker) buildIndexForReorgInfo(t table.Table, workers []*addIndexWorker startHandle, endHandle := reorgInfo.StartHandle, reorgInfo.EndHandle for { - kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startHandle, endHandle, reorgInfo.PartitionID) + kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startHandle, endHandle) if err != nil { return errors.Trace(err) } @@ -950,7 +952,9 @@ func (w *worker) buildIndexForReorgInfo(t table.Table, workers []*addIndexWorker return nil } -// addTableIndex adds index into table. +// addPhysicalTableIndex handles the add index reorganization state for a non-partitioned table or a partition. +// For a partitioned table, it should be handled partition by partition. +// // How to add index in reorganization state? // Concurrently process the defaultTaskHandleCnt tasks. Each task deals with a handle range of the index record. // The handle range is split from PD regions now. Each worker deal with a region table key range one time. @@ -962,7 +966,7 @@ func (w *worker) buildIndexForReorgInfo(t table.Table, workers []*addIndexWorker // 4. Wait all these running tasks finished, then continue to step 3, until all tasks is done. // The above operations are completed in a transaction. // Finally, update the concurrent processing of the total number of rows, and store the completed handle value. -func (w *worker) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo *reorgInfo) error { +func (w *worker) addPhysicalTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo *reorgInfo) error { job := reorgInfo.Job log.Infof("[ddl-reorg] addTableIndex, job:%s, reorgInfo:%#v", job, reorgInfo) colFieldMap := makeupIndexColFieldMap(t, indexInfo) @@ -976,26 +980,39 @@ func (w *worker) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgI go idxWorkers[i].run(reorgInfo.d) } defer closeAddIndexWorkers(idxWorkers) + err := w.buildIndexForReorgInfo(t, idxWorkers, job, reorgInfo) + return errors.Trace(err) +} - finish := false - for !finish { - err := w.buildIndexForReorgInfo(t, idxWorkers, job, reorgInfo) - if err != nil { - return errors.Trace(err) - } - - finish, err = w.updateReorgInfo(t, reorgInfo) - if err != nil { - return errors.Trace(err) +// addTableIndex handles the add index reorganization state for a table. +func (w *worker) addTableIndex(t table.Table, idx *model.IndexInfo, reorgInfo *reorgInfo) error { + var err error + if tbl, ok := t.(table.PartitionedTable); ok { + var finish bool + for !finish { + p := tbl.GetPartition(reorgInfo.PartitionID) + if p == nil { + return errors.Errorf("Can not find partition id %d for table %d", reorgInfo.PartitionID, t.Meta().ID) + } + err = w.addPhysicalTableIndex(p, idx, reorgInfo) + if err != nil { + break + } + finish, err = w.updateReorgInfo(tbl, reorgInfo) + if err != nil { + return errors.Trace(err) + } } + } else { + err = w.addPhysicalTableIndex(t, idx, reorgInfo) } - return nil + return errors.Trace(err) } // updateReorgInfo will find the next partition according to current reorgInfo. // If no more partitions, or table t is not a partitioned table, returns true to // indicate that the reorganize work is finished. -func (w *worker) updateReorgInfo(t table.Table, reorg *reorgInfo) (bool, error) { +func (w *worker) updateReorgInfo(t table.PartitionedTable, reorg *reorgInfo) (bool, error) { pi := t.Meta().GetPartitionInfo() if pi == nil { return true, nil @@ -1012,7 +1029,7 @@ func (w *worker) updateReorgInfo(t table.Table, reorg *reorgInfo) (bool, error) return true, nil } - start, end, err := getTableRange(reorg.d, t.Meta(), pid, reorg.Job.SnapshotVer) + start, end, err := getTableRange(reorg.d, t.GetPartition(pid), reorg.Job.SnapshotVer) if err != nil { return false, errors.Trace(err) } @@ -1057,7 +1074,7 @@ func allocateIndexID(tblInfo *model.TableInfo) int64 { // recordIterFunc is used for low-level record iteration. type recordIterFunc func(h int64, rowKey kv.Key, rawRecord []byte) (more bool, err error) -func iterateSnapshotRows(store kv.Storage, partitionID int64, version uint64, seekHandle int64, fn recordIterFunc) error { +func iterateSnapshotRows(store kv.Storage, t table.Table, version uint64, seekHandle int64, fn recordIterFunc) error { ver := kv.Version{Ver: version} snap, err := store.GetSnapshot(ver) @@ -1065,16 +1082,15 @@ func iterateSnapshotRows(store kv.Storage, partitionID int64, version uint64, se if err != nil { return errors.Trace(err) } - firstKey := tablecodec.EncodeRowKeyWithHandle(partitionID, seekHandle) + firstKey := t.RecordKey(seekHandle) it, err := snap.Seek(firstKey) if err != nil { return errors.Trace(err) } defer it.Close() - recordPrefix := tablecodec.GenTableRecordPrefix(partitionID) for it.Valid() { - if !it.Key().HasPrefix(recordPrefix) { + if !it.Key().HasPrefix(t.RecordPrefix()) { break } @@ -1083,7 +1099,7 @@ func iterateSnapshotRows(store kv.Storage, partitionID int64, version uint64, se if err != nil { return errors.Trace(err) } - rk := tablecodec.EncodeRecordKey(recordPrefix, handle) + rk := t.RecordKey(handle) more, err := fn(handle, rk, it.Value()) if !more || err != nil { diff --git a/ddl/reorg.go b/ddl/reorg.go index 021743e403f22..e4b3116e8f32c 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -184,6 +184,7 @@ type reorgInfo struct { EndHandle int64 d *ddlCtx first bool + // PartitionID is used for partitioned table. // DDL reorganize for a partitioned table will handle partitions one by one, // PartitionID is used to trace the current partition we are handling. // If the table is not partitioned, PartitionID would be TableID. @@ -207,7 +208,7 @@ func constructLimitPB(count uint64) *tipb.Executor { return &tipb.Executor{Tp: tipb.ExecType_TypeLimit, Limit: limitExec} } -func buildDescTableScanDAG(startTS uint64, tblInfo *model.TableInfo, partitionID int64, columns []*model.ColumnInfo, limit uint64) (*tipb.DAGRequest, error) { +func buildDescTableScanDAG(startTS uint64, tbl table.Table, columns []*model.ColumnInfo, limit uint64) (*tipb.DAGRequest, error) { dagReq := &tipb.DAGRequest{} dagReq.StartTs = startTS _, timeZoneOffset := time.Now().In(time.UTC).Zone() @@ -217,8 +218,8 @@ func buildDescTableScanDAG(startTS uint64, tblInfo *model.TableInfo, partitionID } dagReq.Flags |= model.FlagInSelectStmt - pbColumnInfos := model.ColumnsToProto(columns, tblInfo.PKIsHandle) - tblScanExec := constructDescTableScanPB(partitionID, pbColumnInfos) + pbColumnInfos := model.ColumnsToProto(columns, tbl.Meta().PKIsHandle) + tblScanExec := constructDescTableScanPB(tbl.GetID(), pbColumnInfos) dagReq.Executors = append(dagReq.Executors, tblScanExec) dagReq.Executors = append(dagReq.Executors, constructLimitPB(limit)) return dagReq, nil @@ -232,15 +233,15 @@ func getColumnsTypes(columns []*model.ColumnInfo) []*types.FieldType { return colTypes } -// builds a desc table scan upon tblInfo. -func (d *ddlCtx) buildDescTableScan(ctx context.Context, startTS uint64, tblInfo *model.TableInfo, partitionID int64, columns []*model.ColumnInfo, limit uint64) (distsql.SelectResult, error) { - dagPB, err := buildDescTableScanDAG(startTS, tblInfo, partitionID, columns, limit) +// buildDescTableScan builds a desc table scan upon tblInfo. +func (d *ddlCtx) buildDescTableScan(ctx context.Context, startTS uint64, tbl table.Table, columns []*model.ColumnInfo, limit uint64) (distsql.SelectResult, error) { + dagPB, err := buildDescTableScanDAG(startTS, tbl, columns, limit) if err != nil { return nil, errors.Trace(err) } ranges := ranger.FullIntRange(false) var builder distsql.RequestBuilder - builder.SetTableRanges(partitionID, ranges, nil). + builder.SetTableRanges(tbl.GetID(), ranges, nil). SetDAGRequest(dagPB). SetKeepOrder(true). SetConcurrency(1).SetDesc(true) @@ -260,11 +261,11 @@ func (d *ddlCtx) buildDescTableScan(ctx context.Context, startTS uint64, tblInfo } // GetTableMaxRowID gets the last row id of the table partition. -func (d *ddlCtx) GetTableMaxRowID(startTS uint64, tblInfo *model.TableInfo, partitionID int64) (maxRowID int64, emptyTable bool, err error) { +func (d *ddlCtx) GetTableMaxRowID(startTS uint64, tbl table.Table) (maxRowID int64, emptyTable bool, err error) { maxRowID = int64(math.MaxInt64) var columns []*model.ColumnInfo - if tblInfo.PKIsHandle { - for _, col := range tblInfo.Columns { + if tbl.Meta().PKIsHandle { + for _, col := range tbl.Meta().Columns { if mysql.HasPriKeyFlag(col.Flag) { columns = []*model.ColumnInfo{col} break @@ -276,7 +277,7 @@ func (d *ddlCtx) GetTableMaxRowID(startTS uint64, tblInfo *model.TableInfo, part ctx := context.Background() // build a desc scan of tblInfo, which limit is 1, we can use it to retrive the last handle of the table. - result, err := d.buildDescTableScan(ctx, startTS, tblInfo, partitionID, columns, 1) + result, err := d.buildDescTableScan(ctx, startTS, tbl, columns, 1) if err != nil { return maxRowID, false, errors.Trace(err) } @@ -300,11 +301,11 @@ func (d *ddlCtx) GetTableMaxRowID(startTS uint64, tblInfo *model.TableInfo, part var gofailOnceGuard bool // getTableRange gets the start and end handle of a table (or partition). -func getTableRange(d *ddlCtx, tblInfo *model.TableInfo, partitionID int64, snapshotVer uint64) (startHandle, endHandle int64, err error) { +func getTableRange(d *ddlCtx, tbl table.Table, snapshotVer uint64) (startHandle, endHandle int64, err error) { startHandle = math.MinInt64 endHandle = math.MaxInt64 // Get the start handle of this partition. - err = iterateSnapshotRows(d.store, partitionID, snapshotVer, math.MinInt64, + err = iterateSnapshotRows(d.store, tbl, snapshotVer, math.MinInt64, func(h int64, rowKey kv.Key, rawRecord []byte) (bool, error) { startHandle = h return false, nil @@ -314,12 +315,12 @@ func getTableRange(d *ddlCtx, tblInfo *model.TableInfo, partitionID int64, snaps } var emptyTable bool // Get the end handle of this partition. - endHandle, emptyTable, err = d.GetTableMaxRowID(snapshotVer, tblInfo, partitionID) + endHandle, emptyTable, err = d.GetTableMaxRowID(snapshotVer, tbl) if err != nil { return 0, 0, errors.Trace(err) } if endHandle < startHandle || emptyTable { - log.Infof("[ddl-reorg] get table range %v endHandle < startHandle partition %d [%d %d]", tblInfo, partitionID, endHandle, startHandle) + log.Infof("[ddl-reorg] get table range %v endHandle < startHandle partition %d [%d %d]", tbl.Meta(), tbl.GetID(), endHandle, startHandle) endHandle = startHandle } return @@ -346,10 +347,12 @@ func getReorgInfo(d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table) (*re } tblInfo := tbl.Meta() pid = tblInfo.ID + tp := tbl if pi := tblInfo.GetPartitionInfo(); pi != nil { pid = pi.Definitions[0].ID + tp = tbl.(table.PartitionedTable).GetPartition(pid) } - start, end, err = getTableRange(d, tblInfo, pid, ver.Ver) + start, end, err = getTableRange(d, tp, ver.Ver) if err != nil { return nil, errors.Trace(err) } From 581edd4cac4ca5fbb9234a0d8ad50b2ed2f1e7ac Mon Sep 17 00:00:00 2001 From: Lynn Date: Wed, 1 Aug 2018 11:36:22 +0800 Subject: [PATCH 02/13] executor: set the correct handle in DirtyDB when executing update statements (#7209) --- executor/write.go | 6 +++++- executor/write_test.go | 18 ++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/executor/write.go b/executor/write.go index 323bca5f110d5..ab20f07ab0db1 100644 --- a/executor/write.go +++ b/executor/write.go @@ -167,7 +167,11 @@ func updateRecord(ctx sessionctx.Context, h int64, oldData, newData []types.Datu tid := t.Meta().ID ctx.StmtAddDirtyTableOP(DirtyTableDeleteRow, tid, h, nil) - ctx.StmtAddDirtyTableOP(DirtyTableAddRow, tid, h, newData) + if handleChanged { + ctx.StmtAddDirtyTableOP(DirtyTableAddRow, tid, newHandle, newData) + } else { + ctx.StmtAddDirtyTableOP(DirtyTableAddRow, tid, h, newData) + } if onDup { sc.AddAffectedRows(2) diff --git a/executor/write_test.go b/executor/write_test.go index 0039b5e231ae3..d39e9cfce8309 100644 --- a/executor/write_test.go +++ b/executor/write_test.go @@ -1718,6 +1718,24 @@ func (s *testSuite) TestUpdateSelect(c *C) { tk.MustExec("admin check table msg") } +func (s *testSuite) TestUpdateDelete(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("CREATE TABLE ttt (id bigint(20) NOT NULL, host varchar(30) NOT NULL, PRIMARY KEY (id), UNIQUE KEY i_host (host));") + tk.MustExec("insert into ttt values (8,8),(9,9);") + + tk.MustExec("begin") + tk.MustExec("update ttt set id = 0, host='9' where id = 9 limit 1;") + tk.MustExec("delete from ttt where id = 0 limit 1;") + tk.MustQuery("select * from ttt use index (i_host) order by host;").Check(testkit.Rows("8 8")) + tk.MustExec("update ttt set id = 0, host='8' where id = 8 limit 1;") + tk.MustExec("delete from ttt where id = 0 limit 1;") + tk.MustQuery("select * from ttt use index (i_host) order by host;").Check(testkit.Rows()) + tk.MustExec("commit") + tk.MustExec("admin check table ttt;") + tk.MustExec("drop table ttt") +} + func (s *testSuite) TestUpdateAffectRowCnt(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") From d410159e064799df41fe3570f1f2bafc2baf92e7 Mon Sep 17 00:00:00 2001 From: Ewan Chou Date: Wed, 1 Aug 2018 12:22:11 +0800 Subject: [PATCH 03/13] util/chunk: fix chunk truncate (#7216) Fix the wrong nullBitmap index after truncate. --- util/chunk/chunk.go | 2 +- util/chunk/chunk_test.go | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/util/chunk/chunk.go b/util/chunk/chunk.go index 7be6ed4c817b5..2706d5dc098a8 100644 --- a/util/chunk/chunk.go +++ b/util/chunk/chunk.go @@ -198,7 +198,7 @@ func (c *Chunk) TruncateTo(numRows int) { } } col.length = numRows - col.nullBitmap = col.nullBitmap[:(col.length>>3)+1] + col.nullBitmap = col.nullBitmap[:(col.length+7)/8] } c.numVirtualRows = numRows } diff --git a/util/chunk/chunk_test.go b/util/chunk/chunk_test.go index 208234cbe570d..a6b8571be4fb4 100644 --- a/util/chunk/chunk_test.go +++ b/util/chunk/chunk_test.go @@ -234,6 +234,11 @@ func (s *testChunkSuite) TestTruncateTo(c *check.C) { cmpRes := json.CompareBinary(jsonElem, jsonObj) c.Assert(cmpRes, check.Equals, 0) } + chk := NewChunkWithCapacity(fieldTypes[:1], 1) + chk.AppendFloat32(0, 1.0) + chk.TruncateTo(0) + chk.AppendNull(0) + c.Assert(chk.GetRow(0).IsNull(0), check.IsTrue) } // newChunk creates a new chunk and initialize columns with element length. From d5c5115220351bbe7ca11ebcb86c005f386f28a1 Mon Sep 17 00:00:00 2001 From: HuaiyuXu <391585975@qq.com> Date: Wed, 1 Aug 2018 12:59:40 +0800 Subject: [PATCH 04/13] executor: fix firstrow/max/min(bit col) error (#7206) --- executor/aggfuncs/builder.go | 6 ++++++ executor/aggregate_test.go | 11 +++++++++++ 2 files changed, 17 insertions(+) diff --git a/executor/aggfuncs/builder.go b/executor/aggfuncs/builder.go index 1a47a50fa1822..3035cfb059ecb 100644 --- a/executor/aggfuncs/builder.go +++ b/executor/aggfuncs/builder.go @@ -176,6 +176,9 @@ func buildFirstRow(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc { } evalType, fieldType := aggFuncDesc.RetTp.EvalType(), aggFuncDesc.RetTp + if fieldType.Tp == mysql.TypeBit { + evalType = types.ETString + } switch aggFuncDesc.Mode { case aggregation.DedupMode: default: @@ -215,6 +218,9 @@ func buildMaxMin(aggFuncDesc *aggregation.AggFuncDesc, ordinal int, isMax bool) } evalType, fieldType := aggFuncDesc.RetTp.EvalType(), aggFuncDesc.RetTp + if fieldType.Tp == mysql.TypeBit { + evalType = types.ETString + } switch aggFuncDesc.Mode { case aggregation.DedupMode: default: diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index d5973745c55d3..5dde78e136595 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -309,6 +309,17 @@ func (s *testSuite) TestAggregation(c *C) { result = tk.MustQuery("select t.id, count(95), sum(95), avg(95), bit_or(95), bit_and(95), bit_or(95), max(95), min(95), group_concat(95) from t left join s on t.id = s.id") result.Check(testkit.Rows("1 1 95 95.0000 95 95 95 95 95 95")) tk.MustExec("set @@tidb_hash_join_concurrency=5") + + // test agg bit col + tk.MustExec("drop table t") + tk.MustExec("CREATE TABLE `t` (`a` bit(1) NOT NULL, PRIMARY KEY (`a`))") + tk.MustExec("insert into t value(1), (0)") + tk.MustQuery("select a from t group by 1") + // This result is compatible with MySQL, the readable result is shown in the next case. + result = tk.MustQuery("select max(a) from t group by a") + result.Check(testkit.Rows(string([]byte{0x0}), string([]byte{0x1}))) + result = tk.MustQuery("select cast(a as signed) as idx, cast(max(a) as signed), cast(min(a) as signed) from t group by 1 order by idx") + result.Check(testkit.Rows("0 0 0", "1 1 1")) } func (s *testSuite) TestStreamAggPushDown(c *C) { From 606bef7be6d1ec49e159c4ef2932e16bff5e8007 Mon Sep 17 00:00:00 2001 From: Haibin Xie Date: Wed, 1 Aug 2018 13:37:15 +0800 Subject: [PATCH 05/13] stats: calculate the scalar info for histogram after local update (#7215) --- statistics/update.go | 1 + statistics/update_test.go | 3 +++ 2 files changed, 4 insertions(+) diff --git a/statistics/update.go b/statistics/update.go index 3fe1349a0aa4f..6d8631d6d1903 100644 --- a/statistics/update.go +++ b/statistics/update.go @@ -414,6 +414,7 @@ func (h *Handle) UpdateStatsByLocalFeedback(is infoschema.InfoSchema) { eqFB, ranFB := splitFeedbackByQueryType(fb.feedback) newIdx.CMSketch = UpdateCMSketch(idx.CMSketch, eqFB) newIdx.Histogram = *UpdateHistogram(&idx.Histogram, &QueryFeedback{feedback: ranFB}) + newIdx.Histogram.PreCalculateScalar() newTblStats.Indices[fb.hist.ID] = &newIdx } else { col, ok := tblStats.Columns[fb.hist.ID] diff --git a/statistics/update_test.go b/statistics/update_test.go index 855e0231e7271..651d1e698268f 100644 --- a/statistics/update_test.go +++ b/statistics/update_test.go @@ -723,4 +723,7 @@ func (s *testStatsUpdateSuite) TestUpdateStatsByLocalFeedback(c *C) { c.Assert(tbl.Indices[tblInfo.Indices[0].ID].ToString(1), Equals, "index:1 ndv:2\n"+ "num: 2\tlower_bound: \tupper_bound: 2\trepeats: 0\n"+ "num: 4\tlower_bound: 3\tupper_bound: 6\trepeats: 0") + + // Test that it won't cause panic after update. + testKit.MustQuery("select * from t use index(idx) where b > 0") } From 44a2ad7f0f65e5c5ea3dab0f20a34fe5a357e58a Mon Sep 17 00:00:00 2001 From: Yiding Cui Date: Wed, 1 Aug 2018 15:44:21 +0800 Subject: [PATCH 06/13] expression, plan: rename `Column.Position` to `Column.UniqueID` (#7218) --- expression/column.go | 14 ++++++++------ expression/column_test.go | 10 +++++----- expression/constant_test.go | 2 +- expression/expression.go | 2 +- expression/expression_test.go | 2 +- expression/scalar_function_test.go | 4 ++-- expression/schema.go | 2 +- expression/schema_test.go | 14 +++++++------- expression/simple_rewriter.go | 2 +- plan/expression_rewriter.go | 16 ++++++++-------- plan/find_best_task.go | 4 ++-- plan/logical_plan_builder.go | 16 ++++++++-------- plan/planbuilder.go | 12 ++++++------ plan/point_get_plan.go | 2 +- plan/rule_aggregation_push_down.go | 2 +- plan/rule_build_key_info.go | 2 +- plan/rule_join_reorder.go | 2 +- plan/rule_predicate_push_down.go | 2 +- plan/task.go | 6 +++--- 19 files changed, 59 insertions(+), 57 deletions(-) diff --git a/expression/column.go b/expression/column.go index ab647c1f6d3c7..52557377c34d0 100644 --- a/expression/column.go +++ b/expression/column.go @@ -147,14 +147,16 @@ type Column struct { OrigTblName model.CIStr TblName model.CIStr RetType *types.FieldType - ID int64 - // Position is the unique id of this column. - Position int + // This id is used to specify whether this column is ExtraHandleColumn or to access histogram. + // We'll try to remove it in the future. + ID int64 + // UniqueID is the unique id of this column. + UniqueID int // IsAggOrSubq means if this column is referenced to a Aggregation column or a Subquery column. // If so, this column's name will be the plain sql text. IsAggOrSubq bool - // Index is only used for execution. + // Index is used for execution, to tell the column's position in the given row. Index int hashcode []byte @@ -163,7 +165,7 @@ type Column struct { // Equal implements Expression interface. func (col *Column) Equal(_ sessionctx.Context, expr Expression) bool { if newCol, ok := expr.(*Column); ok { - return newCol.Position == col.Position + return newCol.UniqueID == col.UniqueID } return false } @@ -305,7 +307,7 @@ func (col *Column) HashCode(_ *stmtctx.StatementContext) []byte { } col.hashcode = make([]byte, 0, 9) col.hashcode = append(col.hashcode, columnFlag) - col.hashcode = codec.EncodeInt(col.hashcode, int64(col.Position)) + col.hashcode = codec.EncodeInt(col.hashcode, int64(col.UniqueID)) return col.hashcode } diff --git a/expression/column_test.go b/expression/column_test.go index ccaef1491ddb7..4093528f36ab2 100644 --- a/expression/column_test.go +++ b/expression/column_test.go @@ -25,7 +25,7 @@ import ( func (s *testEvaluatorSuite) TestColumn(c *C) { defer testleak.AfterTest(c)() - col := &Column{RetType: types.NewFieldType(mysql.TypeLonglong), Position: 1} + col := &Column{RetType: types.NewFieldType(mysql.TypeLonglong), UniqueID: 1} c.Assert(col.Equal(nil, col), IsTrue) c.Assert(col.Equal(nil, &Column{}), IsFalse) @@ -39,7 +39,7 @@ func (s *testEvaluatorSuite) TestColumn(c *C) { intDatum := types.NewIntDatum(1) corCol := &CorrelatedColumn{Column: *col, Data: &intDatum} invalidCorCol := &CorrelatedColumn{Column: Column{}} - schema := NewSchema(&Column{Position: 1}) + schema := NewSchema(&Column{UniqueID: 1}) c.Assert(corCol.Equal(nil, corCol), IsTrue) c.Assert(corCol.Equal(nil, invalidCorCol), IsFalse) c.Assert(corCol.IsCorrelated(), IsTrue) @@ -97,12 +97,12 @@ func (s *testEvaluatorSuite) TestColumnHashCode(c *C) { defer testleak.AfterTest(c)() col1 := &Column{ - Position: 12, + UniqueID: 12, } c.Assert(col1.HashCode(nil), DeepEquals, []byte{0x1, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xc}) col2 := &Column{ - Position: 2, + UniqueID: 2, } c.Assert(col2.HashCode(nil), DeepEquals, []byte{0x1, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2}) } @@ -112,7 +112,7 @@ func (s *testEvaluatorSuite) TestColumn2Expr(c *C) { cols := make([]*Column, 0, 5) for i := 0; i < 5; i++ { - cols = append(cols, &Column{Position: i}) + cols = append(cols, &Column{UniqueID: i}) } exprs := Column2Exprs(cols) diff --git a/expression/constant_test.go b/expression/constant_test.go index f687c6ac5f792..c36c2d39e2411 100644 --- a/expression/constant_test.go +++ b/expression/constant_test.go @@ -33,7 +33,7 @@ type testExpressionSuite struct{} func newColumn(id int) *Column { return &Column{ - Position: id, + UniqueID: id, ColName: model.NewCIStr(fmt.Sprint(id)), TblName: model.NewCIStr("t"), DBName: model.NewCIStr("test"), diff --git a/expression/expression.go b/expression/expression.go index 5e9ecdb4a57fc..33bed2842bce8 100644 --- a/expression/expression.go +++ b/expression/expression.go @@ -312,7 +312,7 @@ func ColumnInfos2ColumnsWithDBName(ctx sessionctx.Context, dbName, tblName model TblName: tblName, DBName: dbName, RetType: &col.FieldType, - Position: ctx.GetSessionVars().AllocPlanColumnID(), + UniqueID: ctx.GetSessionVars().AllocPlanColumnID(), Index: col.Offset, } columns = append(columns, newCol) diff --git a/expression/expression_test.go b/expression/expression_test.go index 0024d59521677..d6db7f23e1e28 100644 --- a/expression/expression_test.go +++ b/expression/expression_test.go @@ -128,7 +128,7 @@ func tableInfoToSchemaForTest(tableInfo *model.TableInfo) *Schema { schema := NewSchema(make([]*Column, 0, len(columns))...) for i, col := range columns { schema.Append(&Column{ - Position: i, + UniqueID: i, TblName: tableInfo.Name, ColName: col.Name, ID: col.ID, diff --git a/expression/scalar_function_test.go b/expression/scalar_function_test.go index c3cc7e734d255..f4088c2b010c1 100644 --- a/expression/scalar_function_test.go +++ b/expression/scalar_function_test.go @@ -29,7 +29,7 @@ func (s *testEvaluatorSuite) TestScalarFunction(c *C) { defer testleak.AfterTest(c)() a := &Column{ - Position: 1, + UniqueID: 1, TblName: model.NewCIStr("fei"), ColName: model.NewCIStr("han"), RetType: types.NewFieldType(mysql.TypeDouble), @@ -55,7 +55,7 @@ func (s *testEvaluatorSuite) TestScalarFunction(c *C) { func (s *testEvaluatorSuite) TestScalarFuncs2Exprs(c *C) { defer testleak.AfterTest(c)() a := &Column{ - Position: 1, + UniqueID: 1, RetType: types.NewFieldType(mysql.TypeDouble), } sf0, _ := newFunction(ast.LT, a, Zero).(*ScalarFunction) diff --git a/expression/schema.go b/expression/schema.go index 56518c3bf751b..e6a5955d9f198 100644 --- a/expression/schema.go +++ b/expression/schema.go @@ -153,7 +153,7 @@ func (s *Schema) IsUniqueKey(col *Column) bool { // ColumnIndex finds the index for a column. func (s *Schema) ColumnIndex(col *Column) int { for i, c := range s.Columns { - if c.Position == col.Position { + if c.UniqueID == col.UniqueID { return i } } diff --git a/expression/schema_test.go b/expression/schema_test.go index d43d3d2328c20..d28a41d7263f7 100644 --- a/expression/schema_test.go +++ b/expression/schema_test.go @@ -35,7 +35,7 @@ func (s *testEvalSuite) generateSchema(colCount int, dbName, tblName string) *Sc cols := make([]*Column, 0, colCount) for i := 0; i < colCount; i++ { cols = append(cols, &Column{ - Position: s.allocColID(), + UniqueID: s.allocColID(), DBName: model.NewCIStr(dbName), TblName: model.NewCIStr(tblName), ColName: model.NewCIStr(fmt.Sprintf("C%v", i)), @@ -60,7 +60,7 @@ func (s *testEvalSuite) TestSchemaString(c *C) { func (s *testEvalSuite) TestSchemaRetrieveColumn(c *C) { schema := s.generateSchema(5, "T", "B") colOutSchema := &Column{ - Position: 100, + UniqueID: 100, } for _, col := range schema.Columns { c.Assert(schema.RetrieveColumn(col), Equals, col) @@ -72,7 +72,7 @@ func (s *testEvalSuite) TestSchemaIsUniqueKey(c *C) { schema := s.generateSchema(5, "T", "B") generateKeys4Schema(schema) colOutSchema := &Column{ - Position: 100, + UniqueID: 100, } for i, col := range schema.Columns { if i < len(schema.Columns)-1 { @@ -87,7 +87,7 @@ func (s *testEvalSuite) TestSchemaIsUniqueKey(c *C) { func (s *testEvalSuite) TestSchemaContains(c *C) { schema := s.generateSchema(5, "T", "B") colOutSchema := &Column{ - Position: 100, + UniqueID: 100, } for _, col := range schema.Columns { c.Assert(schema.Contains(col), Equals, true) @@ -98,7 +98,7 @@ func (s *testEvalSuite) TestSchemaContains(c *C) { func (s *testEvalSuite) TestSchemaColumnsIndices(c *C) { schema := s.generateSchema(5, "T", "B") colOutSchema := &Column{ - Position: 100, + UniqueID: 100, } for i := 0; i < len(schema.Columns)-1; i++ { colIndices := schema.ColumnsIndices([]*Column{schema.Columns[i], schema.Columns[i+1]}) @@ -131,13 +131,13 @@ func (s *testEvalSuite) TestSchemaMergeSchema(c *C) { schema := MergeSchema(lSchema, rSchema) for i := 0; i < len(lSchema.Columns); i++ { - c.Assert(schema.Columns[i].Position, Equals, lSchema.Columns[i].Position) + c.Assert(schema.Columns[i].UniqueID, Equals, lSchema.Columns[i].UniqueID) c.Assert(schema.Columns[i].DBName, Equals, lSchema.Columns[i].DBName) c.Assert(schema.Columns[i].TblName, Equals, lSchema.Columns[i].TblName) c.Assert(schema.Columns[i].ColName, Equals, lSchema.Columns[i].ColName) } for i := 0; i < len(rSchema.Columns); i++ { - c.Assert(schema.Columns[i+len(lSchema.Columns)].Position, Equals, rSchema.Columns[i].Position) + c.Assert(schema.Columns[i+len(lSchema.Columns)].UniqueID, Equals, rSchema.Columns[i].UniqueID) c.Assert(schema.Columns[i+len(lSchema.Columns)].DBName, Equals, rSchema.Columns[i].DBName) c.Assert(schema.Columns[i+len(lSchema.Columns)].TblName, Equals, rSchema.Columns[i].TblName) c.Assert(schema.Columns[i+len(lSchema.Columns)].ColName, Equals, rSchema.Columns[i].ColName) diff --git a/expression/simple_rewriter.go b/expression/simple_rewriter.go index 1701805a725e9..7d83049fa15fc 100644 --- a/expression/simple_rewriter.go +++ b/expression/simple_rewriter.go @@ -65,7 +65,7 @@ func (sr *simpleRewriter) rewriteColumn(nodeColName *ast.ColumnNameExpr) (*Colum TblName: sr.tbl.Name, RetType: &col.FieldType, ID: col.ID, - Position: sr.ctx.GetSessionVars().AllocPlanColumnID(), + UniqueID: sr.ctx.GetSessionVars().AllocPlanColumnID(), Index: i, }, nil } diff --git a/plan/expression_rewriter.go b/plan/expression_rewriter.go index 423d4e4b26430..b123f0e37ba01 100644 --- a/plan/expression_rewriter.go +++ b/plan/expression_rewriter.go @@ -406,7 +406,7 @@ func (er *expressionRewriter) handleOtherComparableSubq(lexpr, rexpr expression. // Create a column and append it to the schema of that aggregation. colMaxOrMin := &expression.Column{ ColName: model.NewCIStr("agg_Col_0"), - Position: er.ctx.GetSessionVars().AllocPlanColumnID(), + UniqueID: er.ctx.GetSessionVars().AllocPlanColumnID(), RetType: funcMaxOrMin.RetTp, } schema := expression.NewSchema(colMaxOrMin) @@ -425,7 +425,7 @@ func (er *expressionRewriter) buildQuantifierPlan(plan4Agg *LogicalAggregation, funcSum := aggregation.NewAggFuncDesc(er.ctx, ast.AggFuncSum, []expression.Expression{funcIsNull}, false) colSum := &expression.Column{ ColName: model.NewCIStr("agg_col_sum"), - Position: er.ctx.GetSessionVars().AllocPlanColumnID(), + UniqueID: er.ctx.GetSessionVars().AllocPlanColumnID(), RetType: funcSum.RetTp, } plan4Agg.AggFuncs = append(plan4Agg.AggFuncs, funcSum) @@ -435,7 +435,7 @@ func (er *expressionRewriter) buildQuantifierPlan(plan4Agg *LogicalAggregation, funcCount := aggregation.NewAggFuncDesc(er.ctx, ast.AggFuncCount, []expression.Expression{funcIsNull}, false) colCount := &expression.Column{ ColName: model.NewCIStr("agg_col_cnt"), - Position: er.ctx.GetSessionVars().AllocPlanColumnID(), + UniqueID: er.ctx.GetSessionVars().AllocPlanColumnID(), RetType: funcCount.RetTp, } plan4Agg.AggFuncs = append(plan4Agg.AggFuncs, funcCount) @@ -473,7 +473,7 @@ func (er *expressionRewriter) buildQuantifierPlan(plan4Agg *LogicalAggregation, proj.Exprs = append(proj.Exprs, cond) proj.schema.Append(&expression.Column{ ColName: model.NewCIStr("aux_col"), - Position: er.ctx.GetSessionVars().AllocPlanColumnID(), + UniqueID: er.ctx.GetSessionVars().AllocPlanColumnID(), IsAggOrSubq: true, RetType: cond.GetType(), }) @@ -493,12 +493,12 @@ func (er *expressionRewriter) handleNEAny(lexpr, rexpr expression.Expression, np plan4Agg.SetChildren(np) firstRowResultCol := &expression.Column{ ColName: model.NewCIStr("col_firstRow"), - Position: er.ctx.GetSessionVars().AllocPlanColumnID(), + UniqueID: er.ctx.GetSessionVars().AllocPlanColumnID(), RetType: firstRowFunc.RetTp, } count := &expression.Column{ ColName: model.NewCIStr("col_count"), - Position: er.ctx.GetSessionVars().AllocPlanColumnID(), + UniqueID: er.ctx.GetSessionVars().AllocPlanColumnID(), RetType: countFunc.RetTp, } plan4Agg.SetSchema(expression.NewSchema(firstRowResultCol, count)) @@ -519,12 +519,12 @@ func (er *expressionRewriter) handleEQAll(lexpr, rexpr expression.Expression, np plan4Agg.SetChildren(np) firstRowResultCol := &expression.Column{ ColName: model.NewCIStr("col_firstRow"), - Position: er.ctx.GetSessionVars().AllocPlanColumnID(), + UniqueID: er.ctx.GetSessionVars().AllocPlanColumnID(), RetType: firstRowFunc.RetTp, } count := &expression.Column{ ColName: model.NewCIStr("col_count"), - Position: er.ctx.GetSessionVars().AllocPlanColumnID(), + UniqueID: er.ctx.GetSessionVars().AllocPlanColumnID(), RetType: countFunc.RetTp, } plan4Agg.SetSchema(expression.NewSchema(firstRowResultCol, count)) diff --git a/plan/find_best_task.go b/plan/find_best_task.go index 661a607363757..87e8e8e46589a 100644 --- a/plan/find_best_task.go +++ b/plan/find_best_task.go @@ -415,7 +415,7 @@ func (is *PhysicalIndexScan) initSchema(id int, idx *model.IndexInfo, isDoubleRe for _, col := range idx.Columns { colFound := is.dataSourceSchema.FindColumnByName(col.Name.L) if colFound == nil { - colFound = &expression.Column{ColName: col.Name, Position: is.ctx.GetSessionVars().AllocPlanColumnID()} + colFound = &expression.Column{ColName: col.Name, UniqueID: is.ctx.GetSessionVars().AllocPlanColumnID()} } else { colFound = colFound.Clone().(*expression.Column) } @@ -432,7 +432,7 @@ func (is *PhysicalIndexScan) initSchema(id int, idx *model.IndexInfo, isDoubleRe // If it's double read case, the first index must return handle. So we should add extra handle column // if there isn't a handle column. if isDoubleRead && !setHandle { - indexCols = append(indexCols, &expression.Column{ID: model.ExtraHandleID, ColName: model.ExtraHandleName, Position: is.ctx.GetSessionVars().AllocPlanColumnID()}) + indexCols = append(indexCols, &expression.Column{ID: model.ExtraHandleID, ColName: model.ExtraHandleName, UniqueID: is.ctx.GetSessionVars().AllocPlanColumnID()}) } is.SetSchema(expression.NewSchema(indexCols...)) } diff --git a/plan/logical_plan_builder.go b/plan/logical_plan_builder.go index 306f43f641542..03377951fe61b 100644 --- a/plan/logical_plan_builder.go +++ b/plan/logical_plan_builder.go @@ -106,7 +106,7 @@ func (b *planBuilder) buildAggregation(p LogicalPlan, aggFuncList []*ast.Aggrega plan4Agg.AggFuncs = append(plan4Agg.AggFuncs, newFunc) schema4Agg.Append(&expression.Column{ ColName: model.NewCIStr(fmt.Sprintf("%d_col_%d", plan4Agg.id, position)), - Position: b.ctx.GetSessionVars().AllocPlanColumnID(), + UniqueID: b.ctx.GetSessionVars().AllocPlanColumnID(), IsAggOrSubq: true, RetType: newFunc.RetTp, }) @@ -554,7 +554,7 @@ func (b *planBuilder) buildProjectionField(id, position int, field *ast.SelectFi colName = b.buildProjectionFieldNameFromExpressions(field) } return &expression.Column{ - Position: b.ctx.GetSessionVars().AllocPlanColumnID(), + UniqueID: b.ctx.GetSessionVars().AllocPlanColumnID(), TblName: tblName, OrigTblName: origTblName, ColName: colName, @@ -668,7 +668,7 @@ func (b *planBuilder) buildProjection4Union(u *LogicalUnionAll) { proj := LogicalProjection{Exprs: exprs}.init(b.ctx) if childID == 0 { for _, col := range unionSchema.Columns { - col.Position = b.ctx.GetSessionVars().AllocPlanColumnID() + col.UniqueID = b.ctx.GetSessionVars().AllocPlanColumnID() } } proj.SetChildren(child) @@ -1717,7 +1717,7 @@ func (b *planBuilder) buildSelect(sel *ast.SelectStmt) LogicalPlan { proj.SetChildren(p) schema := expression.NewSchema(p.Schema().Clone().Columns[:oldLen]...) for _, col := range schema.Columns { - col.Position = b.ctx.GetSessionVars().AllocPlanColumnID() + col.UniqueID = b.ctx.GetSessionVars().AllocPlanColumnID() } proj.SetSchema(schema) return proj @@ -1737,7 +1737,7 @@ func (ds *DataSource) newExtraHandleSchemaCol() *expression.Column { TblName: ds.tableInfo.Name, ColName: model.ExtraHandleName, RetType: types.NewFieldType(mysql.TypeLonglong), - Position: ds.ctx.GetSessionVars().AllocPlanColumnID(), + UniqueID: ds.ctx.GetSessionVars().AllocPlanColumnID(), ID: model.ExtraHandleID, } } @@ -1819,7 +1819,7 @@ func (b *planBuilder) buildDataSource(tn *ast.TableName) LogicalPlan { for _, col := range columns { ds.Columns = append(ds.Columns, col.ToInfo()) newCol := &expression.Column{ - Position: b.ctx.GetSessionVars().AllocPlanColumnID(), + UniqueID: b.ctx.GetSessionVars().AllocPlanColumnID(), DBName: dbName, TblName: tableInfo.Name, ColName: col.Name, @@ -1963,7 +1963,7 @@ out: newCol := &expression.Column{ RetType: types.NewFieldType(mysql.TypeTiny), ColName: model.NewCIStr("exists_col"), - Position: b.ctx.GetSessionVars().AllocPlanColumnID(), + UniqueID: b.ctx.GetSessionVars().AllocPlanColumnID(), } exists.SetSchema(expression.NewSchema(newCol)) return exists @@ -1988,7 +1988,7 @@ func (b *planBuilder) buildSemiJoin(outerPlan, innerPlan LogicalPlan, onConditio ColName: model.NewCIStr(fmt.Sprintf("%d_aux_0", joinPlan.id)), RetType: types.NewFieldType(mysql.TypeTiny), IsAggOrSubq: true, - Position: b.ctx.GetSessionVars().AllocPlanColumnID(), + UniqueID: b.ctx.GetSessionVars().AllocPlanColumnID(), }) joinPlan.SetSchema(newSchema) if not { diff --git a/plan/planbuilder.go b/plan/planbuilder.go index 90343b4382967..f0c3af15935e7 100644 --- a/plan/planbuilder.go +++ b/plan/planbuilder.go @@ -201,7 +201,7 @@ func (b *planBuilder) buildDo(v *ast.DoStmt) Plan { } p.Exprs = append(p.Exprs, expr) schema.Append(&expression.Column{ - Position: b.ctx.GetSessionVars().AllocPlanColumnID(), + UniqueID: b.ctx.GetSessionVars().AllocPlanColumnID(), RetType: expr.GetType(), }) } @@ -402,7 +402,7 @@ func (b *planBuilder) buildCheckIndex(dbName model.CIStr, as *ast.AdminStmt) Pla if idxCol.Name.L == col.Name.L { columns = append(columns, col) schema.Append(&expression.Column{ - Position: b.ctx.GetSessionVars().AllocPlanColumnID(), + UniqueID: b.ctx.GetSessionVars().AllocPlanColumnID(), RetType: &col.FieldType, }) } @@ -508,7 +508,7 @@ func (b *planBuilder) buildCheckIndexSchema(tn *ast.TableName, indexName string) TblName: tn.Name, DBName: tn.Schema, RetType: &col.FieldType, - Position: b.ctx.GetSessionVars().AllocPlanColumnID(), + UniqueID: b.ctx.GetSessionVars().AllocPlanColumnID(), ID: col.ID}) } schema.Append(&expression.Column{ @@ -516,7 +516,7 @@ func (b *planBuilder) buildCheckIndexSchema(tn *ast.TableName, indexName string) TblName: tn.Name, DBName: tn.Schema, RetType: types.NewFieldType(mysql.TypeLonglong), - Position: b.ctx.GetSessionVars().AllocPlanColumnID(), + UniqueID: b.ctx.GetSessionVars().AllocPlanColumnID(), ID: -1, }) } @@ -742,7 +742,7 @@ func (b *planBuilder) buildShow(show *ast.ShowStmt) Plan { p.SetSchema(buildShowSchema(show)) } for _, col := range p.schema.Columns { - col.Position = b.ctx.GetSessionVars().AllocPlanColumnID() + col.UniqueID = b.ctx.GetSessionVars().AllocPlanColumnID() } mockTablePlan := LogicalTableDual{}.init(b.ctx) mockTablePlan.SetSchema(p.schema) @@ -1479,7 +1479,7 @@ func buildShowSchema(s *ast.ShowStmt) (schema *expression.Schema) { names = []string{"Query_ID", "Duration", "Query"} ftypes = []byte{mysql.TypeLong, mysql.TypeDouble, mysql.TypeVarchar} case ast.ShowMasterStatus: - names = []string{"File", "Position", "Binlog_Do_DB", "Binlog_Ignore_DB", "Executed_Gtid_Set"} + names = []string{"File", "UniqueID", "Binlog_Do_DB", "Binlog_Ignore_DB", "Executed_Gtid_Set"} ftypes = []byte{mysql.TypeVarchar, mysql.TypeLonglong, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar} case ast.ShowPrivileges: names = []string{"Privilege", "Context", "Comment"} diff --git a/plan/point_get_plan.go b/plan/point_get_plan.go index d668dc4b1f6e9..0b4066798663a 100644 --- a/plan/point_get_plan.go +++ b/plan/point_get_plan.go @@ -417,7 +417,7 @@ func colInfoToColumn(db model.CIStr, tblName model.CIStr, asName model.CIStr, co TblName: tblName, RetType: &col.FieldType, ID: col.ID, - Position: col.Offset, + UniqueID: col.Offset, Index: idx, } } diff --git a/plan/rule_aggregation_push_down.go b/plan/rule_aggregation_push_down.go index bba1e3142dabc..3089f1d5c9cfd 100644 --- a/plan/rule_aggregation_push_down.go +++ b/plan/rule_aggregation_push_down.go @@ -175,7 +175,7 @@ func (a *aggregationOptimizer) decompose(ctx sessionctx.Context, aggFunc *aggreg for _, aggFunc := range result { schema.Append(&expression.Column{ ColName: model.NewCIStr(fmt.Sprintf("join_agg_%d", schema.Len())), // useless but for debug - Position: ctx.GetSessionVars().AllocPlanColumnID(), + UniqueID: ctx.GetSessionVars().AllocPlanColumnID(), RetType: aggFunc.RetTp, }) } diff --git a/plan/rule_build_key_info.go b/plan/rule_build_key_info.go index 00b5037af80a4..6a84e035a3e0e 100644 --- a/plan/rule_build_key_info.go +++ b/plan/rule_build_key_info.go @@ -102,7 +102,7 @@ func (p *LogicalProjection) buildSchemaByExprs() *expression.Schema { } else { // If the expression is not a column, we add a column to occupy the position. schema.Append(&expression.Column{ - Position: p.ctx.GetSessionVars().AllocPlanColumnID(), + UniqueID: p.ctx.GetSessionVars().AllocPlanColumnID(), RetType: expr.GetType(), }) } diff --git a/plan/rule_join_reorder.go b/plan/rule_join_reorder.go index 538d7988c9be9..8d97a7afa24de 100644 --- a/plan/rule_join_reorder.go +++ b/plan/rule_join_reorder.go @@ -78,7 +78,7 @@ func findColumnIndexByGroup(groups []LogicalPlan, col *expression.Column) int { return i } } - log.Errorf("Unknown columns %s, position %d", col, col.Position) + log.Errorf("Unknown columns %s, position %d", col, col.UniqueID) return -1 } diff --git a/plan/rule_predicate_push_down.go b/plan/rule_predicate_push_down.go index a070f84dbe675..9d8994209e1f2 100644 --- a/plan/rule_predicate_push_down.go +++ b/plan/rule_predicate_push_down.go @@ -194,7 +194,7 @@ func (p *LogicalProjection) appendExpr(expr expression.Expression) *expression.C p.Exprs = append(p.Exprs, expr) col := &expression.Column{ - Position: p.ctx.GetSessionVars().AllocPlanColumnID(), + UniqueID: p.ctx.GetSessionVars().AllocPlanColumnID(), ColName: model.NewCIStr(expr.String()), RetType: expr.GetType(), } diff --git a/plan/task.go b/plan/task.go index dd4159c2b80e9..06f07207cec70 100644 --- a/plan/task.go +++ b/plan/task.go @@ -407,7 +407,7 @@ func (p *basePhysicalAgg) newPartialAggregate() (partial, final PhysicalPlan) { ft := types.NewFieldType(mysql.TypeLonglong) ft.Flen, ft.Charset, ft.Collate = 21, charset.CharsetBin, charset.CollationBin partialSchema.Append(&expression.Column{ - Position: p.ctx.GetSessionVars().AllocPlanColumnID(), + UniqueID: p.ctx.GetSessionVars().AllocPlanColumnID(), ColName: model.NewCIStr(fmt.Sprintf("col_%d", partialCursor)), RetType: ft, }) @@ -416,7 +416,7 @@ func (p *basePhysicalAgg) newPartialAggregate() (partial, final PhysicalPlan) { } if aggregation.NeedValue(finalAggFunc.Name) { partialSchema.Append(&expression.Column{ - Position: p.ctx.GetSessionVars().AllocPlanColumnID(), + UniqueID: p.ctx.GetSessionVars().AllocPlanColumnID(), ColName: model.NewCIStr(fmt.Sprintf("col_%d", partialCursor)), RetType: finalSchema.Columns[i].GetType(), }) @@ -433,7 +433,7 @@ func (p *basePhysicalAgg) newPartialAggregate() (partial, final PhysicalPlan) { groupByItems := make([]expression.Expression, 0, len(p.GroupByItems)) for i, gbyExpr := range p.GroupByItems { gbyCol := &expression.Column{ - Position: p.ctx.GetSessionVars().AllocPlanColumnID(), + UniqueID: p.ctx.GetSessionVars().AllocPlanColumnID(), ColName: model.NewCIStr(fmt.Sprintf("col_%d", partialCursor+i)), RetType: gbyExpr.GetType(), } From a37acaf0d4562332306231e0a500810868c09509 Mon Sep 17 00:00:00 2001 From: Caitin <34535727+CaitinChen@users.noreply.github.com> Date: Wed, 1 Aug 2018 16:16:38 +0800 Subject: [PATCH 07/13] expression: update wording in comments (#7220) --- expression/builtin_compare.go | 20 ++++++++++---------- expression/builtin_compare_test.go | 2 +- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/expression/builtin_compare.go b/expression/builtin_compare.go index b570d6a00b1ed..cbe93f9caf320 100644 --- a/expression/builtin_compare.go +++ b/expression/builtin_compare.go @@ -1090,17 +1090,17 @@ func RefineComparedConstant(ctx sessionctx.Context, isUnsigned bool, con *Consta } case opcode.NullEQ, opcode.EQ: switch con.RetType.EvalType() { - // An integer value equals or null-equals to a float value contains - // non-zero decimal digits is definite false. - // e.g: - // 1. "integer = 1.1" is definite false. - // 2. "integer <=> 1.1" is definite false. + // An integer value equal or NULL-safe equal to a float value which contains + // non-zero decimal digits is definitely false. + // e.g., + // 1. "integer = 1.1" is definitely false. + // 2. "integer <=> 1.1" is definitely false. case types.ETReal, types.ETDecimal: return con, true case types.ETString: - // We try to convert the string constant to double, - // if the double result equals to the int result, we can return the int result, - // otherwise, the compare function must be false. + // We try to convert the string constant to double. + // If the double result equals the int result, we can return the int result; + // otherwise, the compare function will be false. var doubleDatum types.Datum doubleDatum, err = dt.ConvertTo(sc, types.NewFieldType(mysql.TypeDouble)) if err != nil { @@ -1122,8 +1122,8 @@ func RefineComparedConstant(ctx sessionctx.Context, isUnsigned bool, con *Consta return con, false } -// refineArgs rewrite the arguments if the compare expression is `int column non-int constant` or -// `non-int constant int column`. e.g. `a < 1.1` will be rewritten to `a < 2`. +// refineArgs will rewrite the arguments if the compare expression is `int column non-int constant` or +// `non-int constant int column`. E.g., `a < 1.1` will be rewritten to `a < 2`. func (c *compareFunctionClass) refineArgs(ctx sessionctx.Context, args []Expression) []Expression { arg0Type, arg1Type := args[0].GetType(), args[1].GetType() arg0IsInt := arg0Type.EvalType() == types.ETInt diff --git a/expression/builtin_compare_test.go b/expression/builtin_compare_test.go index 812eb1bfc3fb1..f2867b9dc4f33 100644 --- a/expression/builtin_compare_test.go +++ b/expression/builtin_compare_test.go @@ -64,7 +64,7 @@ func (s *testEvaluatorSuite) TestCompareFunctionWithRefine(c *C) { {"'123456789123456711111189' = a", "0"}, {"123456789123456789.12345 = a", "0"}, // This cast can not be eliminated, - // since convert "aaaa" to int will cause DataTruncate error. + // since converting "aaaa" to an int will cause DataTruncate error. {"'aaaa'=a", "eq(cast(aaaa), cast(a))"}, } From 12850881274b0bcf3a41d4821747462d40f7267d Mon Sep 17 00:00:00 2001 From: Haibin Xie Date: Wed, 1 Aug 2018 16:39:08 +0800 Subject: [PATCH 08/13] executor, plan: support analyze partition table (#7190) --- executor/analyze.go | 24 ++++++++-------- executor/analyze_test.go | 60 ++++++++++++++++++++++++++++++++++++++++ executor/builder.go | 6 ++-- plan/cbo_test.go | 4 +-- plan/common_plans.go | 10 +++---- plan/planbuilder.go | 30 +++++++++++++++++--- plan/stringer.go | 6 ++-- statistics/builder.go | 12 ++++---- 8 files changed, 117 insertions(+), 35 deletions(-) create mode 100644 executor/analyze_test.go diff --git a/executor/analyze.go b/executor/analyze.go index 92981466f3518..c334bca633614 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -74,7 +74,7 @@ func (e *AnalyzeExec) Next(ctx context.Context, chk *chunk.Chunk) error { continue } for i, hg := range result.Hist { - err1 := statsHandle.SaveStatsToStorage(result.TableID, result.Count, result.IsIndex, hg, result.Cms[i], 1) + err1 := statsHandle.SaveStatsToStorage(result.PhysicalID, result.Count, result.IsIndex, hg, result.Cms[i], 1) if err1 != nil { err = err1 log.Error(errors.ErrorStack(err)) @@ -128,10 +128,10 @@ func analyzeIndexPushdown(idxExec *AnalyzeIndexExec) statistics.AnalyzeResult { return statistics.AnalyzeResult{Err: err} } result := statistics.AnalyzeResult{ - TableID: idxExec.tblInfo.ID, - Hist: []*statistics.Histogram{hist}, - Cms: []*statistics.CMSketch{cms}, - IsIndex: 1, + PhysicalID: idxExec.physicalID, + Hist: []*statistics.Histogram{hist}, + Cms: []*statistics.CMSketch{cms}, + IsIndex: 1, } if hist.Len() > 0 { result.Count = hist.Buckets[hist.Len()-1].Count @@ -142,7 +142,7 @@ func analyzeIndexPushdown(idxExec *AnalyzeIndexExec) statistics.AnalyzeResult { // AnalyzeIndexExec represents analyze index push down executor. type AnalyzeIndexExec struct { ctx sessionctx.Context - tblInfo *model.TableInfo + physicalID int64 // physicalID is the partition id for a partitioned table, otherwise, it is the table id. idxInfo *model.IndexInfo concurrency int priority int @@ -152,7 +152,7 @@ type AnalyzeIndexExec struct { func (e *AnalyzeIndexExec) open() error { var builder distsql.RequestBuilder - kvReq, err := builder.SetIndexRanges(e.ctx.GetSessionVars().StmtCtx, e.tblInfo.ID, e.idxInfo.ID, ranger.FullRange()). + kvReq, err := builder.SetIndexRanges(e.ctx.GetSessionVars().StmtCtx, e.physicalID, e.idxInfo.ID, ranger.FullRange()). SetAnalyzeRequest(e.analyzePB). SetKeepOrder(true). Build() @@ -214,9 +214,9 @@ func analyzeColumnsPushdown(colExec *AnalyzeColumnsExec) statistics.AnalyzeResul return statistics.AnalyzeResult{Err: err} } result := statistics.AnalyzeResult{ - TableID: colExec.tblInfo.ID, - Hist: hists, - Cms: cms, + PhysicalID: colExec.physicalID, + Hist: hists, + Cms: cms, } hist := hists[0] result.Count = hist.NullCount @@ -229,7 +229,7 @@ func analyzeColumnsPushdown(colExec *AnalyzeColumnsExec) statistics.AnalyzeResul // AnalyzeColumnsExec represents Analyze columns push down executor. type AnalyzeColumnsExec struct { ctx sessionctx.Context - tblInfo *model.TableInfo + physicalID int64 // physicalID is the partition id for a partitioned table, otherwise, it is the table id. colsInfo []*model.ColumnInfo pkInfo *model.ColumnInfo concurrency int @@ -268,7 +268,7 @@ func (e *AnalyzeColumnsExec) open() error { func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectResult, error) { var builder distsql.RequestBuilder - kvReq, err := builder.SetTableRanges(e.tblInfo.ID, ranges, nil). + kvReq, err := builder.SetTableRanges(e.physicalID, ranges, nil). SetAnalyzeRequest(e.analyzePB). SetKeepOrder(e.keepOrder). Build() diff --git a/executor/analyze_test.go b/executor/analyze_test.go new file mode 100644 index 0000000000000..05d4f7368935f --- /dev/null +++ b/executor/analyze_test.go @@ -0,0 +1,60 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor_test + +import ( + "fmt" + "strings" + + . "github.com/pingcap/check" + "github.com/pingcap/tidb/executor" + "github.com/pingcap/tidb/model" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/util/testkit" +) + +func (s *testSuite) TestAnalyzePartition(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("set @@session.tidb_enable_table_partition=1") + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + createTable := `CREATE TABLE t (a int, b int, c varchar(10), primary key(a), index idx(b)) +PARTITION BY RANGE ( a ) ( + PARTITION p0 VALUES LESS THAN (6), + PARTITION p1 VALUES LESS THAN (11), + PARTITION p2 VALUES LESS THAN (16), + PARTITION p3 VALUES LESS THAN (21) +)` + tk.MustExec(createTable) + for i := 1; i < 21; i++ { + tk.MustExec(fmt.Sprintf(`insert into t values (%d, %d, "hello")`, i, i)) + } + tk.MustExec("analyze table t") + + is := executor.GetInfoSchema(tk.Se.(sessionctx.Context)) + table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + c.Assert(err, IsNil) + pi := table.Meta().GetPartitionInfo() + c.Assert(pi, NotNil) + ids := make([]string, 0, len(pi.Definitions)) + for _, def := range pi.Definitions { + ids = append(ids, fmt.Sprintf("%d", def.ID)) + } + result := tk.MustQuery(fmt.Sprintf("select count(distinct(table_id)) from mysql.stats_meta where table_id in (%s)", strings.Join(ids, ","))) + result.Check(testkit.Rows(fmt.Sprintf("%d", len(ids)))) + result = tk.MustQuery(fmt.Sprintf("select count(distinct(table_id)) from mysql.stats_histograms where table_id in (%s)", strings.Join(ids, ","))) + result.Check(testkit.Rows(fmt.Sprintf("%d", len(ids)))) + result = tk.MustQuery(fmt.Sprintf("select count(distinct(table_id)) from mysql.stats_buckets where table_id in (%s)", strings.Join(ids, ","))) + result.Check(testkit.Rows(fmt.Sprintf("%d", len(ids)))) +} diff --git a/executor/builder.go b/executor/builder.go index 3bd91329e8a26..523323f5b95e2 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1269,7 +1269,7 @@ func (b *executorBuilder) buildAnalyzeIndexPushdown(task plan.AnalyzeIndexTask) _, offset := zone(b.ctx) e := &AnalyzeIndexExec{ ctx: b.ctx, - tblInfo: task.TableInfo, + physicalID: task.PhysicalID, idxInfo: task.IndexInfo, concurrency: b.ctx.GetSessionVars().IndexSerialScanConcurrency, analyzePB: &tipb.AnalyzeReq{ @@ -1301,7 +1301,7 @@ func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plan.AnalyzeColumnsTa _, offset := zone(b.ctx) e := &AnalyzeColumnsExec{ ctx: b.ctx, - tblInfo: task.TableInfo, + physicalID: task.PhysicalID, colsInfo: task.ColsInfo, pkInfo: task.PKInfo, concurrency: b.ctx.GetSessionVars().DistSQLScanConcurrency, @@ -1319,7 +1319,7 @@ func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plan.AnalyzeColumnsTa BucketSize: maxBucketSize, SampleSize: maxRegionSampleSize, SketchSize: maxSketchSize, - ColumnsInfo: model.ColumnsToProto(cols, task.TableInfo.PKIsHandle), + ColumnsInfo: model.ColumnsToProto(cols, task.PKInfo != nil), CmsketchDepth: &depth, CmsketchWidth: &width, } diff --git a/plan/cbo_test.go b/plan/cbo_test.go index 6e90a8a5ec863..5960960e12c15 100644 --- a/plan/cbo_test.go +++ b/plan/cbo_test.go @@ -416,7 +416,7 @@ func (s *testAnalyzeSuite) TestAnalyze(c *C) { }{ { sql: "analyze table t3", - best: "Analyze{Index(t3.a),Table(t3.b)}", + best: "Analyze{Index(a),Table(b)}", }, // Test analyze full table. { @@ -448,7 +448,7 @@ func (s *testAnalyzeSuite) TestAnalyze(c *C) { // Test analyze all index. { sql: "analyze table t2 index", - best: "Analyze{Index(t2.a),Index(t2.b)}", + best: "Analyze{Index(a),Index(b)}", }, // TODO: Refine these tests in the future. //{ diff --git a/plan/common_plans.go b/plan/common_plans.go index ca91a0e0b69ce..a2f9f6f873e64 100644 --- a/plan/common_plans.go +++ b/plan/common_plans.go @@ -355,15 +355,15 @@ type Delete struct { // AnalyzeColumnsTask is used for analyze columns. type AnalyzeColumnsTask struct { - TableInfo *model.TableInfo - PKInfo *model.ColumnInfo - ColsInfo []*model.ColumnInfo + PhysicalID int64 // PhysicalID is the partition id for a partitioned table, otherwise, it is the table id. + PKInfo *model.ColumnInfo + ColsInfo []*model.ColumnInfo } // AnalyzeIndexTask is used for analyze index. type AnalyzeIndexTask struct { - TableInfo *model.TableInfo - IndexInfo *model.IndexInfo + PhysicalID int64 // PhysicalID is the partition id for a partitioned table, otherwise, it is the table id. + IndexInfo *model.IndexInfo } // Analyze represents an analyze plan diff --git a/plan/planbuilder.go b/plan/planbuilder.go index f0c3af15935e7..dc2edec690113 100644 --- a/plan/planbuilder.go +++ b/plan/planbuilder.go @@ -564,15 +564,31 @@ func getColsInfo(tn *ast.TableName) (indicesInfo []*model.IndexInfo, colsInfo [] return } +func getPhysicalIDs(tblInfo *model.TableInfo) []int64 { + if pi := tblInfo.GetPartitionInfo(); pi != nil { + ids := make([]int64, 0, len(pi.Definitions)) + for _, def := range pi.Definitions { + ids = append(ids, def.ID) + } + return ids + } + return []int64{tblInfo.ID} +} + func (b *planBuilder) buildAnalyzeTable(as *ast.AnalyzeTableStmt) Plan { p := &Analyze{} for _, tbl := range as.TableNames { idxInfo, colInfo, pkInfo := getColsInfo(tbl) + physicalIDs := getPhysicalIDs(tbl.TableInfo) for _, idx := range idxInfo { - p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{TableInfo: tbl.TableInfo, IndexInfo: idx}) + for _, id := range physicalIDs { + p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{PhysicalID: id, IndexInfo: idx}) + } } if len(colInfo) > 0 || pkInfo != nil { - p.ColTasks = append(p.ColTasks, AnalyzeColumnsTask{TableInfo: tbl.TableInfo, PKInfo: pkInfo, ColsInfo: colInfo}) + for _, id := range physicalIDs { + p.ColTasks = append(p.ColTasks, AnalyzeColumnsTask{PhysicalID: id, PKInfo: pkInfo, ColsInfo: colInfo}) + } } } return p @@ -581,13 +597,16 @@ func (b *planBuilder) buildAnalyzeTable(as *ast.AnalyzeTableStmt) Plan { func (b *planBuilder) buildAnalyzeIndex(as *ast.AnalyzeTableStmt) Plan { p := &Analyze{} tblInfo := as.TableNames[0].TableInfo + physicalIDs := getPhysicalIDs(tblInfo) for _, idxName := range as.IndexNames { idx := findIndexByName(tblInfo.Indices, idxName) if idx == nil || idx.State != model.StatePublic { b.err = ErrAnalyzeMissIndex.GenByArgs(idxName.O, tblInfo.Name.O) break } - p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{TableInfo: tblInfo, IndexInfo: idx}) + for _, id := range physicalIDs { + p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{PhysicalID: id, IndexInfo: idx}) + } } return p } @@ -595,9 +614,12 @@ func (b *planBuilder) buildAnalyzeIndex(as *ast.AnalyzeTableStmt) Plan { func (b *planBuilder) buildAnalyzeAllIndex(as *ast.AnalyzeTableStmt) Plan { p := &Analyze{} tblInfo := as.TableNames[0].TableInfo + physicalIDs := getPhysicalIDs(tblInfo) for _, idx := range tblInfo.Indices { if idx.State == model.StatePublic { - p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{TableInfo: tblInfo, IndexInfo: idx}) + for _, id := range physicalIDs { + p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{PhysicalID: id, IndexInfo: idx}) + } } } return p diff --git a/plan/stringer.go b/plan/stringer.go index 3d13c7b60daf5..fdaf15c5eaac1 100644 --- a/plan/stringer.go +++ b/plan/stringer.go @@ -200,15 +200,15 @@ func toString(in Plan, strs []string, idxs []int) ([]string, []int) { str = "Analyze{" var children []string for _, idx := range x.IdxTasks { - children = append(children, fmt.Sprintf("Index(%s.%s)", idx.TableInfo.Name.O, idx.IndexInfo.Name.O)) + children = append(children, fmt.Sprintf("Index(%s)", idx.IndexInfo.Name.O)) } for _, col := range x.ColTasks { var colNames []string if col.PKInfo != nil { - colNames = append(colNames, fmt.Sprintf("%s.%s", col.TableInfo.Name.O, col.PKInfo.Name.O)) + colNames = append(colNames, fmt.Sprintf("%s", col.PKInfo.Name.O)) } for _, c := range col.ColsInfo { - colNames = append(colNames, fmt.Sprintf("%s.%s", col.TableInfo.Name.O, c.Name.O)) + colNames = append(colNames, fmt.Sprintf("%s", c.Name.O)) } children = append(children, fmt.Sprintf("Table(%s)", strings.Join(colNames, ", "))) } diff --git a/statistics/builder.go b/statistics/builder.go index 84fce7e6ca96a..079431b53f3fd 100644 --- a/statistics/builder.go +++ b/statistics/builder.go @@ -156,10 +156,10 @@ func BuildColumn(ctx sessionctx.Context, numBuckets, id int64, collector *Sample // AnalyzeResult is used to represent analyze result. type AnalyzeResult struct { - TableID int64 - Hist []*Histogram - Cms []*CMSketch - Count int64 - IsIndex int - Err error + PhysicalID int64 // PhysicalID is the partition id for a partitioned table, otherwise, it is the table id. + Hist []*Histogram + Cms []*CMSketch + Count int64 + IsIndex int + Err error } From fd7e5274b6ad5a7341b24c94217614186dcd0ff2 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 1 Aug 2018 16:57:20 +0800 Subject: [PATCH 09/13] executor: panic recover for analyze worker (#7214) --- executor/analyze.go | 21 ++++++++++++++++++++- metrics/metrics.go | 1 + 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/executor/analyze.go b/executor/analyze.go index c334bca633614..40d78666c105b 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -14,12 +14,14 @@ package executor import ( + "runtime" "strconv" "github.com/juju/errors" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/model" "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/sessionctx" @@ -66,10 +68,13 @@ func (e *AnalyzeExec) Next(ctx context.Context, chk *chunk.Chunk) error { } close(taskCh) statsHandle := domain.GetDomain(e.ctx).StatsHandle() - for i := 0; i < len(e.tasks); i++ { + for i, panicCnt := 0, 0; i < len(e.tasks) && panicCnt < concurrency; i++ { result := <-resultCh if result.Err != nil { err = result.Err + if errors.Trace(err) == analyzeWorkerPanic { + panicCnt++ + } log.Error(errors.ErrorStack(err)) continue } @@ -111,7 +116,21 @@ type analyzeTask struct { colExec *AnalyzeColumnsExec } +var analyzeWorkerPanic = errors.New("analyze worker panic") + func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultCh chan<- statistics.AnalyzeResult) { + defer func() { + if r := recover(); r != nil { + buf := make([]byte, 4096) + stackSize := runtime.Stack(buf, false) + buf = buf[:stackSize] + log.Errorf("analyzeWorker panic stack is:\n%s", buf) + metrics.PanicCounter.WithLabelValues(metrics.LabelAnalyze).Inc() + resultCh <- statistics.AnalyzeResult{ + Err: analyzeWorkerPanic, + } + } + }() for task := range taskCh { switch task.taskType { case colTask: diff --git a/metrics/metrics.go b/metrics/metrics.go index 7fa8ec24c385e..f3b2db94b6c7a 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -33,6 +33,7 @@ const ( LabelDDLOwner = "ddl-owner" LabelDDL = "ddl" LabelGCWorker = "gcworker" + LabelAnalyze = "analyze" opSucc = "ok" opFailed = "err" From 791d6909da7a33afc2f9d77b7c95c818f3c2978b Mon Sep 17 00:00:00 2001 From: liukun <451564319@qq.com> Date: Wed, 1 Aug 2018 17:17:15 +0800 Subject: [PATCH 10/13] tikvclient,mocktikv: add batch put API for raw kv (#7135) --- store/mockstore/mocktikv/mvcc.go | 1 + store/mockstore/mocktikv/mvcc_leveldb.go | 16 +++ store/mockstore/mocktikv/rpc.go | 24 +++++ store/tikv/rawkv.go | 131 +++++++++++++++++++++++ store/tikv/rawkv_test.go | 29 +++++ store/tikv/tikvrpc/tikvrpc.go | 15 +++ 6 files changed, 216 insertions(+) diff --git a/store/mockstore/mocktikv/mvcc.go b/store/mockstore/mocktikv/mvcc.go index 980b9ec1a6b4d..626d1d85475d0 100644 --- a/store/mockstore/mocktikv/mvcc.go +++ b/store/mockstore/mocktikv/mvcc.go @@ -439,6 +439,7 @@ type RawKV interface { RawGet(key []byte) []byte RawScan(startKey, endKey []byte, limit int) []Pair RawPut(key, value []byte) + RawBatchPut(keys, values [][]byte) RawDelete(key []byte) RawDeleteRange(startKey, endKey []byte) } diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index 1ceb29e496d7c..e773dd144c98e 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -893,6 +893,22 @@ func (mvcc *MVCCLevelDB) RawPut(key, value []byte) { terror.Log(mvcc.db.Put(key, value, nil)) } +// RawBatchPut implements the RawKV interface +func (mvcc *MVCCLevelDB) RawBatchPut(keys, values [][]byte) { + mvcc.mu.Lock() + defer mvcc.mu.Unlock() + + batch := &leveldb.Batch{} + for i, key := range keys { + value := values[i] + if value == nil { + value = []byte{} + } + batch.Put(key, value) + } + terror.Log(mvcc.db.Write(batch, nil)) +} + // RawGet implements the RawKV interface. func (mvcc *MVCCLevelDB) RawGet(key []byte) []byte { mvcc.mu.Lock() diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index a05ee2e473d73..1bbafa40e535c 100644 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -388,6 +388,23 @@ func (h *rpcHandler) handleKvRawPut(req *kvrpcpb.RawPutRequest) *kvrpcpb.RawPutR return &kvrpcpb.RawPutResponse{} } +func (h *rpcHandler) handleKvRawBatchPut(req *kvrpcpb.RawBatchPutRequest) *kvrpcpb.RawBatchPutResponse { + rawKV, ok := h.mvccStore.(RawKV) + if !ok { + return &kvrpcpb.RawBatchPutResponse{ + Error: "not implemented", + } + } + keys := make([][]byte, 0, len(req.Pairs)) + values := make([][]byte, 0, len(req.Pairs)) + for _, pair := range req.Pairs { + keys = append(keys, pair.Key) + values = append(values, pair.Value) + } + rawKV.RawBatchPut(keys, values) + return &kvrpcpb.RawBatchPutResponse{} +} + func (h *rpcHandler) handleKvRawDelete(req *kvrpcpb.RawDeleteRequest) *kvrpcpb.RawDeleteResponse { rawKV, ok := h.mvccStore.(RawKV) if !ok { @@ -615,6 +632,13 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R return resp, nil } resp.RawPut = handler.handleKvRawPut(r) + case tikvrpc.CmdRawBatchPut: + r := req.RawBatchPut + if err := handler.checkRequest(reqCtx, r.Size()); err != nil { + resp.RawBatchPut = &kvrpcpb.RawBatchPutResponse{RegionError: err} + return resp, nil + } + resp.RawBatchPut = handler.handleKvRawBatchPut(r) case tikvrpc.CmdRawDelete: r := req.RawDelete if err := handler.checkRequest(reqCtx, r.Size()); err != nil { diff --git a/store/tikv/rawkv.go b/store/tikv/rawkv.go index dad5426d3d91c..322bbf4518af3 100644 --- a/store/tikv/rawkv.go +++ b/store/tikv/rawkv.go @@ -23,16 +23,20 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/store/tikv/tikvrpc" + "github.com/pingcap/tidb/util/goroutine_pool" "golang.org/x/net/context" ) var ( + rawKVClientGP = gp.New(3 * time.Minute) // MaxRawKVScanLimit is the maximum scan limit for rawkv Scan. MaxRawKVScanLimit = 10240 // ErrMaxScanLimitExceeded is returned when the limit for rawkv Scan is to large. ErrMaxScanLimitExceeded = errors.New("limit should be less than MaxRawKVScanLimit") ) +const rawBatchPutSize = 16 * 1024 + // RawKVClient is a client of TiKV server which is used as a key-value storage, // only GET/PUT/DELETE commands are supported. type RawKVClient struct { @@ -131,6 +135,26 @@ func (c *RawKVClient) Put(key, value []byte) error { return nil } +// BatchPut stores key-value pairs to TiKV. +func (c *RawKVClient) BatchPut(keys, values [][]byte) error { + start := time.Now() + defer func() { + metrics.TiKVRawkvCmdHistogram.WithLabelValues("batch_put").Observe(time.Since(start).Seconds()) + }() + + if len(keys) != len(values) { + return errors.New("the len of keys is not equal to the len of values") + } + for _, value := range values { + if len(value) == 0 { + return errors.New("empty value is not supported") + } + } + bo := NewBackoffer(context.Background(), rawkvMaxBackoff) + err := c.sendBatchPut(bo, keys, values) + return errors.Trace(err) +} + // Delete deletes a key-value pair from TiKV. func (c *RawKVClient) Delete(key []byte) error { start := time.Now() @@ -298,3 +322,110 @@ func (c *RawKVClient) sendDeleteRangeReq(startKey []byte, endKey []byte) (*tikvr return resp, actualEndKey, nil } } + +func (c *RawKVClient) sendBatchPut(bo *Backoffer, keys, values [][]byte) error { + keyToValue := make(map[string][]byte) + for i, key := range keys { + keyToValue[string(key)] = values[i] + } + groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys) + if err != nil { + return errors.Trace(err) + } + var batches []batch + // split the keys by size and RegionVerID + for regionID, groupKeys := range groups { + batches = appendBatches(batches, regionID, groupKeys, keyToValue, rawBatchPutSize) + } + bo, cancel := bo.Fork() + ch := make(chan error, len(batches)) + for _, batch := range batches { + batch1 := batch + rawKVClientGP.Go(func() { + singleBatchBackoffer, singleBatchCancel := bo.Fork() + defer singleBatchCancel() + ch <- c.doBatchPut(singleBatchBackoffer, batch1) + }) + } + + for i := 0; i < len(batches); i++ { + if e := <-ch; e != nil { + cancel() + // catch the first error + if err == nil { + err = e + } + } + } + return errors.Trace(err) +} + +func appendBatches(batches []batch, regionID RegionVerID, groupKeys [][]byte, keyToValue map[string][]byte, limit int) []batch { + var start, size int + var keys, values [][]byte + for start = 0; start < len(groupKeys); start++ { + if size >= limit { + batches = append(batches, batch{regionID: regionID, keys: keys, values: values}) + keys = make([][]byte, 0) + values = make([][]byte, 0) + size = 0 + } + key := groupKeys[start] + value := keyToValue[string(key)] + keys = append(keys, key) + values = append(values, value) + size += len(key) + size += len(value) + } + if len(keys) != 0 { + batches = append(batches, batch{regionID: regionID, keys: keys, values: values}) + } + return batches +} + +func (c *RawKVClient) doBatchPut(bo *Backoffer, batch batch) error { + kvPair := make([]*kvrpcpb.KvPair, 0, len(batch.keys)) + for i, key := range batch.keys { + kvPair = append(kvPair, &kvrpcpb.KvPair{Key: key, Value: batch.values[i]}) + } + + req := &tikvrpc.Request{ + Type: tikvrpc.CmdRawBatchPut, + RawBatchPut: &kvrpcpb.RawBatchPutRequest{ + Pairs: kvPair, + }, + } + + sender := NewRegionRequestSender(c.regionCache, c.rpcClient) + resp, err := sender.SendReq(bo, req, batch.regionID, readTimeoutShort) + if err != nil { + return errors.Trace(err) + } + regionErr, err := resp.GetRegionError() + if err != nil { + return errors.Trace(err) + } + if regionErr != nil { + err := bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) + if err != nil { + return errors.Trace(err) + } + // recursive call + return c.sendBatchPut(bo, batch.keys, batch.values) + } + + cmdResp := resp.RawBatchPut + if cmdResp == nil { + return errors.Trace(ErrBodyMissing) + } + if cmdResp.GetError() != "" { + return errors.New(cmdResp.GetError()) + } + return nil +} + +type batch struct { + regionID RegionVerID + keys [][]byte + values [][]byte +} diff --git a/store/tikv/rawkv_test.go b/store/tikv/rawkv_test.go index 58a1113d8031c..638a396af07a7 100644 --- a/store/tikv/rawkv_test.go +++ b/store/tikv/rawkv_test.go @@ -15,6 +15,7 @@ package tikv import ( "bytes" + "fmt" . "github.com/pingcap/check" "github.com/pingcap/tidb/store/mockstore/mocktikv" @@ -66,6 +67,11 @@ func (s *testRawKVSuite) mustPut(c *C, key, value []byte) { c.Assert(err, IsNil) } +func (s *testRawKVSuite) mustBatchPut(c *C, keys, values [][]byte) { + err := s.client.BatchPut(keys, values) + c.Assert(err, IsNil) +} + func (s *testRawKVSuite) mustDelete(c *C, key []byte) { err := s.client.Delete(key) c.Assert(err, IsNil) @@ -126,6 +132,29 @@ func (s *testRawKVSuite) TestSimple(c *C) { c.Assert(err, NotNil) } +func (s *testRawKVSuite) TestBatchPut(c *C) { + testNum := 0 + size := 0 + var testKeys [][]byte + var testValues [][]byte + for i := 0; size/rawBatchPutSize < 4; i++ { + key := fmt.Sprint("key", i) + size += len(key) + testKeys = append(testKeys, []byte(key)) + value := fmt.Sprint("value", i) + size += len(value) + testValues = append(testValues, []byte(value)) + s.mustNotExist(c, []byte(key)) + testNum = i + } + err := s.split(c, "", fmt.Sprint("key", testNum/2)) + c.Assert(err, IsNil) + s.mustBatchPut(c, testKeys, testValues) + for i := 0; i < testNum; i++ { + s.mustGet(c, testKeys[i], testValues[i]) + } +} + func (s *testRawKVSuite) TestSplit(c *C) { s.mustPut(c, []byte("k1"), []byte("v1")) s.mustPut(c, []byte("k3"), []byte("v3")) diff --git a/store/tikv/tikvrpc/tikvrpc.go b/store/tikv/tikvrpc/tikvrpc.go index 6862c4b0f892e..3e12dfd58c3a9 100644 --- a/store/tikv/tikvrpc/tikvrpc.go +++ b/store/tikv/tikvrpc/tikvrpc.go @@ -46,6 +46,7 @@ const ( CmdRawGet CmdType = 256 + iota CmdRawPut + CmdRawBatchPut CmdRawDelete CmdRawDeleteRange CmdRawScan @@ -86,6 +87,8 @@ func (t CmdType) String() string { return "RawGet" case CmdRawPut: return "RawPut" + case CmdRawBatchPut: + return "RawBatchPut" case CmdRawDelete: return "RawDelete" case CmdRawDeleteRange: @@ -123,6 +126,7 @@ type Request struct { DeleteRange *kvrpcpb.DeleteRangeRequest RawGet *kvrpcpb.RawGetRequest RawPut *kvrpcpb.RawPutRequest + RawBatchPut *kvrpcpb.RawBatchPutRequest RawDelete *kvrpcpb.RawDeleteRequest RawDeleteRange *kvrpcpb.RawDeleteRangeRequest RawScan *kvrpcpb.RawScanRequest @@ -148,6 +152,7 @@ type Response struct { DeleteRange *kvrpcpb.DeleteRangeResponse RawGet *kvrpcpb.RawGetResponse RawPut *kvrpcpb.RawPutResponse + RawBatchPut *kvrpcpb.RawBatchPutResponse RawDelete *kvrpcpb.RawDeleteResponse RawDeleteRange *kvrpcpb.RawDeleteRangeResponse RawScan *kvrpcpb.RawScanResponse @@ -202,6 +207,8 @@ func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error { req.RawGet.Context = ctx case CmdRawPut: req.RawPut.Context = ctx + case CmdRawBatchPut: + req.RawBatchPut.Context = ctx case CmdRawDelete: req.RawDelete.Context = ctx case CmdRawDeleteRange: @@ -282,6 +289,10 @@ func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) { resp.RawPut = &kvrpcpb.RawPutResponse{ RegionError: e, } + case CmdRawBatchPut: + resp.RawBatchPut = &kvrpcpb.RawBatchPutResponse{ + RegionError: e, + } case CmdRawDelete: resp.RawDelete = &kvrpcpb.RawDeleteResponse{ RegionError: e, @@ -352,6 +363,8 @@ func (resp *Response) GetRegionError() (*errorpb.Error, error) { e = resp.RawGet.GetRegionError() case CmdRawPut: e = resp.RawPut.GetRegionError() + case CmdRawBatchPut: + e = resp.RawBatchPut.GetRegionError() case CmdRawDelete: e = resp.RawDelete.GetRegionError() case CmdRawDeleteRange: @@ -408,6 +421,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp resp.RawGet, err = client.RawGet(ctx, req.RawGet) case CmdRawPut: resp.RawPut, err = client.RawPut(ctx, req.RawPut) + case CmdRawBatchPut: + resp.RawBatchPut, err = client.RawBatchPut(ctx, req.RawBatchPut) case CmdRawDelete: resp.RawDelete, err = client.RawDelete(ctx, req.RawDelete) case CmdRawDeleteRange: From 62461a77b74ec408ce4bebc408231c509f1fe7c0 Mon Sep 17 00:00:00 2001 From: Ewan Chou Date: Wed, 1 Aug 2018 21:53:18 +0800 Subject: [PATCH 11/13] util/chunk: fix TruncateTo again (#7234) When we append null, we simply increment the nullCount, so we need to unset the unused bits in the last bitmap byte. --- util/chunk/chunk.go | 12 +++++++++++- util/chunk/chunk_test.go | 11 ++++++----- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/util/chunk/chunk.go b/util/chunk/chunk.go index 2706d5dc098a8..ccdcdc7219d8c 100644 --- a/util/chunk/chunk.go +++ b/util/chunk/chunk.go @@ -198,7 +198,17 @@ func (c *Chunk) TruncateTo(numRows int) { } } col.length = numRows - col.nullBitmap = col.nullBitmap[:(col.length+7)/8] + bitmapLen := (col.length + 7) / 8 + col.nullBitmap = col.nullBitmap[:bitmapLen] + if col.length%8 != 0 { + // When we append null, we simply increment the nullCount, + // so we need to clear the unused bits in the last bitmap byte. + lastByte := col.nullBitmap[bitmapLen-1] + unusedBitsLen := 8 - uint(col.length%8) + lastByte <<= unusedBitsLen + lastByte >>= unusedBitsLen + col.nullBitmap[bitmapLen-1] = lastByte + } } c.numVirtualRows = numRows } diff --git a/util/chunk/chunk_test.go b/util/chunk/chunk_test.go index a6b8571be4fb4..7c0feea0e18fc 100644 --- a/util/chunk/chunk_test.go +++ b/util/chunk/chunk_test.go @@ -210,21 +210,21 @@ func (s *testChunkSuite) TestTruncateTo(c *check.C) { c.Assert(src.columns[0].length, check.Equals, 12) c.Assert(src.columns[0].nullCount, check.Equals, 6) - c.Assert(string(src.columns[0].nullBitmap), check.Equals, string([]byte{0x55, 0x55})) + c.Assert(string(src.columns[0].nullBitmap), check.Equals, string([]byte{0x55, 0x05})) c.Assert(len(src.columns[0].offsets), check.Equals, 0) c.Assert(len(src.columns[0].data), check.Equals, 4*12) c.Assert(len(src.columns[0].elemBuf), check.Equals, 4) c.Assert(src.columns[1].length, check.Equals, 12) c.Assert(src.columns[1].nullCount, check.Equals, 6) - c.Assert(string(src.columns[0].nullBitmap), check.Equals, string([]byte{0x55, 0x55})) + c.Assert(string(src.columns[0].nullBitmap), check.Equals, string([]byte{0x55, 0x05})) c.Assert(string(src.columns[1].offsets), check.Equals, string([]int32{0, 3, 3, 6, 6, 9, 9, 12, 12, 15, 15, 18, 18})) c.Assert(string(src.columns[1].data), check.Equals, "abcabcabcabcabcabc") c.Assert(len(src.columns[1].elemBuf), check.Equals, 0) c.Assert(src.columns[2].length, check.Equals, 12) c.Assert(src.columns[2].nullCount, check.Equals, 6) - c.Assert(string(src.columns[0].nullBitmap), check.Equals, string([]byte{0x55, 0x55})) + c.Assert(string(src.columns[0].nullBitmap), check.Equals, string([]byte{0x55, 0x05})) c.Assert(len(src.columns[2].offsets), check.Equals, 13) c.Assert(len(src.columns[2].data), check.Equals, 150) c.Assert(len(src.columns[2].elemBuf), check.Equals, 0) @@ -236,9 +236,10 @@ func (s *testChunkSuite) TestTruncateTo(c *check.C) { } chk := NewChunkWithCapacity(fieldTypes[:1], 1) chk.AppendFloat32(0, 1.0) - chk.TruncateTo(0) + chk.AppendFloat32(0, 1.0) + chk.TruncateTo(1) chk.AppendNull(0) - c.Assert(chk.GetRow(0).IsNull(0), check.IsTrue) + c.Assert(chk.GetRow(1).IsNull(0), check.IsTrue) } // newChunk creates a new chunk and initialize columns with element length. From 5e7aa1d97d6459843949ae3587c9b5ac6661bb37 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 1 Aug 2018 22:48:33 +0800 Subject: [PATCH 12/13] infoschema,executor: add PROCESSLIST table to INFORMMATION_SCHEMA database (#7236) --- executor/aggregate_test.go | 2 +- infoschema/infoschema_test.go | 1 + infoschema/tables.go | 44 +++++++++++++++++++++++++++++++++++ 3 files changed, 46 insertions(+), 1 deletion(-) diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index 5dde78e136595..b85914532ab50 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -239,7 +239,7 @@ func (s *testSuite) TestAggregation(c *C) { result = tk.MustQuery("select count(*) from information_schema.columns") // When adding new memory columns in information_schema, please update this variable. - columnCountOfAllInformationSchemaTables := "749" + columnCountOfAllInformationSchemaTables := "757" result.Check(testkit.Rows(columnCountOfAllInformationSchemaTables)) tk.MustExec("drop table if exists t1") diff --git a/infoschema/infoschema_test.go b/infoschema/infoschema_test.go index 5961dc1791264..5952181911302 100644 --- a/infoschema/infoschema_test.go +++ b/infoschema/infoschema_test.go @@ -262,6 +262,7 @@ func (*testSuite) TestInfoTables(c *C) { "OPTIMIZER_TRACE", "TABLESPACES", "COLLATION_CHARACTER_SET_APPLICABILITY", + "PROCESSLIST", } for _, t := range info_tables { tb, err1 := is.TableByName(model.NewCIStr(infoschema.Name), model.NewCIStr(t)) diff --git a/infoschema/tables.go b/infoschema/tables.go index 1475208006ca3..cb6fa7bc4d491 100644 --- a/infoschema/tables.go +++ b/infoschema/tables.go @@ -16,6 +16,7 @@ package infoschema import ( "fmt" "sort" + "time" "github.com/juju/errors" "github.com/pingcap/tidb/kv" @@ -63,6 +64,7 @@ const ( tableOptimizerTrace = "OPTIMIZER_TRACE" tableTableSpaces = "TABLESPACES" tableCollationCharacterSetApplicability = "COLLATION_CHARACTER_SET_APPLICABILITY" + tableProcesslist = "PROCESSLIST" ) type columnInfo struct { @@ -516,6 +518,17 @@ var tableCollationCharacterSetApplicabilityCols = []columnInfo{ {"CHARACTER_SET_NAME", mysql.TypeVarchar, 32, mysql.NotNullFlag, nil, nil}, } +var tableProcesslistCols = []columnInfo{ + {"ID", mysql.TypeLonglong, 21, mysql.NotNullFlag, 0, nil}, + {"USER", mysql.TypeVarchar, 16, mysql.NotNullFlag, "", nil}, + {"HOST", mysql.TypeVarchar, 64, mysql.NotNullFlag, "", nil}, + {"DB", mysql.TypeVarchar, 64, mysql.NotNullFlag, "", nil}, + {"COMMAND", mysql.TypeVarchar, 16, mysql.NotNullFlag, "", nil}, + {"TIME", mysql.TypeLong, 7, mysql.NotNullFlag, 0, nil}, + {"STATE", mysql.TypeVarchar, 7, 0, nil, nil}, + {"Info", mysql.TypeString, 512, 0, nil, nil}, +} + func dataForCharacterSets() (records [][]types.Datum) { records = append(records, types.MakeDatums("ascii", "ascii_general_ci", "US ASCII", 1), @@ -569,6 +582,34 @@ func dataForUserPrivileges(ctx sessionctx.Context) [][]types.Datum { return pm.UserPrivilegesTable() } +func dataForProcesslist(ctx sessionctx.Context) [][]types.Datum { + sm := ctx.GetSessionManager() + if sm == nil { + return nil + } + + var records [][]types.Datum + pl := sm.ShowProcessList() + for _, pi := range pl { + var t uint64 + if len(pi.Info) != 0 { + t = uint64(time.Since(pi.Time) / time.Second) + } + record := types.MakeDatums( + pi.ID, + pi.User, + pi.Host, + pi.DB, + pi.Command, + t, + fmt.Sprintf("%d", pi.State), + pi.Info, + ) + records = append(records, record) + } + return records +} + func dataForEngines() (records [][]types.Datum) { records = append(records, types.MakeDatums("InnoDB", "DEFAULT", "Supports transactions, row-level locking, and foreign keys", "YES", "YES", "YES"), @@ -1079,6 +1120,7 @@ var tableNameToColumns = map[string][]columnInfo{ tableOptimizerTrace: tableOptimizerTraceCols, tableTableSpaces: tableTableSpacesCols, tableCollationCharacterSetApplicability: tableCollationCharacterSetApplicabilityCols, + tableProcesslist: tableProcesslistCols, } func createInfoSchemaTable(handle *Handle, meta *model.TableInfo) *infoschemaTable { @@ -1165,6 +1207,8 @@ func (it *infoschemaTable) getRows(ctx sessionctx.Context, cols []*table.Column) case tableTableSpaces: case tableCollationCharacterSetApplicability: fullRows = dataForCollationCharacterSetApplicability() + case tableProcesslist: + fullRows = dataForProcesslist(ctx) } if err != nil { return nil, errors.Trace(err) From cc1c3be98316e563a0a76d9ea635f3e0ccb580c5 Mon Sep 17 00:00:00 2001 From: Caitin <34535727+CaitinChen@users.noreply.github.com> Date: Thu, 2 Aug 2018 10:10:40 +0800 Subject: [PATCH 13/13] plan: update comment wording (#7238) --- plan/planbuilder.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/plan/planbuilder.go b/plan/planbuilder.go index dc2edec690113..9afb77d83a04f 100644 --- a/plan/planbuilder.go +++ b/plan/planbuilder.go @@ -1110,10 +1110,10 @@ func (b *planBuilder) buildValuesListOfInsert(insert *ast.InsertStmt, insertPlan return } - // If the value_list and col_list is empty and we have generated column, we can still write to this table. - // i.e. insert into t values(); can be executed successfully if t have generated column. + // If value_list and col_list are empty and we have a generated column, we can still write data to this table. + // For example, insert into t values(); can be executed successfully if t has a generated column. if len(insert.Columns) > 0 || len(insert.Lists[0]) > 0 { - // If value_list is not empty or the col_list is not empty, length of value_list should be the same with col_list's. + // If value_list or col_list is not empty, the length of value_list should be the same with that of col_list. if len(insert.Lists[0]) != len(affectedValuesCols) { b.err = ErrWrongValueCountOnRow.GenByArgs(1) return @@ -1129,7 +1129,7 @@ func (b *planBuilder) buildValuesListOfInsert(insert *ast.InsertStmt, insertPlan totalTableCols := insertPlan.Table.Cols() for i, valuesItem := range insert.Lists { - // The length of the all the value_list should be the same. + // The length of all the value_list should be the same. // "insert into t values (), ()" is valid. // "insert into t values (), (1)" is not valid. // "insert into t values (1), ()" is not valid. @@ -1178,7 +1178,7 @@ func (b *planBuilder) buildSelectPlanOfInsert(insert *ast.InsertStmt, insertPlan return } - // Check that the length of select' row is equal to the col list. + // Check to guarantee that the length of the row returned by select is equal to that of affectedValuesCols. if selectPlan.Schema().Len() != len(affectedValuesCols) { b.err = ErrWrongValueCountOnRow.GenByArgs(1) return