Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql/opt: add implicit SELECT FOR UPDATE support for UPSERT statements #53132

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,14 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
return checkSupportForPlanNode(n.plan)

case *lookupJoinNode:
if n.table.lockingStrength != descpb.ScanLockingStrength_FOR_NONE {
// Lookup joins that are performing row-level locking cannot
// currently be distributed because their locks would not be
// propagated back to the root transaction coordinator.
// TODO(nvanbenschoten): lift this restriction.
return cannotDistribute, cannotDistributeRowLevelLockingErr
}

if err := checkExpr(n.onCond); err != nil {
return cannotDistribute, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ func (e *distSQLSpecExecFactory) ConstructLookupJoin(
lookupCols exec.TableColumnOrdinalSet,
onCond tree.TypedExpr,
reqOrdering exec.OutputOrdering,
locking *tree.LockingItem,
) (exec.Node, error) {
// TODO (rohany): Implement production of system columns by the underlying scan here.
return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: lookup join")
Expand Down
31 changes: 30 additions & 1 deletion pkg/sql/opt/exec/execbuilder/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -996,7 +996,36 @@ func (b *Builder) shouldApplyImplicitLockingToUpdateInput(upd *memo.UpdateExpr)
// TODO(nvanbenschoten): implement this method to match on appropriate Upsert
// expression trees and apply a row-level locking mode.
func (b *Builder) shouldApplyImplicitLockingToUpsertInput(ups *memo.UpsertExpr) bool {
return false
if !b.evalCtx.SessionData.ImplicitSelectForUpdate {
return false
}

// Try to match the Upsert's input expression against the pattern:
//
// [Project] (LeftJoin Scan | LookupJoin) [Project] Values
//
input := ups.Input
if proj, ok := input.(*memo.ProjectExpr); ok {
input = proj.Input
}
switch join := input.(type) {
case *memo.LeftJoinExpr:
if _, ok := join.Right.(*memo.ScanExpr); !ok {
return false
}
input = join.Left

case *memo.LookupJoinExpr:
input = join.Input

default:
return false
}
if proj, ok := input.(*memo.ProjectExpr); ok {
input = proj.Input
}
_, ok := input.(*memo.ValuesExpr)
return ok
}

// tryApplyImplicitLockingToDeleteInput determines whether or not the builder
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/opt/exec/execbuilder/relational.go
Original file line number Diff line number Diff line change
Expand Up @@ -1560,6 +1560,11 @@ func (b *Builder) buildLookupJoin(join *memo.LookupJoinExpr) (execPlan, error) {
tab := md.Table(join.Table)
idx := tab.Index(join.Index)

var locking *tree.LockingItem
if b.forceForUpdateLocking {
locking = forUpdateLocking
}

res.root, err = b.factory.ConstructLookupJoin(
joinOpToJoinType(join.JoinType),
input.root,
Expand All @@ -1570,6 +1575,7 @@ func (b *Builder) buildLookupJoin(join *memo.LookupJoinExpr) (execPlan, error) {
lookupOrdinals,
onExpr,
res.reqOrdering(join),
locking,
)
if err != nil {
return execPlan{}, err
Expand Down
77 changes: 44 additions & 33 deletions pkg/sql/opt/exec/execbuilder/testdata/fk
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ root · ·
│ missing stats ·
│ table child@primary
│ spans FULL SCAN
│ locking strength for update
└── fk-check · ·
└── error if rows · ·
└── hash join (anti) · ·
Expand Down Expand Up @@ -337,6 +338,7 @@ root · ·
│ missing stats ·
│ table child@primary
│ spans [/10 - /10]
│ locking strength for update
└── fk-check · ·
└── error if rows · ·
└── lookup join (anti) · ·
Expand All @@ -362,6 +364,7 @@ root · ·
│ missing stats ·
│ table parent@primary
│ spans FULL SCAN
│ locking strength for update
├── fk-check · ·
│ └── error if rows · ·
│ └── hash join · ·
Expand Down Expand Up @@ -402,39 +405,40 @@ root · ·
query TTT
EXPLAIN UPDATE parent SET p = p+1 WHERE other = 10
----
· distribution local
· vectorized false
root · ·
├── update · ·
│ │ table parent
│ │ set p
│ └── buffer · ·
│ │ label buffer 1
│ └── render · ·
│ └── scan · ·
│ missing stats ·
│ table parent@parent_other_key
│ spans [/10 - /10]
├── fk-check · ·
│ └── error if rows · ·
│ └── lookup join (semi) · ·
│ │ table child@child_p_idx
│ │ equality (p) = (p)
│ └── except · ·
│ ├── scan buffer · ·
│ │ label buffer 1
│ └── scan buffer · ·
│ label buffer 1
└── fk-check · ·
└── error if rows · ·
└── lookup join (semi) · ·
│ table child_nullable@child_nullable_p_idx
│ equality (p) = (p)
└── except · ·
├── scan buffer · ·
│ label buffer 1
└── scan buffer · ·
· label buffer 1
· distribution local
· vectorized false
root · ·
├── update · ·
│ │ table parent
│ │ set p
│ └── buffer · ·
│ │ label buffer 1
│ └── render · ·
│ └── scan · ·
│ missing stats ·
│ table parent@parent_other_key
│ spans [/10 - /10]
│ locking strength for update
├── fk-check · ·
│ └── error if rows · ·
│ └── lookup join (semi) · ·
│ │ table child@child_p_idx
│ │ equality (p) = (p)
│ └── except · ·
│ ├── scan buffer · ·
│ │ label buffer 1
│ └── scan buffer · ·
│ label buffer 1
└── fk-check · ·
└── error if rows · ·
└── lookup join (semi) · ·
│ table child_nullable@child_nullable_p_idx
│ equality (p) = (p)
└── except · ·
├── scan buffer · ·
│ label buffer 1
└── scan buffer · ·
· label buffer 1

statement ok
CREATE TABLE grandchild (g INT PRIMARY KEY, c INT NOT NULL REFERENCES child(c))
Expand All @@ -455,6 +459,7 @@ root · ·
│ missing stats ·
│ table child@primary
│ spans FULL SCAN
│ locking strength for update
└── fk-check · ·
└── error if rows · ·
└── hash join · ·
Expand Down Expand Up @@ -490,6 +495,7 @@ root · ·
│ missing stats ·
│ table child@primary
│ spans FULL SCAN
│ locking strength for update
└── fk-check · ·
└── error if rows · ·
└── hash join (anti) · ·
Expand Down Expand Up @@ -517,6 +523,7 @@ root · ·
│ missing stats ·
│ table child@primary
│ spans FULL SCAN
│ locking strength for update
└── fk-check · ·
└── error if rows · ·
└── hash join (anti) · ·
Expand Down Expand Up @@ -545,6 +552,7 @@ root · ·
│ missing stats ·
│ table child@primary
│ spans FULL SCAN
│ locking strength for update
├── fk-check · ·
│ └── error if rows · ·
│ └── hash join (anti) · ·
Expand Down Expand Up @@ -594,6 +602,7 @@ root · ·
│ missing stats ·
│ table child@primary
│ spans FULL SCAN
│ locking strength for update
└── fk-check · ·
└── error if rows · ·
└── hash join (anti) · ·
Expand Down Expand Up @@ -625,6 +634,7 @@ root · ·
│ missing stats ·
│ table self@primary
│ spans FULL SCAN
│ locking strength for update
└── fk-check · ·
└── error if rows · ·
└── hash join (anti) · ·
Expand Down Expand Up @@ -653,6 +663,7 @@ root · ·
│ missing stats ·
│ table self@primary
│ spans FULL SCAN
│ locking strength for update
└── fk-check · ·
└── error if rows · ·
└── hash join · ·
Expand Down
23 changes: 12 additions & 11 deletions pkg/sql/opt/exec/execbuilder/testdata/select_index
Original file line number Diff line number Diff line change
Expand Up @@ -1613,17 +1613,18 @@ scan · ·
query TTT
EXPLAIN UPDATE t4 SET c = 30 WHERE a = 10 and b = 20
----
· distribution local
· vectorized false
update · ·
│ table t4
│ set c
│ auto commit ·
└── render · ·
└── scan · ·
· missing stats ·
· table t4@primary
· spans [/10/20 - /10/20]
· distribution local
· vectorized false
update · ·
│ table t4
│ set c
│ auto commit ·
└── render · ·
└── scan · ·
· missing stats ·
· table t4@primary
· spans [/10/20 - /10/20]
· locking strength for update

# Optimization should not be applied for deletes.
query TTT
Expand Down
86 changes: 44 additions & 42 deletions pkg/sql/opt/exec/execbuilder/testdata/spool
Original file line number Diff line number Diff line change
Expand Up @@ -60,27 +60,28 @@ query TTT
EXPLAIN WITH a AS (UPDATE t SET x = x + 1 RETURNING x)
SELECT * FROM a LIMIT 1
----
· distribution local
· vectorized false
root · ·
├── limit · ·
│ │ count 1
│ └── scan buffer · ·
│ label buffer 1 (a)
└── subquery · ·
│ id @S1
│ original sql UPDATE t SET x = x + 1 RETURNING x
│ exec mode all rows
└── buffer · ·
│ label buffer 1 (a)
└── update · ·
│ table t
│ set x
└── render · ·
└── scan · ·
· missing stats ·
· table t@primary
· spans FULL SCAN
· distribution local
· vectorized false
root · ·
├── limit · ·
│ │ count 1
│ └── scan buffer · ·
│ label buffer 1 (a)
└── subquery · ·
│ id @S1
│ original sql UPDATE t SET x = x + 1 RETURNING x
│ exec mode all rows
└── buffer · ·
│ label buffer 1 (a)
└── update · ·
│ table t
│ set x
└── render · ·
└── scan · ·
· missing stats ·
· table t@primary
· spans FULL SCAN
· locking strength for update

query TTT
EXPLAIN WITH a AS (UPSERT INTO t VALUES (2), (3) RETURNING x)
Expand Down Expand Up @@ -154,27 +155,28 @@ root · ·
query TTT
EXPLAIN SELECT * FROM [UPDATE t SET x = x + 1 RETURNING x] LIMIT 1
----
· distribution local
· vectorized false
root · ·
├── limit · ·
│ │ count 1
│ └── scan buffer · ·
│ label buffer 1
└── subquery · ·
│ id @S1
│ original sql UPDATE t SET x = x + 1 RETURNING x
│ exec mode all rows
└── buffer · ·
│ label buffer 1
└── update · ·
│ table t
│ set x
└── render · ·
└── scan · ·
· missing stats ·
· table t@primary
· spans FULL SCAN
· distribution local
· vectorized false
root · ·
├── limit · ·
│ │ count 1
│ └── scan buffer · ·
│ label buffer 1
└── subquery · ·
│ id @S1
│ original sql UPDATE t SET x = x + 1 RETURNING x
│ exec mode all rows
└── buffer · ·
│ label buffer 1
└── update · ·
│ table t
│ set x
└── render · ·
└── scan · ·
· missing stats ·
· table t@primary
· spans FULL SCAN
· locking strength for update

query TTT
EXPLAIN SELECT * FROM [UPSERT INTO t VALUES (2), (3) RETURNING x] LIMIT 1
Expand Down
Loading