Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: add several sql hint related to session variables #11809

Merged
merged 15 commits into from
Sep 17, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
builder.Request.IsolationLevel = builder.getIsolationLevel()
builder.Request.NotFillCache = sv.StmtCtx.NotFillCache
builder.Request.Priority = builder.getKVPriority(sv)
builder.Request.ReplicaRead = sv.ReplicaRead
builder.Request.ReplicaRead = sv.GetReplicaRead()
return builder
}

Expand Down
2 changes: 1 addition & 1 deletion distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ func (s *testSuite) TestRequestBuilder6(c *C) {

func (s *testSuite) TestRequestBuilder7(c *C) {
vars := variable.NewSessionVars()
vars.ReplicaRead = kv.ReplicaReadFollower
vars.SetReplicaRead(kv.ReplicaReadFollower)

concurrency := 10

Expand Down
6 changes: 3 additions & 3 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ func (e *AnalyzeFastExec) getSampRegionsRowCount(bo *tikv.Backoffer, needRebuild
var resp *tikvrpc.Response
var rpcCtx *tikv.RPCContext
// we always use the first follower when follower read is enabled
rpcCtx, *err = e.cache.GetRPCContext(bo, loc.Region, e.ctx.GetSessionVars().ReplicaRead, 0)
rpcCtx, *err = e.cache.GetRPCContext(bo, loc.Region, e.ctx.GetSessionVars().GetReplicaRead(), 0)
if *err != nil {
return
}
Expand Down Expand Up @@ -925,7 +925,7 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err
if err != nil {
return 0, err
}
if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() {
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
for _, t := range e.scanTasks {
Expand All @@ -949,7 +949,7 @@ func (e *AnalyzeFastExec) handleSampTasks(bo *tikv.Backoffer, workID int, err *e
if *err != nil {
return
}
if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() {
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
rander := rand.New(rand.NewSource(e.randSeed + int64(workID)))
Expand Down
2 changes: 1 addition & 1 deletion executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (s *testSuite1) TestAnalyzeReplicaReadFollower(c *C) {
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
ctx := tk.Se.(sessionctx.Context)
ctx.GetSessionVars().ReplicaRead = kv.ReplicaReadFollower
ctx.GetSessionVars().SetReplicaRead(kv.ReplicaReadFollower)
tk.MustExec("analyze table t")
}

Expand Down
84 changes: 83 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1366,13 +1366,92 @@ func (e *UnionExec) Close() error {
return e.baseExecutor.Close()
}

func extractStmtHintsFromStmtNode(stmtNode ast.StmtNode) (stmtHints stmtctx.StmtHints, warns []error) {
var hints []*ast.TableOptimizerHint
switch x := stmtNode.(type) {
case *ast.SelectStmt:
hints = x.TableHints
case *ast.UpdateStmt:
hints = x.TableHints
case *ast.DeleteStmt:
hints = x.TableHints
// TODO: support hint for InsertStmt
default:
return
}
var memoryQuotaHintList, noIndexMergeHintList, useToJAHintList, readReplicaHintList []*ast.TableOptimizerHint
for _, hint := range hints {
switch hint.HintName.L {
case "memory_quota":
memoryQuotaHintList = append(memoryQuotaHintList, hint)
case "no_index_merge":
noIndexMergeHintList = append(noIndexMergeHintList, hint)
case "use_toja":
useToJAHintList = append(useToJAHintList, hint)
case "read_consistent_replica":
readReplicaHintList = append(readReplicaHintList, hint)
}
}
// Handle MEMORY_QUOTA
if len(memoryQuotaHintList) != 0 {
if len(memoryQuotaHintList) > 1 {
warn := errors.New("There are multiple MEMORY_QUOTA hints, only the last one will take effect")
foreyes marked this conversation as resolved.
Show resolved Hide resolved
warns = append(warns, warn)
}
hint := memoryQuotaHintList[len(memoryQuotaHintList)-1]
// Executor use MemoryQuota < 0 to indicate no memory limit, here use it to handle hint syntax error.
if hint.MemoryQuota < 0 {
foreyes marked this conversation as resolved.
Show resolved Hide resolved
warn := errors.New("There are some syntax error in MEMORY_QUOTA hint, correct usage: MEMORY_QUOTA(10 M) or MEMORY_QUOTA(10 G)")
warns = append(warns, warn)
} else {
stmtHints.HasMemQuotaHint = true
stmtHints.MemQuotaQuery = int64(hint.MemoryQuota)
}
}
// Handle USE_TOJA
if len(useToJAHintList) != 0 {
if len(useToJAHintList) > 1 {
warn := errors.New("There are multiple USE_TOJA hints, only the last one will take effect")
warns = append(warns, warn)
}
hint := useToJAHintList[len(useToJAHintList)-1]
stmtHints.HasAllowInSubqToJoinAndAggHint = true
stmtHints.AllowInSubqToJoinAndAgg = hint.HintFlag
}
// Handle NO_INDEX_MERGE
if len(noIndexMergeHintList) != 0 {
if len(noIndexMergeHintList) > 1 {
warn := errors.New("There are multiple NO_INDEX_MERGE hints, only the last one will take effect")
warns = append(warns, warn)
}
stmtHints.HasEnableIndexMergeHint = true
stmtHints.EnableIndexMerge = false
}
// Handle READ_CONSISTENT_REPLICA
if len(readReplicaHintList) != 0 {
if len(readReplicaHintList) > 1 {
warn := errors.New("There are multiple READ_CONSISTENT_REPLICA hints, only the last one will take effect")
warns = append(warns, warn)
}
stmtHints.HasReplicaReadHint = true
stmtHints.ReplicaRead = byte(kv.ReplicaReadFollower)
}
return
}

// ResetContextOfStmt resets the StmtContext and session variables.
// Before every execution, we must clear statement context.
func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
stmtHints, hintWarns := extractStmtHintsFromStmtNode(s)
vars := ctx.GetSessionVars()
memQuota := vars.MemQuotaQuery
if stmtHints.HasMemQuotaHint {
memQuota = stmtHints.MemQuotaQuery
}
sc := &stmtctx.StatementContext{
StmtHints: stmtHints,
TimeZone: vars.Location(),
MemTracker: memory.NewTracker(stringutil.MemoizeStr(s.Text), vars.MemQuotaQuery),
MemTracker: memory.NewTracker(stringutil.MemoizeStr(s.Text), memQuota),
}
switch config.GetGlobalConfig().OOMAction {
case config.OOMActionCancel:
Expand Down Expand Up @@ -1504,5 +1583,8 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
return err
}
vars.StmtCtx = sc
for _, warn := range hintWarns {
vars.StmtCtx.AppendWarning(warn)
}
return
}
2 changes: 1 addition & 1 deletion executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if err != nil {
return err
}
if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() {
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
if e.idxInfo != nil {
Expand Down
2 changes: 1 addition & 1 deletion planner/core/expression_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ func (er *expressionRewriter) handleInSubquery(ctx context.Context, v *ast.Patte
// and has no correlated column from the current level plan(if the correlated column is from upper level,
// we can treat it as constant, because the upper LogicalApply cannot be eliminated since current node is a join node),
// and don't need to append a scalar value, we can rewrite it to inner join.
if er.sctx.GetSessionVars().AllowInSubqToJoinAndAgg && !v.Not && !asScalar && len(extractCorColumnsBySchema(np, er.p.Schema())) == 0 {
if er.sctx.GetSessionVars().GetAllowInSubqToJoinAndAgg() && !v.Not && !asScalar && len(extractCorColumnsBySchema(np, er.p.Schema())) == 0 {
// We need to try to eliminate the agg and the projection produced by this operation.
er.b.optFlag |= flagEliminateAgg
er.b.optFlag |= flagEliminateProjection
Expand Down
2 changes: 1 addition & 1 deletion planner/core/indexmerge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (s *testIndexMergeSuite) TestIndexMergePathGenerateion(c *C) {
lp = lp.Children()[0]
}
}
ds.ctx.GetSessionVars().EnableIndexMerge = true
ds.ctx.GetSessionVars().SetEnableIndexMerge(true)
idxMergeStartIndex := len(ds.possibleAccessPaths)
_, err = lp.recursiveDeriveStats()
c.Assert(err, IsNil)
Expand Down
2 changes: 1 addition & 1 deletion planner/core/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (ds *DataSource) DeriveStats(childStats []*property.StatsInfo) (*property.S
}
}
// Consider the IndexMergePath. Now, we just generate `IndexMergePath` in DNF case.
if len(ds.pushedDownConds) > 0 && len(ds.possibleAccessPaths) > 1 && ds.ctx.GetSessionVars().EnableIndexMerge {
if len(ds.pushedDownConds) > 0 && len(ds.possibleAccessPaths) > 1 && ds.ctx.GetSessionVars().GetEnableIndexMerge() {
needConsiderIndexMerge := true
for i := 1; i < len(ds.possibleAccessPaths); i++ {
if len(ds.possibleAccessPaths[i].accessConds) != 0 {
Expand Down
4 changes: 2 additions & 2 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1202,7 +1202,7 @@ func (s *session) Txn(active bool) (kv.Transaction, error) {
s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true)
}
s.sessionVars.TxnCtx.CouldRetry = s.isTxnRetryable()
if s.sessionVars.ReplicaRead.IsFollowerRead() {
if s.sessionVars.GetReplicaRead().IsFollowerRead() {
s.txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
}
Expand Down Expand Up @@ -1265,7 +1265,7 @@ func (s *session) NewTxn(ctx context.Context) error {
}
txn.SetCap(s.getMembufCap())
txn.SetVars(s.sessionVars.KVVars)
if s.GetSessionVars().ReplicaRead.IsFollowerRead() {
if s.GetSessionVars().GetReplicaRead().IsFollowerRead() {
txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
s.txn.changeInvalidToValid(txn)
Expand Down
6 changes: 3 additions & 3 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2802,9 +2802,9 @@ func (s *testSessionSuite) TestReplicaRead(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.Se, err = session.CreateSession4Test(s.store)
c.Assert(err, IsNil)
c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadLeader)
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadLeader)
tk.MustExec("set @@tidb_replica_read = 'follower';")
c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadFollower)
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadFollower)
tk.MustExec("set @@tidb_replica_read = 'leader';")
c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadLeader)
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadLeader)
}
16 changes: 16 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type SQLWarn struct {
// It should be reset before executing a statement.
type StatementContext struct {
// Set the following variables before execution
StmtHints

// IsDDLJobInQueue is used to mark whether the DDL job is put into the queue.
// If IsDDLJobInQueue is true, it means the DDL job is in the queue of storage, and it can be handled by the DDL worker.
Expand Down Expand Up @@ -137,6 +138,21 @@ type StatementContext struct {
Tables []TableEntry
}

// StmtHints are SessionVars related sql hints.
type StmtHints struct {
// Hint flags
HasAllowInSubqToJoinAndAggHint bool
HasEnableIndexMergeHint bool
HasMemQuotaHint bool
HasReplicaReadHint bool

// Hint Information
AllowInSubqToJoinAndAgg bool
EnableIndexMerge bool
MemQuotaQuery int64
ReplicaRead byte
}

// GetNowTsCached getter for nowTs, if not set get now time and cache it
func (sc *StatementContext) GetNowTsCached() time.Time {
if !sc.stmtTimeCached {
Expand Down
73 changes: 57 additions & 16 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,6 @@ type SessionVars struct {
// This variable is currently not recommended to be turned on.
AllowWriteRowID bool

// AllowInSubqToJoinAndAgg can be set to false to forbid rewriting the semi join to inner join with agg.
AllowInSubqToJoinAndAgg bool

// CorrelationThreshold is the guard to enable row count estimation using column order correlation.
CorrelationThreshold float64

Expand Down Expand Up @@ -342,9 +339,6 @@ type SessionVars struct {
// EnableVectorizedExpression enables the vectorized expression evaluation.
EnableVectorizedExpression bool

// EnableIndexMerge enables the generation of IndexMergePath.
EnableIndexMerge bool

// DDLReorgPriority is the operation priority of adding indices.
DDLReorgPriority int

Expand Down Expand Up @@ -403,9 +397,6 @@ type SessionVars struct {
// use noop funcs or not
EnableNoopFuncs bool

// ReplicaRead is used for reading data from replicas, only follower is supported at this time.
ReplicaRead kv.ReplicaReadType

// StartTime is the start time of the last query.
StartTime time.Time

Expand All @@ -417,6 +408,17 @@ type SessionVars struct {

// PrevStmt is used to store the previous executed statement in the current session.
PrevStmt string

// Unexported fields should be accessed and set through interfaces like GetReplicaRead() and SetReplicaRead().

// allowInSubqToJoinAndAgg can be set to false to forbid rewriting the semi join to inner join with agg.
allowInSubqToJoinAndAgg bool

// EnableIndexMerge enables the generation of IndexMergePath.
enableIndexMerge bool

// replicaRead is used for reading data from replicas, only follower is supported at this time.
replicaRead kv.ReplicaReadType
}

// ConnectionInfo present connection used by audit.
Expand Down Expand Up @@ -459,7 +461,7 @@ func NewSessionVars() *SessionVars {
RetryLimit: DefTiDBRetryLimit,
DisableTxnAutoRetry: DefTiDBDisableTxnAutoRetry,
DDLReorgPriority: kv.PriorityLow,
AllowInSubqToJoinAndAgg: DefOptInSubqToJoinAndAgg,
allowInSubqToJoinAndAgg: DefOptInSubqToJoinAndAgg,
CorrelationThreshold: DefOptCorrelationThreshold,
CorrelationExpFactor: DefOptCorrelationExpFactor,
EnableRadixJoin: false,
Expand All @@ -470,9 +472,9 @@ func NewSessionVars() *SessionVars {
SlowQueryFile: config.GetGlobalConfig().Log.SlowQueryFile,
WaitSplitRegionFinish: DefTiDBWaitSplitRegionFinish,
WaitSplitRegionTimeout: DefWaitSplitRegionTimeout,
EnableIndexMerge: false,
enableIndexMerge: false,
EnableNoopFuncs: DefTiDBEnableNoopFuncs,
ReplicaRead: kv.ReplicaReadLeader,
replicaRead: kv.ReplicaReadLeader,
}
vars.Concurrency = Concurrency{
IndexLookupConcurrency: DefIndexLookupConcurrency,
Expand Down Expand Up @@ -512,6 +514,45 @@ func NewSessionVars() *SessionVars {
return vars
}

// GetAllowInSubqToJoinAndAgg get AllowInSubqToJoinAndAgg from sql hints and SessionVars.allowInSubqToJoinAndAgg.
func (s *SessionVars) GetAllowInSubqToJoinAndAgg() bool {
if s.StmtCtx.HasAllowInSubqToJoinAndAggHint {
return s.StmtCtx.AllowInSubqToJoinAndAgg
}
return s.allowInSubqToJoinAndAgg
}

// SetAllowInSubqToJoinAndAgg set SessionVars.allowInSubqToJoinAndAgg.
func (s *SessionVars) SetAllowInSubqToJoinAndAgg(val bool) {
s.allowInSubqToJoinAndAgg = val
}

// GetEnableIndexMerge get EnableIndexMerge from sql hints and SessionVars.enableIndexMerge.
func (s *SessionVars) GetEnableIndexMerge() bool {
if s.StmtCtx.HasEnableIndexMergeHint {
return s.StmtCtx.EnableIndexMerge
}
return s.enableIndexMerge
}

// SetEnableIndexMerge set SessionVars.enableIndexMerge.
func (s *SessionVars) SetEnableIndexMerge(val bool) {
s.enableIndexMerge = val
}

// GetReplicaRead get ReplicaRead from sql hints and SessionVars.replicaRead.
func (s *SessionVars) GetReplicaRead() kv.ReplicaReadType {
if s.StmtCtx.HasReplicaReadHint {
return kv.ReplicaReadType(s.StmtCtx.ReplicaRead)
}
return s.replicaRead
}

// SetReplicaRead set SessionVars.replicaRead.
func (s *SessionVars) SetReplicaRead(val kv.ReplicaReadType) {
s.replicaRead = val
}

// GetWriteStmtBufs get pointer of SessionVars.writeStmtBufs.
func (s *SessionVars) GetWriteStmtBufs() *WriteStmtBufs {
return &s.writeStmtBufs
Expand Down Expand Up @@ -738,7 +779,7 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
case TiDBOptWriteRowID:
s.AllowWriteRowID = TiDBOptOn(val)
case TiDBOptInSubqToJoinAndAgg:
s.AllowInSubqToJoinAndAgg = TiDBOptOn(val)
s.allowInSubqToJoinAndAgg = TiDBOptOn(val)
foreyes marked this conversation as resolved.
Show resolved Hide resolved
case TiDBOptCorrelationThreshold:
s.CorrelationThreshold = tidbOptFloat64(val, DefOptCorrelationThreshold)
case TiDBOptCorrelationExpFactor:
Expand Down Expand Up @@ -848,14 +889,14 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
case TiDBLowResolutionTSO:
s.LowResolutionTSO = TiDBOptOn(val)
case TiDBEnableIndexMerge:
s.EnableIndexMerge = TiDBOptOn(val)
s.enableIndexMerge = TiDBOptOn(val)
foreyes marked this conversation as resolved.
Show resolved Hide resolved
case TiDBEnableNoopFuncs:
s.EnableNoopFuncs = TiDBOptOn(val)
case TiDBReplicaRead:
if strings.EqualFold(val, "follower") {
s.ReplicaRead = kv.ReplicaReadFollower
foreyes marked this conversation as resolved.
Show resolved Hide resolved
s.replicaRead = kv.ReplicaReadFollower
} else if strings.EqualFold(val, "leader") || len(val) == 0 {
s.ReplicaRead = kv.ReplicaReadLeader
foreyes marked this conversation as resolved.
Show resolved Hide resolved
s.replicaRead = kv.ReplicaReadLeader
}
}
s.systems[name] = val
Expand Down
Loading