From 91159662fef3a649c973a072c823384a147ec2f6 Mon Sep 17 00:00:00 2001 From: Daniel Harrison Date: Wed, 20 Apr 2016 13:28:59 -0400 Subject: [PATCH] sql: extract k/v operations from insert/update/delete Separates the mapping between table rows/indexes and key/values for the mutative operations. This will be useful immediately for implementing UPSERT and eventually for TableWriter. I tried to keep this as pure a refactor as I could, but it was a lot simpler to model an UPDATE with a PK change as a delete followed by an update (instead of migrating the mess I committed before). I'm still not clear which component decides which columns are required, but I think the additionally complexity that UPSERT/REPLACE adds will make it more obvious one way or another. Once that happens it should be more obvious how to handle all these colIDtoRowIndex maps. --- sql/delete.go | 96 ++------ sql/insert.go | 88 +------- sql/rowwriter.go | 577 +++++++++++++++++++++++++++++++++++++++++++++++ sql/update.go | 281 +++-------------------- 4 files changed, 644 insertions(+), 398 deletions(-) create mode 100644 sql/rowwriter.go diff --git a/sql/delete.go b/sql/delete.go index c11b920f1394..c46c69811c9e 100644 --- a/sql/delete.go +++ b/sql/delete.go @@ -20,6 +20,7 @@ import ( "bytes" "fmt" + "github.com/cockroachdb/cockroach/client" "github.com/cockroachdb/cockroach/roachpb" "github.com/cockroachdb/cockroach/sql/parser" "github.com/cockroachdb/cockroach/sql/privilege" @@ -31,10 +32,11 @@ type deleteNode struct { n *parser.Delete run struct { + // The following fields are populated during Start(). editNodeRun - colIDtoRowIndex map[ColumnID]int - fastPath bool + rd rowDeleter + fastPath bool } } @@ -93,26 +95,21 @@ func (d *deleteNode) Start() *roachpb.Error { if err != nil { return roachpb.NewError(err) } - d.run.colIDtoRowIndex = colIDtoRowIndex - - // Determine the secondary indexes that need to be updated as well. - indexes := d.tableDesc.Indexes - // Also include all the indexes under mutation; mutation state is - // irrelevant for deletions. - for _, m := range d.tableDesc.Mutations { - if index := m.GetIndex(); index != nil { - indexes = append(indexes, *index) - } + + rd, err := makeRowDeleter(d.tableDesc, colIDtoRowIndex) + if err != nil { + return roachpb.NewError(err) } + d.run.rd = rd - d.run.startEditNode(&d.editNodeBase, rows, indexes) + d.run.startEditNode(&d.editNodeBase, rows) // Check if we can avoid doing a round-trip to read the values and just // "fast-path" skip to deleting the key ranges without reading them first. // TODO(dt): We could probably be smarter when presented with an index-join, // but this goes away anyway once we push-down more of SQL. sel := rows.(*selectNode) - if scan, ok := sel.table.node.(*scanNode); ok && canDeleteWithoutScan(d.n, scan, len(indexes)) { + if scan, ok := sel.table.node.(*scanNode); ok && canDeleteWithoutScan(d.n, scan, &d.run.rd) { d.run.fastPath = true d.run.pErr = d.fastDelete() d.run.done = true @@ -142,35 +139,11 @@ func (d *deleteNode) Next() bool { rowVals := d.run.rows.Values() - primaryIndexKey, _, err := encodeIndexKey( - &d.run.primaryIndex, d.run.colIDtoRowIndex, rowVals, d.run.primaryIndexKeyPrefix) - if err != nil { - d.run.pErr = roachpb.NewError(err) - return false - } - - secondaryIndexEntries, err := encodeSecondaryIndexes( - d.tableDesc.ID, d.run.indexes, d.run.colIDtoRowIndex, rowVals) - if err != nil { - d.run.pErr = roachpb.NewError(err) + d.run.pErr = d.run.rd.deleteRow(d.run.b, rowVals) + if d.run.pErr != nil { return false } - for _, secondaryIndexEntry := range secondaryIndexEntries { - if log.V(2) { - log.Infof("Del %s", secondaryIndexEntry.key) - } - d.run.b.Del(secondaryIndexEntry.key) - } - - // Delete the row. - rowStartKey := roachpb.Key(primaryIndexKey) - rowEndKey := rowStartKey.PrefixEnd() - if log.V(2) { - log.Infof("DelRange %s - %s", rowStartKey, rowEndKey) - } - d.run.b.DelRange(rowStartKey, rowEndKey, false) - resultRow, err := d.rh.cookResultRow(rowVals) if err != nil { d.run.pErr = roachpb.NewError(err) @@ -184,11 +157,8 @@ func (d *deleteNode) Next() bool { // Determine if the deletion of `rows` can be done without actually scanning them, // i.e. if we do not need to know their values for filtering expressions or a // RETURNING clause or for updating secondary indexes. -func canDeleteWithoutScan(n *parser.Delete, scan *scanNode, indexCount int) bool { - if indexCount != 0 { - if log.V(2) { - log.Infof("delete forced to scan: values required to update %d secondary indexes", indexCount) - } +func canDeleteWithoutScan(n *parser.Delete, scan *scanNode, rd *rowDeleter) bool { + if !rd.fastPathAvailable() { return false } if n.Returning != nil { @@ -215,45 +185,27 @@ func (d *deleteNode) fastDelete() *roachpb.Error { return scan.pErr } - for _, span := range scan.spans { - if log.V(2) { - log.Infof("Skipping scan and just deleting %s - %s", span.start, span.end) - } - d.run.b.DelRange(span.start, span.end, true) + rowCount, pErr := d.run.rd.fastDelete(d.run.b, scan, d.fastDeleteCommitFunc) + if pErr != nil { + return pErr } + d.rh.rowCount += rowCount + return nil +} +func (d *deleteNode) fastDeleteCommitFunc(b *client.Batch) *roachpb.Error { if d.autoCommit { // An auto-txn can commit the transaction with the batch. This is an // optimization to avoid an extra round-trip to the transaction // coordinator. - if pErr := d.p.txn.CommitInBatch(d.run.b); pErr != nil { + if pErr := d.p.txn.CommitInBatch(b); pErr != nil { return pErr } } else { - if pErr := d.p.txn.Run(d.run.b); pErr != nil { + if pErr := d.p.txn.Run(b); pErr != nil { return pErr } } - - for _, r := range d.run.b.Results { - var prev []byte - for _, i := range r.Keys { - // If prefix is same, don't bother decoding key. - if len(prev) > 0 && bytes.HasPrefix(i, prev) { - continue - } - - after, err := scan.fetcher.readIndexKey(i) - if err != nil { - return roachpb.NewError(err) - } - k := i[:len(i)-len(after)] - if !bytes.Equal(k, prev) { - prev = k - d.rh.rowCount++ - } - } - } return nil } diff --git a/sql/insert.go b/sql/insert.go index eeea4ea72f3b..30c65169629c 100644 --- a/sql/insert.go +++ b/sql/insert.go @@ -20,12 +20,10 @@ import ( "bytes" "fmt" - "github.com/cockroachdb/cockroach/keys" "github.com/cockroachdb/cockroach/roachpb" "github.com/cockroachdb/cockroach/sql/parser" "github.com/cockroachdb/cockroach/sql/privilege" "github.com/cockroachdb/cockroach/util" - "github.com/cockroachdb/cockroach/util/log" ) type insertNode struct { @@ -38,7 +36,9 @@ type insertNode struct { run struct { // The following fields are populated during Start(). - rowCreatorNodeRun + editNodeRun + + ri rowInserter rowIdxToRetIdx []int rowTemplate parser.DTuple } @@ -201,18 +201,13 @@ func (n *insertNode) Start() *roachpb.Error { return pErr } - // Determine the secondary indexes that need to be updated as well. - indexes := n.tableDesc.Indexes - // Also include the secondary indexes in mutation state WRITE_ONLY. - for _, m := range n.tableDesc.Mutations { - if m.State == DescriptorMutation_WRITE_ONLY { - if index := m.GetIndex(); index != nil { - indexes = append(indexes, *index) - } - } + ri, err := makeRowInserter(n.tableDesc, n.colIDtoRowIndex, n.cols) + if err != nil { + return roachpb.NewError(err) } + n.run.ri = ri - n.run.startRowCreatorNode(&n.rowCreatorNodeBase, rows, indexes) + n.run.startEditNode(&n.editNodeBase, rows) // Prepare structures for building values to pass to rh. if n.rh.exprs != nil { @@ -313,78 +308,15 @@ func (n *insertNode) Next() bool { } } - // Encode the values to the expected column type. This needs to - // happen before index encoding because certain datum types (i.e. tuple) - // cannot be used as index values. - for i, val := range rowVals { - // Make sure the value can be written to the column before proceeding. - var mErr error - if n.run.marshalled[i], mErr = marshalColumnValue(n.cols[i], val); mErr != nil { - n.run.pErr = roachpb.NewError(mErr) - return false - } - } - - primaryIndexKey, _, eErr := encodeIndexKey( - &n.run.primaryIndex, n.colIDtoRowIndex, rowVals, n.run.primaryIndexKeyPrefix) - if eErr != nil { - n.run.pErr = roachpb.NewError(eErr) + n.run.pErr = n.run.ri.insertRow(n.run.b, rowVals) + if n.run.pErr != nil { return false } - // Write the row sentinel. We want to write the sentinel first in case - // we are trying to insert a duplicate primary key: if we write the - // secondary indexes first, we may get an error that looks like a - // uniqueness violation on a non-unique index. - sentinelKey := keys.MakeNonColumnKey(primaryIndexKey) - if log.V(2) { - log.Infof("CPut %s -> NULL", roachpb.Key(sentinelKey)) - } - // This is subtle: An interface{}(nil) deletes the value, so we pass in - // []byte{} as a non-nil value. - n.run.b.CPut(sentinelKey, []byte{}, nil) - - secondaryIndexEntries, eErr := encodeSecondaryIndexes( - n.tableDesc.ID, n.run.indexes, n.colIDtoRowIndex, rowVals) - if eErr != nil { - n.run.pErr = roachpb.NewError(eErr) - return false - } - - for _, secondaryIndexEntry := range secondaryIndexEntries { - if log.V(2) { - log.Infof("CPut %s -> %v", secondaryIndexEntry.key, - secondaryIndexEntry.value) - } - n.run.b.CPut(secondaryIndexEntry.key, secondaryIndexEntry.value, nil) - } - - // Write the row columns. for i, val := range rowVals { - col := n.cols[i] if n.run.rowTemplate != nil { n.run.rowTemplate[n.run.rowIdxToRetIdx[i]] = val } - - if _, ok := n.primaryKeyCols[col.ID]; ok { - // Skip primary key columns as their values are encoded in the row - // sentinel key which is guaranteed to exist for as long as the row - // exists. - continue - } - - if n.run.marshalled[i] != nil { - // We only output non-NULL values. Non-existent column keys are - // considered NULL during scanning and the row sentinel ensures we know - // the row exists. - - key := keys.MakeColumnKey(primaryIndexKey, uint32(col.ID)) - if log.V(2) { - log.Infof("CPut %s -> %v", roachpb.Key(key), val) - } - - n.run.b.CPut(key, n.run.marshalled[i], nil) - } } resultRow, err := n.rh.cookResultRow(n.run.rowTemplate) diff --git a/sql/rowwriter.go b/sql/rowwriter.go new file mode 100644 index 000000000000..b55229736e60 --- /dev/null +++ b/sql/rowwriter.go @@ -0,0 +1,577 @@ +// Copyright 2015 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. +// +// Author: Peter Mattis (peter@cockroachlabs.com) +// Author: Daniel Harrison (daniel.harrison@gmail.com) + +package sql + +import ( + "bytes" + "fmt" + + "github.com/cockroachdb/cockroach/client" + "github.com/cockroachdb/cockroach/keys" + "github.com/cockroachdb/cockroach/roachpb" + "github.com/cockroachdb/cockroach/sql/parser" + "github.com/cockroachdb/cockroach/util/log" +) + +// rowHelper has the common methods for table row manipulations. +type rowHelper struct { + tableDesc *TableDescriptor + indexes []IndexDescriptor + primaryIndexKeyPrefix []byte + + // colIDtoRowIndex defines the expected order of parser.Datums passed to + // row{Inserter/Updater/Deleter} by mapping a column id from the table schema + // to the index it will appear at in the row. The set of columns present in a row + // varies depending on the row operation. + // + // - insert: columns provided by the user, plus any columns with default values + // - update: all columns + // - delete: all columns + // + // TODO(dan): These can be tightened up. Update, for example, only needs all + // columns if the primary key is changing. Otherwise, it needs the columns + // that are changing, plus the columns in each index that needs to be updated. + colIDtoRowIndex map[ColumnID]int + + // Computed and cached by InPrimaryIndex. + primaryIndexCols map[ColumnID]struct{} +} + +func makeRowHelper( + tableDesc *TableDescriptor, + colIDtoRowIndex map[ColumnID]int, + indexes []IndexDescriptor, +) rowHelper { + return rowHelper{ + tableDesc: tableDesc, + colIDtoRowIndex: colIDtoRowIndex, + indexes: indexes, + primaryIndexKeyPrefix: MakeIndexKeyPrefix(tableDesc.ID, tableDesc.PrimaryIndex.ID), + } +} + +func (rw *rowHelper) requirePrimaryIndexCols() error { + for i, col := range rw.tableDesc.PrimaryIndex.ColumnIDs { + if _, ok := rw.colIDtoRowIndex[col]; !ok { + return fmt.Errorf("missing %q primary key column", rw.tableDesc.PrimaryIndex.ColumnNames[i]) + } + } + return nil +} + +func (rw *rowHelper) requireAllIndexCols() error { + for _, index := range rw.indexes { + for _, col := range index.ColumnIDs { + if _, ok := rw.colIDtoRowIndex[col]; !ok { + return fmt.Errorf("missing %q index column", col) + } + } + } + return nil +} + +func (rw *rowHelper) encodeIndexes(values []parser.Datum) ( + primaryIndexKey []byte, + secondaryIndexEntries []indexEntry, + err error, +) { + primaryIndexKey, _, err = encodeIndexKey( + &rw.tableDesc.PrimaryIndex, rw.colIDtoRowIndex, values, rw.primaryIndexKeyPrefix) + if err != nil { + return nil, nil, err + } + secondaryIndexEntries, err = encodeSecondaryIndexes( + rw.tableDesc.ID, rw.indexes, rw.colIDtoRowIndex, values) + if err != nil { + return nil, nil, err + } + + return primaryIndexKey, secondaryIndexEntries, nil +} + +// TODO(dan): This logic is common and being moved into TableDescriptor (see +// #6233). Once it is, use the shared one. +func (rw *rowHelper) columnInPK(colID ColumnID) bool { + if rw.primaryIndexCols == nil { + rw.primaryIndexCols = make(map[ColumnID]struct{}) + for _, colID := range rw.tableDesc.PrimaryIndex.ColumnIDs { + rw.primaryIndexCols[colID] = struct{}{} + } + } + _, ok := rw.primaryIndexCols[colID] + return ok +} + +// rowInserter abstracts the key/value operations for inserting table rows. +type rowInserter struct { + rowHelper rowHelper + cols []ColumnDescriptor + + // For allocation avoidance. + marshalled []interface{} +} + +// makeRowInserter creates a rowInserter for the given table. +// +// colIDtoRowIndex defines the expected order of parser.Datums passed to +// insertRow by mapping a column id from the table schema to the index it will +// appear at in the row. It contains the same set of columns as cols, including +// all default columns being inserted. +// +// TODO(dan): Eliminate the duplication. +func makeRowInserter( + tableDesc *TableDescriptor, + colIDtoRowIndex map[ColumnID]int, + cols []ColumnDescriptor, +) (rowInserter, error) { + indexes := tableDesc.Indexes + // Also include the secondary indexes in mutation state WRITE_ONLY. + for _, m := range tableDesc.Mutations { + if m.State == DescriptorMutation_WRITE_ONLY { + if index := m.GetIndex(); index != nil { + indexes = append(indexes, *index) + } + } + } + + rh := makeRowHelper(tableDesc, colIDtoRowIndex, indexes) + if err := rh.requirePrimaryIndexCols(); err != nil { + return rowInserter{}, err + } + + return rowInserter{ + rowHelper: rh, + cols: cols, + marshalled: make([]interface{}, len(cols)), + }, nil +} + +// insertRow adds to the batch the kv operations necessary to insert a table row +// with the given values. +func (ri *rowInserter) insertRow(b *client.Batch, values []parser.Datum) *roachpb.Error { + if len(values) != len(ri.cols) { + return roachpb.NewErrorf("got %d values but expected %d", len(values), len(ri.cols)) + } + + // Encode the values to the expected column type. This needs to + // happen before index encoding because certain datum types (i.e. tuple) + // cannot be used as index values. + for i, val := range values { + // Make sure the value can be written to the column before proceeding. + var err error + if ri.marshalled[i], err = marshalColumnValue(ri.cols[i], val); err != nil { + return roachpb.NewError(err) + } + } + + primaryIndexKey, secondaryIndexEntries, err := ri.rowHelper.encodeIndexes(values) + if err != nil { + return roachpb.NewError(err) + } + + // Write the row sentinel. We want to write the sentinel first in case + // we are trying to insert a duplicate primary key: if we write the + // secondary indexes first, we may get an error that looks like a + // uniqueness violation on a non-unique index. + sentinelKey := keys.MakeNonColumnKey(primaryIndexKey) + if log.V(2) { + log.Infof("CPut %s -> NULL", roachpb.Key(sentinelKey)) + } + // This is subtle: An interface{}(nil) deletes the value, so we pass in + // []byte{} as a non-nil value. + b.CPut(sentinelKey, []byte{}, nil) + + for _, secondaryIndexEntry := range secondaryIndexEntries { + if log.V(2) { + log.Infof("CPut %s -> %v", secondaryIndexEntry.key, secondaryIndexEntry.value) + } + b.CPut(secondaryIndexEntry.key, secondaryIndexEntry.value, nil) + } + + // Write the row columns. + for i, val := range values { + col := ri.cols[i] + + if ri.rowHelper.columnInPK(col.ID) { + // Skip primary key columns as their values are encoded in the row + // sentinel key which is guaranteed to exist for as long as the row + // exists. + continue + } + + if ri.marshalled[i] != nil { + // We only output non-NULL values. Non-existent column keys are + // considered NULL during scanning and the row sentinel ensures we know + // the row exists. + + key := keys.MakeColumnKey(primaryIndexKey, uint32(col.ID)) + if log.V(2) { + log.Infof("CPut %s -> %v", roachpb.Key(key), val) + } + + b.CPut(key, ri.marshalled[i], nil) + } + } + + return nil +} + +// rowUpdater abstracts the key/value operations for updating table rows. +type rowUpdater struct { + rowHelper rowHelper + rd rowDeleter + ri rowInserter + updateCols []ColumnDescriptor + deleteOnlyIndex map[int]struct{} + primaryKeyColChange bool + + // For allocation avoidance. + marshalled []interface{} + newValues []parser.Datum +} + +// makeRowUpdater creates a rowUpdater for the given table. +// +// colIDtoRowIndex defines the expected order of parser.Datums in values (which +// are the existing values) passed to updateRow by mapping a column id from the +// table schema to the index it will appear at in the row. +// +// updateCols are the columns being updated and corresponds to the updateValues +// that will be passed to updateRow. This means all columns if the primary key +// is changing. Otherwise, it needs the columns that are changing, plus the +// columns in each index that needs to be updated. +func makeRowUpdater( + tableDesc *TableDescriptor, + colIDtoRowIndex map[ColumnID]int, + updateCols []ColumnDescriptor, +) (rowUpdater, error) { + primaryIndexCols := make(map[ColumnID]struct{}, len(tableDesc.PrimaryIndex.ColumnIDs)) + for _, colID := range tableDesc.PrimaryIndex.ColumnIDs { + primaryIndexCols[colID] = struct{}{} + } + + var primaryKeyColChange bool + for _, c := range updateCols { + if _, ok := primaryIndexCols[c.ID]; ok { + primaryKeyColChange = true + break + } + } + + updateColsMap := make(map[ColumnID]struct{}) + for _, updateCol := range updateCols { + updateColsMap[updateCol.ID] = struct{}{} + } + + // Secondary indexes needing updating. + needsUpdate := func(index IndexDescriptor) bool { + // If the primary key changed, we need to update all of them. + if primaryKeyColChange { + return true + } + for _, id := range index.ColumnIDs { + if _, ok := updateColsMap[id]; ok { + return true + } + } + return false + } + + indexes := make([]IndexDescriptor, 0, len(tableDesc.Indexes)+len(tableDesc.Mutations)) + for _, index := range tableDesc.Indexes { + if needsUpdate(index) { + indexes = append(indexes, index) + } + } + + var deleteOnlyIndex map[int]struct{} + for _, m := range tableDesc.Mutations { + if index := m.GetIndex(); index != nil { + if needsUpdate(*index) { + indexes = append(indexes, *index) + + switch m.State { + case DescriptorMutation_DELETE_ONLY: + if deleteOnlyIndex == nil { + // Allocate at most once. + deleteOnlyIndex = make(map[int]struct{}, len(tableDesc.Mutations)) + } + deleteOnlyIndex[len(indexes)-1] = struct{}{} + + case DescriptorMutation_WRITE_ONLY: + } + } + } + } + + rh := makeRowHelper(tableDesc, colIDtoRowIndex, indexes) + // We already had to compute this, so may as well save it. + rh.primaryIndexCols = primaryIndexCols + + ru := rowUpdater{ + rowHelper: rh, + updateCols: updateCols, + deleteOnlyIndex: deleteOnlyIndex, + primaryKeyColChange: primaryKeyColChange, + marshalled: make([]interface{}, len(updateCols)), + newValues: make([]parser.Datum, len(tableDesc.Columns)), + } + + if primaryKeyColChange { + // These fields are only used when the primary key is changing. + // TODO(dan): Is it safe for these to share rowHelper instead of creating + // two more? + var err error + ru.rd, err = makeRowDeleter(tableDesc, colIDtoRowIndex) + if err != nil { + return rowUpdater{}, err + } + ru.ri, err = makeRowInserter(tableDesc, colIDtoRowIndex, tableDesc.Columns) + if err != nil { + return rowUpdater{}, err + } + } + + return ru, nil +} + +// updateRow adds to the batch the kv operations necessary to update a table row +// with the given values. +// +// The row corresponding to values is updated with the ones in updateValues. +// Note that updateValues only contains the ones that are changing. +// +// The return value is only good until the next call to UpdateRow. +func (ru *rowUpdater) updateRow( + b *client.Batch, + values []parser.Datum, + updateValues []parser.Datum, +) ([]parser.Datum, *roachpb.Error) { + if len(values) != len(ru.rowHelper.tableDesc.Columns) { + return nil, roachpb.NewErrorf("got %d values but expected %d", len(values), len(ru.rowHelper.tableDesc.Columns)) + } + if len(updateValues) != len(ru.updateCols) { + return nil, roachpb.NewErrorf("got %d values but expected %d", len(updateValues), len(ru.updateCols)) + } + + primaryIndexKey, secondaryIndexEntries, err := ru.rowHelper.encodeIndexes(values) + if err != nil { + return nil, roachpb.NewError(err) + } + + // Check that the new value types match the column types. This needs to + // happen before index encoding because certain datum types (i.e. tuple) + // cannot be used as index values. + for i, val := range updateValues { + if ru.marshalled[i], err = marshalColumnValue(ru.updateCols[i], val); err != nil { + return nil, roachpb.NewError(err) + } + } + + // Update the row values. + copy(ru.newValues, values) + for i, updateCol := range ru.updateCols { + ru.newValues[ru.rowHelper.colIDtoRowIndex[updateCol.ID]] = updateValues[i] + } + + newPrimaryIndexKey := primaryIndexKey + rowPrimaryKeyChanged := false + var newSecondaryIndexEntries []indexEntry + if ru.primaryKeyColChange { + newPrimaryIndexKey, newSecondaryIndexEntries, err = ru.rowHelper.encodeIndexes(ru.newValues) + if err != nil { + return nil, roachpb.NewError(err) + } + rowPrimaryKeyChanged = !bytes.Equal(primaryIndexKey, newPrimaryIndexKey) + } else { + newSecondaryIndexEntries, err = encodeSecondaryIndexes( + ru.rowHelper.tableDesc.ID, ru.rowHelper.indexes, ru.rowHelper.colIDtoRowIndex, ru.newValues) + if err != nil { + return nil, roachpb.NewError(err) + } + } + + if rowPrimaryKeyChanged { + pErr := ru.rd.deleteRow(b, values) + if pErr != nil { + return nil, pErr + } + pErr = ru.ri.insertRow(b, ru.newValues) + return ru.newValues, pErr + } + + // Update secondary indexes. + for i, newSecondaryIndexEntry := range newSecondaryIndexEntries { + secondaryIndexEntry := secondaryIndexEntries[i] + secondaryKeyChanged := !bytes.Equal(newSecondaryIndexEntry.key, secondaryIndexEntry.key) + if secondaryKeyChanged { + if log.V(2) { + log.Infof("Del %s", secondaryIndexEntry.key) + } + b.Del(secondaryIndexEntry.key) + // Do not update Indexes in the DELETE_ONLY state. + if _, ok := ru.deleteOnlyIndex[i]; !ok { + if log.V(2) { + log.Infof("CPut %s -> %v", newSecondaryIndexEntry.key, newSecondaryIndexEntry.value) + } + b.CPut(newSecondaryIndexEntry.key, newSecondaryIndexEntry.value, nil) + } + } + } + + // Add the new values. + for i, val := range updateValues { + col := ru.updateCols[i] + + if ru.rowHelper.columnInPK(col.ID) { + // Skip primary key columns as their values are encoded in the row + // sentinel key which is guaranteed to exist for as long as the row + // exists. + continue + } + + key := keys.MakeColumnKey(newPrimaryIndexKey, uint32(col.ID)) + if ru.marshalled[i] != nil { + // We only output non-NULL values. Non-existent column keys are + // considered NULL during scanning and the row sentinel ensures we know + // the row exists. + if log.V(2) { + log.Infof("Put %s -> %v", roachpb.Key(key), val) + } + + b.Put(key, ru.marshalled[i]) + } else { + // The column might have already existed but is being set to NULL, so + // delete it. + if log.V(2) { + log.Infof("Del %s", roachpb.Key(key)) + } + + b.Del(key) + } + + } + + return ru.newValues, nil +} + +// rowDeleter abstracts the key/value operations for deleting table rows. +type rowDeleter struct { + rowHelper rowHelper +} + +// makeRowDeleter creates a rowDeleter for the given table. +// +// colIDtoRowIndex defines the expected order of parser.Datums passed to +// deleteRow by mapping a column id from the table schema to the index it will +// appear at in the row. +func makeRowDeleter( + tableDesc *TableDescriptor, + colIDtoRowIndex map[ColumnID]int, +) (rowDeleter, error) { + indexes := tableDesc.Indexes + for _, m := range tableDesc.Mutations { + if index := m.GetIndex(); index != nil { + indexes = append(indexes, *index) + } + } + rowHelper := makeRowHelper(tableDesc, colIDtoRowIndex, indexes) + if err := rowHelper.requireAllIndexCols(); err != nil { + return rowDeleter{}, err + } + return rowDeleter{rowHelper}, nil +} + +// deleteRow adds to the batch the kv operations necessary to delete a table row +// with the given values. +func (rd *rowDeleter) deleteRow(b *client.Batch, values []parser.Datum) *roachpb.Error { + primaryIndexKey, secondaryIndexEntries, err := rd.rowHelper.encodeIndexes(values) + if err != nil { + return roachpb.NewError(err) + } + + for _, secondaryIndexEntry := range secondaryIndexEntries { + if log.V(2) { + log.Infof("Del %s", secondaryIndexEntry.key) + } + b.Del(secondaryIndexEntry.key) + } + + // Delete the row. + rowStartKey := roachpb.Key(primaryIndexKey) + rowEndKey := rowStartKey.PrefixEnd() + if log.V(2) { + log.Infof("DelRange %s - %s", rowStartKey, rowEndKey) + } + b.DelRange(rowStartKey, rowEndKey, false) + + return nil +} + +// fastPathAvailable returns true if the fastDelete optimization can be used. +func (rd *rowDeleter) fastPathAvailable() bool { + if len(rd.rowHelper.indexes) != 0 { + if log.V(2) { + log.Infof("delete forced to scan: values required to update %d secondary indexes", len(rd.rowHelper.indexes)) + } + return false + } + return true +} + +// fastDelete adds to the batch the kv operations necessary to delete a sql +// table row without knowing the values that are currently present. +func (rd *rowDeleter) fastDelete( + b *client.Batch, + scan *scanNode, + commitFunc func(b *client.Batch) *roachpb.Error, +) (rowCount int, pErr *roachpb.Error) { + for _, span := range scan.spans { + if log.V(2) { + log.Infof("Skipping scan and just deleting %s - %s", span.start, span.end) + } + b.DelRange(span.start, span.end, true) + } + + pErr = commitFunc(b) + if pErr != nil { + return 0, pErr + } + + for _, r := range b.Results { + var prev []byte + for _, i := range r.Keys { + // If prefix is same, don't bother decoding key. + if len(prev) > 0 && bytes.HasPrefix(i, prev) { + continue + } + + after, err := scan.fetcher.readIndexKey(i) + if err != nil { + return 0, roachpb.NewError(err) + } + k := i[:len(i)-len(after)] + if !bytes.Equal(k, prev) { + prev = k + rowCount++ + } + } + } + + return rowCount, nil +} diff --git a/sql/update.go b/sql/update.go index 603358908651..cff03fe2ae00 100644 --- a/sql/update.go +++ b/sql/update.go @@ -21,11 +21,9 @@ import ( "fmt" "github.com/cockroachdb/cockroach/client" - "github.com/cockroachdb/cockroach/keys" "github.com/cockroachdb/cockroach/roachpb" "github.com/cockroachdb/cockroach/sql/parser" "github.com/cockroachdb/cockroach/sql/privilege" - "github.com/cockroachdb/cockroach/util/log" "github.com/cockroachdb/cockroach/util/tracing" ) @@ -67,27 +65,21 @@ func (p *planner) makeEditNode(t parser.TableExpr, r parser.ReturningExprs, auto // editNodeRun holds the runtime (execute) state needed to run // row-modifying statements. type editNodeRun struct { - rows planNode - primaryIndex IndexDescriptor - primaryIndexKeyPrefix []byte - indexes []IndexDescriptor - pErr *roachpb.Error - b *client.Batch - resultRow parser.DTuple - done bool + rows planNode + pErr *roachpb.Error + b *client.Batch + resultRow parser.DTuple + done bool } -func (r *editNodeRun) startEditNode(en *editNodeBase, rows planNode, indexes []IndexDescriptor) { +func (r *editNodeRun) startEditNode(en *editNodeBase, rows planNode) { if isSystemConfigID(en.tableDesc.GetID()) { // Mark transaction as operating on the system DB. en.p.txn.SetSystemConfigTrigger() } r.rows = rows - r.primaryIndex = en.tableDesc.PrimaryIndex - r.primaryIndexKeyPrefix = MakeIndexKeyPrefix(en.tableDesc.ID, r.primaryIndex.ID) r.b = en.p.txn.NewBatch() - r.indexes = indexes } func (r *editNodeRun) finalize(en *editNodeBase, convertError bool) { @@ -100,6 +92,7 @@ func (r *editNodeRun) finalize(en *editNodeBase, convertError bool) { r.pErr = en.p.txn.Run(r.b) } if r.pErr != nil && convertError { + // TODO(dan): Move this logic into rowInsert/rowUpdate. r.pErr = convertBatchError(en.tableDesc, *r.b, r.pErr) } @@ -146,29 +139,17 @@ func (p *planner) makeRowCreatorNode(en editNodeBase, cols []ColumnDescriptor, c }, nil } -// rowCreatorNodeBase holds the runtime (execute) state needed to run -// statements that create row values. -type rowCreatorNodeRun struct { - editNodeRun - marshalled []interface{} -} - -func (r *rowCreatorNodeRun) startRowCreatorNode(en *rowCreatorNodeBase, rows planNode, indexes []IndexDescriptor) { - r.startEditNode(&en.editNodeBase, rows, indexes) - r.marshalled = make([]interface{}, len(en.cols)) -} - type updateNode struct { // The following fields are populated during makePlan. rowCreatorNodeBase - n *parser.Update - colIDSet map[ColumnID]struct{} - primaryKeyColChange bool + n *parser.Update + colIDSet map[ColumnID]int run struct { // The following fields are populated during Start(). - rowCreatorNodeRun - deleteOnlyIndex map[int]struct{} + editNodeRun + + ru rowUpdater } } @@ -230,13 +211,9 @@ func (p *planner) Update(n *parser.Update, autoCommit bool) (planNode, *roachpb. } // Set of columns being updated - var primaryKeyColChange bool - colIDSet := map[ColumnID]struct{}{} - for _, c := range cols { - colIDSet[c.ID] = struct{}{} - if _, ok := rc.primaryKeyCols[c.ID]; ok { - primaryKeyColChange = true - } + colIDSet := make(map[ColumnID]int, len(cols)) + for i, c := range en.tableDesc.Columns { + colIDSet[c.ID] = i } tracing.AnnotateTrace() @@ -307,10 +284,9 @@ func (p *planner) Update(n *parser.Update, autoCommit bool) (planNode, *roachpb. } return &updateNode{ - n: n, - rowCreatorNodeBase: rc, - primaryKeyColChange: primaryKeyColChange, - colIDSet: colIDSet, + n: n, + rowCreatorNodeBase: rc, + colIDSet: colIDSet, }, nil } @@ -380,50 +356,13 @@ func (u *updateNode) Start() *roachpb.Error { } u.colIDtoRowIndex = colIDtoRowIndex - // Secondary indexes needing updating. - needsUpdate := func(index IndexDescriptor) bool { - // If the primary key changed, we need to update all of them. - if u.primaryKeyColChange { - return true - } - for _, id := range index.ColumnIDs { - if _, ok := u.colIDSet[id]; ok { - return true - } - } - return false - } - - indexes := make([]IndexDescriptor, 0, len(u.tableDesc.Indexes)+len(u.tableDesc.Mutations)) - var deleteOnlyIndex map[int]struct{} - - for _, index := range u.tableDesc.Indexes { - if needsUpdate(index) { - indexes = append(indexes, index) - } - } - for _, m := range u.tableDesc.Mutations { - if index := m.GetIndex(); index != nil { - if needsUpdate(*index) { - indexes = append(indexes, *index) - - switch m.State { - case DescriptorMutation_DELETE_ONLY: - if deleteOnlyIndex == nil { - // Allocate at most once. - deleteOnlyIndex = make(map[int]struct{}, len(u.tableDesc.Mutations)) - } - deleteOnlyIndex[len(indexes)-1] = struct{}{} - - case DescriptorMutation_WRITE_ONLY: - } - } - } + ru, err := makeRowUpdater(u.tableDesc, u.colIDSet, u.cols) + if err != nil { + return roachpb.NewError(err) } + u.run.ru = ru - u.run.startRowCreatorNode(&u.rowCreatorNodeBase, rows, indexes) - - u.run.deleteOnlyIndex = deleteOnlyIndex + u.run.startEditNode(&u.editNodeBase, rows) return nil } @@ -440,29 +379,16 @@ func (u *updateNode) Next() bool { tracing.AnnotateTrace() - rowVals := u.run.rows.Values() - - primaryIndexKey, _, err := encodeIndexKey( - &u.run.primaryIndex, u.colIDtoRowIndex, rowVals, u.run.primaryIndexKeyPrefix) - if err != nil { - u.run.pErr = roachpb.NewError(err) - return false - } - // Compute the current secondary index key:value pairs for this row. - secondaryIndexEntries, err := encodeSecondaryIndexes( - u.tableDesc.ID, u.run.indexes, u.colIDtoRowIndex, rowVals) - if err != nil { - u.run.pErr = roachpb.NewError(err) - return false - } + oldValues := u.run.rows.Values() // Our updated value expressions occur immediately after the plain // columns in the output. - newVals := rowVals[len(u.tableDesc.Columns):] + updateValues := oldValues[len(u.tableDesc.Columns):] + oldValues = oldValues[:len(u.tableDesc.Columns)] // Ensure that the values honor the specified column widths. - for i := range newVals { - if err := checkValueWidth(u.cols[i], newVals[i]); err != nil { + for i := range updateValues { + if err := checkValueWidth(u.cols[i], updateValues[i]); err != nil { u.run.pErr = roachpb.NewError(err) return false } @@ -470,161 +396,20 @@ func (u *updateNode) Next() bool { // Update the row values. for i, col := range u.cols { - val := newVals[i] + val := updateValues[i] if !col.Nullable && val == parser.DNull { u.run.pErr = roachpb.NewUErrorf("null value in column %q violates not-null constraint", col.Name) return false } - rowVals[u.colIDtoRowIndex[col.ID]] = val - } - - // Check that the new value types match the column types. This needs to - // happen before index encoding because certain datum types (i.e. tuple) - // cannot be used as index values. - for i, val := range newVals { - var mErr error - if u.run.marshalled[i], mErr = marshalColumnValue(u.cols[i], val); mErr != nil { - u.run.pErr = roachpb.NewError(mErr) - return false - } - } - - // Compute the new primary index key for this row. - newPrimaryIndexKey := primaryIndexKey - var rowPrimaryKeyChanged bool - if u.primaryKeyColChange { - newPrimaryIndexKey, _, err = encodeIndexKey( - &u.run.primaryIndex, u.colIDtoRowIndex, rowVals, u.run.primaryIndexKeyPrefix) - if err != nil { - u.run.pErr = roachpb.NewError(err) - return false - } - // Note that even if primaryIndexColChange is true, it's possible that - // primary key fields in this particular row didn't change. - rowPrimaryKeyChanged = !bytes.Equal(primaryIndexKey, newPrimaryIndexKey) } - // Compute the new secondary index key:value pairs for this row. - newSecondaryIndexEntries, eErr := encodeSecondaryIndexes( - u.tableDesc.ID, u.run.indexes, u.colIDtoRowIndex, rowVals) - if eErr != nil { - u.run.pErr = roachpb.NewError(eErr) + newValues, pErr := u.run.ru.updateRow(u.run.b, oldValues, updateValues) + if pErr != nil { + u.run.pErr = pErr return false } - if rowPrimaryKeyChanged { - // Delete all the data stored under the old primary key. - rowStartKey := roachpb.Key(primaryIndexKey) - rowEndKey := rowStartKey.PrefixEnd() - if log.V(2) { - log.Infof("DelRange %s - %s", rowStartKey, rowEndKey) - } - u.run.b.DelRange(rowStartKey, rowEndKey, false) - - // Delete all the old secondary indexes. - for _, secondaryIndexEntry := range secondaryIndexEntries { - if log.V(2) { - log.Infof("Del %s", secondaryIndexEntry.key) - } - u.run.b.Del(secondaryIndexEntry.key) - } - - // Write the new row sentinel. We want to write the sentinel first in case - // we are trying to insert a duplicate primary key: if we write the - // secondary indexes first, we may get an error that looks like a - // uniqueness violation on a non-unique index. - sentinelKey := keys.MakeNonColumnKey(newPrimaryIndexKey) - if log.V(2) { - log.Infof("CPut %s -> NULL", roachpb.Key(sentinelKey)) - } - // This is subtle: An interface{}(nil) deletes the value, so we pass in - // []byte{} as a non-nil value. - u.run.b.CPut(sentinelKey, []byte{}, nil) - - // Write any fields from the old row that were not modified by the UPDATE. - for i, col := range u.tableDesc.Columns { - if _, ok := u.colIDSet[col.ID]; ok { - continue - } - if _, ok := u.primaryKeyCols[col.ID]; ok { - continue - } - key := keys.MakeColumnKey(newPrimaryIndexKey, uint32(col.ID)) - val := rowVals[i] - marshalledVal, mErr := marshalColumnValue(col, val) - if mErr != nil { - u.run.pErr = roachpb.NewError(mErr) - return false - } - - if log.V(2) { - log.Infof("Put %s -> %v", roachpb.Key(key), val) - } - u.run.b.Put(key, marshalledVal) - } - // At this point, we've deleted the old row and associated index data and - // written the sentinel keys and column keys for non-updated columns. Fall - // through to below where the index keys and updated column keys will be - // written. - } - - // Update secondary indexes. - for i, newSecondaryIndexEntry := range newSecondaryIndexEntries { - secondaryIndexEntry := secondaryIndexEntries[i] - secondaryKeyChanged := !bytes.Equal(newSecondaryIndexEntry.key, secondaryIndexEntry.key) - if secondaryKeyChanged { - if log.V(2) { - log.Infof("Del %s", secondaryIndexEntry.key) - } - u.run.b.Del(secondaryIndexEntry.key) - } - if rowPrimaryKeyChanged || secondaryKeyChanged { - // Do not update Indexes in the DELETE_ONLY state. - if _, ok := u.run.deleteOnlyIndex[i]; !ok { - if log.V(2) { - log.Infof("CPut %s -> %v", newSecondaryIndexEntry.key, - newSecondaryIndexEntry.value) - } - u.run.b.CPut(newSecondaryIndexEntry.key, newSecondaryIndexEntry.value, nil) - } - } - } - - // Add the new values. - for i, val := range newVals { - col := u.cols[i] - - if _, ok := u.primaryKeyCols[col.ID]; ok { - // Skip primary key columns as their values are encoded in the row - // sentinel key which is guaranteed to exist for as long as the row - // exists. - continue - } - - key := keys.MakeColumnKey(newPrimaryIndexKey, uint32(col.ID)) - if u.run.marshalled[i] != nil { - // We only output non-NULL values. Non-existent column keys are - // considered NULL during scanning and the row sentinel ensures we know - // the row exists. - if log.V(2) { - log.Infof("Put %s -> %v", roachpb.Key(key), val) - } - - u.run.b.Put(key, u.run.marshalled[i]) - } else { - // The column might have already existed but is being set to NULL, so - // delete it. - if log.V(2) { - log.Infof("Del %s", key) - } - - u.run.b.Del(key) - } - - } - - // rowVals[:len(tableDesc.Columns)] have been updated with the new values above. - resultRow, err := u.rh.cookResultRow(rowVals[:len(u.tableDesc.Columns)]) + resultRow, err := u.rh.cookResultRow(newValues) if err != nil { u.run.pErr = roachpb.NewError(err) return false