Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: add several sql hint related to session variables #11809

Merged
merged 15 commits into from
Sep 17, 2019
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
builder.Request.IsolationLevel = builder.getIsolationLevel()
builder.Request.NotFillCache = sv.StmtCtx.NotFillCache
builder.Request.Priority = builder.getKVPriority(sv)
builder.Request.ReplicaRead = sv.ReplicaRead
builder.Request.ReplicaRead = sv.GetReplicaRead()
return builder
}

Expand Down
2 changes: 1 addition & 1 deletion distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ func (s *testSuite) TestRequestBuilder6(c *C) {

func (s *testSuite) TestRequestBuilder7(c *C) {
vars := variable.NewSessionVars()
vars.ReplicaRead = kv.ReplicaReadFollower
vars.SetReplicaRead(kv.ReplicaReadFollower)

concurrency := 10

Expand Down
6 changes: 3 additions & 3 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ func (e *AnalyzeFastExec) getSampRegionsRowCount(bo *tikv.Backoffer, needRebuild
var resp *tikvrpc.Response
var rpcCtx *tikv.RPCContext
// we always use the first follower when follower read is enabled
rpcCtx, *err = e.cache.GetRPCContext(bo, loc.Region, e.ctx.GetSessionVars().ReplicaRead, 0)
rpcCtx, *err = e.cache.GetRPCContext(bo, loc.Region, e.ctx.GetSessionVars().GetReplicaRead(), 0)
if *err != nil {
return
}
Expand Down Expand Up @@ -925,7 +925,7 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err
if err != nil {
return 0, err
}
if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() {
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
for _, t := range e.scanTasks {
Expand All @@ -949,7 +949,7 @@ func (e *AnalyzeFastExec) handleSampTasks(bo *tikv.Backoffer, workID int, err *e
if *err != nil {
return
}
if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() {
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
rander := rand.New(rand.NewSource(e.randSeed + int64(workID)))
Expand Down
2 changes: 1 addition & 1 deletion executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (s *testSuite1) TestAnalyzeReplicaReadFollower(c *C) {
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
ctx := tk.Se.(sessionctx.Context)
ctx.GetSessionVars().ReplicaRead = kv.ReplicaReadFollower
ctx.GetSessionVars().SetReplicaRead(kv.ReplicaReadFollower)
tk.MustExec("analyze table t")
}

Expand Down
93 changes: 92 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1366,13 +1366,101 @@ func (e *UnionExec) Close() error {
return e.baseExecutor.Close()
}

func extractStmtHintsFromStmtNode(stmtNode ast.StmtNode) []*ast.TableOptimizerHint {
switch x := stmtNode.(type) {
case *ast.SelectStmt:
return x.TableHints
case *ast.UpdateStmt:
return x.TableHints
case *ast.DeleteStmt:
return x.TableHints
// TODO: support hint for InsertStmt
case *ast.ExplainStmt:
return extractStmtHintsFromStmtNode(x.Stmt)
default:
return nil
}
}

func handleStmtHints(hints []*ast.TableOptimizerHint) (stmtHints stmtctx.StmtHints, warns []error) {
var memoryQuotaHintList, noIndexMergeHintList, useToJAHintList, readReplicaHintList []*ast.TableOptimizerHint
for _, hint := range hints {
switch hint.HintName.L {
case "memory_quota":
memoryQuotaHintList = append(memoryQuotaHintList, hint)
case "no_index_merge":
noIndexMergeHintList = append(noIndexMergeHintList, hint)
case "use_toja":
useToJAHintList = append(useToJAHintList, hint)
case "read_consistent_replica":
readReplicaHintList = append(readReplicaHintList, hint)
}
}
// Handle MEMORY_QUOTA
if len(memoryQuotaHintList) != 0 {
if len(memoryQuotaHintList) > 1 {
warn := errors.New("There are multiple MEMORY_QUOTA hints, only the last one will take effect")
foreyes marked this conversation as resolved.
Show resolved Hide resolved
warns = append(warns, warn)
}
hint := memoryQuotaHintList[len(memoryQuotaHintList)-1]
// Executor use MemoryQuota <= 0 to indicate no memory limit, here use < 0 to handle hint syntax error.
if hint.MemoryQuota < 0 {
foreyes marked this conversation as resolved.
Show resolved Hide resolved
warn := errors.New("The use of MEMORY_QUOTA hint is invalid, valid usage: MEMORY_QUOTA(10 MB) or MEMORY_QUOTA(10 GB)")
warns = append(warns, warn)
} else {
stmtHints.HasMemQuotaHint = true
stmtHints.MemQuotaQuery = hint.MemoryQuota
if hint.MemoryQuota == 0 {
warn := errors.New("Setting the MEMORY_QUOTA to 0 means no memory limit")
warns = append(warns, warn)
}
}
}
// Handle USE_TOJA
if len(useToJAHintList) != 0 {
if len(useToJAHintList) > 1 {
warn := errors.New("There are multiple USE_TOJA hints, only the last one will take effect")
warns = append(warns, warn)
}
hint := useToJAHintList[len(useToJAHintList)-1]
stmtHints.HasAllowInSubqToJoinAndAggHint = true
stmtHints.AllowInSubqToJoinAndAgg = hint.HintFlag
}
// Handle NO_INDEX_MERGE
if len(noIndexMergeHintList) != 0 {
if len(noIndexMergeHintList) > 1 {
warn := errors.New("There are multiple NO_INDEX_MERGE hints, only the last one will take effect")
warns = append(warns, warn)
}
stmtHints.HasEnableIndexMergeHint = true
stmtHints.EnableIndexMerge = false
}
// Handle READ_CONSISTENT_REPLICA
if len(readReplicaHintList) != 0 {
if len(readReplicaHintList) > 1 {
warn := errors.New("There are multiple READ_CONSISTENT_REPLICA hints, only the last one will take effect")
warns = append(warns, warn)
}
stmtHints.HasReplicaReadHint = true
stmtHints.ReplicaRead = byte(kv.ReplicaReadFollower)
}
return
}

// ResetContextOfStmt resets the StmtContext and session variables.
// Before every execution, we must clear statement context.
func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
hints := extractStmtHintsFromStmtNode(s)
stmtHints, hintWarns := handleStmtHints(hints)
vars := ctx.GetSessionVars()
memQuota := vars.MemQuotaQuery
if stmtHints.HasMemQuotaHint {
memQuota = stmtHints.MemQuotaQuery
}
sc := &stmtctx.StatementContext{
StmtHints: stmtHints,
TimeZone: vars.Location(),
MemTracker: memory.NewTracker(stringutil.MemoizeStr(s.Text), vars.MemQuotaQuery),
MemTracker: memory.NewTracker(stringutil.MemoizeStr(s.Text), memQuota),
}
switch config.GetGlobalConfig().OOMAction {
case config.OOMActionCancel:
Expand Down Expand Up @@ -1504,5 +1592,8 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
return err
}
vars.StmtCtx = sc
for _, warn := range hintWarns {
vars.StmtCtx.AppendWarning(warn)
}
return
}
2 changes: 1 addition & 1 deletion executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if err != nil {
return err
}
if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() {
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
if e.idxInfo != nil {
Expand Down
2 changes: 1 addition & 1 deletion planner/core/expression_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ func (er *expressionRewriter) handleInSubquery(ctx context.Context, v *ast.Patte
// and has no correlated column from the current level plan(if the correlated column is from upper level,
// we can treat it as constant, because the upper LogicalApply cannot be eliminated since current node is a join node),
// and don't need to append a scalar value, we can rewrite it to inner join.
if er.sctx.GetSessionVars().AllowInSubqToJoinAndAgg && !v.Not && !asScalar && len(extractCorColumnsBySchema(np, er.p.Schema())) == 0 {
if er.sctx.GetSessionVars().GetAllowInSubqToJoinAndAgg() && !v.Not && !asScalar && len(extractCorColumnsBySchema(np, er.p.Schema())) == 0 {
// We need to try to eliminate the agg and the projection produced by this operation.
er.b.optFlag |= flagEliminateAgg
er.b.optFlag |= flagEliminateProjection
Expand Down
2 changes: 1 addition & 1 deletion planner/core/indexmerge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (s *testIndexMergeSuite) TestIndexMergePathGenerateion(c *C) {
lp = lp.Children()[0]
}
}
ds.ctx.GetSessionVars().EnableIndexMerge = true
ds.ctx.GetSessionVars().SetEnableIndexMerge(true)
idxMergeStartIndex := len(ds.possibleAccessPaths)
_, err = lp.recursiveDeriveStats()
c.Assert(err, IsNil)
Expand Down
2 changes: 1 addition & 1 deletion planner/core/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (ds *DataSource) DeriveStats(childStats []*property.StatsInfo) (*property.S
}
}
// Consider the IndexMergePath. Now, we just generate `IndexMergePath` in DNF case.
if len(ds.pushedDownConds) > 0 && len(ds.possibleAccessPaths) > 1 && ds.ctx.GetSessionVars().EnableIndexMerge {
if len(ds.pushedDownConds) > 0 && len(ds.possibleAccessPaths) > 1 && ds.ctx.GetSessionVars().GetEnableIndexMerge() {
needConsiderIndexMerge := true
for i := 1; i < len(ds.possibleAccessPaths); i++ {
if len(ds.possibleAccessPaths[i].accessConds) != 0 {
Expand Down
4 changes: 2 additions & 2 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1212,7 +1212,7 @@ func (s *session) Txn(active bool) (kv.Transaction, error) {
s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true)
}
s.sessionVars.TxnCtx.CouldRetry = s.isTxnRetryable()
if s.sessionVars.ReplicaRead.IsFollowerRead() {
if s.sessionVars.GetReplicaRead().IsFollowerRead() {
s.txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
}
Expand Down Expand Up @@ -1275,7 +1275,7 @@ func (s *session) NewTxn(ctx context.Context) error {
}
txn.SetCap(s.getMembufCap())
txn.SetVars(s.sessionVars.KVVars)
if s.GetSessionVars().ReplicaRead.IsFollowerRead() {
if s.GetSessionVars().GetReplicaRead().IsFollowerRead() {
txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
s.txn.changeInvalidToValid(txn)
Expand Down
56 changes: 53 additions & 3 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2808,9 +2808,59 @@ func (s *testSessionSuite) TestReplicaRead(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.Se, err = session.CreateSession4Test(s.store)
c.Assert(err, IsNil)
c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadLeader)
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadLeader)
tk.MustExec("set @@tidb_replica_read = 'follower';")
c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadFollower)
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadFollower)
tk.MustExec("set @@tidb_replica_read = 'leader';")
c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadLeader)
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadLeader)
}

func (s *testSessionSuite) TestStmtHints(c *C) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add some test case with hints in subqueries?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unlikely to support use these hints in subqueries.
These hints need to handle before sql execute and set statement variables, handle them in subquries is too expensive.
Maybe discuss with PM team later.

var err error
tk := testkit.NewTestKit(c, s.store)
tk.Se, err = session.CreateSession4Test(s.store)
c.Assert(err, IsNil)

// Test MEMORY_QUOTA hint
tk.MustExec("select /*+ MEMORY_QUOTA(1 MB) */ 1;")
val := int64(1) * 1024 * 1024
c.Assert(tk.Se.GetSessionVars().StmtCtx.MemTracker.CheckBytesLimit(val), IsTrue)
tk.MustExec("select /*+ MEMORY_QUOTA(1 GB) */ 1;")
val = int64(1) * 1024 * 1024 * 1024
c.Assert(tk.Se.GetSessionVars().StmtCtx.MemTracker.CheckBytesLimit(val), IsTrue)
tk.MustExec("select /*+ MEMORY_QUOTA(1 GB), MEMORY_QUOTA(1 MB) */ 1;")
val = int64(1) * 1024 * 1024
c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May we check the warning msg?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Warning messages need to be dealt with specially later, and there is no need to care too much about them at this stage.

c.Assert(tk.Se.GetSessionVars().StmtCtx.MemTracker.CheckBytesLimit(val), IsTrue)
tk.MustExec("select /*+ MEMORY_QUOTA(0 GB) */ 1;")
val = int64(0)
c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1)
c.Assert(tk.Se.GetSessionVars().StmtCtx.MemTracker.CheckBytesLimit(val), IsTrue)

// Test NO_INDEX_MERGE hint
tk.Se.GetSessionVars().SetEnableIndexMerge(true)
tk.MustExec("select /*+ NO_INDEX_MERGE() */ 1;")
c.Assert(tk.Se.GetSessionVars().GetEnableIndexMerge(), IsFalse)
tk.MustExec("select /*+ NO_INDEX_MERGE(), NO_INDEX_MERGE() */ 1;")
c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1)
c.Assert(tk.Se.GetSessionVars().GetEnableIndexMerge(), IsFalse)

// Test USE_TOJA hint
tk.Se.GetSessionVars().SetAllowInSubqToJoinAndAgg(true)
tk.MustExec("select /*+ USE_TOJA(false) */ 1;")
c.Assert(tk.Se.GetSessionVars().GetAllowInSubqToJoinAndAgg(), IsFalse)
tk.Se.GetSessionVars().SetAllowInSubqToJoinAndAgg(false)
tk.MustExec("select /*+ USE_TOJA(true) */ 1;")
c.Assert(tk.Se.GetSessionVars().GetAllowInSubqToJoinAndAgg(), IsTrue)
tk.MustExec("select /*+ USE_TOJA(false), USE_TOJA(true) */ 1;")
c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1)
c.Assert(tk.Se.GetSessionVars().GetAllowInSubqToJoinAndAgg(), IsTrue)

// Test READ_CONSISTENT_REPLICA hint
tk.Se.GetSessionVars().SetReplicaRead(kv.ReplicaReadLeader)
tk.MustExec("select /*+ READ_CONSISTENT_REPLICA() */ 1;")
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadFollower)
tk.MustExec("select /*+ READ_CONSISTENT_REPLICA(), READ_CONSISTENT_REPLICA() */ 1;")
c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1)
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadFollower)
}
16 changes: 16 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type SQLWarn struct {
// It should be reset before executing a statement.
type StatementContext struct {
// Set the following variables before execution
StmtHints

// IsDDLJobInQueue is used to mark whether the DDL job is put into the queue.
// If IsDDLJobInQueue is true, it means the DDL job is in the queue of storage, and it can be handled by the DDL worker.
Expand Down Expand Up @@ -137,6 +138,21 @@ type StatementContext struct {
Tables []TableEntry
}

// StmtHints are SessionVars related sql hints.
type StmtHints struct {
// Hint flags
HasAllowInSubqToJoinAndAggHint bool
HasEnableIndexMergeHint bool
HasMemQuotaHint bool
HasReplicaReadHint bool

// Hint Information
AllowInSubqToJoinAndAgg bool
EnableIndexMerge bool
MemQuotaQuery int64
ReplicaRead byte
}

// GetNowTsCached getter for nowTs, if not set get now time and cache it
func (sc *StatementContext) GetNowTsCached() time.Time {
if !sc.stmtTimeCached {
Expand Down
Loading