Skip to content

Commit

Permalink
Merge #33681
Browse files Browse the repository at this point in the history
33681: opt: Project merged Upsert columns r=andy-kimball a=andy-kimball

Currently, the Upsert operator's insert and update columns are
separately created and maintained. For example:

  CREATE TABLE abc (a INT PRIMARY KEY, b INT)
  INSERT INTO abc VALUES (1) ON CONFLICT (a) DO UPDATE SET b=5

Here are the projected columns:

  SELECT
    a AS fetch_a, b AS fetch_b,
    1 AS ins_a, NULL AS ins_b,
    5 AS upd_b
  FROM ...

This commit constructs CASE statements that merge the insert and
update columns, like this:

  SELECT
    a AS fetch_a, b AS fetch_b,
    CASE WHEN fetch_a IS NULL THEN 1 ELSE fetch_a END AS ups_a,
    CASE WHEN fetch_a IS NULL THEN NULL::INT ELSE 5 END AS ups_b
  FROM ...

The advantages to making this change are:

1. This sets us up to implement check constraints, since they will now
   be able to operate on the combined columns.
2. This often avoids evaluating the update expression when an insert
   needs to occur for a given row.
3. This simplifies the property derivation, making Upsert act just like
   Update, Insert, and Delete. It's now possible to cleanly map each
   mutation output column to a single input column.


Co-authored-by: Andrew Kimball <andyk@cockroachlabs.com>
  • Loading branch information
craig[bot] and andy-kimball committed Jan 17, 2019
2 parents 4705ef3 + 829125c commit 1c552ab
Show file tree
Hide file tree
Showing 30 changed files with 1,804 additions and 980 deletions.
4 changes: 2 additions & 2 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ func (sc *SchemaChanger) distBackfill(
if backfillType == columnBackfill {
fkTables, err := row.TablesNeededForFKs(
ctx,
*tableDesc,
tableDesc,
row.CheckUpdates,
row.NoLookup,
row.NoCheckPrivilege,
Expand Down Expand Up @@ -665,7 +665,7 @@ func columnBackfillInTxn(
var otherTableDescs []*sqlbase.ImmutableTableDescriptor
fkTables, err := row.TablesNeededForFKs(
ctx,
*tableDesc,
tableDesc,
row.CheckUpdates,
row.NoLookup,
row.NoCheckPrivilege,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk(
) (roachpb.Key, error) {
fkTables, _ := row.TablesNeededForFKs(
ctx,
*tableDesc,
tableDesc,
row.CheckUpdates,
row.NoLookup,
row.NoCheckPrivilege,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (p *planner) Delete(
// Determine what are the foreign key tables that are involved in the deletion.
fkTables, err := row.TablesNeededForFKs(
ctx,
*desc,
desc,
row.CheckDeletes,
p.LookupTableByID,
p.CheckPrivilege,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ CREATE TABLE IF NOT EXISTS child_with_index(

fkTables, err := row.TablesNeededForFKs(
context.TODO(),
*pd,
pd,
row.CheckDeletes,
lookup,
row.NoCheckPrivilege,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (p *planner) Insert(
}
fkTables, err := row.TablesNeededForFKs(
ctx,
*desc,
desc,
fkCheckType,
p.LookupTableByID,
p.CheckPrivilege,
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/opt/exec/execbuilder/relational_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1217,10 +1217,11 @@ func (b *Builder) buildUpsert(ups *memo.UpsertExpr) (execPlan, error) {
//
// TODO(andyk): Using ensureColumns here can result in an extra Render.
// Upgrade execution engine to not require this.
colList := make(opt.ColList, 0, len(ups.InsertCols)+len(ups.FetchCols)+len(ups.UpdateCols))
colList := make(opt.ColList, 0, len(ups.InsertCols)+len(ups.FetchCols)+len(ups.UpdateCols)+1)
colList = appendColsWhenPresent(colList, ups.InsertCols)
colList = appendColsWhenPresent(colList, ups.FetchCols)
colList = appendColsWhenPresent(colList, ups.UpdateCols)
colList = append(colList, ups.CanaryCol)
input, err = b.ensureColumns(input, colList, nil, ups.ProvidedPhysical().Ordering)
if err != nil {
return execPlan{}, err
Expand Down
22 changes: 11 additions & 11 deletions pkg/sql/opt/exec/execbuilder/testdata/orderby
Original file line number Diff line number Diff line change
Expand Up @@ -482,17 +482,17 @@ ordinality · · (x, "ordinality") ·
query TTTTT
EXPLAIN (VERBOSE) INSERT INTO t(a, b) SELECT * FROM (SELECT 1 AS x, 2 AS y) ORDER BY x RETURNING b
----
render · · (b) ·
│ render 0 b · ·
└── run · · (a, b, c) ·
└── insert · · (a, b, c) ·
│ into t(a, b, c) · ·
│ strategy inserter · ·
└── values · · (x, y, column6) ·
· size 3 columns, 1 row · ·
· row 0, expr 0 1 · ·
· row 0, expr 1 2 · ·
· row 0, expr 2 NULL · ·
render · · (b) ·
│ render 0 b · ·
└── run · · (a, b, c) ·
└── insert · · (a, b, c) ·
│ into t(a, b, c) · ·
│ strategy inserter · ·
└── values · · (x, y, column6) ·
· size 3 columns, 1 row · ·
· row 0, expr 0 1 · ·
· row 0, expr 1 2 · ·
· row 0, expr 2 CAST(NULL AS BOOL) · ·

query TTTTT
EXPLAIN (VERBOSE) DELETE FROM t WHERE a = 3 RETURNING b
Expand Down
60 changes: 32 additions & 28 deletions pkg/sql/opt/exec/execbuilder/testdata/spool
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,22 @@ query TTT
EXPLAIN WITH a AS (UPSERT INTO t VALUES (2), (3) RETURNING x)
SELECT * FROM a LIMIT 1
----
limit · ·
│ count 1
└── spool · ·
│ limit 1
└── run · ·
└── upsert · ·
│ into t(x)
│ strategy opt upserter
└── lookup-join · ·
│ type left outer
├── values · ·
│ size 1 column, 2 rows
└── scan · ·
· table t@primary
limit · ·
│ count 1
└── spool · ·
│ limit 1
└── run · ·
└── upsert · ·
│ into t(x)
│ strategy opt upserter
└── render · ·
└── render · ·
└── lookup-join · ·
│ type left outer
├── values · ·
│ size 1 column, 2 rows
└── scan · ·
· table t@primary

# Ditto all mutations, with the statement source syntax.
query TTT
Expand Down Expand Up @@ -128,20 +130,22 @@ limit · ·
query TTT
EXPLAIN SELECT * FROM [UPSERT INTO t VALUES (2), (3) RETURNING x] LIMIT 1
----
limit · ·
│ count 1
└── spool · ·
│ limit 1
└── run · ·
└── upsert · ·
│ into t(x)
│ strategy opt upserter
└── lookup-join · ·
│ type left outer
├── values · ·
│ size 1 column, 2 rows
└── scan · ·
· table t@primary
limit · ·
│ count 1
└── spool · ·
│ limit 1
└── run · ·
└── upsert · ·
│ into t(x)
│ strategy opt upserter
└── render · ·
└── render · ·
└── lookup-join · ·
│ type left outer
├── values · ·
│ size 1 column, 2 rows
└── scan · ·
· table t@primary

# Check that a spool is also inserted for other processings than LIMIT.
query TTT
Expand Down
48 changes: 27 additions & 21 deletions pkg/sql/opt/exec/execbuilder/testdata/upsert
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,33 @@ SELECT tree, field, description FROM [
EXPLAIN (VERBOSE) UPSERT INTO kv TABLE kv ORDER BY v DESC LIMIT 2
]
----
count · ·
└── upsert · ·
│ into kv(k, v)
│ strategy opt upserter
└── render · ·
│ render 0 k
│ render 1 v
│ render 2 k
│ render 3 v
│ render 4 v
└── lookup-join · ·
│ type inner
├── limit · ·
│ │ count 2
│ └── sort · ·
│ │ order -v
│ └── scan · ·
│ table kv@primary
│ spans ALL
└── scan · ·
· table kv@primary
count · ·
└── upsert · ·
│ into kv(k, v)
│ strategy opt upserter
└── render · ·
│ render 0 upsert_k
│ render 1 upsert_v
│ render 2 upsert_k
│ render 3 v
│ render 4 upsert_v
│ render 5 k
└── render · ·
│ render 0 CASE WHEN k IS NULL THEN k ELSE k END
│ render 1 CASE WHEN k IS NULL THEN v ELSE v END
│ render 2 k
│ render 3 v
└── lookup-join · ·
│ type inner
├── limit · ·
│ │ count 2
│ └── sort · ·
│ │ order -v
│ └── scan · ·
│ table kv@primary
│ spans ALL
└── scan · ·
· table kv@primary

# Regression test for #25726.
# UPSERT over tables with column families, on the fast path, use the
Expand Down
78 changes: 42 additions & 36 deletions pkg/sql/opt/memo/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,63 +275,69 @@ func (sf *ScanFlags) Empty() bool {
return !sf.NoIndexJoin && !sf.ForceIndex
}

// MapToInputIDs maps from the ID of a target table column to the ID(s) of the
// corresponding input column(s) that provides the value for it:
// MapToInputID maps from the ID of a target table column to the ID of the
// corresponding input column that provides the value for it:
//
// Insert: a = InsertCols
// Update: a = UpdateCols/FetchCols
// Upsert: a = UpdateCols/FetchCols, b = InsertCols
// Delete: a = FetchCols
// Insert: InsertCols
// Update: UpdateCols/FetchCols
// Upsert: InsertCols/UpdateCols/FetchCols
// Delete: FetchCols
//
// For the UpdateCols/FetchCols case, use the corresponding UpdateCol if it is
// non-zero (meaning that column will be updated), else use the FetchCol (which
// holds the existing value of the column).
func (m *MutationPrivate) MapToInputIDs(tabColID opt.ColumnID) (a, b opt.ColumnID) {
// When choosing from UpdateCols/FetchCols, use the corresponding UpdateCol if
// it is non-zero (meaning that column will be updated), else use the FetchCol
// (which holds the existing value of the column). And for the Upsert case, use
// either the UpdateCols/FetchCols or the InsertCols value, as long as it is
// non-zero. If both are non-zero, then they must be the same, since they are
// set to the same CASE expression column.
//
// If there is no matching input column ID, MapToInputID returns 0.
func (m *MutationPrivate) MapToInputID(tabColID opt.ColumnID) opt.ColumnID {
ord := m.Table.ColumnOrdinal(tabColID)
if m.FetchCols != nil {
if m.UpdateCols != nil && m.UpdateCols[ord] != 0 {
a = m.UpdateCols[ord]
} else {
a = m.FetchCols[ord]
}
}

var ins opt.ColumnID
if m.InsertCols != nil {
if a == 0 {
a = m.InsertCols[ord]
} else {
b = m.InsertCols[ord]
ins = m.InsertCols[ord]
}

var upd opt.ColumnID
if m.UpdateCols != nil {
upd = m.UpdateCols[ord]
}
if upd == 0 && m.FetchCols != nil {
upd = m.FetchCols[ord]
}

if ins != 0 {
if upd != 0 && ins != upd {
panic("insert and update columns should be the same")
}
return ins
}
return a, b
return upd
}

// MapToInputCols maps the given set of table columns to a corresponding set of
// input columns using the MapToInputID function. This method should not be
// called for Upsert ops, since the mapping is ambiguous.
// input columns using the MapToInputID function.
func (m *MutationPrivate) MapToInputCols(tabCols opt.ColSet) opt.ColSet {
var inCols opt.ColSet
tabCols.ForEach(func(t int) {
a, b := m.MapToInputIDs(opt.ColumnID(t))
if b != 0 {
panic("MapToInputCols cannot be called for Upsert case")
id := m.MapToInputID(opt.ColumnID(t))
if id == 0 {
panic(fmt.Sprintf("could not find input column for %d", t))
}
inCols.Add(int(a))
inCols.Add(int(id))
})
return inCols
}

// AddEquivTableCols adds an FD to the given set that declares an equivalence
// between each table column and its corresponding input column. This method
// should not be called for Upsert ops, since the mapping is ambiguous.
// between each table column and its corresponding input column.
func (m *MutationPrivate) AddEquivTableCols(md *opt.Metadata, fdset *props.FuncDepSet) {
for i, n := 0, md.Table(m.Table).ColumnCount(); i < n; i++ {
t := m.Table.ColumnID(i)
a, b := m.MapToInputIDs(t)
if b != 0 {
panic("AddEquivTableCols cannot be called for Upsert case")
}
if a != 0 {
fdset.AddEquivalency(t, a)
id := m.MapToInputID(t)
if id != 0 {
fdset.AddEquivalency(t, id)
}
}
}
30 changes: 10 additions & 20 deletions pkg/sql/opt/memo/logical_props_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,14 +1011,10 @@ func (b *logicalPropsBuilder) buildMutationProps(mutation RelExpr, rel *props.Re
continue
}

// Either one or two input columns can provide the source value for the
// mutation. If two are present, then both must be not-null in order to
// guarantee that the result will be not-null as well.
a, b := private.MapToInputIDs(tabColID)
if inputProps.NotNullCols.Contains(int(a)) {
if b == 0 || inputProps.NotNullCols.Contains(int(b)) {
rel.NotNullCols.Add(int(private.Table.ColumnID(i)))
}
// If the input column is not null, then the result will be not null.
inputColID := private.MapToInputID(tabColID)
if inputProps.NotNullCols.Contains(int(inputColID)) {
rel.NotNullCols.Add(int(private.Table.ColumnID(i)))
}
}

Expand All @@ -1028,18 +1024,12 @@ func (b *logicalPropsBuilder) buildMutationProps(mutation RelExpr, rel *props.Re

// Functional Dependencies
// -----------------------
// Start with copy of FuncDepSet from input. For Insert, Update, and Delete
// ops, map the FDs of each source column to the corresponding destination
// column by making the columns equivalent and then filtering out the source
// columns via a call to ProjectCols. Note that this will not work for Upsert
// ops, since destination values can be sourced from multiple input columns.
// In that case, the functional dependencies are empty.
switch mutation.Op() {
case opt.InsertOp, opt.UpdateOp, opt.DeleteOp:
rel.FuncDeps.CopyFrom(&inputProps.FuncDeps)
private.AddEquivTableCols(md, &rel.FuncDeps)
rel.FuncDeps.ProjectCols(rel.OutputCols)
}
// Start with copy of FuncDepSet from input. Map the FDs of each source column
// to the corresponding destination column by making the columns equivalent
// and then filtering out the source columns via a call to ProjectCols.
rel.FuncDeps.CopyFrom(&inputProps.FuncDeps)
private.AddEquivTableCols(md, &rel.FuncDeps)
rel.FuncDeps.ProjectCols(rel.OutputCols)

// Cardinality
// -----------
Expand Down
12 changes: 1 addition & 11 deletions pkg/sql/opt/memo/statistics_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1902,17 +1902,7 @@ func (sb *statisticsBuilder) colStatMutation(

// Get colstat from child by mapping requested columns to corresponding
// input columns.
var inColSet opt.ColSet
if mutation.Op() == opt.UpsertOp {
// Treat the Upsert operator as if it was an Insert. This is not precise,
// as some percentage of rows will come from the fetch/update columns.
temp := *private
temp.FetchCols = nil
temp.UpdateCols = nil
inColSet = temp.MapToInputCols(colSet)
} else {
inColSet = private.MapToInputCols(colSet)
}
inColSet := private.MapToInputCols(colSet)
inColStat := sb.colStatFromChild(inColSet, mutation, 0 /* childIdx */)

// Construct mutation colstat using the corresponding input stats.
Expand Down
Loading

0 comments on commit 1c552ab

Please sign in to comment.