From e2834f6086ef7e04bd489d3d9100a5baae83abff Mon Sep 17 00:00:00 2001 From: Daniel Harrison Date: Fri, 29 Apr 2016 16:02:18 -0400 Subject: [PATCH] sql: introduce tableWriter abstraction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Migrates the client.Batch ownership for INSERT/UPDATE/DELETE to a new tableWriter interface, the methods of which parallel the TableReader interface being added in https://github.com/cockroachdb/cockroach/pull/6368. In preparation for UPSERT, which has more complicated batch usage than the previous three cases. name old time/op new time/op delta Insert1000_Cockroach-8 19.5ms ± 3% 19.8ms ± 6% ~ (p=0.690 n=5+5) Update1000_Cockroach-8 43.4ms ± 3% 43.5ms ± 6% ~ (p=0.841 n=5+5) Delete1000_Cockroach-8 65.1ms ± 2% 64.9ms ± 6% ~ (p=1.000 n=5+5) Scan1000_Cockroach-8 2.24ms ± 4% 2.15ms ± 2% -3.97% (p=0.016 n=5+5) TrackChoices1000_Cockroach-8 153µs ± 6% 148µs ± 4% ~ (p=0.222 n=5+5) name old alloc/op new alloc/op delta Insert1000_Cockroach-8 3.66MB ± 1% 3.64MB ± 1% ~ (p=0.095 n=5+5) Update1000_Cockroach-8 6.23MB ± 1% 6.21MB ± 0% ~ (p=0.548 n=5+5) Delete1000_Cockroach-8 2.68MB ± 0% 2.68MB ± 0% ~ (p=0.413 n=5+4) Scan1000_Cockroach-8 313kB ± 0% 313kB ± 0% ~ (p=1.000 n=5+5) TrackChoices1000_Cockroach-8 23.2kB ± 4% 23.7kB ± 6% ~ (p=0.317 n=5+5) name old allocs/op new allocs/op delta Insert1000_Cockroach-8 25.6k ± 1% 25.5k ± 1% ~ (p=0.095 n=5+5) Update1000_Cockroach-8 41.5k ± 0% 41.4k ± 0% ~ (p=0.206 n=5+5) Delete1000_Cockroach-8 16.7k ± 0% 16.7k ± 0% ~ (p=0.548 n=5+5) Scan1000_Cockroach-8 2.29k ± 0% 2.29k ± 0% ~ (all samples are equal) TrackChoices1000_Cockroach-8 115 ± 2% 116 ± 2% ~ (p=0.429 n=5+5) (The Scan1000 movement is noise.) --- sql/delete.go | 48 ++++------ sql/insert.go | 70 ++++++++------- sql/rowwriter.go | 60 +------------ sql/tablewriter.go | 218 +++++++++++++++++++++++++++++++++++++++++++++ sql/update.go | 98 +++++++------------- 5 files changed, 314 insertions(+), 180 deletions(-) create mode 100644 sql/tablewriter.go diff --git a/sql/delete.go b/sql/delete.go index 53f0b94d1b4a..47c099fcb12e 100644 --- a/sql/delete.go +++ b/sql/delete.go @@ -20,7 +20,6 @@ import ( "bytes" "fmt" - "github.com/cockroachdb/cockroach/client" "github.com/cockroachdb/cockroach/roachpb" "github.com/cockroachdb/cockroach/sql/parser" "github.com/cockroachdb/cockroach/sql/privilege" @@ -31,7 +30,7 @@ type deleteNode struct { editNodeBase n *parser.Delete - rd rowDeleter + tw tableDeleter run struct { // The following fields are populated during Start(). @@ -74,12 +73,14 @@ func (p *planner) Delete(n *parser.Delete, autoCommit bool) (planNode, *roachpb. return nil, roachpb.NewError(err) } // TODO(dan): Use rd.fetchCols to compute the fetch selectors. + tw := tableDeleter{rd: rd, autoCommit: autoCommit} - return &deleteNode{ + dn := &deleteNode{ n: n, editNodeBase: en, - rd: rd, - }, nil + tw: tw, + } + return dn, nil } func (d *deleteNode) Start() *roachpb.Error { @@ -97,14 +98,16 @@ func (d *deleteNode) Start() *roachpb.Error { return pErr } - d.run.startEditNode(&d.editNodeBase, rows) + if pErr = d.run.startEditNode(&d.editNodeBase, rows, &d.tw); pErr != nil { + return pErr + } // Check if we can avoid doing a round-trip to read the values and just // "fast-path" skip to deleting the key ranges without reading them first. // TODO(dt): We could probably be smarter when presented with an index-join, // but this goes away anyway once we push-down more of SQL. sel := rows.(*selectNode) - if scan, ok := sel.table.node.(*scanNode); ok && canDeleteWithoutScan(d.n, scan, &d.rd) { + if scan, ok := sel.table.node.(*scanNode); ok && canDeleteWithoutScan(d.n, scan, &d.tw) { d.run.fastPath = true d.run.pErr = d.fastDelete() d.run.done = true @@ -128,14 +131,14 @@ func (d *deleteNode) Next() bool { if !d.run.rows.Next() { // We're done. Finish the batch. - d.run.finalize(&d.editNodeBase, false) + d.run.pErr = d.tw.finalize() + d.run.done = true return false } rowVals := d.run.rows.Values() - d.run.pErr = d.rd.deleteRow(d.run.b, rowVals) - if d.run.pErr != nil { + if _, d.run.pErr = d.tw.run(rowVals); d.run.pErr != nil { return false } @@ -152,8 +155,8 @@ func (d *deleteNode) Next() bool { // Determine if the deletion of `rows` can be done without actually scanning them, // i.e. if we do not need to know their values for filtering expressions or a // RETURNING clause or for updating secondary indexes. -func canDeleteWithoutScan(n *parser.Delete, scan *scanNode, rd *rowDeleter) bool { - if !rd.fastPathAvailable() { +func canDeleteWithoutScan(n *parser.Delete, scan *scanNode, td *tableDeleter) bool { + if !td.fastPathAvailable() { return false } if n.Returning != nil { @@ -180,7 +183,10 @@ func (d *deleteNode) fastDelete() *roachpb.Error { return scan.pErr } - rowCount, pErr := d.rd.fastDelete(d.run.b, scan, d.fastDeleteCommitFunc) + if pErr := d.tw.init(d.p.txn); pErr != nil { + return pErr + } + rowCount, pErr := d.tw.fastDelete(scan) if pErr != nil { return pErr } @@ -188,22 +194,6 @@ func (d *deleteNode) fastDelete() *roachpb.Error { return nil } -func (d *deleteNode) fastDeleteCommitFunc(b *client.Batch) *roachpb.Error { - if d.autoCommit { - // An auto-txn can commit the transaction with the batch. This is an - // optimization to avoid an extra round-trip to the transaction - // coordinator. - if pErr := d.p.txn.CommitInBatch(b); pErr != nil { - return pErr - } - } else { - if pErr := d.p.txn.Run(b); pErr != nil { - return pErr - } - } - return nil -} - func (d *deleteNode) Columns() []ResultColumn { return d.rh.columns } diff --git a/sql/insert.go b/sql/insert.go index 1885ace6f6cc..11b6e590d3ca 100644 --- a/sql/insert.go +++ b/sql/insert.go @@ -28,13 +28,16 @@ import ( type insertNode struct { // The following fields are populated during makePlan. - rowCreatorNodeBase - n *parser.Insert - qvals qvalMap - insertRows parser.SelectStatement - checkExprs []parser.Expr + editNodeBase + defaultExprs []parser.Expr + n *parser.Insert + qvals qvalMap + insertRows parser.SelectStatement + checkExprs []parser.Expr - ri rowInserter + insertCols []ColumnDescriptor + insertColIDtoRowIndex map[ColumnID]int + tw tableWriter run struct { // The following fields are populated during Start(). @@ -99,13 +102,13 @@ func (p *planner) Insert(n *parser.Insert, autoCommit bool) (planNode, *roachpb. } } - rc, rcErr := p.makeRowCreatorNode(en, cols) - if rcErr != nil { - return nil, rcErr + defaultExprs, err := makeDefaultExprs(cols, &p.parser, p.evalCtx) + if err != nil { + return nil, roachpb.NewError(err) } // Replace any DEFAULT markers with the corresponding default expressions. - insertRows, err := p.fillDefaults(rc.defaultExprs, cols, n) + insertRows, err := p.fillDefaults(defaultExprs, cols, n) if err != nil { return nil, roachpb.NewError(err) } @@ -165,19 +168,23 @@ func (p *planner) Insert(n *parser.Insert, autoCommit bool) (planNode, *roachpb. return nil, pErr } - in := &insertNode{ - n: n, - rowCreatorNodeBase: rc, - checkExprs: checkExprs, - qvals: qvals, - insertRows: insertRows, - } - - in.ri, err = makeRowInserter(en.tableDesc, cols) + ri, err := makeRowInserter(en.tableDesc, cols) if err != nil { - return in, roachpb.NewError(err) + return nil, roachpb.NewError(err) } + tw := &tableInserter{ri: ri, autoCommit: autoCommit} + in := &insertNode{ + n: n, + editNodeBase: en, + defaultExprs: defaultExprs, + checkExprs: checkExprs, + qvals: qvals, + insertRows: insertRows, + insertCols: ri.insertCols, + insertColIDtoRowIndex: ri.insertColIDtoRowIndex, + tw: tw, + } return in, nil } @@ -198,7 +205,9 @@ func (n *insertNode) Start() *roachpb.Error { return pErr } - n.run.startEditNode(&n.editNodeBase, rows) + if pErr = n.run.startEditNode(&n.editNodeBase, rows, n.tw); pErr != nil { + return pErr + } // Prepare structures for building values to pass to rh. if n.rh.exprs != nil { @@ -217,8 +226,8 @@ func (n *insertNode) Start() *roachpb.Error { colIDToRetIndex[col.ID] = i } - n.run.rowIdxToRetIdx = make([]int, len(n.ri.insertCols)) - for i, col := range n.ri.insertCols { + n.run.rowIdxToRetIdx = make([]int, len(n.insertCols)) + for i, col := range n.insertCols { n.run.rowIdxToRetIdx[i] = colIDToRetIndex[col.ID] } } @@ -233,7 +242,8 @@ func (n *insertNode) Next() bool { if !n.run.rows.Next() { // We're done. Finish the batch. - n.run.finalize(&n.editNodeBase, true) + n.run.pErr = n.tw.finalize() + n.run.done = true return false } @@ -242,7 +252,7 @@ func (n *insertNode) Next() bool { // The values for the row may be shorter than the number of columns being // inserted into. Generate default values for those columns using the // default expressions. - for i := len(rowVals); i < len(n.ri.insertCols); i++ { + for i := len(rowVals); i < len(n.insertCols); i++ { if n.defaultExprs == nil { rowVals = append(rowVals, parser.DNull) continue @@ -258,7 +268,7 @@ func (n *insertNode) Next() bool { // Check to see if NULL is being inserted into any non-nullable column. for _, col := range n.tableDesc.Columns { if !col.Nullable { - if i, ok := n.ri.insertColIDtoRowIndex[col.ID]; !ok || rowVals[i] == parser.DNull { + if i, ok := n.insertColIDtoRowIndex[col.ID]; !ok || rowVals[i] == parser.DNull { n.run.pErr = roachpb.NewUErrorf("null value in column %q violates not-null constraint", col.Name) return false } @@ -267,7 +277,7 @@ func (n *insertNode) Next() bool { // Ensure that the values honor the specified column widths. for i := range rowVals { - if err := checkValueWidth(n.ri.insertCols[i], rowVals[i]); err != nil { + if err := checkValueWidth(n.insertCols[i], rowVals[i]); err != nil { n.run.pErr = roachpb.NewError(err) return false } @@ -277,7 +287,7 @@ func (n *insertNode) Next() bool { // Populate qvals. for ref, qval := range n.qvals { // The colIdx is 0-based, we need to change it to 1-based. - ri, has := n.ri.insertColIDtoRowIndex[ColumnID(ref.colIdx+1)] + ri, has := n.insertColIDtoRowIndex[ColumnID(ref.colIdx+1)] if !has { n.run.pErr = roachpb.NewUErrorf("failed to to find column %d in row", ColumnID(ref.colIdx+1)) return false @@ -299,7 +309,7 @@ func (n *insertNode) Next() bool { } } - n.run.pErr = n.ri.insertRow(n.run.b, rowVals) + _, n.run.pErr = n.tw.run(rowVals) if n.run.pErr != nil { return false } @@ -495,7 +505,7 @@ func (n *insertNode) ExplainPlan(v bool) (name, description string, children []p var buf bytes.Buffer if v { fmt.Fprintf(&buf, "into %s (", n.tableDesc.Name) - for i, col := range n.ri.insertCols { + for i, col := range n.insertCols { if i > 0 { fmt.Fprintf(&buf, ", ") } diff --git a/sql/rowwriter.go b/sql/rowwriter.go index 827052302d51..cad0bcd35392 100644 --- a/sql/rowwriter.go +++ b/sql/rowwriter.go @@ -293,11 +293,8 @@ func makeRowUpdater( if primaryKeyColChange { // These fields are only used when the primary key is changing. - // TODO(dan): Is it safe for these to share rowHelper instead of creating - // two more? var err error - ru.rd, err = makeRowDeleter(tableDesc) - if err != nil { + if ru.rd, err = makeRowDeleter(tableDesc); err != nil { return rowUpdater{}, err } ru.fetchCols = ru.rd.fetchCols @@ -305,6 +302,8 @@ func makeRowUpdater( return rowUpdater{}, err } } else { + // TODO(radu): we only need to select columns necessary to generate primary and + // secondary indexes keys, and columns needed by returningHelper. ru.fetchCols = make([]ColumnDescriptor, len(requestedCols)) copy(ru.fetchCols, requestedCols) } @@ -515,59 +514,6 @@ func (rd *rowDeleter) deleteRow(b *client.Batch, values []parser.Datum) *roachpb return nil } -// fastPathAvailable returns true if the fastDelete optimization can be used. -func (rd *rowDeleter) fastPathAvailable() bool { - if len(rd.helper.indexes) != 0 { - if log.V(2) { - log.Infof("delete forced to scan: values required to update %d secondary indexes", len(rd.helper.indexes)) - } - return false - } - return true -} - -// fastDelete adds to the batch the kv operations necessary to delete a sql -// table row without knowing the values that are currently present. -func (rd *rowDeleter) fastDelete( - b *client.Batch, - scan *scanNode, - commitFunc func(b *client.Batch) *roachpb.Error, -) (rowCount int, pErr *roachpb.Error) { - for _, span := range scan.spans { - if log.V(2) { - log.Infof("Skipping scan and just deleting %s - %s", span.start, span.end) - } - b.DelRange(span.start, span.end, true) - } - - pErr = commitFunc(b) - if pErr != nil { - return 0, pErr - } - - for _, r := range b.Results { - var prev []byte - for _, i := range r.Keys { - // If prefix is same, don't bother decoding key. - if len(prev) > 0 && bytes.HasPrefix(i, prev) { - continue - } - - after, err := scan.fetcher.readIndexKey(i) - if err != nil { - return 0, roachpb.NewError(err) - } - k := i[:len(i)-len(after)] - if !bytes.Equal(k, prev) { - prev = k - rowCount++ - } - } - } - - return rowCount, nil -} - func colIDtoRowIndexFromCols(cols []ColumnDescriptor) map[ColumnID]int { colIDtoRowIndex := make(map[ColumnID]int, len(cols)) for i, col := range cols { diff --git a/sql/tablewriter.go b/sql/tablewriter.go new file mode 100644 index 000000000000..dbdc91006796 --- /dev/null +++ b/sql/tablewriter.go @@ -0,0 +1,218 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. +// +// Author: Daniel Harrison (daniel.harrison@gmail.com) + +package sql + +import ( + "bytes" + + "github.com/cockroachdb/cockroach/client" + "github.com/cockroachdb/cockroach/roachpb" + "github.com/cockroachdb/cockroach/sql/parser" + "github.com/cockroachdb/cockroach/util/log" +) + +// tableWriter handles writing kvs and forming table rows. +// +// Usage: +// pErr := tw.init(txn) +// // Handle pErr. +// for { +// var values []parser.Datum +// values = ... +// row, pErr := tw.run(values) +// // Handle pErr. +// } +// pErr := tw.finalize() +// // Handle pErr. +type tableWriter interface { + + // init provides the tableWriter with a Txn to write to and returns an error + // if it was misconfigured. + init(*client.Txn) *roachpb.Error + + // run performs a sql row modification (tableInserter performs an insert, + // etc). It batch up writes to the init'd txn and periodically send them. The + // returned DTuple is suitable for use with returningHelper. + run(parser.DTuple) (parser.DTuple, *roachpb.Error) + + // finalize flushes out any remaining writes. It is called after all calls to + // run. + finalize() *roachpb.Error +} + +var _ tableWriter = (*tableInserter)(nil) +var _ tableWriter = (*tableUpdater)(nil) +var _ tableWriter = (*tableDeleter)(nil) + +// tableInserter handles writing kvs and forming table rows for inserts. +type tableInserter struct { + ri rowInserter + autoCommit bool + + // Set by init. + txn *client.Txn + b *client.Batch +} + +func (ti *tableInserter) init(txn *client.Txn) *roachpb.Error { + ti.txn = txn + ti.b = txn.NewBatch() + return nil +} + +func (ti *tableInserter) run(values parser.DTuple) (parser.DTuple, *roachpb.Error) { + return nil, ti.ri.insertRow(ti.b, values) +} + +func (ti *tableInserter) finalize() *roachpb.Error { + var pErr *roachpb.Error + if ti.autoCommit { + // An auto-txn can commit the transaction with the batch. This is an + // optimization to avoid an extra round-trip to the transaction + // coordinator. + pErr = ti.txn.CommitInBatch(ti.b) + } else { + pErr = ti.txn.Run(ti.b) + } + + if pErr != nil { + pErr = convertBatchError(ti.ri.helper.tableDesc, *ti.b, pErr) + } + return pErr +} + +// tableUpdater handles writing kvs and forming table rows for updates. +type tableUpdater struct { + ru rowUpdater + autoCommit bool + + // Set by init. + txn *client.Txn + b *client.Batch +} + +func (tu *tableUpdater) init(txn *client.Txn) *roachpb.Error { + tu.txn = txn + tu.b = txn.NewBatch() + return nil +} + +func (tu *tableUpdater) run(values parser.DTuple) (parser.DTuple, *roachpb.Error) { + oldValues := values[:len(tu.ru.fetchCols)] + updateValues := values[len(tu.ru.fetchCols):] + return tu.ru.updateRow(tu.b, oldValues, updateValues) +} + +func (tu *tableUpdater) finalize() *roachpb.Error { + var pErr *roachpb.Error + if tu.autoCommit { + // An auto-txn can commit the transaction with the batch. This is an + // optimization to avoid an extra round-trip to the transaction + // coordinator. + pErr = tu.txn.CommitInBatch(tu.b) + } else { + pErr = tu.txn.Run(tu.b) + } + + if pErr != nil { + pErr = convertBatchError(tu.ru.helper.tableDesc, *tu.b, pErr) + } + return pErr +} + +// tableDeleter handles writing kvs and forming table rows for deletes. +type tableDeleter struct { + rd rowDeleter + autoCommit bool + + // Set by init. + txn *client.Txn + b *client.Batch +} + +func (td *tableDeleter) init(txn *client.Txn) *roachpb.Error { + td.txn = txn + td.b = txn.NewBatch() + return nil +} + +func (td *tableDeleter) run(values parser.DTuple) (parser.DTuple, *roachpb.Error) { + return nil, td.rd.deleteRow(td.b, values) +} + +func (td *tableDeleter) finalize() *roachpb.Error { + if td.autoCommit { + // An auto-txn can commit the transaction with the batch. This is an + // optimization to avoid an extra round-trip to the transaction + // coordinator. + return td.txn.CommitInBatch(td.b) + } + return td.txn.Run(td.b) +} + +// fastPathAvailable returns true if the fastDelete optimization can be used. +func (td *tableDeleter) fastPathAvailable() bool { + if len(td.rd.helper.indexes) != 0 { + if log.V(2) { + log.Infof("delete forced to scan: values required to update %d secondary indexes", len(td.rd.helper.indexes)) + } + return false + } + return true +} + +// fastDelete adds to the batch the kv operations necessary to delete sql rows +// without knowing the values that are currently present. fastDelete calls +// finalize, so it should not be called after. +func (td *tableDeleter) fastDelete( + scan *scanNode, +) (rowCount int, pErr *roachpb.Error) { + for _, span := range scan.spans { + if log.V(2) { + log.Infof("Skipping scan and just deleting %s - %s", span.start, span.end) + } + td.b.DelRange(span.start, span.end, true) + } + + pErr = td.finalize() + if pErr != nil { + return 0, pErr + } + + for _, r := range td.b.Results { + var prev []byte + for _, i := range r.Keys { + // If prefix is same, don't bother decoding key. + if len(prev) > 0 && bytes.HasPrefix(i, prev) { + continue + } + + after, err := scan.fetcher.readIndexKey(i) + if err != nil { + return 0, roachpb.NewError(err) + } + k := i[:len(i)-len(after)] + if !bytes.Equal(k, prev) { + prev = k + rowCount++ + } + } + } + + td.b = nil + return rowCount, nil +} diff --git a/sql/update.go b/sql/update.go index 739e2a430bdd..0888468972e7 100644 --- a/sql/update.go +++ b/sql/update.go @@ -20,7 +20,6 @@ import ( "bytes" "fmt" - "github.com/cockroachdb/cockroach/client" "github.com/cockroachdb/cockroach/roachpb" "github.com/cockroachdb/cockroach/sql/parser" "github.com/cockroachdb/cockroach/sql/privilege" @@ -67,67 +66,30 @@ func (p *planner) makeEditNode(t parser.TableExpr, r parser.ReturningExprs, auto type editNodeRun struct { rows planNode pErr *roachpb.Error - b *client.Batch + tw tableWriter resultRow parser.DTuple done bool } -func (r *editNodeRun) startEditNode(en *editNodeBase, rows planNode) { +func (r *editNodeRun) startEditNode(en *editNodeBase, rows planNode, tw tableWriter) *roachpb.Error { if isSystemConfigID(en.tableDesc.GetID()) { // Mark transaction as operating on the system DB. en.p.txn.SetSystemConfigTrigger() } r.rows = rows - r.b = en.p.txn.NewBatch() -} - -func (r *editNodeRun) finalize(en *editNodeBase, convertError bool) { - if en.autoCommit { - // An auto-txn can commit the transaction with the batch. This is an - // optimization to avoid an extra round-trip to the transaction - // coordinator. - r.pErr = en.p.txn.CommitInBatch(r.b) - } else { - r.pErr = en.p.txn.Run(r.b) - } - if r.pErr != nil && convertError { - // TODO(dan): Move this logic into rowInsert/rowUpdate. - r.pErr = convertBatchError(en.tableDesc, *r.b, r.pErr) - } - - r.done = true - r.b = nil -} - -// recordCreatorNode (Base, Run) is shared by row creating statements -// (UPDATE, INSERT). - -// rowCreatorNodeBase holds the common (prepare+execute) state needed -// to run statements that create row values. -type rowCreatorNodeBase struct { - editNodeBase - defaultExprs []parser.Expr -} - -func (p *planner) makeRowCreatorNode(en editNodeBase, cols []ColumnDescriptor) (rowCreatorNodeBase, *roachpb.Error) { - defaultExprs, err := makeDefaultExprs(cols, &p.parser, p.evalCtx) - if err != nil { - return rowCreatorNodeBase{}, roachpb.NewError(err) - } - - return rowCreatorNodeBase{ - editNodeBase: en, - defaultExprs: defaultExprs, - }, nil + r.tw = tw + return r.tw.init(en.p.txn) } type updateNode struct { // The following fields are populated during makePlan. - rowCreatorNodeBase - n *parser.Update + editNodeBase + defaultExprs []parser.Expr + n *parser.Update - ru rowUpdater + updateCols []ColumnDescriptor + tw tableUpdater run struct { // The following fields are populated during Start(). @@ -187,20 +149,17 @@ func (p *planner) Update(n *parser.Update, autoCommit bool) (planNode, *roachpb. return nil, roachpb.NewError(err) } - rc, rcErr := p.makeRowCreatorNode(en, updateCols) - if rcErr != nil { - return nil, rcErr - } - - un := &updateNode{ - n: n, - rowCreatorNodeBase: rc, + defaultExprs, err := makeDefaultExprs(updateCols, &p.parser, p.evalCtx) + if err != nil { + return nil, roachpb.NewError(err) } - if un.ru, err = makeRowUpdater(en.tableDesc, updateCols); err != nil { + ru, err := makeRowUpdater(en.tableDesc, updateCols) + if err != nil { return nil, roachpb.NewError(err) } // TODO(dan): Use ru.fetchCols to compute the fetch selectors. + tw := tableUpdater{ru: ru, autoCommit: autoCommit} tracing.AnnotateTrace() @@ -220,13 +179,13 @@ func (p *planner) Update(n *parser.Update, autoCommit bool) (planNode, *roachpb. if expr.Tuple { if t, ok := expr.Expr.(*parser.Tuple); ok { for _, e := range t.Exprs { - e = fillDefault(e, i, rc.defaultExprs) + e = fillDefault(e, i, defaultExprs) targets = append(targets, parser.SelectExpr{Expr: e}) i++ } } } else { - e := fillDefault(expr.Expr, i, rc.defaultExprs) + e := fillDefault(expr.Expr, i, defaultExprs) targets = append(targets, parser.SelectExpr{Expr: e}) i++ } @@ -269,6 +228,13 @@ func (p *planner) Update(n *parser.Update, autoCommit bool) (planNode, *roachpb. return nil, pErr } + un := &updateNode{ + n: n, + editNodeBase: en, + defaultExprs: defaultExprs, + updateCols: ru.updateCols, + tw: tw, + } return un, nil } @@ -329,7 +295,9 @@ func (u *updateNode) Start() *roachpb.Error { return pErr } - u.run.startEditNode(&u.editNodeBase, rows) + if pErr = u.run.startEditNode(&u.editNodeBase, rows, &u.tw); pErr != nil { + return pErr + } return nil } @@ -340,7 +308,9 @@ func (u *updateNode) Next() bool { } if !u.run.rows.Next() { - u.run.finalize(&u.editNodeBase, true) + // We're done. Finish the batch. + u.run.pErr = u.tw.finalize() + u.run.done = true return false } @@ -355,14 +325,14 @@ func (u *updateNode) Next() bool { // Ensure that the values honor the specified column widths. for i := range updateValues { - if err := checkValueWidth(u.ru.updateCols[i], updateValues[i]); err != nil { + if err := checkValueWidth(u.updateCols[i], updateValues[i]); err != nil { u.run.pErr = roachpb.NewError(err) return false } } // Update the row values. - for i, col := range u.ru.updateCols { + for i, col := range u.updateCols { val := updateValues[i] if !col.Nullable && val == parser.DNull { u.run.pErr = roachpb.NewUErrorf("null value in column %q violates not-null constraint", col.Name) @@ -370,7 +340,7 @@ func (u *updateNode) Next() bool { } } - newValues, pErr := u.ru.updateRow(u.run.b, oldValues, updateValues) + newValues, pErr := u.tw.run(append(oldValues, updateValues...)) if pErr != nil { u.run.pErr = pErr return false @@ -422,7 +392,7 @@ func (u *updateNode) ExplainPlan(v bool) (name, description string, children []p var buf bytes.Buffer if v { fmt.Fprintf(&buf, "set %s (", u.tableDesc.Name) - for i, col := range u.ru.updateCols { + for i, col := range u.updateCols { if i > 0 { fmt.Fprintf(&buf, ", ") }