Skip to content

Commit

Permalink
Merge branch 'master' of github.com:pingcap/tidb
Browse files Browse the repository at this point in the history
  • Loading branch information
SunRunAway committed Jun 20, 2019
2 parents f1f47c4 + 711582a commit 1f503ff
Show file tree
Hide file tree
Showing 39 changed files with 287 additions and 204 deletions.
4 changes: 4 additions & 0 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/israce"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testutil"
Expand Down Expand Up @@ -1494,6 +1495,9 @@ func (s *testIntegrationSuite5) TestFulltextIndexIgnore(c *C) {
}

func (s *testIntegrationSuite1) TestTreatOldVersionUTF8AsUTF8MB4(c *C) {
if israce.RaceEnabled {
c.Skip("skip race test")
}
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use test")
s.tk.MustExec("drop table if exists t")
Expand Down
7 changes: 7 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/israce"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testutil"
Expand Down Expand Up @@ -2764,6 +2765,9 @@ func (s *testDBSuite4) TestAlterShardRowIDBits(c *C) {
}

func (s *testDBSuite2) TestLockTables(c *C) {
if israce.RaceEnabled {
c.Skip("skip race test")
}
s.tk = testkit.NewTestKit(c, s.store)
tk := s.tk
tk.MustExec("use test")
Expand Down Expand Up @@ -2960,6 +2964,9 @@ func (s *testDBSuite2) TestLockTables(c *C) {

// TestConcurrentLockTables test concurrent lock/unlock tables.
func (s *testDBSuite2) TestConcurrentLockTables(c *C) {
if israce.RaceEnabled {
c.Skip("skip race test")
}
s.tk = testkit.NewTestKit(c, s.store)
tk2 := testkit.NewTestKit(c, s.store)
tk := s.tk
Expand Down
6 changes: 3 additions & 3 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (a *recordSet) Next(ctx context.Context, req *chunk.RecordBatch) error {

// NewRecordBatch create a recordBatch base on top-level executor's newFirstChunk().
func (a *recordSet) NewRecordBatch() *chunk.RecordBatch {
return chunk.NewRecordBatch(a.executor.newFirstChunk())
return chunk.NewRecordBatch(newFirstChunk(a.executor))
}

func (a *recordSet) Close() error {
Expand Down Expand Up @@ -307,7 +307,7 @@ func (c *chunkRowRecordSet) Next(ctx context.Context, req *chunk.RecordBatch) er
}

func (c *chunkRowRecordSet) NewRecordBatch() *chunk.RecordBatch {
return chunk.NewRecordBatch(c.e.newFirstChunk())
return chunk.NewRecordBatch(newFirstChunk(c.e))
}

func (c *chunkRowRecordSet) Close() error {
Expand Down Expand Up @@ -385,7 +385,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlex
a.logAudit()
}()

err = e.Next(ctx, chunk.NewRecordBatch(e.newFirstChunk()))
err = e.Next(ctx, chunk.NewRecordBatch(newFirstChunk(e)))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (e *CheckIndexRangeExec) Open(ctx context.Context) error {
FieldType: *colTypeForHandle,
})

e.srcChunk = e.newFirstChunk()
e.srcChunk = newFirstChunk(e)
dagPB, err := e.buildDAGPB()
if err != nil {
return err
Expand Down
10 changes: 5 additions & 5 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (e *HashAggExec) initForUnparallelExec() {
e.partialResultMap = make(aggPartialResultMapper)
e.groupKeyBuffer = make([]byte, 0, 8)
e.groupValDatums = make([]types.Datum, 0, len(e.groupKeyBuffer))
e.childResult = e.children[0].newFirstChunk()
e.childResult = newFirstChunk(e.children[0])
}

func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
Expand Down Expand Up @@ -275,12 +275,12 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
partialResultsMap: make(aggPartialResultMapper),
groupByItems: e.GroupByItems,
groupValDatums: make([]types.Datum, 0, len(e.GroupByItems)),
chk: e.children[0].newFirstChunk(),
chk: newFirstChunk(e.children[0]),
}

e.partialWorkers[i] = w
e.inputCh <- &HashAggInput{
chk: e.children[0].newFirstChunk(),
chk: newFirstChunk(e.children[0]),
giveBackCh: w.inputCh,
}
}
Expand All @@ -295,7 +295,7 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
outputCh: e.finalOutputCh,
finalResultHolderCh: e.finalInputCh,
rowBuffer: make([]types.Datum, 0, e.Schema().Len()),
mutableRow: chunk.MutRowFromTypes(e.retTypes()),
mutableRow: chunk.MutRowFromTypes(retTypes(e)),
}
}
}
Expand Down Expand Up @@ -772,7 +772,7 @@ func (e *StreamAggExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return err
}
e.childResult = e.children[0].newFirstChunk()
e.childResult = newFirstChunk(e.children[0])
e.executed = false
e.isChildReturnEmpty = true
e.inputIter = chunk.NewIterator4Chunk(e.childResult)
Expand Down
9 changes: 6 additions & 3 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -990,11 +990,14 @@ func (e *AnalyzeFastExec) buildHist(ID int64, collector *statistics.SampleCollec
}
data = append(data, bytes)
}
stats := domain.GetDomain(e.ctx).StatsHandle()
handle := domain.GetDomain(e.ctx).StatsHandle()
tblStats := handle.GetTableStats(e.tblInfo)
rowCount := int64(e.rowCount)
if stats.Lease() > 0 {
rowCount = mathutil.MinInt64(stats.GetTableStats(e.tblInfo).Count, rowCount)
if handle.Lease() > 0 && !tblStats.Pseudo {
rowCount = mathutil.MinInt64(tblStats.Count, rowCount)
}
// Adjust the row count in case the count of `tblStats` is not accurate and too small.
rowCount = mathutil.MaxInt64(rowCount, int64(len(collector.Samples)))
// build CMSketch
var ndv, scaleRatio uint64
collector.CMSketch, ndv, scaleRatio = statistics.NewCMSketchWithTopN(defaultCMSketchDepth, defaultCMSketchWidth, data, 20, uint64(rowCount))
Expand Down
1 change: 1 addition & 0 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ func (s *testFastAnalyze) TestFastAnalyzeRetryRowCount(c *C) {
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int primary key)")
c.Assert(s.dom.StatsHandle().Update(s.dom.InfoSchema()), IsNil)
tk.MustExec("set @@session.tidb_enable_fast_analyze=1")
tk.MustExec("set @@session.tidb_build_stats_concurrency=1")
tblInfo, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
Expand Down
8 changes: 4 additions & 4 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,20 +129,20 @@ func (mds *mockDataSource) Next(ctx context.Context, req *chunk.RecordBatch) err
func buildMockDataSource(opt mockDataSourceParameters) *mockDataSource {
baseExec := newBaseExecutor(opt.ctx, opt.schema, nil)
m := &mockDataSource{baseExec, opt, nil, nil, 0}
types := m.retTypes()
types := retTypes(m)
colData := make([][]interface{}, len(types))
for i := 0; i < len(types); i++ {
colData[i] = m.genColDatums(i)
}

m.genData = make([]*chunk.Chunk, (m.p.rows+m.initCap-1)/m.initCap)
for i := range m.genData {
m.genData[i] = chunk.NewChunkWithCapacity(m.retTypes(), m.ctx.GetSessionVars().MaxChunkSize)
m.genData[i] = chunk.NewChunkWithCapacity(retTypes(m), m.ctx.GetSessionVars().MaxChunkSize)
}

for i := 0; i < m.p.rows; i++ {
idx := i / m.maxChunkSize
retTypes := m.retTypes()
retTypes := retTypes(m)
for colIdx := 0; colIdx < len(types); colIdx++ {
switch retTypes[colIdx].Tp {
case mysql.TypeLong, mysql.TypeLonglong:
Expand Down Expand Up @@ -259,7 +259,7 @@ func benchmarkAggExecWithCase(b *testing.B, casTest *aggTestCase) {
b.StopTimer() // prepare a new agg-executor
aggExec := buildAggExecutor(b, casTest, dataSource)
tmpCtx := context.Background()
chk := aggExec.newFirstChunk()
chk := newFirstChunk(aggExec)
dataSource.prepareChunks()

b.StartTimer()
Expand Down
20 changes: 10 additions & 10 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,8 +872,8 @@ func (b *executorBuilder) buildMergeJoin(v *plannercore.PhysicalMergeJoin) Execu
v.JoinType == plannercore.RightOuterJoin,
defaultValues,
v.OtherConditions,
leftExec.retTypes(),
rightExec.retTypes(),
retTypes(leftExec),
retTypes(rightExec),
),
isOuterJoin: v.JoinType.IsOuterJoin(),
}
Expand Down Expand Up @@ -946,7 +946,7 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo
}

defaultValues := v.DefaultValues
lhsTypes, rhsTypes := leftExec.retTypes(), rightExec.retTypes()
lhsTypes, rhsTypes := retTypes(leftExec), retTypes(rightExec)
if v.InnerChildIdx == 0 {
if len(v.LeftConditions) > 0 {
b.err = errors.Annotate(ErrBuildExecutor, "join's inner condition should be empty")
Expand Down Expand Up @@ -1020,7 +1020,7 @@ func (b *executorBuilder) buildHashAgg(v *plannercore.PhysicalHashAgg) Executor
if len(v.GroupByItems) != 0 || aggregation.IsAllFirstRow(v.AggFuncs) {
e.defaultVal = nil
} else {
e.defaultVal = chunk.NewChunkWithCapacity(e.retTypes(), 1)
e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1)
}
for _, aggDesc := range v.AggFuncs {
if aggDesc.HasDistinct {
Expand Down Expand Up @@ -1079,7 +1079,7 @@ func (b *executorBuilder) buildStreamAgg(v *plannercore.PhysicalStreamAgg) Execu
if len(v.GroupByItems) != 0 || aggregation.IsAllFirstRow(v.AggFuncs) {
e.defaultVal = nil
} else {
e.defaultVal = chunk.NewChunkWithCapacity(e.retTypes(), 1)
e.defaultVal = chunk.NewChunkWithCapacity(retTypes(e), 1)
}
for i, aggDesc := range v.AggFuncs {
aggFunc := aggfuncs.Build(b.ctx, aggDesc, i)
Expand Down Expand Up @@ -1220,7 +1220,7 @@ func (b *executorBuilder) buildApply(v *plannercore.PhysicalApply) *NestedLoopAp
defaultValues = make([]types.Datum, v.Children()[v.InnerChildIdx].Schema().Len())
}
tupleJoiner := newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0,
defaultValues, otherConditions, leftChild.retTypes(), rightChild.retTypes())
defaultValues, otherConditions, retTypes(leftChild), retTypes(rightChild))
outerExec, innerExec := leftChild, rightChild
outerFilter, innerFilter := v.LeftConditions, v.RightConditions
if v.InnerChildIdx == 0 {
Expand Down Expand Up @@ -1703,7 +1703,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin)
if b.err != nil {
return nil
}
outerTypes := outerExec.retTypes()
outerTypes := retTypes(outerExec)
innerPlan := v.Children()[1-v.OuterIndex]
innerTypes := make([]*types.FieldType, innerPlan.Schema().Len())
for i, col := range innerPlan.Schema().Columns {
Expand Down Expand Up @@ -1761,7 +1761,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin)
innerKeyCols[i] = v.InnerJoinKeys[i].Index
}
e.innerCtx.keyCols = innerKeyCols
e.joinResult = e.newFirstChunk()
e.joinResult = newFirstChunk(e)
executorCounterIndexLookUpJoin.Inc()
return e
}
Expand Down Expand Up @@ -2015,7 +2015,7 @@ func (builder *dataReaderBuilder) buildUnionScanForIndexJoin(ctx context.Context
return nil, err
}
us := e.(*UnionScanExec)
us.snapshotChunkBuffer = us.newFirstChunk()
us.snapshotChunkBuffer = newFirstChunk(us)
return us, nil
}

Expand Down Expand Up @@ -2050,7 +2050,7 @@ func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Contex
return nil, err
}
e.resultHandler = &tableResultHandler{}
result, err := builder.SelectResult(ctx, builder.ctx, kvReq, e.retTypes(), e.feedback, getPhysicalPlanIDs(e.plans))
result, err := builder.SelectResult(ctx, builder.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans))
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {
// If tidb_batch_delete is ON and not in a transaction, we could use BatchDelete mode.
batchDelete := e.ctx.GetSessionVars().BatchDelete && !e.ctx.GetSessionVars().InTxn()
batchDMLSize := e.ctx.GetSessionVars().DMLBatchSize
fields := e.children[0].retTypes()
chk := e.children[0].newFirstChunk()
fields := retTypes(e.children[0])
chk := newFirstChunk(e.children[0])
for {
iter := chunk.NewIterator4Chunk(chk)

Expand Down Expand Up @@ -183,8 +183,8 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error {
e.initialMultiTableTblMap()
colPosInfos := e.getColPosInfos(e.children[0].Schema())
tblRowMap := make(tableRowMapType)
fields := e.children[0].retTypes()
chk := e.children[0].newFirstChunk()
fields := retTypes(e.children[0])
chk := newFirstChunk(e.children[0])
for {
iter := chunk.NewIterator4Chunk(chk)
err := e.children[0].Next(ctx, chunk.NewRecordBatch(chk))
Expand Down
15 changes: 13 additions & 2 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
e.feedback.Invalidate()
return err
}
e.result, err = e.SelectResult(ctx, e.ctx, kvReq, e.retTypes(), e.feedback, getPhysicalPlanIDs(e.plans))
e.result, err = e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans))
if err != nil {
e.feedback.Invalidate()
return err
Expand Down Expand Up @@ -794,7 +794,7 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er
handleCnt := len(task.handles)
task.rows = make([]chunk.Row, 0, handleCnt)
for {
chk := tableReader.newFirstChunk()
chk := newFirstChunk(tableReader)
err = tableReader.Next(ctx, chunk.NewRecordBatch(chk))
if err != nil {
logutil.Logger(ctx).Error("table reader fetch next chunk failed", zap.Error(err))
Expand Down Expand Up @@ -839,6 +839,17 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er
}

if len(w.idxLookup.tblPlans) == 1 {
obtainedHandlesMap := make(map[int64]struct{}, len(task.rows))
for _, row := range task.rows {
handle := row.GetInt64(w.handleIdx)
obtainedHandlesMap[handle] = struct{}{}
}

logutil.Logger(ctx).Error("inconsistent index handles", zap.String("index", w.idxLookup.index.Name.O),
zap.Int("index_cnt", handleCnt), zap.Int("table_cnt", len(task.rows)),
zap.Int64s("missing_handles", GetLackHandles(task.handles, obtainedHandlesMap)),
zap.Int64s("total_handles", task.handles))

// table scan in double read can never has conditions according to convertToIndexScan.
// if this table scan has no condition, the number of rows it returns must equal to the length of handles.
return errors.Errorf("inconsistent index %s handle count %d isn't equal to value count %d",
Expand Down
Loading

0 comments on commit 1f503ff

Please sign in to comment.