Skip to content

Commit

Permalink
*: data race at the SessionVars.PlanID
Browse files Browse the repository at this point in the history
Signed-off-by: Weizhen Wang <wangweizhen@pingcap.com>
  • Loading branch information
hawkingrei committed Mar 19, 2023
1 parent 56412f5 commit a7881bb
Show file tree
Hide file tree
Showing 13 changed files with 25 additions and 29 deletions.
2 changes: 1 addition & 1 deletion executor/analyze_col.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (e *AnalyzeColumnsExec) toV2() *AnalyzeColumnsExecV2 {
}

func (e *AnalyzeColumnsExec) open(ranges []*ranger.Range) error {
e.memTracker = memory.NewTracker(e.ctx.GetSessionVars().PlanID, -1)
e.memTracker = memory.NewTracker(int(e.ctx.GetSessionVars().PlanID.Load()), -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
e.resultHandler = &tableResultHandler{}
firstPartRanges, secondPartRanges := distsql.SplitRangesAcrossInt64Boundary(ranges, true, false, !hasPkHist(e.handleCols))
Expand Down
2 changes: 1 addition & 1 deletion executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (e *PrepareExec) Next(ctx context.Context, req *chunk.Chunk) error {
topsql.AttachAndRegisterSQLInfo(ctx, stmt.NormalizedSQL, stmt.SQLDigest, vars.InRestrictedSQL)
}

e.ctx.GetSessionVars().PlanID = 0
e.ctx.GetSessionVars().PlanID.Store(0)
e.ctx.GetSessionVars().PlanColumnID.Store(0)
e.ctx.GetSessionVars().MapHashCode2UniqueID4ExtendedCol = nil
// In MySQL prepare protocol, the server need to tell the client how many column the prepared statement would return when executing it.
Expand Down
6 changes: 3 additions & 3 deletions planner/core/casetest/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestPreferRangeScan(t *testing.T) {
} else if i == 1 {
tk.MustExec("set session tidb_opt_prefer_range_scan=1")
}
tk.Session().GetSessionVars().PlanID = 0
tk.Session().GetSessionVars().PlanID.Store(0)
tk.MustExec(tt)
info := tk.Session().ShowProcess()
require.NotNil(t, info)
Expand Down Expand Up @@ -126,7 +126,7 @@ func TestNormalizedPlan(t *testing.T) {
planNormalizedSuiteData := GetPlanNormalizedSuiteData()
planNormalizedSuiteData.LoadTestCases(t, &input, &output)
for i, tt := range input {
tk.Session().GetSessionVars().PlanID = 0
tk.Session().GetSessionVars().PlanID.Store(0)
tk.MustExec(tt)
info := tk.Session().ShowProcess()
require.NotNil(t, info)
Expand Down Expand Up @@ -174,7 +174,7 @@ func TestNormalizedPlanForDiffStore(t *testing.T) {
planNormalizedSuiteData.LoadTestCases(t, &input, &output)
lastDigest := ""
for i, tt := range input {
tk.Session().GetSessionVars().PlanID = 0
tk.Session().GetSessionVars().PlanID.Store(0)
tk.MustExec(tt)
info := tk.Session().ShowProcess()
require.NotNil(t, info)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestTiFlashLateMaterialization(t *testing.T) {
planNormalizedSuiteData := GetPlanNormalizedSuiteData()
planNormalizedSuiteData.LoadTestCases(t, &input, &output)
for i, tt := range input {
tk.Session().GetSessionVars().PlanID = 0
tk.Session().GetSessionVars().PlanID.Store(0)
tk.MustExec(tt)
info := tk.Session().ShowProcess()
require.NotNil(t, info)
Expand Down
4 changes: 2 additions & 2 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (p *baseLogicalPlan) enumeratePhysicalPlans4Task(physicalPlans []PhysicalPl
// The curCntPlan records the number of possible plans for pp
curCntPlan = 1
timeStampNow := p.GetLogicalTS4TaskMap()
savedPlanID := p.ctx.GetSessionVars().PlanID
savedPlanID := p.ctx.GetSessionVars().PlanID.Load()
for j, child := range p.children {
childProp := pp.GetChildReqProps(j)
childTask, cnt, err := child.findBestTask(childProp, &PlanCounterDisabled, opt)
Expand All @@ -240,7 +240,7 @@ func (p *baseLogicalPlan) enumeratePhysicalPlans4Task(physicalPlans []PhysicalPl

// If the target plan can be found in this physicalPlan(pp), rebuild childTasks to build the corresponding combination.
if planCounter.IsForce() && int64(*planCounter) <= curCntPlan {
p.ctx.GetSessionVars().PlanID = savedPlanID
p.ctx.GetSessionVars().PlanID.Store(savedPlanID)
curCntPlan = int64(*planCounter)
err := p.rebuildChildTasks(&childTasks, pp, childCnts, int64(*planCounter), timeStampNow, opt)
if err != nil {
Expand Down
6 changes: 2 additions & 4 deletions planner/core/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,19 +503,17 @@ func (p PhysicalIndexJoin) Init(ctx sessionctx.Context, stats *property.StatsInf

// Init initializes PhysicalIndexMergeJoin.
func (p PhysicalIndexMergeJoin) Init(ctx sessionctx.Context) *PhysicalIndexMergeJoin {
ctx.GetSessionVars().PlanID++
p.tp = plancodec.TypeIndexMergeJoin
p.id = ctx.GetSessionVars().PlanID
p.id = int(ctx.GetSessionVars().PlanID.Add(1))
p.ctx = ctx
p.self = &p
return &p
}

// Init initializes PhysicalIndexHashJoin.
func (p PhysicalIndexHashJoin) Init(ctx sessionctx.Context) *PhysicalIndexHashJoin {
ctx.GetSessionVars().PlanID++
p.tp = plancodec.TypeIndexHashJoin
p.id = ctx.GetSessionVars().PlanID
p.id = int(ctx.GetSessionVars().PlanID.Add(1))
p.ctx = ctx
p.self = &p
return &p
Expand Down
2 changes: 1 addition & 1 deletion planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ type logicalOptRule interface {

// BuildLogicalPlanForTest builds a logical plan for testing purpose from ast.Node.
func BuildLogicalPlanForTest(ctx context.Context, sctx sessionctx.Context, node ast.Node, infoSchema infoschema.InfoSchema) (Plan, types.NameSlice, error) {
sctx.GetSessionVars().PlanID = 0
sctx.GetSessionVars().PlanID.Store(0)
sctx.GetSessionVars().PlanColumnID.Store(0)
builder, _ := NewPlanBuilder().Init(sctx, infoSchema, &utilhint.BlockHintProcessor{})
p, err := builder.Build(ctx, node)
Expand Down
5 changes: 2 additions & 3 deletions planner/core/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,11 +725,10 @@ func (p *logicalSchemaProducer) BuildKeyInfo(selfSchema *expression.Schema, chil
}

func newBasePlan(ctx sessionctx.Context, tp string, offset int) basePlan {
ctx.GetSessionVars().PlanID++
id := ctx.GetSessionVars().PlanID
id := ctx.GetSessionVars().PlanID.Add(1)
return basePlan{
tp: tp,
id: id,
id: int(id),
ctx: ctx,
blockOffset: offset,
}
Expand Down
8 changes: 4 additions & 4 deletions planner/core/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestEncodeDecodePlan(t *testing.T) {
tk.MustExec("set tidb_enable_collect_execution_info=1;")
tk.MustExec("set tidb_partition_prune_mode='static';")

tk.Session().GetSessionVars().PlanID = 0
tk.Session().GetSessionVars().PlanID.Store(0)
getPlanTree := func() (str1, str2 string) {
info := tk.Session().ShowProcess()
require.NotNil(t, info)
Expand Down Expand Up @@ -480,7 +480,7 @@ func BenchmarkDecodePlan(b *testing.B) {
buf.WriteString(fmt.Sprintf("select count(1) as num,a from t where a='%v' group by a", i))
}
query := buf.String()
tk.Session().GetSessionVars().PlanID = 0
tk.Session().GetSessionVars().PlanID.Store(0)
tk.MustExec(query)
info := tk.Session().ShowProcess()
require.NotNil(b, info)
Expand All @@ -507,7 +507,7 @@ func BenchmarkEncodePlan(b *testing.B) {
tk.MustExec("set @@tidb_slow_log_threshold=200000")

query := "select count(*) from th t1 join th t2 join th t3 join th t4 join th t5 join th t6 where t1.i=t2.a and t1.i=t3.i and t3.i=t4.i and t4.i=t5.i and t5.i=t6.i"
tk.Session().GetSessionVars().PlanID = 0
tk.Session().GetSessionVars().PlanID.Store(0)
tk.MustExec(query)
info := tk.Session().ShowProcess()
require.NotNil(b, info)
Expand All @@ -531,7 +531,7 @@ func BenchmarkEncodeFlatPlan(b *testing.B) {
tk.MustExec("set @@tidb_slow_log_threshold=200000")

query := "select count(*) from th t1 join th t2 join th t3 join th t4 join th t5 join th t6 where t1.i=t2.a and t1.i=t3.i and t3.i=t4.i and t4.i=t5.i and t5.i=t6.i"
tk.Session().GetSessionVars().PlanID = 0
tk.Session().GetSessionVars().PlanID.Store(0)
tk.MustExec(query)
info := tk.Session().ShowProcess()
require.NotNil(b, info)
Expand Down
2 changes: 1 addition & 1 deletion planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ func TryFastPlan(ctx sessionctx.Context, node ast.Node) (p Plan) {
return nil
}

ctx.GetSessionVars().PlanID = 0
ctx.GetSessionVars().PlanID.Store(0)
ctx.GetSessionVars().PlanColumnID.Store(0)
switch x := node.(type) {
case *ast.SelectStmt:
Expand Down
6 changes: 3 additions & 3 deletions planner/funcdep/extract_fd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func TestFDSet_ExtractFD(t *testing.T) {
require.NoError(t, sessiontxn.GetTxnManager(tk.Session()).OnStmtStart(context.TODO(), nil))
stmt, err := par.ParseOneStmt(tt.sql, "", "")
require.NoError(t, err, comment)
tk.Session().GetSessionVars().PlanID = 0
tk.Session().GetSessionVars().PlanID.Store(0)
tk.Session().GetSessionVars().PlanColumnID.Store(0)
err = plannercore.Preprocess(context.Background(), tk.Session(), stmt, plannercore.WithPreprocessorReturn(&plannercore.PreprocessorReturn{InfoSchema: is}))
require.NoError(t, err)
Expand Down Expand Up @@ -316,7 +316,7 @@ func TestFDSet_ExtractFDForApply(t *testing.T) {
comment := fmt.Sprintf("case:%v sql:%s", i, tt.sql)
stmt, err := par.ParseOneStmt(tt.sql, "", "")
require.NoError(t, err, comment)
tk.Session().GetSessionVars().PlanID = 0
tk.Session().GetSessionVars().PlanID.Store(0)
tk.Session().GetSessionVars().PlanColumnID.Store(0)
err = plannercore.Preprocess(context.Background(), tk.Session(), stmt, plannercore.WithPreprocessorReturn(&plannercore.PreprocessorReturn{InfoSchema: is}))
require.NoError(t, err, comment)
Expand Down Expand Up @@ -364,7 +364,7 @@ func TestFDSet_MakeOuterJoin(t *testing.T) {
comment := fmt.Sprintf("case:%v sql:%s", i, tt.sql)
stmt, err := par.ParseOneStmt(tt.sql, "", "")
require.NoError(t, err, comment)
tk.Session().GetSessionVars().PlanID = 0
tk.Session().GetSessionVars().PlanID.Store(0)
tk.Session().GetSessionVars().PlanColumnID.Store(0)
err = plannercore.Preprocess(context.Background(), tk.Session(), stmt, plannercore.WithPreprocessorReturn(&plannercore.PreprocessorReturn{InfoSchema: is}))
require.NoError(t, err, comment)
Expand Down
4 changes: 2 additions & 2 deletions planner/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,12 +492,12 @@ func OptimizeExecStmt(ctx context.Context, sctx sessionctx.Context,
}

func buildLogicalPlan(ctx context.Context, sctx sessionctx.Context, node ast.Node, builder *core.PlanBuilder) (core.Plan, error) {
sctx.GetSessionVars().PlanID = 0
sctx.GetSessionVars().PlanID.Store(0)
sctx.GetSessionVars().PlanColumnID.Store(0)
sctx.GetSessionVars().MapHashCode2UniqueID4ExtendedCol = nil

failpoint.Inject("mockRandomPlanID", func() {
sctx.GetSessionVars().PlanID = rand.Intn(1000) // nolint:gosec
sctx.GetSessionVars().PlanID.Store(rand.Int31n(1000)) // nolint:gosec
})

// reset fields about rewrite
Expand Down
5 changes: 2 additions & 3 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ type SessionVars struct {
ConnectionID uint64

// PlanID is the unique id of logical and physical plan.
PlanID int
PlanID atomic.Int32

// PlanColumnID is the unique id for column when building plan.
PlanColumnID atomic.Int64
Expand Down Expand Up @@ -1603,8 +1603,7 @@ func (s *SessionVars) BuildParserConfig() parser.ParserConfig {

// AllocNewPlanID alloc new ID
func (s *SessionVars) AllocNewPlanID() int {
s.PlanID++
return s.PlanID
return int(s.PlanID.Add(1))
}

const (
Expand Down

0 comments on commit a7881bb

Please sign in to comment.