Skip to content

Commit

Permalink
Merge branch 'master' into cleanup-sysvars
Browse files Browse the repository at this point in the history
  • Loading branch information
morgo authored Jun 10, 2021
2 parents 7ee7b51 + faf139e commit fdbdccd
Show file tree
Hide file tree
Showing 95 changed files with 1,101 additions and 851 deletions.
23 changes: 23 additions & 0 deletions cmd/explaintest/r/cte.result
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,26 @@ create table t1(c1 bigint unsigned);
insert into t1 values(0);
with recursive cte1 as (select c1 - 1 c1 from t1 union all select c1 - 1 c1 from cte1 where c1 != 0) select * from cte1 dt1, cte1 dt2;
Error 1690: BIGINT UNSIGNED value is out of range in '(test.t1.c1 - 1)'
drop table if exists t;
create table t(a int, b int, key (b));
desc with cte as (select * from t) select * from cte;
id estRows task access object operator info
CTEFullScan_9 1.00 root CTE:cte data:CTE_0
CTE_0 1.00 root Non-Recursive CTE
└─TableReader_8(Seed Part) 10000.00 root data:TableFullScan_7
└─TableFullScan_7 10000.00 cop[tikv] table:t keep order:false, stats:pseudo
create SESSION binding for with cte as (select * from t) select * from cte using with cte as (select * from t use index(b)) select * from cte;
desc with cte as (select * from t) select * from cte;
id estRows task access object operator info
CTEFullScan_10 1.00 root CTE:cte data:CTE_0
CTE_0 1.00 root Non-Recursive CTE
└─IndexLookUp_9(Seed Part) 10000.00 root
├─IndexFullScan_7(Build) 10000.00 cop[tikv] table:t, index:b(b) keep order:false, stats:pseudo
└─TableRowIDScan_8(Probe) 10000.00 cop[tikv] table:t keep order:false, stats:pseudo
desc with cte as (select * from t use index()) select * from cte;
id estRows task access object operator info
CTEFullScan_10 1.00 root CTE:cte data:CTE_0
CTE_0 1.00 root Non-Recursive CTE
└─IndexLookUp_9(Seed Part) 10000.00 root
├─IndexFullScan_7(Build) 10000.00 cop[tikv] table:t, index:b(b) keep order:false, stats:pseudo
└─TableRowIDScan_8(Probe) 10000.00 cop[tikv] table:t keep order:false, stats:pseudo
7 changes: 7 additions & 0 deletions cmd/explaintest/t/cte.test
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,10 @@ create table t1(c1 bigint unsigned);
insert into t1 values(0);
--error 1690
with recursive cte1 as (select c1 - 1 c1 from t1 union all select c1 - 1 c1 from cte1 where c1 != 0) select * from cte1 dt1, cte1 dt2;
# case 32
drop table if exists t;
create table t(a int, b int, key (b));
desc with cte as (select * from t) select * from cte;
create SESSION binding for with cte as (select * from t) select * from cte using with cte as (select * from t use index(b)) select * from cte;
desc with cte as (select * from t) select * from cte;
desc with cte as (select * from t use index()) select * from cte;
3 changes: 3 additions & 0 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ cluster-ssl-key = ""
# "plaintext" means encryption is disabled.
spilled-file-encryption-method = "plaintext"

# Security Enhanced Mode (SEM) restricts the "SUPER" privilege and requires fine-grained privileges instead.
enable-sem = false

[status]
# If enable status report HTTP service.
report-status = true
Expand Down
24 changes: 23 additions & 1 deletion ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ func (s *serialTestStateChangeSuite) TestWriteReorgForModifyColumnWithUniqIdx(c

// TestWriteReorgForModifyColumnWithPKIsHandle tests whether the correct columns is used in PhysicalIndexScan's ToPB function.
func (s *serialTestStateChangeSuite) TestWriteReorgForModifyColumnWithPKIsHandle(c *C) {
modifyColumnSQL := "alter table tt change column c cc tinyint unsigned not null default 1 first"
modifyColumnSQL := "alter table tt change column c cc tinyint not null default 1 first"

_, err := s.se.Execute(context.Background(), "use test_db_state")
c.Assert(err, IsNil)
Expand Down Expand Up @@ -867,6 +867,7 @@ func (s *testStateChangeSuiteBase) runTestInSchemaState(c *C, state model.Schema
originalCallback := d.GetHook()
d.(ddl.DDLForTest).SetHook(callback)
_, err = s.se.Execute(context.Background(), alterTableSQL)
fmt.Println(alterTableSQL)
c.Assert(err, IsNil)
c.Assert(errors.ErrorStack(checkErr), Equals, "")
d.(ddl.DDLForTest).SetHook(originalCallback)
Expand Down Expand Up @@ -1754,3 +1755,24 @@ func (s *serialTestStateChangeSuite) TestModifyColumnTypeArgs(c *C) {
c.Assert(changingCol, IsNil)
c.Assert(changingIdxs, IsNil)
}

func (s *testStateChangeSuite) TestWriteReorgForColumnTypeChange(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test_db_state")
tk.MustExec(`CREATE TABLE t_ctc (
a DOUBLE NULL DEFAULT '1.732088511183121',
c char(30) NOT NULL,
KEY idx (a,c)
) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_bin COMMENT='…comment';
`)
defer func() {
tk.MustExec("drop table t_ctc")
}()

sqls := make([]sqlWithErr, 2)
sqls[0] = sqlWithErr{"INSERT INTO t_ctc SET c = 'zr36f7ywjquj1curxh9gyrwnx', a = '1.9897043136824033';", nil}
sqls[1] = sqlWithErr{"DELETE FROM t_ctc;", nil}
dropColumnsSQL := "alter table t_ctc change column a ddd TIME NULL DEFAULT '18:21:32' AFTER c;"
query := &expectQuery{sql: "admin check table t_ctc;", rows: nil}
s.runTestInSchemaState(c, model.StateWriteReorganization, false, dropColumnsSQL, sqls, query)
}
8 changes: 4 additions & 4 deletions ddl/placement_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ PARTITION BY RANGE (c) (

for _, testcase := range testCases {
c.Log(testcase.name)
failpoint.Enable("github.com/pingcap/tidb/store/tikv/config/injectTxnScope",
failpoint.Enable("tikvclient/injectTxnScope",
fmt.Sprintf(`return("%v")`, testcase.zone))
se, err := session.CreateSession4Test(s.store)
c.Check(err, IsNil)
Expand All @@ -518,7 +518,7 @@ PARTITION BY RANGE (c) (
c.Assert(err, NotNil)
c.Assert(err.Error(), Matches, testcase.err.Error())
}
failpoint.Disable("github.com/pingcap/tidb/store/tikv/config/injectTxnScope")
failpoint.Disable("tikvclient/injectTxnScope")
}
}

Expand Down Expand Up @@ -629,8 +629,8 @@ PARTITION BY RANGE (c) (
},
},
}
failpoint.Enable("github.com/pingcap/tidb/store/tikv/config/injectTxnScope", `return("bj")`)
defer failpoint.Disable("github.com/pingcap/tidb/store/tikv/config/injectTxnScope")
failpoint.Enable("tikvclient/injectTxnScope", `return("bj")`)
defer failpoint.Disable("tikvclient/injectTxnScope")
dbInfo := testGetSchemaByName(c, tk.Se, "test")
tk2 := testkit.NewTestKit(c, s.store)
var chkErr error
Expand Down
8 changes: 4 additions & 4 deletions ddl/serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ func (s *testSerialSuite) TestRecoverTableByJobIDFail(c *C) {
hook := &ddl.TestDDLCallback{}
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionRecoverTable {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockCommitError", `return(true)`), IsNil)
c.Assert(failpoint.Enable("tikvclient/mockCommitError", `return(true)`), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/ddl/mockRecoverTableCommitErr", `return(true)`), IsNil)
}
}
Expand All @@ -774,7 +774,7 @@ func (s *testSerialSuite) TestRecoverTableByJobIDFail(c *C) {

// do recover table.
tk.MustExec(fmt.Sprintf("recover table by job %d", jobID))
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/mockCommitError"), IsNil)
c.Assert(failpoint.Disable("tikvclient/mockCommitError"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/ddl/mockRecoverTableCommitErr"), IsNil)

// make sure enable GC after recover table.
Expand Down Expand Up @@ -824,7 +824,7 @@ func (s *testSerialSuite) TestRecoverTableByTableNameFail(c *C) {
hook := &ddl.TestDDLCallback{}
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionRecoverTable {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockCommitError", `return(true)`), IsNil)
c.Assert(failpoint.Enable("tikvclient/mockCommitError", `return(true)`), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/ddl/mockRecoverTableCommitErr", `return(true)`), IsNil)
}
}
Expand All @@ -834,7 +834,7 @@ func (s *testSerialSuite) TestRecoverTableByTableNameFail(c *C) {

// do recover table.
tk.MustExec("recover table t_recover")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/mockCommitError"), IsNil)
c.Assert(failpoint.Disable("tikvclient/mockCommitError"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/ddl/mockRecoverTableCommitErr"), IsNil)

// make sure enable GC after recover table.
Expand Down
8 changes: 8 additions & 0 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/sli"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -341,6 +342,13 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr
if r.rootPlanID <= 0 || r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || callee == "" {
return
}

if copStats.ScanDetail != nil {
readKeys := copStats.ScanDetail.ProcessedKeys
readTime := copStats.TimeDetail.KvReadWallTimeMs.Seconds()
sli.ObserveReadSLI(uint64(readKeys), readTime)
}

if r.stats == nil {
id := r.rootPlanID
r.stats = &selectResultRuntimeStats{
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1381,6 +1381,11 @@ error = '''
Unknown column '%-.192s' in '%-.192s'
'''

["table:1114"]
error = '''
The table '%-.192s' is full
'''

["table:1192"]
error = '''
Can't execute the given command because you have active locked tables or an active transaction
Expand Down
10 changes: 5 additions & 5 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,8 +856,7 @@ func FormatSQL(sql string) stringutil.StringerFunc {
var (
sessionExecuteRunDurationInternal = metrics.SessionExecuteRunDuration.WithLabelValues(metrics.LblInternal)
sessionExecuteRunDurationGeneral = metrics.SessionExecuteRunDuration.WithLabelValues(metrics.LblGeneral)
totalTiFlashQueryFailCounter = metrics.TiFlashQueryTotalCounter.WithLabelValues(metrics.LblError)
totalTiFlashQuerySuccCounter = metrics.TiFlashQueryTotalCounter.WithLabelValues(metrics.LblOK)
totalTiFlashQuerySuccCounter = metrics.TiFlashQueryTotalCounter.WithLabelValues("", metrics.LblOK)
)

// FinishExecuteStmt is used to record some information after `ExecStmt` execution finished:
Expand All @@ -866,7 +865,7 @@ var (
// 3. record execute duration metric.
// 4. update the `PrevStmt` in session variable.
// 5. reset `DurationParse` in session variable.
func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, succ bool, hasMoreResults bool) {
func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults bool) {
sessVars := a.Ctx.GetSessionVars()
execDetail := sessVars.StmtCtx.GetExecDetails()
// Attach commit/lockKeys runtime stats to executor runtime stats.
Expand All @@ -885,14 +884,15 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, succ bool, hasMoreResults boo
// Only record the read keys in write statement which affect row more than 0.
a.Ctx.GetTxnWriteThroughputSLI().AddReadKeys(execDetail.ScanDetail.ProcessedKeys)
}
succ := err == nil
// `LowSlowQuery` and `SummaryStmt` must be called before recording `PrevStmt`.
a.LogSlowQuery(txnTS, succ, hasMoreResults)
a.SummaryStmt(succ)
if sessVars.StmtCtx.IsTiFlash.Load() {
if succ {
totalTiFlashQuerySuccCounter.Inc()
} else {
totalTiFlashQueryFailCounter.Inc()
metrics.TiFlashQueryTotalCounter.WithLabelValues(metrics.ExecuteErrorToLabel(err), metrics.LblError).Inc()
}
}
sessVars.PrevStmt = FormatSQL(a.GetTextToLog())
Expand All @@ -909,7 +909,7 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, succ bool, hasMoreResults boo

// CloseRecordSet will finish the execution of current statement and do some record work
func (a *ExecStmt) CloseRecordSet(txnStartTS uint64, lastErr error) {
a.FinishExecuteStmt(txnStartTS, lastErr == nil, false)
a.FinishExecuteStmt(txnStartTS, lastErr, false)
a.logAudit()
// Detach the Memory and disk tracker for the previous stmtCtx from GlobalMemoryUsageTracker and GlobalDiskUsageTracker
if stmtCtx := a.Ctx.GetSessionVars().StmtCtx; stmtCtx != nil {
Expand Down
3 changes: 2 additions & 1 deletion executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ func (s *testSerialSuite2) TestFastAnalyze4GlobalStats(c *C) {
tk.MustExec("create table test_fast_gstats(a int, b int) PARTITION BY HASH(a) PARTITIONS 2;")
tk.MustExec("insert into test_fast_gstats values(1,1),(3,3),(4,4),(2,2),(5,5);")
err := tk.ExecToErr("analyze table test_fast_gstats;")
c.Assert(err.Error(), Equals, "Fast analyze hasn't reached General Availability and only support analyze version 1 currently.")
c.Assert(err, ErrorMatches, ".*Fast analyze hasn't reached General Availability and only support analyze version 1 currently.*")
}

func (s *testSuite1) TestIssue15993(c *C) {
Expand Down Expand Up @@ -983,6 +983,7 @@ func (s *testSuite1) TestAnalyzeClusteredIndexPrimary(c *C) {
}

func (s *testSuite1) TestAnalyzeFullSamplingOnIndexWithVirtualColumnOrPrefixColumn(c *C) {
c.Skip("unstable, skip it and fix it before 20210624")
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists sampling_index_virtual_col")
Expand Down
4 changes: 2 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4151,8 +4151,8 @@ func fullRangePartition(idxArr []int) bool {
}

func (b *executorBuilder) buildTableSample(v *plannercore.PhysicalTableSample) *TableSampleExecutor {
if err := b.validCanReadTemporaryTable(v.TableInfo.Meta()); err != nil {
b.err = err
if v.TableInfo.Meta().TempTableType != model.TempTableNone {
b.err = errors.New("TABLESAMPLE clause can not be applied to temporary tables")
return nil
}

Expand Down
42 changes: 38 additions & 4 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1491,6 +1491,12 @@ type UnionExec struct {
results []*chunk.Chunk
wg sync.WaitGroup
initialized bool
mu struct {
*sync.Mutex
maxOpenedChildID int
}

childInFlightForTest int32
}

// unionWorkerResult stores the result for a union worker.
Expand All @@ -1510,12 +1516,11 @@ func (e *UnionExec) waitAllFinished() {

// Open implements the Executor Open interface.
func (e *UnionExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return err
}
e.stopFetchData.Store(false)
e.initialized = false
e.finished = make(chan struct{})
e.mu.Mutex = &sync.Mutex{}
e.mu.maxOpenedChildID = -1
return nil
}

Expand Down Expand Up @@ -1561,6 +1566,19 @@ func (e *UnionExec) resultPuller(ctx context.Context, workerID int) {
e.wg.Done()
}()
for childID := range e.childIDChan {
e.mu.Lock()
if childID > e.mu.maxOpenedChildID {
e.mu.maxOpenedChildID = childID
}
e.mu.Unlock()
if err := e.children[childID].Open(ctx); err != nil {
result.err = err
e.stopFetchData.Store(true)
e.resultPool <- result
}
failpoint.Inject("issue21441", func() {
atomic.AddInt32(&e.childInFlightForTest, 1)
})
for {
if e.stopFetchData.Load().(bool) {
return
Expand All @@ -1575,12 +1593,20 @@ func (e *UnionExec) resultPuller(ctx context.Context, workerID int) {
e.resourcePools[workerID] <- result.chk
break
}
failpoint.Inject("issue21441", func() {
if int(atomic.LoadInt32(&e.childInFlightForTest)) > e.concurrency {
panic("the count of child in flight is larger than e.concurrency unexpectedly")
}
})
e.resultPool <- result
if result.err != nil {
e.stopFetchData.Store(true)
return
}
}
failpoint.Inject("issue21441", func() {
atomic.AddInt32(&e.childInFlightForTest, -1)
})
}
}

Expand Down Expand Up @@ -1623,7 +1649,15 @@ func (e *UnionExec) Close() error {
for range e.childIDChan {
}
}
return e.baseExecutor.Close()
// We do not need to acquire the e.mu.Lock since all the resultPuller can be
// promised to exit when reaching here (e.childIDChan been closed).
var firstErr error
for i := 0; i <= e.mu.maxOpenedChildID; i++ {
if err := e.children[i].Close(); err != nil && firstErr == nil {
firstErr = err
}
}
return firstErr
}

// ResetContextOfStmt resets the StmtContext and session variables.
Expand Down
Loading

0 comments on commit fdbdccd

Please sign in to comment.