Skip to content

Commit

Permalink
single-roundtrip transactions
Browse files Browse the repository at this point in the history
resuscitation of #2818.

This change enables 1PC txns for transaction contained in a single batch and
sent to a single Range.

Contrary to the previous incarnation, now the chunk of batch
containing EndTransaction is split and re-sent before any other
redundant requests have been made.

More improvement is possible: If the EndTransaction request were to end up in
the last Batch sent over the course of the multi-range request anyway, it would
not need to be split up into its own chunk. There's no way of knowing that
until the last range descriptor has been looked up though, at which point in
time we've already sent requests, complicating the code. Hence, this
optimization (which amounts to @bdarnell's suggestion in #2818) has been
omitted for now due to the amount of code changes required.

Unfortunately, the improvements here don't directly reflect in SQL
benchmarks because of the way the Txn is used there.
We need to optimize the SQL server to batch up as many commands as it
can (basically until it reads data) and commit in the same round-trip
if possible. Currently the Txn object is used deep down in the individual
operations (for example `(*planner).Insert()`) so this isn't trivial.
I will file an issue.
  • Loading branch information
tbg committed Dec 9, 2015
1 parent 7d11ac4 commit bc8f1aa
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 115 deletions.
60 changes: 0 additions & 60 deletions kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
package kv

import (
"github.com/cockroachdb/cockroach/client"
"github.com/cockroachdb/cockroach/keys"
"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/util"
"github.com/gogo/protobuf/proto"
"golang.org/x/net/context"
)

var emptySpan = roachpb.Span{}
Expand Down Expand Up @@ -111,64 +109,6 @@ func truncate(ba roachpb.BatchRequest, rs roachpb.RSpan) (roachpb.BatchRequest,
return ba, len(ba.Requests) - numNoop, nil
}

// SenderFn is a function that implements a Sender.
type senderFn func(context.Context, roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)

// Send implements batch.Sender.
func (f senderFn) Send(ctx context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
return f(ctx, ba)
}

// A ChunkingSender sends batches, subdividing them appropriately.
type chunkingSender struct {
f senderFn
}

// NewChunkingSender returns a new chunking sender which sends through the supplied
// SenderFn.
func newChunkingSender(f senderFn) client.Sender {
return &chunkingSender{f: f}
}

// Send implements Sender.
// TODO(tschottdorf): We actually don't want to chop EndTransaction off for
// single-range requests (but that happens now since EndTransaction has the
// isAlone flag). Whether it is one or not is unknown right now (you can only
// find out after you've sent to the Range/looked up a descriptor that suggests
// that you're multi-range. In those cases, the wrapped sender should return an
// error so that we split and retry once the chunk which contains
// EndTransaction (i.e. the last one).
func (cs *chunkingSender) Send(ctx context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
if len(ba.Requests) < 1 {
panic("empty batch")
}

parts := ba.Split()
var rplChunks []*roachpb.BatchResponse
for _, part := range parts {
ba.Requests = part
rpl, err := cs.f(ctx, ba)
if err != nil {
return nil, err
}
// Propagate transaction from last reply to next request. The final
// update is taken and put into the response's main header.
ba.Txn.Update(rpl.Header().Txn)

rplChunks = append(rplChunks, rpl)
}

reply := rplChunks[0]
for _, rpl := range rplChunks[1:] {
reply.Responses = append(reply.Responses, rpl.Responses...)
}
lastHeader := rplChunks[len(rplChunks)-1].BatchResponse_Header
reply.Error = lastHeader.Error
reply.Timestamp = lastHeader.Timestamp
reply.Txn = ba.Txn
return reply, nil
}

// prev gives the right boundary of the union of all requests which don't
// affect keys larger than the given key.
// TODO(tschottdorf): again, better on BatchRequest itself, but can't pull
Expand Down
111 changes: 78 additions & 33 deletions kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package kv

import (
"errors"
"fmt"
"net"
"sync/atomic"
Expand Down Expand Up @@ -409,8 +410,8 @@ func (ds *DistSender) getDescriptors(rs roachpb.RSpan, considerIntents, useRever
return desc, needAnother(desc, useReverseScan), evict, nil
}

// sendAttempt gathers and rearranges the replicas, and makes an RPC call.
func (ds *DistSender) sendAttempt(trace *tracer.Trace, ba roachpb.BatchRequest, desc *roachpb.RangeDescriptor) (*roachpb.BatchResponse, *roachpb.Error) {
// sendSingleRange gathers and rearranges the replicas, and makes an RPC call.
func (ds *DistSender) sendSingleRange(trace *tracer.Trace, ba roachpb.BatchRequest, desc *roachpb.RangeDescriptor) (*roachpb.BatchResponse, *roachpb.Error) {
defer trace.Epoch("sending RPC")()

leader := ds.leaderCache.Lookup(roachpb.RangeID(desc.RangeID))
Expand Down Expand Up @@ -464,12 +465,7 @@ func (ds *DistSender) sendAttempt(trace *tracer.Trace, ba roachpb.BatchRequest,
func (ds *DistSender) Send(ctx context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
// In the event that timestamp isn't set and read consistency isn't
// required, set the timestamp using the local clock.
// TODO(tschottdorf): right place for this?
if ba.ReadConsistency == roachpb.INCONSISTENT && ba.Timestamp.Equal(roachpb.ZeroTimestamp) {
// Make sure that after the call, args hasn't changed.
defer func(timestamp roachpb.Timestamp) {
ba.Timestamp = timestamp
}(ba.Timestamp)
ba.Timestamp = ds.clock.Now()
}

Expand All @@ -487,14 +483,54 @@ func (ds *DistSender) Send(ctx context.Context, ba roachpb.BatchRequest) (*roach
}
}

// TODO(tschottdorf): provisional instantiation.
return newChunkingSender(ds.sendChunk).Send(ctx, ba)
if len(ba.Requests) < 1 {
panic("empty batch")
}

var rplChunks []*roachpb.BatchResponse
parts := ba.Split(false /* don't split ET */)
for len(parts) > 0 {
part := parts[0]
ba.Requests = part
rpl, pErr, shouldSplitET := ds.sendChunk(ctx, ba)
if shouldSplitET {
// If we tried to send a single round-trip EndTransaction but
// it looks like it's going to hit multiple ranges, split it
// here and try again.
if len(parts) != 1 {
panic("EndTransaction not in last chunk of batch")
}
parts = ba.Split(true /* split ET */)
if len(parts) != 2 {
panic("split of final EndTransaction chunk resulted in != 2 parts")
}
continue
}
if pErr != nil {
return nil, pErr
}
// Propagate transaction from last reply to next request. The final
// update is taken and put into the response's main header.
ba.Txn.Update(rpl.Header().Txn)
rplChunks = append(rplChunks, rpl)
parts = parts[1:]
}

reply := rplChunks[0]
for _, rpl := range rplChunks[1:] {
reply.Responses = append(reply.Responses, rpl.Responses...)
}
*reply.Header() = rplChunks[len(rplChunks)-1].BatchResponse_Header
return reply, nil
}

// sendChunk is in charge of sending an "admissible" piece of batch, i.e. one
// which doesn't need to be subdivided further before going to a range (so no
// mixing of forward and reverse scans, etc).
func (ds *DistSender) sendChunk(ctx context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
// mixing of forward and reverse scans, etc). The parameters and return values
// correspond to client.Sender with the exception of the returned boolean,
// which is true when indicating that the caller should retry but needs to send
// EndTransaction in a separate request.
func (ds *DistSender) sendChunk(ctx context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error, bool) {
isReverse := ba.IsReverse()

trace := tracer.FromCtx(ctx)
Expand Down Expand Up @@ -533,13 +569,26 @@ func (ds *DistSender) sendChunk(ctx context.Context, ba roachpb.BatchRequest) (*
break
}

// If there's no transaction and op spans ranges, possibly
// re-run as part of a transaction for consistency. The
// case where we don't need to re-run is if the read
// consistency is not required.
if needAnother && ba.Txn == nil && ba.IsPossibleTransaction() &&
ba.ReadConsistency != roachpb.INCONSISTENT {
return nil, roachpb.NewError(&roachpb.OpRequiresTxnError{})
if needAnother && br == nil {
// TODO(tschottdorf): we should have a mechanism for discovering
// range merges (descriptor staleness will mostly go unnoticed),
// or we'll be turning single-range queries into multi-range
// queries for no good reason.

// If there's no transaction and op spans ranges, possibly
// re-run as part of a transaction for consistency. The
// case where we don't need to re-run is if the read
// consistency is not required.
if ba.Txn == nil && ba.IsPossibleTransaction() &&
ba.ReadConsistency != roachpb.INCONSISTENT {
return nil, roachpb.NewError(&roachpb.OpRequiresTxnError{}), false
}
// If the request is more than but ends with EndTransaction, we
// want the caller to come again with the EndTransaction in an
// extra call.
if l := len(ba.Requests) - 1; l > 0 && ba.Requests[l].GetInner().Method() == roachpb.EndTransaction {
return nil, roachpb.NewError(errors.New("cannot send 1PC txn to multiple ranges")), true /* shouldSplitET */
}
}

// It's possible that the returned descriptor misses parts of the
Expand All @@ -558,7 +607,7 @@ func (ds *DistSender) sendChunk(ctx context.Context, ba roachpb.BatchRequest) (*
if iErr != nil {
return nil, roachpb.NewError(iErr)
}
baNew, numActive, trErr := truncate(ba, intersected)
truncBA, numActive, trErr := truncate(ba, intersected)
if numActive == 0 && trErr == nil {
// This shouldn't happen in the wild, but some tests
// exercise it.
Expand All @@ -568,20 +617,18 @@ func (ds *DistSender) sendChunk(ctx context.Context, ba roachpb.BatchRequest) (*
if trErr != nil {
return nil, roachpb.NewError(trErr)
}
reply, err := ds.sendAttempt(trace, baNew, desc)

if err != nil {
if log.V(1) {
log.Warningf("failed to invoke %s: %s", baNew, err)
}
}
return reply, err
return ds.sendSingleRange(trace, truncBA, desc)
}()
// If sending succeeded, break this loop.
if pErr == nil {
break
}

if log.V(1) {
log.Warningf("failed to invoke %s: %s", ba, pErr)
}

// Error handling below.
// If retryable, allow retry. For range not found or range
// key mismatch errors, we don't backoff on the retry,
Expand Down Expand Up @@ -656,21 +703,19 @@ func (ds *DistSender) sendChunk(ctx context.Context, ba roachpb.BatchRequest) (*

// Immediately return if querying a range failed non-retryably.
if pErr != nil {
return nil, pErr
return nil, pErr, false
}

ba.Txn.Update(curReply.Txn)

first := br == nil
if first {
if br == nil {
// First response from a Range.
br = curReply
} else {
// This was the second or later call in a cross-Range request.
// Combine the new response with the existing one.
if err := br.Combine(curReply); err != nil {
// TODO(tschottdorf): return nil, roachpb.NewError(err)
panic(err)
return nil, roachpb.NewError(err), false
}
}

Expand All @@ -686,7 +731,7 @@ func (ds *DistSender) sendChunk(ctx context.Context, ba roachpb.BatchRequest) (*
// batch request; only if that's all requests does needAnother
// remain false.
needAnother = false
if first {
if br == nil {
// Clone ba.Requests. This is because we're multi-range, and
// some requests may be bounded, which could lead to them being
// masked out once they're saturated. We don't want to risk
Expand Down Expand Up @@ -736,7 +781,7 @@ func (ds *DistSender) sendChunk(ctx context.Context, ba roachpb.BatchRequest) (*

// If this was the last range accessed by this call, exit loop.
if !needAnother {
return br, nil
return br, nil, false
}

if isReverse {
Expand Down
Loading

0 comments on commit bc8f1aa

Please sign in to comment.