Skip to content

Commit

Permalink
sql: introduce tableWriter abstraction
Browse files Browse the repository at this point in the history
Migrates the client.Batch ownership for INSERT/UPDATE/DELETE to a new
tableWriter interface, the methods of which parallel the TableReader interface
being added in cockroachdb#6368.

In preparation for UPSERT, which has more complicated batch usage than the
previous three cases.

name                          old time/op    new time/op    delta
Insert1000_Cockroach-8          19.5ms ± 3%    19.8ms ± 6%    ~             (p=0.690 n=5+5)
Update1000_Cockroach-8          43.4ms ± 3%    43.5ms ± 6%    ~             (p=0.841 n=5+5)
Delete1000_Cockroach-8          65.1ms ± 2%    64.9ms ± 6%    ~             (p=1.000 n=5+5)
Scan1000_Cockroach-8            2.24ms ± 4%    2.15ms ± 2%  -3.97%          (p=0.016 n=5+5)
TrackChoices1000_Cockroach-8     153µs ± 6%     148µs ± 4%    ~             (p=0.222 n=5+5)

name                          old alloc/op   new alloc/op   delta
Insert1000_Cockroach-8          3.66MB ± 1%    3.64MB ± 1%    ~             (p=0.095 n=5+5)
Update1000_Cockroach-8          6.23MB ± 1%    6.21MB ± 0%    ~             (p=0.548 n=5+5)
Delete1000_Cockroach-8          2.68MB ± 0%    2.68MB ± 0%    ~             (p=0.413 n=5+4)
Scan1000_Cockroach-8             313kB ± 0%     313kB ± 0%    ~             (p=1.000 n=5+5)
TrackChoices1000_Cockroach-8    23.2kB ± 4%    23.7kB ± 6%    ~             (p=0.317 n=5+5)

name                          old allocs/op  new allocs/op  delta
Insert1000_Cockroach-8           25.6k ± 1%     25.5k ± 1%    ~             (p=0.095 n=5+5)
Update1000_Cockroach-8           41.5k ± 0%     41.4k ± 0%    ~             (p=0.206 n=5+5)
Delete1000_Cockroach-8           16.7k ± 0%     16.7k ± 0%    ~             (p=0.548 n=5+5)
Scan1000_Cockroach-8             2.29k ± 0%     2.29k ± 0%    ~     (all samples are equal)
TrackChoices1000_Cockroach-8       115 ± 2%       116 ± 2%    ~             (p=0.429 n=5+5)

(The Scan1000 movement is noise.)
  • Loading branch information
danhhz committed May 3, 2016
1 parent 7006f0c commit e2834f6
Show file tree
Hide file tree
Showing 5 changed files with 314 additions and 180 deletions.
48 changes: 19 additions & 29 deletions sql/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"bytes"
"fmt"

"github.com/cockroachdb/cockroach/client"
"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/sql/parser"
"github.com/cockroachdb/cockroach/sql/privilege"
Expand All @@ -31,7 +30,7 @@ type deleteNode struct {
editNodeBase
n *parser.Delete

rd rowDeleter
tw tableDeleter

run struct {
// The following fields are populated during Start().
Expand Down Expand Up @@ -74,12 +73,14 @@ func (p *planner) Delete(n *parser.Delete, autoCommit bool) (planNode, *roachpb.
return nil, roachpb.NewError(err)
}
// TODO(dan): Use rd.fetchCols to compute the fetch selectors.
tw := tableDeleter{rd: rd, autoCommit: autoCommit}

return &deleteNode{
dn := &deleteNode{
n: n,
editNodeBase: en,
rd: rd,
}, nil
tw: tw,
}
return dn, nil
}

func (d *deleteNode) Start() *roachpb.Error {
Expand All @@ -97,14 +98,16 @@ func (d *deleteNode) Start() *roachpb.Error {
return pErr
}

d.run.startEditNode(&d.editNodeBase, rows)
if pErr = d.run.startEditNode(&d.editNodeBase, rows, &d.tw); pErr != nil {
return pErr
}

// Check if we can avoid doing a round-trip to read the values and just
// "fast-path" skip to deleting the key ranges without reading them first.
// TODO(dt): We could probably be smarter when presented with an index-join,
// but this goes away anyway once we push-down more of SQL.
sel := rows.(*selectNode)
if scan, ok := sel.table.node.(*scanNode); ok && canDeleteWithoutScan(d.n, scan, &d.rd) {
if scan, ok := sel.table.node.(*scanNode); ok && canDeleteWithoutScan(d.n, scan, &d.tw) {
d.run.fastPath = true
d.run.pErr = d.fastDelete()
d.run.done = true
Expand All @@ -128,14 +131,14 @@ func (d *deleteNode) Next() bool {

if !d.run.rows.Next() {
// We're done. Finish the batch.
d.run.finalize(&d.editNodeBase, false)
d.run.pErr = d.tw.finalize()
d.run.done = true
return false
}

rowVals := d.run.rows.Values()

d.run.pErr = d.rd.deleteRow(d.run.b, rowVals)
if d.run.pErr != nil {
if _, d.run.pErr = d.tw.run(rowVals); d.run.pErr != nil {
return false
}

Expand All @@ -152,8 +155,8 @@ func (d *deleteNode) Next() bool {
// Determine if the deletion of `rows` can be done without actually scanning them,
// i.e. if we do not need to know their values for filtering expressions or a
// RETURNING clause or for updating secondary indexes.
func canDeleteWithoutScan(n *parser.Delete, scan *scanNode, rd *rowDeleter) bool {
if !rd.fastPathAvailable() {
func canDeleteWithoutScan(n *parser.Delete, scan *scanNode, td *tableDeleter) bool {
if !td.fastPathAvailable() {
return false
}
if n.Returning != nil {
Expand All @@ -180,30 +183,17 @@ func (d *deleteNode) fastDelete() *roachpb.Error {
return scan.pErr
}

rowCount, pErr := d.rd.fastDelete(d.run.b, scan, d.fastDeleteCommitFunc)
if pErr := d.tw.init(d.p.txn); pErr != nil {
return pErr
}
rowCount, pErr := d.tw.fastDelete(scan)
if pErr != nil {
return pErr
}
d.rh.rowCount += rowCount
return nil
}

func (d *deleteNode) fastDeleteCommitFunc(b *client.Batch) *roachpb.Error {
if d.autoCommit {
// An auto-txn can commit the transaction with the batch. This is an
// optimization to avoid an extra round-trip to the transaction
// coordinator.
if pErr := d.p.txn.CommitInBatch(b); pErr != nil {
return pErr
}
} else {
if pErr := d.p.txn.Run(b); pErr != nil {
return pErr
}
}
return nil
}

func (d *deleteNode) Columns() []ResultColumn {
return d.rh.columns
}
Expand Down
70 changes: 40 additions & 30 deletions sql/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@ import (

type insertNode struct {
// The following fields are populated during makePlan.
rowCreatorNodeBase
n *parser.Insert
qvals qvalMap
insertRows parser.SelectStatement
checkExprs []parser.Expr
editNodeBase
defaultExprs []parser.Expr
n *parser.Insert
qvals qvalMap
insertRows parser.SelectStatement
checkExprs []parser.Expr

ri rowInserter
insertCols []ColumnDescriptor
insertColIDtoRowIndex map[ColumnID]int
tw tableWriter

run struct {
// The following fields are populated during Start().
Expand Down Expand Up @@ -99,13 +102,13 @@ func (p *planner) Insert(n *parser.Insert, autoCommit bool) (planNode, *roachpb.
}
}

rc, rcErr := p.makeRowCreatorNode(en, cols)
if rcErr != nil {
return nil, rcErr
defaultExprs, err := makeDefaultExprs(cols, &p.parser, p.evalCtx)
if err != nil {
return nil, roachpb.NewError(err)
}

// Replace any DEFAULT markers with the corresponding default expressions.
insertRows, err := p.fillDefaults(rc.defaultExprs, cols, n)
insertRows, err := p.fillDefaults(defaultExprs, cols, n)
if err != nil {
return nil, roachpb.NewError(err)
}
Expand Down Expand Up @@ -165,19 +168,23 @@ func (p *planner) Insert(n *parser.Insert, autoCommit bool) (planNode, *roachpb.
return nil, pErr
}

in := &insertNode{
n: n,
rowCreatorNodeBase: rc,
checkExprs: checkExprs,
qvals: qvals,
insertRows: insertRows,
}

in.ri, err = makeRowInserter(en.tableDesc, cols)
ri, err := makeRowInserter(en.tableDesc, cols)
if err != nil {
return in, roachpb.NewError(err)
return nil, roachpb.NewError(err)
}
tw := &tableInserter{ri: ri, autoCommit: autoCommit}

in := &insertNode{
n: n,
editNodeBase: en,
defaultExprs: defaultExprs,
checkExprs: checkExprs,
qvals: qvals,
insertRows: insertRows,
insertCols: ri.insertCols,
insertColIDtoRowIndex: ri.insertColIDtoRowIndex,
tw: tw,
}
return in, nil
}

Expand All @@ -198,7 +205,9 @@ func (n *insertNode) Start() *roachpb.Error {
return pErr
}

n.run.startEditNode(&n.editNodeBase, rows)
if pErr = n.run.startEditNode(&n.editNodeBase, rows, n.tw); pErr != nil {
return pErr
}

// Prepare structures for building values to pass to rh.
if n.rh.exprs != nil {
Expand All @@ -217,8 +226,8 @@ func (n *insertNode) Start() *roachpb.Error {
colIDToRetIndex[col.ID] = i
}

n.run.rowIdxToRetIdx = make([]int, len(n.ri.insertCols))
for i, col := range n.ri.insertCols {
n.run.rowIdxToRetIdx = make([]int, len(n.insertCols))
for i, col := range n.insertCols {
n.run.rowIdxToRetIdx[i] = colIDToRetIndex[col.ID]
}
}
Expand All @@ -233,7 +242,8 @@ func (n *insertNode) Next() bool {

if !n.run.rows.Next() {
// We're done. Finish the batch.
n.run.finalize(&n.editNodeBase, true)
n.run.pErr = n.tw.finalize()
n.run.done = true
return false
}

Expand All @@ -242,7 +252,7 @@ func (n *insertNode) Next() bool {
// The values for the row may be shorter than the number of columns being
// inserted into. Generate default values for those columns using the
// default expressions.
for i := len(rowVals); i < len(n.ri.insertCols); i++ {
for i := len(rowVals); i < len(n.insertCols); i++ {
if n.defaultExprs == nil {
rowVals = append(rowVals, parser.DNull)
continue
Expand All @@ -258,7 +268,7 @@ func (n *insertNode) Next() bool {
// Check to see if NULL is being inserted into any non-nullable column.
for _, col := range n.tableDesc.Columns {
if !col.Nullable {
if i, ok := n.ri.insertColIDtoRowIndex[col.ID]; !ok || rowVals[i] == parser.DNull {
if i, ok := n.insertColIDtoRowIndex[col.ID]; !ok || rowVals[i] == parser.DNull {
n.run.pErr = roachpb.NewUErrorf("null value in column %q violates not-null constraint", col.Name)
return false
}
Expand All @@ -267,7 +277,7 @@ func (n *insertNode) Next() bool {

// Ensure that the values honor the specified column widths.
for i := range rowVals {
if err := checkValueWidth(n.ri.insertCols[i], rowVals[i]); err != nil {
if err := checkValueWidth(n.insertCols[i], rowVals[i]); err != nil {
n.run.pErr = roachpb.NewError(err)
return false
}
Expand All @@ -277,7 +287,7 @@ func (n *insertNode) Next() bool {
// Populate qvals.
for ref, qval := range n.qvals {
// The colIdx is 0-based, we need to change it to 1-based.
ri, has := n.ri.insertColIDtoRowIndex[ColumnID(ref.colIdx+1)]
ri, has := n.insertColIDtoRowIndex[ColumnID(ref.colIdx+1)]
if !has {
n.run.pErr = roachpb.NewUErrorf("failed to to find column %d in row", ColumnID(ref.colIdx+1))
return false
Expand All @@ -299,7 +309,7 @@ func (n *insertNode) Next() bool {
}
}

n.run.pErr = n.ri.insertRow(n.run.b, rowVals)
_, n.run.pErr = n.tw.run(rowVals)
if n.run.pErr != nil {
return false
}
Expand Down Expand Up @@ -495,7 +505,7 @@ func (n *insertNode) ExplainPlan(v bool) (name, description string, children []p
var buf bytes.Buffer
if v {
fmt.Fprintf(&buf, "into %s (", n.tableDesc.Name)
for i, col := range n.ri.insertCols {
for i, col := range n.insertCols {
if i > 0 {
fmt.Fprintf(&buf, ", ")
}
Expand Down
60 changes: 3 additions & 57 deletions sql/rowwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,18 +293,17 @@ func makeRowUpdater(

if primaryKeyColChange {
// These fields are only used when the primary key is changing.
// TODO(dan): Is it safe for these to share rowHelper instead of creating
// two more?
var err error
ru.rd, err = makeRowDeleter(tableDesc)
if err != nil {
if ru.rd, err = makeRowDeleter(tableDesc); err != nil {
return rowUpdater{}, err
}
ru.fetchCols = ru.rd.fetchCols
if ru.ri, err = makeRowInserter(tableDesc, tableDesc.Columns); err != nil {
return rowUpdater{}, err
}
} else {
// TODO(radu): we only need to select columns necessary to generate primary and
// secondary indexes keys, and columns needed by returningHelper.
ru.fetchCols = make([]ColumnDescriptor, len(requestedCols))
copy(ru.fetchCols, requestedCols)
}
Expand Down Expand Up @@ -515,59 +514,6 @@ func (rd *rowDeleter) deleteRow(b *client.Batch, values []parser.Datum) *roachpb
return nil
}

// fastPathAvailable returns true if the fastDelete optimization can be used.
func (rd *rowDeleter) fastPathAvailable() bool {
if len(rd.helper.indexes) != 0 {
if log.V(2) {
log.Infof("delete forced to scan: values required to update %d secondary indexes", len(rd.helper.indexes))
}
return false
}
return true
}

// fastDelete adds to the batch the kv operations necessary to delete a sql
// table row without knowing the values that are currently present.
func (rd *rowDeleter) fastDelete(
b *client.Batch,
scan *scanNode,
commitFunc func(b *client.Batch) *roachpb.Error,
) (rowCount int, pErr *roachpb.Error) {
for _, span := range scan.spans {
if log.V(2) {
log.Infof("Skipping scan and just deleting %s - %s", span.start, span.end)
}
b.DelRange(span.start, span.end, true)
}

pErr = commitFunc(b)
if pErr != nil {
return 0, pErr
}

for _, r := range b.Results {
var prev []byte
for _, i := range r.Keys {
// If prefix is same, don't bother decoding key.
if len(prev) > 0 && bytes.HasPrefix(i, prev) {
continue
}

after, err := scan.fetcher.readIndexKey(i)
if err != nil {
return 0, roachpb.NewError(err)
}
k := i[:len(i)-len(after)]
if !bytes.Equal(k, prev) {
prev = k
rowCount++
}
}
}

return rowCount, nil
}

func colIDtoRowIndexFromCols(cols []ColumnDescriptor) map[ColumnID]int {
colIDtoRowIndex := make(map[ColumnID]int, len(cols))
for i, col := range cols {
Expand Down
Loading

0 comments on commit e2834f6

Please sign in to comment.