Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: extract k/v operations from insert/update/delete #6211

Merged
merged 1 commit into from
Apr 25, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 24 additions & 72 deletions sql/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"fmt"

"github.com/cockroachdb/cockroach/client"
"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/sql/parser"
"github.com/cockroachdb/cockroach/sql/privilege"
Expand All @@ -31,10 +32,11 @@ type deleteNode struct {
n *parser.Delete

run struct {
// The following fields are populated during Start().
editNodeRun

colIDtoRowIndex map[ColumnID]int
fastPath bool
rd rowDeleter
fastPath bool
}
}

Expand Down Expand Up @@ -93,26 +95,21 @@ func (d *deleteNode) Start() *roachpb.Error {
if err != nil {
return roachpb.NewError(err)
}
d.run.colIDtoRowIndex = colIDtoRowIndex

// Determine the secondary indexes that need to be updated as well.
indexes := d.tableDesc.Indexes
// Also include all the indexes under mutation; mutation state is
// irrelevant for deletions.
for _, m := range d.tableDesc.Mutations {
if index := m.GetIndex(); index != nil {
indexes = append(indexes, *index)
}

rd, err := makeRowDeleter(d.tableDesc, colIDtoRowIndex)
if err != nil {
return roachpb.NewError(err)
}
d.run.rd = rd

d.run.startEditNode(&d.editNodeBase, rows, indexes)
d.run.startEditNode(&d.editNodeBase, rows)

// Check if we can avoid doing a round-trip to read the values and just
// "fast-path" skip to deleting the key ranges without reading them first.
// TODO(dt): We could probably be smarter when presented with an index-join,
// but this goes away anyway once we push-down more of SQL.
sel := rows.(*selectNode)
if scan, ok := sel.table.node.(*scanNode); ok && canDeleteWithoutScan(d.n, scan, len(indexes)) {
if scan, ok := sel.table.node.(*scanNode); ok && canDeleteWithoutScan(d.n, scan, &d.run.rd) {
d.run.fastPath = true
d.run.pErr = d.fastDelete()
d.run.done = true
Expand Down Expand Up @@ -142,35 +139,11 @@ func (d *deleteNode) Next() bool {

rowVals := d.run.rows.Values()

primaryIndexKey, _, err := encodeIndexKey(
&d.run.primaryIndex, d.run.colIDtoRowIndex, rowVals, d.run.primaryIndexKeyPrefix)
if err != nil {
d.run.pErr = roachpb.NewError(err)
return false
}

secondaryIndexEntries, err := encodeSecondaryIndexes(
d.tableDesc.ID, d.run.indexes, d.run.colIDtoRowIndex, rowVals)
if err != nil {
d.run.pErr = roachpb.NewError(err)
d.run.pErr = d.run.rd.deleteRow(d.run.b, rowVals)
if d.run.pErr != nil {
return false
}

for _, secondaryIndexEntry := range secondaryIndexEntries {
if log.V(2) {
log.Infof("Del %s", secondaryIndexEntry.key)
}
d.run.b.Del(secondaryIndexEntry.key)
}

// Delete the row.
rowStartKey := roachpb.Key(primaryIndexKey)
rowEndKey := rowStartKey.PrefixEnd()
if log.V(2) {
log.Infof("DelRange %s - %s", rowStartKey, rowEndKey)
}
d.run.b.DelRange(rowStartKey, rowEndKey, false)

resultRow, err := d.rh.cookResultRow(rowVals)
if err != nil {
d.run.pErr = roachpb.NewError(err)
Expand All @@ -184,11 +157,8 @@ func (d *deleteNode) Next() bool {
// Determine if the deletion of `rows` can be done without actually scanning them,
// i.e. if we do not need to know their values for filtering expressions or a
// RETURNING clause or for updating secondary indexes.
func canDeleteWithoutScan(n *parser.Delete, scan *scanNode, indexCount int) bool {
if indexCount != 0 {
if log.V(2) {
log.Infof("delete forced to scan: values required to update %d secondary indexes", indexCount)
}
func canDeleteWithoutScan(n *parser.Delete, scan *scanNode, rd *rowDeleter) bool {
if !rd.fastPathAvailable() {
return false
}
if n.Returning != nil {
Expand All @@ -215,45 +185,27 @@ func (d *deleteNode) fastDelete() *roachpb.Error {
return scan.pErr
}

for _, span := range scan.spans {
if log.V(2) {
log.Infof("Skipping scan and just deleting %s - %s", span.start, span.end)
}
d.run.b.DelRange(span.start, span.end, true)
rowCount, pErr := d.run.rd.fastDelete(d.run.b, scan, d.fastDeleteCommitFunc)
if pErr != nil {
return pErr
}
d.rh.rowCount += rowCount
return nil
}

func (d *deleteNode) fastDeleteCommitFunc(b *client.Batch) *roachpb.Error {
if d.autoCommit {
// An auto-txn can commit the transaction with the batch. This is an
// optimization to avoid an extra round-trip to the transaction
// coordinator.
if pErr := d.p.txn.CommitInBatch(d.run.b); pErr != nil {
if pErr := d.p.txn.CommitInBatch(b); pErr != nil {
return pErr
}
} else {
if pErr := d.p.txn.Run(d.run.b); pErr != nil {
if pErr := d.p.txn.Run(b); pErr != nil {
return pErr
}
}

for _, r := range d.run.b.Results {
var prev []byte
for _, i := range r.Keys {
// If prefix is same, don't bother decoding key.
if len(prev) > 0 && bytes.HasPrefix(i, prev) {
continue
}

after, err := scan.fetcher.readIndexKey(i)
if err != nil {
return roachpb.NewError(err)
}
k := i[:len(i)-len(after)]
if !bytes.Equal(k, prev) {
prev = k
d.rh.rowCount++
}
}
}
return nil
}

Expand Down
88 changes: 10 additions & 78 deletions sql/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@ import (
"bytes"
"fmt"

"github.com/cockroachdb/cockroach/keys"
"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/sql/parser"
"github.com/cockroachdb/cockroach/sql/privilege"
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/log"
)

type insertNode struct {
Expand All @@ -38,7 +36,9 @@ type insertNode struct {

run struct {
// The following fields are populated during Start().
rowCreatorNodeRun
editNodeRun

ri rowInserter
rowIdxToRetIdx []int
rowTemplate parser.DTuple
}
Expand Down Expand Up @@ -201,18 +201,13 @@ func (n *insertNode) Start() *roachpb.Error {
return pErr
}

// Determine the secondary indexes that need to be updated as well.
indexes := n.tableDesc.Indexes
// Also include the secondary indexes in mutation state WRITE_ONLY.
for _, m := range n.tableDesc.Mutations {
if m.State == DescriptorMutation_WRITE_ONLY {
if index := m.GetIndex(); index != nil {
indexes = append(indexes, *index)
}
}
ri, err := makeRowInserter(n.tableDesc, n.colIDtoRowIndex, n.cols)
if err != nil {
return roachpb.NewError(err)
}
n.run.ri = ri

n.run.startRowCreatorNode(&n.rowCreatorNodeBase, rows, indexes)
n.run.startEditNode(&n.editNodeBase, rows)

// Prepare structures for building values to pass to rh.
if n.rh.exprs != nil {
Expand Down Expand Up @@ -313,78 +308,15 @@ func (n *insertNode) Next() bool {
}
}

// Encode the values to the expected column type. This needs to
// happen before index encoding because certain datum types (i.e. tuple)
// cannot be used as index values.
for i, val := range rowVals {
// Make sure the value can be written to the column before proceeding.
var mErr error
if n.run.marshalled[i], mErr = marshalColumnValue(n.cols[i], val); mErr != nil {
n.run.pErr = roachpb.NewError(mErr)
return false
}
}

primaryIndexKey, _, eErr := encodeIndexKey(
&n.run.primaryIndex, n.colIDtoRowIndex, rowVals, n.run.primaryIndexKeyPrefix)
if eErr != nil {
n.run.pErr = roachpb.NewError(eErr)
n.run.pErr = n.run.ri.insertRow(n.run.b, rowVals)
if n.run.pErr != nil {
return false
}

// Write the row sentinel. We want to write the sentinel first in case
// we are trying to insert a duplicate primary key: if we write the
// secondary indexes first, we may get an error that looks like a
// uniqueness violation on a non-unique index.
sentinelKey := keys.MakeNonColumnKey(primaryIndexKey)
if log.V(2) {
log.Infof("CPut %s -> NULL", roachpb.Key(sentinelKey))
}
// This is subtle: An interface{}(nil) deletes the value, so we pass in
// []byte{} as a non-nil value.
n.run.b.CPut(sentinelKey, []byte{}, nil)

secondaryIndexEntries, eErr := encodeSecondaryIndexes(
n.tableDesc.ID, n.run.indexes, n.colIDtoRowIndex, rowVals)
if eErr != nil {
n.run.pErr = roachpb.NewError(eErr)
return false
}

for _, secondaryIndexEntry := range secondaryIndexEntries {
if log.V(2) {
log.Infof("CPut %s -> %v", secondaryIndexEntry.key,
secondaryIndexEntry.value)
}
n.run.b.CPut(secondaryIndexEntry.key, secondaryIndexEntry.value, nil)
}

// Write the row columns.
for i, val := range rowVals {
col := n.cols[i]
if n.run.rowTemplate != nil {
n.run.rowTemplate[n.run.rowIdxToRetIdx[i]] = val
}

if _, ok := n.primaryKeyCols[col.ID]; ok {
// Skip primary key columns as their values are encoded in the row
// sentinel key which is guaranteed to exist for as long as the row
// exists.
continue
}

if n.run.marshalled[i] != nil {
// We only output non-NULL values. Non-existent column keys are
// considered NULL during scanning and the row sentinel ensures we know
// the row exists.

key := keys.MakeColumnKey(primaryIndexKey, uint32(col.ID))
if log.V(2) {
log.Infof("CPut %s -> %v", roachpb.Key(key), val)
}

n.run.b.CPut(key, n.run.marshalled[i], nil)
}
}

resultRow, err := n.rh.cookResultRow(n.run.rowTemplate)
Expand Down
Loading