Skip to content

Commit

Permalink
sql: implement limited first version of UPSERT
Browse files Browse the repository at this point in the history
Specifically, add a batching tableWriter for the "short form" described in
https://github.com/cockroachdb/cockroach/blob/master/docs/RFCS/upsert.md.

For cockroachdb#1962.

Notable followups:
- Add support for the full ON CONFLICT DO UPDATE form.
- Hook returningHelper up to the tableWriter abstraction.
- Using fetchRows to compute fetch selectors in Insert/Upsert/Delete.
  • Loading branch information
danhhz committed May 3, 2016
1 parent e2834f6 commit 0323834
Show file tree
Hide file tree
Showing 9 changed files with 3,904 additions and 3,605 deletions.
47 changes: 45 additions & 2 deletions sql/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,23 @@ type insertNode struct {
}

// Insert inserts rows into the database.
// Privileges: INSERT on table
// Privileges: INSERT on table. Also requires UPDATE on "ON DUPLICATE KEY UPDATE".
// Notes: postgres requires INSERT. No "on duplicate key update" option.
// mysql requires INSERT. Also requires UPDATE on "ON DUPLICATE KEY UPDATE".
func (p *planner) Insert(n *parser.Insert, autoCommit bool) (planNode, *roachpb.Error) {
en, pErr := p.makeEditNode(n.Table, n.Returning, autoCommit, privilege.INSERT)
if pErr != nil {
return nil, pErr
}
if n.OnConflict != nil {
if err := p.checkPrivilege(en.tableDesc, privilege.UPDATE); err != nil {
return nil, roachpb.NewError(err)
}
// TODO(dan): Support RETURNING in UPSERTs.
if n.Returning != nil {
return nil, roachpb.NewErrorf("RETURNING is not supported with UPSERT")
}
}

var cols []ColumnDescriptor
// Determine which columns we're inserting into.
Expand Down Expand Up @@ -172,7 +181,41 @@ func (p *planner) Insert(n *parser.Insert, autoCommit bool) (planNode, *roachpb.
if err != nil {
return nil, roachpb.NewError(err)
}
tw := &tableInserter{ri: ri, autoCommit: autoCommit}

var tw tableWriter
if n.OnConflict == nil {
tw = &tableInserter{ri: ri, autoCommit: autoCommit}
} else {
// TODO(dan): These are both implied by the short form of UPSERT. When the
// INSERT INTO ON CONFLICT form is implemented, get these values from
// n.OnConfict.
upsertConflictIndex := en.tableDesc.PrimaryIndex
insertCols := ri.insertCols

indexColSet := make(map[ColumnID]struct{}, len(upsertConflictIndex.ColumnIDs))
for _, colID := range upsertConflictIndex.ColumnIDs {
indexColSet[colID] = struct{}{}
}

// updateCols contains the columns that will be updated when a conflict is
// found. For the UPSERT short form, it is the set of columns in insertCols
// minus any columns in the conflict index. Example:
// `UPSERT INTO abc VALUES (1, 2, 3)` is syntactic sugar for
// `INSERT INTO abc VALUES (1, 2, 3) ON CONFLICT a DO UPDATE SET b = 2, c = 3`.
updateCols := make([]ColumnDescriptor, 0, len(insertCols))
for _, c := range insertCols {
if _, ok := indexColSet[c.ID]; !ok {
updateCols = append(updateCols, c)
}
}
ru, err := makeRowUpdater(en.tableDesc, updateCols)
if err != nil {
return nil, roachpb.NewError(err)
}
// TODO(dan): Use ru.fetchCols to compute the fetch selectors.

tw = &tableUpserter{ri: ri, ru: ru, autoCommit: autoCommit}
}

in := &insertNode{
n: n,
Expand Down
25 changes: 20 additions & 5 deletions sql/parser/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,21 @@ import (

// Insert represents an INSERT statement.
type Insert struct {
Table TableExpr
Columns QualifiedNames
Rows *Select
Returning ReturningExprs
Table TableExpr
Columns QualifiedNames
Rows *Select
Returning ReturningExprs
OnConflict *OnConflict
}

func (node *Insert) String() string {
var buf bytes.Buffer
fmt.Fprintf(&buf, "INSERT INTO %s", node.Table)
if node.OnConflict != nil {
buf.WriteString("UPSERT")
} else {
buf.WriteString("INSERT")
}
fmt.Fprintf(&buf, " INTO %s", node.Table)
if node.Columns != nil {
fmt.Fprintf(&buf, "(%s)", node.Columns)
}
Expand All @@ -54,3 +60,12 @@ func (node *Insert) String() string {
func (node *Insert) DefaultValues() bool {
return node.Rows.Select == nil
}

// OnConflict represents an `ON CONFLICT index DO UPDATE SET` statement.
//
// The zero value for OnConflict is used to signal the UPSERT short form, which
// uses the primary key for Index and the values being inserted for Exprs.
type OnConflict struct {
Index Name
Exprs UpdateExprs
}
1 change: 1 addition & 0 deletions sql/parser/keywords.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions sql/parser/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,17 @@ func TestParse(t *testing.T) {
{`INSERT INTO a VALUES (1, 2) RETURNING 1, 2`},
{`INSERT INTO a VALUES (1, 2) RETURNING a + b, c`},

{`UPSERT INTO a VALUES (1)`},
{`UPSERT INTO a.b VALUES (1)`},
{`UPSERT INTO a VALUES (1, 2)`},
{`UPSERT INTO a VALUES (1, DEFAULT)`},
{`UPSERT INTO a VALUES (1, 2), (3, 4)`},
{`UPSERT INTO a VALUES (a + 1, 2 * 3)`},
{`UPSERT INTO a(a, b) VALUES (1, 2)`},
{`UPSERT INTO a(a, a.b) VALUES (1, 2)`},
{`UPSERT INTO a SELECT b, c FROM d`},
{`UPSERT INTO a DEFAULT VALUES`},

{`SELECT 1 + 1`},
{`SELECT - 1`},
{`SELECT + 1`},
Expand Down
Loading

0 comments on commit 0323834

Please sign in to comment.