Skip to content

Commit

Permalink
enable 1PC txns
Browse files Browse the repository at this point in the history
  • Loading branch information
tbg committed Oct 12, 2015
1 parent b5434f3 commit d37683d
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 48 deletions.
101 changes: 59 additions & 42 deletions kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package kv

import (
"errors"
"fmt"
"math/rand"
"net"
Expand Down Expand Up @@ -466,49 +467,57 @@ func (ds *DistSender) Send(ctx context.Context, ba roachpb.BatchRequest) (*roach
panic("empty batch")
}

// Deterministically create ClientCmdIDs for all parts of the batch if
// a CmdID is already set (otherwise, leave them empty).
var nextID func() roachpb.ClientCmdID
empty := roachpb.ClientCmdID{}
if empty == ba.CmdID {
nextID = func() roachpb.ClientCmdID {
return empty
}
} else {
rng := rand.New(rand.NewSource(ba.CmdID.Random))
id := ba.CmdID
nextID = func() roachpb.ClientCmdID {
curID := id // copy
id.Random = rng.Int63() // adjust for next call
return curID
splitAndSend := func(ba roachpb.BatchRequest, splitET bool) (*roachpb.BatchResponse, *roachpb.Error, bool) {
// Deterministically create ClientCmdIDs for all parts of the batch if
// a CmdID is already set (otherwise, leave them empty).
var nextID func() roachpb.ClientCmdID
empty := roachpb.ClientCmdID{}
if empty == ba.CmdID {
nextID = func() roachpb.ClientCmdID {
return empty
}
} else {
rng := rand.New(rand.NewSource(ba.CmdID.Random))
id := ba.CmdID
nextID = func() roachpb.ClientCmdID {
curID := id // copy
id.Random = rng.Int63() // adjust for next call
return curID
}
}
}

parts := ba.Split(true /* canSplitET */)
var rplChunks []*roachpb.BatchResponse
for _, part := range parts {
ba.Requests = part
ba.CmdID = nextID()
rpl, err, _ := ds.sendChunk(ctx, ba)
if err != nil {
return nil, err
var rplChunks []*roachpb.BatchResponse
parts := ba.Split(splitET)
for _, part := range parts {
ba.Requests = part
ba.CmdID = nextID()
rpl, pErr, shouldSplitET := ds.sendChunk(ctx, ba)
if pErr != nil {
return nil, pErr, shouldSplitET
}
// Propagate transaction from last reply to next request. The final
// update is taken and put into the response's main header.
ba.Txn.Update(rpl.Header().Txn)
rplChunks = append(rplChunks, rpl)
}
// Propagate transaction from last reply to next request. The final
// update is taken and put into the response's main header.
ba.Txn.Update(rpl.Header().Txn)

rplChunks = append(rplChunks, rpl)
reply := rplChunks[0]
for _, rpl := range rplChunks[1:] {
reply.Responses = append(reply.Responses, rpl.Responses...)
}
lastHeader := rplChunks[len(rplChunks)-1].BatchResponse_Header
reply.Error = lastHeader.Error
reply.Timestamp = lastHeader.Timestamp
reply.Txn = ba.Txn
return reply, nil, false
}

reply := rplChunks[0]
for _, rpl := range rplChunks[1:] {
reply.Responses = append(reply.Responses, rpl.Responses...)
rpl, pErr, shouldSplitET := splitAndSend(ba, false /* !splitET */)
if pErr == nil || !shouldSplitET {
return rpl, pErr
}
lastHeader := rplChunks[len(rplChunks)-1].BatchResponse_Header
reply.Error = lastHeader.Error
reply.Timestamp = lastHeader.Timestamp
reply.Txn = ba.Txn
return reply, nil
rpl, pErr, _ = splitAndSend(ba, true /* splitET */)
return rpl, pErr
}

// sendChunk is in charge of sending an "admissible" piece of batch, i.e. one
Expand Down Expand Up @@ -559,12 +568,20 @@ func (ds *DistSender) sendChunk(ctx context.Context, ba roachpb.BatchRequest) (*
break
}

// If there's no transaction and op spans ranges, possibly
// re-run as part of a transaction for consistency. The
// case where we don't need to re-run is if the read
// consistency is not required.
if needAnother && ba.Txn == nil && ba.ReadConsistency != roachpb.INCONSISTENT {
return nil, roachpb.NewError(&roachpb.OpRequiresTxnError{}), false
if needAnother {
// If there's no transaction and op spans ranges, possibly
// re-run as part of a transaction for consistency. The
// case where we don't need to re-run is if the read
// consistency is not required.
if ba.Txn == nil && ba.ReadConsistency != roachpb.INCONSISTENT {
return nil, roachpb.NewError(&roachpb.OpRequiresTxnError{}), false
}
// If the request is more than but ends with EndTransaction, we
// want the caller to come again with the EndTransaction in an
// extra call.
if l := len(ba.Requests) - 1; l > 0 && ba.Requests[l].GetInner().Method() == roachpb.EndTransaction {
return nil, roachpb.NewError(errors.New("cannot send 1PC txn to multiple ranges")), true /* shouldSplitET */
}
}

// It's possible that the returned descriptor misses parts of the
Expand Down
6 changes: 0 additions & 6 deletions storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1440,12 +1440,6 @@ func (s *Store) proposeRaftCommandImpl(idKey cmdIDKey, cmd roachpb.RaftCommand)
args := union.GetInner()
etr, ok := args.(*roachpb.EndTransactionRequest)
if ok && etr.InternalCommitTrigger != nil && etr.InternalCommitTrigger.ChangeReplicasTrigger != nil {
// TODO(tschottdorf): the real check is that EndTransaction needs
// to be the last element in the batch. Any caveats to solve before
// changing this?
if len(cmd.Cmd.Requests) != 1 {
panic("EndTransaction should only ever occur by itself in a batch")
}
// EndTransactionRequest with a ChangeReplicasTrigger is special because raft
// needs to understand it; it cannot simply be an opaque command.
crt := etr.InternalCommitTrigger.ChangeReplicasTrigger
Expand Down

0 comments on commit d37683d

Please sign in to comment.