Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
40518: storage: don't perform one-phase commit transactions after restarts r=nvanbenschoten a=nvanbenschoten

Fixes cockroachdb#40466.

This commit adjusts the Replica-side logic around executing one-phase commit transactions. It prevents batches that could be executed on the 1PC fast-path from doing so if they contain transactions in any epoch other than their first epoch. We saw in cockroachdb#40466 that this could lead to intent abandonment if the txn had written intents in earlier epochs.

I believe that this is a new issue that was introduced when we moved from using the presence of a BeginTxn and EndTxn in the same batch to detect 1PC transactions to using the sequence numbers of writes and the presence of an EndTxn in a batch to detect 1PC transactions. That change was necessary for parallel commits. Before that change, I think logic in DistSender was preventing us from hitting this case because it would always split EndTxn requests from the rest of its batch when a txn had been restarted: https://github.com/cockroachdb/cockroach/blob/18bdfe1a691fa6d785d510e86d27cecdac9c436e/pkg/kv/dist_sender.go#L692-L697

Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
  • Loading branch information
craig[bot] and nvanbenschoten committed Sep 6, 2019
2 parents 1dcd427 + b738ed2 commit a32d225
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 75 deletions.
6 changes: 0 additions & 6 deletions pkg/kv/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2464,9 +2464,6 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.Put("c", "put")
return txn.CommitInBatch(ctx, b)
},
// Parallel commits do not support the canForwardSerializableTimestamp
// optimization. That's ok because we need to removed that optimization
// anyway. See #36431.
txnCoordRetry: true,
},
{
Expand Down Expand Up @@ -2549,9 +2546,6 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.Put("c", "put")
return txn.CommitInBatch(ctx, b) // both puts will succeed, et will retry
},
// Parallel commits do not support the canForwardSerializableTimestamp
// optimization. That's ok because we need to removed that optimization
// anyway. See #36431.
txnCoordRetry: true,
},
{
Expand Down
28 changes: 28 additions & 0 deletions pkg/kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,34 @@ func TestTxnCoordSenderCleanupOnAborted(t *testing.T) {
verifyCleanup(key, s.Eng, t, txn1.Sender().(*TxnCoordSender), txn2.Sender().(*TxnCoordSender))
}

// TestTxnCoordSenderCleanupOnCommitAfterRestart verifies that if a txn restarts
// at a higher epoch and then commits before it has written anything in the new
// epoch, the coordinator still cleans up the transaction. In #40466, we saw that
// this case could be detected as a 1PC transaction and the cleanup during the
// commit could be omitted.
func TestTxnCoordSenderCleanupOnCommitAfterRestart(t *testing.T) {
defer leaktest.AfterTest(t)()
s := createTestDB(t)
defer s.Stop()
ctx := context.Background()

// Create a transaction with intent at "a".
key := roachpb.Key("a")
txn := client.NewTxn(ctx, s.DB, 0 /* gatewayNodeID */, client.RootTxn)
if err := txn.Put(ctx, key, []byte("value")); err != nil {
t.Fatal(err)
}

// Restart the transaction with a new epoch.
txn.ManualRestart(ctx, s.Clock.Now())

// Now immediately commit.
if err := txn.CommitOrCleanup(ctx); err != nil {
t.Fatal(err)
}
verifyCleanup(key, s.Eng, t, txn.Sender().(*TxnCoordSender))
}

// TestTxnCoordSenderGCWithAmbiguousResultErr verifies that the coordinator
// cleans up extant transactions and intents after an ambiguous result error is
// observed, even if the error is on the first request.
Expand Down
33 changes: 17 additions & 16 deletions pkg/storage/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,33 +402,34 @@ func IsEndTransactionTriggeringRetryError(
}

// A transaction can still avoid a retry under certain conditions.
if retry && canForwardSerializableTimestamp(txn, args.NoRefreshSpans) {
if retry && CanForwardCommitTimestampWithoutRefresh(txn, args) {
retry, reason = false, 0
}

if !retry {
if IsEndTransactionExceedingDeadline(txn.Timestamp, args) {
exceededBy := txn.Timestamp.GoTime().Sub(args.Deadline.GoTime())
fromStart := txn.Timestamp.GoTime().Sub(txn.OrigTimestamp.GoTime())
extraMsg = fmt.Sprintf(
"txn timestamp pushed too much; deadline exceeded by %s (%s > %s), "+
"original timestamp %s ago (%s)",
exceededBy, txn.Timestamp, args.Deadline, fromStart, txn.OrigTimestamp)
retry, reason = true, roachpb.RETRY_COMMIT_DEADLINE_EXCEEDED
}
// However, a transaction must obey its deadline, if set.
if !retry && IsEndTransactionExceedingDeadline(txn.Timestamp, args) {
exceededBy := txn.Timestamp.GoTime().Sub(args.Deadline.GoTime())
fromStart := txn.Timestamp.GoTime().Sub(txn.OrigTimestamp.GoTime())
extraMsg = fmt.Sprintf(
"txn timestamp pushed too much; deadline exceeded by %s (%s > %s), "+
"original timestamp %s ago (%s)",
exceededBy, txn.Timestamp, args.Deadline, fromStart, txn.OrigTimestamp)
retry, reason = true, roachpb.RETRY_COMMIT_DEADLINE_EXCEEDED
}

return retry, reason, extraMsg
}

// canForwardSerializableTimestamp returns whether a serializable txn can
// be safely committed with a forwarded timestamp. This requires that
// CanForwardCommitTimestampWithoutRefresh returns whether a txn can be
// safely committed with a timestamp above its read timestamp without
// requiring a read refresh (see txnSpanRefresher). This requires that
// the transaction's timestamp has not leaked and that the transaction
// has encountered no spans which require refreshing at the forwarded
// timestamp. If either of those conditions are true, a client-side
// retry is required.
func canForwardSerializableTimestamp(txn *roachpb.Transaction, noRefreshSpans bool) bool {
return !txn.OrigTimestampWasObserved && noRefreshSpans
func CanForwardCommitTimestampWithoutRefresh(
txn *roachpb.Transaction, args *roachpb.EndTransactionRequest,
) bool {
return !txn.OrigTimestampWasObserved && args.NoRefreshSpans
}

const intentResolutionBatchSize = 500
Expand Down
123 changes: 94 additions & 29 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,50 +521,115 @@ func TestMaybeStripInFlightWrites(t *testing.T) {
// transactional batch can be committed as an atomic write.
func TestIsOnePhaseCommit(t *testing.T) {
defer leaktest.AfterTest(t)()
txnReqs := make([]roachpb.RequestUnion, 3)
txnReqs[0].MustSetInner(&roachpb.BeginTransactionRequest{})
txnReqs[1].MustSetInner(&roachpb.PutRequest{})
txnReqs[2].MustSetInner(&roachpb.EndTransactionRequest{Commit: true})
txnReqsNoRefresh := make([]roachpb.RequestUnion, 3)
txnReqsNoRefresh[0].MustSetInner(&roachpb.BeginTransactionRequest{})
txnReqsNoRefresh[1].MustSetInner(&roachpb.PutRequest{})
txnReqsNoRefresh[2].MustSetInner(&roachpb.EndTransactionRequest{Commit: true, NoRefreshSpans: true})
withSeq := func(req roachpb.Request, seq enginepb.TxnSeq) roachpb.Request {
h := req.Header()
h.Sequence = seq
req.SetHeader(h)
return req
}
makeReqs := func(reqs ...roachpb.Request) []roachpb.RequestUnion {
ru := make([]roachpb.RequestUnion, len(reqs))
for i, r := range reqs {
ru[i].MustSetInner(r)
}
return ru
}

noReqs := makeReqs()
getReq := makeReqs(withSeq(&roachpb.GetRequest{}, 0))
putReq := makeReqs(withSeq(&roachpb.PutRequest{}, 1))
etReq := makeReqs(withSeq(&roachpb.EndTransactionRequest{Commit: true}, 1))
txnReqs := makeReqs(
withSeq(&roachpb.BeginTransactionRequest{}, 0),
withSeq(&roachpb.PutRequest{}, 1),
withSeq(&roachpb.EndTransactionRequest{Commit: true}, 2),
)
txnReqsNoRefresh := makeReqs(
withSeq(&roachpb.BeginTransactionRequest{}, 0),
withSeq(&roachpb.PutRequest{}, 1),
withSeq(&roachpb.EndTransactionRequest{Commit: true, NoRefreshSpans: true}, 2),
)
txnReqsRequire1PC := makeReqs(
withSeq(&roachpb.BeginTransactionRequest{}, 0),
withSeq(&roachpb.PutRequest{}, 1),
withSeq(&roachpb.EndTransactionRequest{Commit: true, Require1PC: true}, 2),
)

testCases := []struct {
bu []roachpb.RequestUnion
isTxn bool
isWTO bool
isTSOff bool
exp1PC bool
ru []roachpb.RequestUnion
isTxn bool
isRestarted bool
isWTO bool
isTSOff bool
exp1PC bool
}{
{[]roachpb.RequestUnion{}, false, false, false, false},
{[]roachpb.RequestUnion{}, true, false, false, false},
{[]roachpb.RequestUnion{{Value: &roachpb.RequestUnion_Get{Get: &roachpb.GetRequest{}}}}, true, false, false, false},
{[]roachpb.RequestUnion{{Value: &roachpb.RequestUnion_Put{Put: &roachpb.PutRequest{}}}}, true, false, false, false},
{txnReqs[0 : len(txnReqs)-1], true, false, false, false},
{txnReqs[1:], true, false, false, false},
{txnReqs, true, false, false, true},
{txnReqs, true, true, false, false},
{txnReqs, true, false, true, false},
{txnReqs, true, true, true, false},
{txnReqsNoRefresh, true, false, false, true},
{txnReqsNoRefresh, true, true, false, true},
{txnReqsNoRefresh, true, false, true, true},
{txnReqsNoRefresh, true, true, true, true},
{ru: noReqs, isTxn: false, exp1PC: false},
{ru: noReqs, isTxn: true, exp1PC: false},
{ru: getReq, isTxn: true, exp1PC: false},
{ru: putReq, isTxn: true, exp1PC: false},
{ru: etReq, isTxn: true, exp1PC: true},
{ru: etReq, isTxn: true, isTSOff: true, exp1PC: false},
{ru: etReq, isTxn: true, isWTO: true, exp1PC: false},
{ru: etReq, isTxn: true, isWTO: true, isTSOff: true, exp1PC: false},
{ru: etReq, isTxn: true, isRestarted: true, exp1PC: false},
{ru: etReq, isTxn: true, isRestarted: true, isTSOff: true, exp1PC: false},
{ru: etReq, isTxn: true, isRestarted: true, isWTO: true, exp1PC: false},
{ru: etReq, isTxn: true, isRestarted: true, isWTO: true, isTSOff: true, exp1PC: false},
{ru: txnReqs[0:2], isTxn: true, exp1PC: false},
{ru: txnReqs[1:], isTxn: true, exp1PC: true},
{ru: txnReqs[2:], isTxn: true, exp1PC: false},
{ru: txnReqs, isTxn: true, exp1PC: true},
{ru: txnReqs, isTxn: true, isTSOff: true, exp1PC: false},
{ru: txnReqs, isTxn: true, isWTO: true, exp1PC: false},
{ru: txnReqs, isTxn: true, isWTO: true, isTSOff: true, exp1PC: false},
{ru: txnReqs, isTxn: true, isRestarted: true, exp1PC: false},
{ru: txnReqs, isTxn: true, isRestarted: true, isTSOff: true, exp1PC: false},
{ru: txnReqs, isTxn: true, isRestarted: true, isWTO: true, exp1PC: false},
{ru: txnReqs, isTxn: true, isRestarted: true, isWTO: true, isTSOff: true, exp1PC: false},
{ru: txnReqsNoRefresh[0:2], isTxn: true, exp1PC: false},
{ru: txnReqsNoRefresh[1:], isTxn: true, exp1PC: true},
{ru: txnReqsNoRefresh[2:], isTxn: true, exp1PC: false},
{ru: txnReqsNoRefresh, isTxn: true, exp1PC: true},
{ru: txnReqsNoRefresh, isTxn: true, isTSOff: true, exp1PC: true},
{ru: txnReqsNoRefresh, isTxn: true, isWTO: true, exp1PC: true},
{ru: txnReqsNoRefresh, isTxn: true, isWTO: true, isTSOff: true, exp1PC: true},
{ru: txnReqsNoRefresh, isTxn: true, isRestarted: true, exp1PC: false},
{ru: txnReqsNoRefresh, isTxn: true, isRestarted: true, isTSOff: true, exp1PC: false},
{ru: txnReqsNoRefresh, isTxn: true, isRestarted: true, isWTO: true, exp1PC: false},
{ru: txnReqsNoRefresh, isTxn: true, isRestarted: true, isWTO: true, isTSOff: true, exp1PC: false},
{ru: txnReqsRequire1PC[0:2], isTxn: true, exp1PC: false},
{ru: txnReqsRequire1PC[1:], isTxn: true, exp1PC: true},
{ru: txnReqsRequire1PC[2:], isTxn: true, exp1PC: false},
{ru: txnReqsRequire1PC, isTxn: true, exp1PC: true},
{ru: txnReqsRequire1PC, isTxn: true, isTSOff: true, exp1PC: false},
{ru: txnReqsRequire1PC, isTxn: true, isWTO: true, exp1PC: false},
{ru: txnReqsRequire1PC, isTxn: true, isWTO: true, isTSOff: true, exp1PC: false},
{ru: txnReqsRequire1PC, isTxn: true, isRestarted: true, exp1PC: true},
{ru: txnReqsRequire1PC, isTxn: true, isRestarted: true, isTSOff: true, exp1PC: false},
{ru: txnReqsRequire1PC, isTxn: true, isRestarted: true, isWTO: true, exp1PC: false},
{ru: txnReqsRequire1PC, isTxn: true, isRestarted: true, isWTO: true, isTSOff: true, exp1PC: false},
}

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
for i, c := range testCases {
ba := roachpb.BatchRequest{Requests: c.bu}
ba := roachpb.BatchRequest{Requests: c.ru}
if c.isTxn {
ba.Txn = newTransaction("txn", roachpb.Key("a"), 1, clock)
if c.isRestarted {
ba.Txn.Restart(-1, 0, clock.Now())
}
if c.isWTO {
ba.Txn.WriteTooOld = true
}
if c.isTSOff {
ba.Txn.Timestamp = ba.Txn.OrigTimestamp.Add(1, 0)
}
} else {
require.False(t, c.isRestarted)
require.False(t, c.isWTO)
require.False(t, c.isTSOff)
}
if is1PC := isOnePhaseCommit(&ba, &StoreTestingKnobs{}); is1PC != c.exp1PC {
if is1PC := isOnePhaseCommit(&ba); is1PC != c.exp1PC {
t.Errorf("%d: expected 1pc=%t; got %t", i, c.exp1PC, is1PC)
}
}
Expand Down
47 changes: 28 additions & 19 deletions pkg/storage/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (r *Replica) evaluateWriteBatch(
ms := enginepb.MVCCStats{}
// If not transactional or there are indications that the batch's txn will
// require restart or retry, execute as normal.
if isOnePhaseCommit(ba, r.store.TestingKnobs()) {
if isOnePhaseCommit(ba) {
_, hasBegin := ba.GetArg(roachpb.BeginTransaction)
arg, _ := ba.GetArg(roachpb.EndTransaction)
etArg := arg.(*roachpb.EndTransactionRequest)
Expand All @@ -277,18 +277,18 @@ func (r *Replica) evaluateWriteBatch(
strippedBa.Requests = ba.Requests[:len(ba.Requests)-1] // strip end txn req
}

// If there were no refreshable spans earlier in the txn
// (e.g. earlier gets or scans), then the batch can be retried
// locally in the event of write too old errors.
retryLocally := etArg.NoRefreshSpans && !ba.Txn.OrigTimestampWasObserved
// Is the transaction allowed to retry locally in the event of
// write too old errors? This is only allowed if it is able to
// forward its commit timestamp without a read refresh.
canForwardTimestamp := batcheval.CanForwardCommitTimestampWithoutRefresh(ba.Txn, etArg)

// If all writes occurred at the intended timestamp, we've succeeded on the fast path.
rec := NewReplicaEvalContext(r, spans)
batch, br, res, pErr := r.evaluateWriteBatchWithLocalRetries(
ctx, idKey, rec, &ms, &strippedBa, spans, retryLocally,
ctx, idKey, rec, &ms, &strippedBa, spans, canForwardTimestamp,
)
if pErr == nil && (ba.Timestamp == br.Timestamp ||
(retryLocally && !batcheval.IsEndTransactionExceedingDeadline(br.Timestamp, etArg))) {
(canForwardTimestamp && !batcheval.IsEndTransactionExceedingDeadline(br.Timestamp, etArg))) {
clonedTxn := ba.Txn.Clone()
clonedTxn.Status = roachpb.COMMITTED
// Make sure the returned txn has the actual commit
Expand Down Expand Up @@ -431,14 +431,15 @@ func (r *Replica) evaluateWriteBatchWithLocalRetries(
return
}

// isOnePhaseCommit returns true iff the BatchRequest contains all commands in
// the transaction, starting with BeginTransaction and ending with
// EndTransaction. One phase commits are disallowed if (1) the transaction has
// already been flagged with a write too old error, or (2) if isolation is
// serializable and the commit timestamp has been forwarded, or (3) the
// transaction exceeded its deadline, or (4) the testing knobs disallow optional
// one phase commits and the BatchRequest does not require one phase commit.
func isOnePhaseCommit(ba *roachpb.BatchRequest, knobs *StoreTestingKnobs) bool {
// isOnePhaseCommit returns true iff the BatchRequest contains all writes in the
// transaction and ends with an EndTransaction. One phase commits are disallowed
// if any of the following conditions are true:
// (1) the transaction has already been flagged with a write too old error
// (2) the transaction's commit timestamp has been forwarded
// (3) the transaction exceeded its deadline
// (4) the transaction is not in its first epoch and the EndTransaction request
// does not require one phase commit.
func isOnePhaseCommit(ba *roachpb.BatchRequest) bool {
if ba.Txn == nil {
return false
}
Expand All @@ -447,13 +448,21 @@ func isOnePhaseCommit(ba *roachpb.BatchRequest, knobs *StoreTestingKnobs) bool {
}
arg, _ := ba.GetArg(roachpb.EndTransaction)
etArg := arg.(*roachpb.EndTransactionRequest)
if batcheval.IsEndTransactionExceedingDeadline(ba.Txn.Timestamp, etArg) {
return false
}
if retry, _, _ := batcheval.IsEndTransactionTriggeringRetryError(ba.Txn, etArg); retry {
return false
}
return !knobs.DisableOptional1PC || etArg.Require1PC
// If the transaction has already restarted at least once then it may have
// left intents at prior epochs that need to be cleaned up during the
// process of committing the transaction. Even if the current epoch could
// perform a one phase commit, we don't allow it to because that could
// prevent it from properly resolving intents from prior epochs and cause
// it to abandon them instead.
//
// The exception to this rule is transactions that require a one phase
// commit. We know that if they also required a one phase commit in past
// epochs then they couldn't have left any intents that they now need to
// clean up.
return ba.Txn.Epoch == 0 || etArg.Require1PC
}

// maybeStripInFlightWrites attempts to remove all point writes and query
Expand Down
5 changes: 0 additions & 5 deletions pkg/storage/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,6 @@ type StoreTestingKnobs struct {
// error returned to the client, or to simulate network failures.
TestingResponseFilter storagebase.ReplicaResponseFilter

// Disables the use of optional one phase commits. Even when enabled, requests
// that set the Require1PC flag are permitted to use one phase commits. This
// prevents wedging node liveness, which requires one phase commits during
// liveness updates.
DisableOptional1PC bool
// A hack to manipulate the clock before sending a batch request to a replica.
// TODO(kaneda): This hook is not encouraged to use. Get rid of it once
// we make TestServer take a ManualClock.
Expand Down

0 comments on commit a32d225

Please sign in to comment.