Skip to content

Commit

Permalink
sql: extract k/v operations from insert/update/delete
Browse files Browse the repository at this point in the history
Separates the mapping between table rows/indexes and key/values for the
mutative operations. This will be useful immediately for implementing UPSERT and
eventually for TableWriter.

I tried to keep this as pure a refactor as I could, but it was a lot simpler to
model an UPDATE with a PK change as a delete followed by an update (instead of
migrating the mess I committed before).

I'm still not clear which component decides which columns are required, but I
think the additionally complexity that UPSERT/REPLACE adds will make it more
obvious one way or another. Once that happens it should be more obvious how to
handle all these colIDtoRowIndex maps.
  • Loading branch information
danhhz committed Apr 25, 2016
1 parent 3d4256e commit 9115966
Show file tree
Hide file tree
Showing 4 changed files with 644 additions and 398 deletions.
96 changes: 24 additions & 72 deletions sql/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"fmt"

"github.com/cockroachdb/cockroach/client"
"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/sql/parser"
"github.com/cockroachdb/cockroach/sql/privilege"
Expand All @@ -31,10 +32,11 @@ type deleteNode struct {
n *parser.Delete

run struct {
// The following fields are populated during Start().
editNodeRun

colIDtoRowIndex map[ColumnID]int
fastPath bool
rd rowDeleter
fastPath bool
}
}

Expand Down Expand Up @@ -93,26 +95,21 @@ func (d *deleteNode) Start() *roachpb.Error {
if err != nil {
return roachpb.NewError(err)
}
d.run.colIDtoRowIndex = colIDtoRowIndex

// Determine the secondary indexes that need to be updated as well.
indexes := d.tableDesc.Indexes
// Also include all the indexes under mutation; mutation state is
// irrelevant for deletions.
for _, m := range d.tableDesc.Mutations {
if index := m.GetIndex(); index != nil {
indexes = append(indexes, *index)
}

rd, err := makeRowDeleter(d.tableDesc, colIDtoRowIndex)
if err != nil {
return roachpb.NewError(err)
}
d.run.rd = rd

d.run.startEditNode(&d.editNodeBase, rows, indexes)
d.run.startEditNode(&d.editNodeBase, rows)

// Check if we can avoid doing a round-trip to read the values and just
// "fast-path" skip to deleting the key ranges without reading them first.
// TODO(dt): We could probably be smarter when presented with an index-join,
// but this goes away anyway once we push-down more of SQL.
sel := rows.(*selectNode)
if scan, ok := sel.table.node.(*scanNode); ok && canDeleteWithoutScan(d.n, scan, len(indexes)) {
if scan, ok := sel.table.node.(*scanNode); ok && canDeleteWithoutScan(d.n, scan, &d.run.rd) {
d.run.fastPath = true
d.run.pErr = d.fastDelete()
d.run.done = true
Expand Down Expand Up @@ -142,35 +139,11 @@ func (d *deleteNode) Next() bool {

rowVals := d.run.rows.Values()

primaryIndexKey, _, err := encodeIndexKey(
&d.run.primaryIndex, d.run.colIDtoRowIndex, rowVals, d.run.primaryIndexKeyPrefix)
if err != nil {
d.run.pErr = roachpb.NewError(err)
return false
}

secondaryIndexEntries, err := encodeSecondaryIndexes(
d.tableDesc.ID, d.run.indexes, d.run.colIDtoRowIndex, rowVals)
if err != nil {
d.run.pErr = roachpb.NewError(err)
d.run.pErr = d.run.rd.deleteRow(d.run.b, rowVals)
if d.run.pErr != nil {
return false
}

for _, secondaryIndexEntry := range secondaryIndexEntries {
if log.V(2) {
log.Infof("Del %s", secondaryIndexEntry.key)
}
d.run.b.Del(secondaryIndexEntry.key)
}

// Delete the row.
rowStartKey := roachpb.Key(primaryIndexKey)
rowEndKey := rowStartKey.PrefixEnd()
if log.V(2) {
log.Infof("DelRange %s - %s", rowStartKey, rowEndKey)
}
d.run.b.DelRange(rowStartKey, rowEndKey, false)

resultRow, err := d.rh.cookResultRow(rowVals)
if err != nil {
d.run.pErr = roachpb.NewError(err)
Expand All @@ -184,11 +157,8 @@ func (d *deleteNode) Next() bool {
// Determine if the deletion of `rows` can be done without actually scanning them,
// i.e. if we do not need to know their values for filtering expressions or a
// RETURNING clause or for updating secondary indexes.
func canDeleteWithoutScan(n *parser.Delete, scan *scanNode, indexCount int) bool {
if indexCount != 0 {
if log.V(2) {
log.Infof("delete forced to scan: values required to update %d secondary indexes", indexCount)
}
func canDeleteWithoutScan(n *parser.Delete, scan *scanNode, rd *rowDeleter) bool {
if !rd.fastPathAvailable() {
return false
}
if n.Returning != nil {
Expand All @@ -215,45 +185,27 @@ func (d *deleteNode) fastDelete() *roachpb.Error {
return scan.pErr
}

for _, span := range scan.spans {
if log.V(2) {
log.Infof("Skipping scan and just deleting %s - %s", span.start, span.end)
}
d.run.b.DelRange(span.start, span.end, true)
rowCount, pErr := d.run.rd.fastDelete(d.run.b, scan, d.fastDeleteCommitFunc)
if pErr != nil {
return pErr
}
d.rh.rowCount += rowCount
return nil
}

func (d *deleteNode) fastDeleteCommitFunc(b *client.Batch) *roachpb.Error {
if d.autoCommit {
// An auto-txn can commit the transaction with the batch. This is an
// optimization to avoid an extra round-trip to the transaction
// coordinator.
if pErr := d.p.txn.CommitInBatch(d.run.b); pErr != nil {
if pErr := d.p.txn.CommitInBatch(b); pErr != nil {
return pErr
}
} else {
if pErr := d.p.txn.Run(d.run.b); pErr != nil {
if pErr := d.p.txn.Run(b); pErr != nil {
return pErr
}
}

for _, r := range d.run.b.Results {
var prev []byte
for _, i := range r.Keys {
// If prefix is same, don't bother decoding key.
if len(prev) > 0 && bytes.HasPrefix(i, prev) {
continue
}

after, err := scan.fetcher.readIndexKey(i)
if err != nil {
return roachpb.NewError(err)
}
k := i[:len(i)-len(after)]
if !bytes.Equal(k, prev) {
prev = k
d.rh.rowCount++
}
}
}
return nil
}

Expand Down
88 changes: 10 additions & 78 deletions sql/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@ import (
"bytes"
"fmt"

"github.com/cockroachdb/cockroach/keys"
"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/sql/parser"
"github.com/cockroachdb/cockroach/sql/privilege"
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/log"
)

type insertNode struct {
Expand All @@ -38,7 +36,9 @@ type insertNode struct {

run struct {
// The following fields are populated during Start().
rowCreatorNodeRun
editNodeRun

ri rowInserter
rowIdxToRetIdx []int
rowTemplate parser.DTuple
}
Expand Down Expand Up @@ -201,18 +201,13 @@ func (n *insertNode) Start() *roachpb.Error {
return pErr
}

// Determine the secondary indexes that need to be updated as well.
indexes := n.tableDesc.Indexes
// Also include the secondary indexes in mutation state WRITE_ONLY.
for _, m := range n.tableDesc.Mutations {
if m.State == DescriptorMutation_WRITE_ONLY {
if index := m.GetIndex(); index != nil {
indexes = append(indexes, *index)
}
}
ri, err := makeRowInserter(n.tableDesc, n.colIDtoRowIndex, n.cols)
if err != nil {
return roachpb.NewError(err)
}
n.run.ri = ri

n.run.startRowCreatorNode(&n.rowCreatorNodeBase, rows, indexes)
n.run.startEditNode(&n.editNodeBase, rows)

// Prepare structures for building values to pass to rh.
if n.rh.exprs != nil {
Expand Down Expand Up @@ -313,78 +308,15 @@ func (n *insertNode) Next() bool {
}
}

// Encode the values to the expected column type. This needs to
// happen before index encoding because certain datum types (i.e. tuple)
// cannot be used as index values.
for i, val := range rowVals {
// Make sure the value can be written to the column before proceeding.
var mErr error
if n.run.marshalled[i], mErr = marshalColumnValue(n.cols[i], val); mErr != nil {
n.run.pErr = roachpb.NewError(mErr)
return false
}
}

primaryIndexKey, _, eErr := encodeIndexKey(
&n.run.primaryIndex, n.colIDtoRowIndex, rowVals, n.run.primaryIndexKeyPrefix)
if eErr != nil {
n.run.pErr = roachpb.NewError(eErr)
n.run.pErr = n.run.ri.insertRow(n.run.b, rowVals)
if n.run.pErr != nil {
return false
}

// Write the row sentinel. We want to write the sentinel first in case
// we are trying to insert a duplicate primary key: if we write the
// secondary indexes first, we may get an error that looks like a
// uniqueness violation on a non-unique index.
sentinelKey := keys.MakeNonColumnKey(primaryIndexKey)
if log.V(2) {
log.Infof("CPut %s -> NULL", roachpb.Key(sentinelKey))
}
// This is subtle: An interface{}(nil) deletes the value, so we pass in
// []byte{} as a non-nil value.
n.run.b.CPut(sentinelKey, []byte{}, nil)

secondaryIndexEntries, eErr := encodeSecondaryIndexes(
n.tableDesc.ID, n.run.indexes, n.colIDtoRowIndex, rowVals)
if eErr != nil {
n.run.pErr = roachpb.NewError(eErr)
return false
}

for _, secondaryIndexEntry := range secondaryIndexEntries {
if log.V(2) {
log.Infof("CPut %s -> %v", secondaryIndexEntry.key,
secondaryIndexEntry.value)
}
n.run.b.CPut(secondaryIndexEntry.key, secondaryIndexEntry.value, nil)
}

// Write the row columns.
for i, val := range rowVals {
col := n.cols[i]
if n.run.rowTemplate != nil {
n.run.rowTemplate[n.run.rowIdxToRetIdx[i]] = val
}

if _, ok := n.primaryKeyCols[col.ID]; ok {
// Skip primary key columns as their values are encoded in the row
// sentinel key which is guaranteed to exist for as long as the row
// exists.
continue
}

if n.run.marshalled[i] != nil {
// We only output non-NULL values. Non-existent column keys are
// considered NULL during scanning and the row sentinel ensures we know
// the row exists.

key := keys.MakeColumnKey(primaryIndexKey, uint32(col.ID))
if log.V(2) {
log.Infof("CPut %s -> %v", roachpb.Key(key), val)
}

n.run.b.CPut(key, n.run.marshalled[i], nil)
}
}

resultRow, err := n.rh.cookResultRow(n.run.rowTemplate)
Expand Down
Loading

0 comments on commit 9115966

Please sign in to comment.