Skip to content

Commit

Permalink
storage: don't perform one-phase commit transactions after restarts
Browse files Browse the repository at this point in the history
Fixes cockroachdb#40466.

This commit adjusts the Replica-side logic around executing one-phase commit
transactions. It prevents batches that could be executed on the 1PC fast-path
from doing so if they contain transactions in any epoch other than their first
epoch. We saw in cockroachdb#40466 that this could lead to intent abandonment if the txn
had written intents in earlier epochs.

I believe that this is a new issue that was introduced when we moved from using
the presence of a BeginTxn and EndTxn in the same batch to detect 1PC transactions
to using the sequence numbers of writes and the presence of an EndTxn in a batch
to detect 1PC transactions. That change was necessary for parallel commits. Before
that change, I think logic in DistSender was preventing us from hitting this case
because it would always split EndTxn requests from the rest of its batch when a
txn had been restarted:
 https://github.com/cockroachdb/cockroach/blob/18bdfe1a691fa6d785d510e86d27cecdac9c436e/pkg/kv/dist_sender.go#L692-L697

Release note: None
  • Loading branch information
nvanbenschoten committed Sep 5, 2019
1 parent 6f87f25 commit 8cc73c4
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 21 deletions.
28 changes: 28 additions & 0 deletions pkg/kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,34 @@ func TestTxnCoordSenderCleanupOnAborted(t *testing.T) {
verifyCleanup(key, s.Eng, t, txn1.Sender().(*TxnCoordSender), txn2.Sender().(*TxnCoordSender))
}

// TestTxnCoordSenderCleanupOnCommitAfterRestart verifies that if a txn restarts
// at a higher epoch and then commits before it has written anything in the new
// epoch, the coordinator still cleans up the transaction. In #40466, we saw that
// this case could be detected as a 1PC transaction and the cleanup during the
// commit could be omitted.
func TestTxnCoordSenderCleanupOnCommitAfterRestart(t *testing.T) {
defer leaktest.AfterTest(t)()
s := createTestDB(t)
defer s.Stop()
ctx := context.Background()

// Create a transaction with intent at "a".
key := roachpb.Key("a")
txn := client.NewTxn(ctx, s.DB, 0 /* gatewayNodeID */, client.RootTxn)
if err := txn.Put(ctx, key, []byte("value")); err != nil {
t.Fatal(err)
}

// Restart the transaction with a new epoch.
txn.ManualRestart(ctx, s.Clock.Now())

// Now immediately commit.
if err := txn.CommitOrCleanup(ctx); err != nil {
t.Fatal(err)
}
verifyCleanup(key, s.Eng, t, txn.Sender().(*TxnCoordSender))
}

// TestTxnCoordSenderGCWithAmbiguousResultErr verifies that the coordinator
// cleans up extant transactions and intents after an ambiguous result error is
// observed, even if the error is on the first request.
Expand Down
73 changes: 53 additions & 20 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,11 @@ func TestMaybeStripInFlightWrites(t *testing.T) {
// transactional batch can be committed as an atomic write.
func TestIsOnePhaseCommit(t *testing.T) {
defer leaktest.AfterTest(t)()
noReqs := []roachpb.RequestUnion{}
getReq := make([]roachpb.RequestUnion, 1)
getReq[0].MustSetInner(&roachpb.GetRequest{})
putReq := make([]roachpb.RequestUnion, 1)
putReq[0].MustSetInner(&roachpb.PutRequest{})
txnReqs := make([]roachpb.RequestUnion, 3)
txnReqs[0].MustSetInner(&roachpb.BeginTransactionRequest{})
txnReqs[1].MustSetInner(&roachpb.PutRequest{})
Expand All @@ -529,40 +534,68 @@ func TestIsOnePhaseCommit(t *testing.T) {
txnReqsNoRefresh[0].MustSetInner(&roachpb.BeginTransactionRequest{})
txnReqsNoRefresh[1].MustSetInner(&roachpb.PutRequest{})
txnReqsNoRefresh[2].MustSetInner(&roachpb.EndTransactionRequest{Commit: true, NoRefreshSpans: true})
txnReqsRequire1PC := make([]roachpb.RequestUnion, 3)
txnReqsRequire1PC[0].MustSetInner(&roachpb.BeginTransactionRequest{})
txnReqsRequire1PC[1].MustSetInner(&roachpb.PutRequest{})
txnReqsRequire1PC[2].MustSetInner(&roachpb.EndTransactionRequest{Commit: true, Require1PC: true})
testCases := []struct {
bu []roachpb.RequestUnion
isTxn bool
isWTO bool
isTSOff bool
exp1PC bool
ru []roachpb.RequestUnion
isTxn bool
isRestarted bool
isWTO bool
isTSOff bool
exp1PC bool
}{
{[]roachpb.RequestUnion{}, false, false, false, false},
{[]roachpb.RequestUnion{}, true, false, false, false},
{[]roachpb.RequestUnion{{Value: &roachpb.RequestUnion_Get{Get: &roachpb.GetRequest{}}}}, true, false, false, false},
{[]roachpb.RequestUnion{{Value: &roachpb.RequestUnion_Put{Put: &roachpb.PutRequest{}}}}, true, false, false, false},
{txnReqs[0 : len(txnReqs)-1], true, false, false, false},
{txnReqs[1:], true, false, false, false},
{txnReqs, true, false, false, true},
{txnReqs, true, true, false, false},
{txnReqs, true, false, true, false},
{txnReqs, true, true, true, false},
{txnReqsNoRefresh, true, false, false, true},
{txnReqsNoRefresh, true, true, false, true},
{txnReqsNoRefresh, true, false, true, true},
{txnReqsNoRefresh, true, true, true, true},
{ru: noReqs, isTxn: false, exp1PC: false},
{ru: noReqs, isTxn: true, exp1PC: false},
{ru: getReq, isTxn: true, exp1PC: false},
{ru: putReq, isTxn: true, exp1PC: false},
{ru: txnReqs[0 : len(txnReqs)-1], isTxn: true, exp1PC: false},
{ru: txnReqs[1:], isTxn: true, exp1PC: false},
{ru: txnReqs, isTxn: true, exp1PC: true},
{ru: txnReqs, isTxn: true, isTSOff: true, exp1PC: false},
{ru: txnReqs, isTxn: true, isWTO: true, exp1PC: false},
{ru: txnReqs, isTxn: true, isWTO: true, isTSOff: true, exp1PC: false},
{ru: txnReqs, isTxn: true, isRestarted: true, exp1PC: false},
{ru: txnReqs, isTxn: true, isRestarted: true, isTSOff: true, exp1PC: false},
{ru: txnReqs, isTxn: true, isRestarted: true, isWTO: true, exp1PC: false},
{ru: txnReqs, isTxn: true, isRestarted: true, isWTO: true, isTSOff: true, exp1PC: false},
{ru: txnReqsNoRefresh, isTxn: true, exp1PC: true},
{ru: txnReqsNoRefresh, isTxn: true, isTSOff: true, exp1PC: true},
{ru: txnReqsNoRefresh, isTxn: true, isWTO: true, exp1PC: true},
{ru: txnReqsNoRefresh, isTxn: true, isWTO: true, isTSOff: true, exp1PC: true},
{ru: txnReqsNoRefresh, isTxn: true, isRestarted: true, exp1PC: false},
{ru: txnReqsNoRefresh, isTxn: true, isRestarted: true, isTSOff: true, exp1PC: false},
{ru: txnReqsNoRefresh, isTxn: true, isRestarted: true, isWTO: true, exp1PC: false},
{ru: txnReqsNoRefresh, isTxn: true, isRestarted: true, isWTO: true, isTSOff: true, exp1PC: false},
{ru: txnReqsRequire1PC, isTxn: true, exp1PC: true},
{ru: txnReqsRequire1PC, isTxn: true, isTSOff: true, exp1PC: false},
{ru: txnReqsRequire1PC, isTxn: true, isWTO: true, exp1PC: false},
{ru: txnReqsRequire1PC, isTxn: true, isWTO: true, isTSOff: true, exp1PC: false},
{ru: txnReqsRequire1PC, isTxn: true, isRestarted: true, exp1PC: true},
{ru: txnReqsRequire1PC, isTxn: true, isRestarted: true, isTSOff: true, exp1PC: false},
{ru: txnReqsRequire1PC, isTxn: true, isRestarted: true, isWTO: true, exp1PC: false},
{ru: txnReqsRequire1PC, isTxn: true, isRestarted: true, isWTO: true, isTSOff: true, exp1PC: false},
}

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
for i, c := range testCases {
ba := roachpb.BatchRequest{Requests: c.bu}
ba := roachpb.BatchRequest{Requests: c.ru}
if c.isTxn {
ba.Txn = newTransaction("txn", roachpb.Key("a"), 1, clock)
if c.isRestarted {
ba.Txn.Restart(-1, 0, clock.Now())
}
if c.isWTO {
ba.Txn.WriteTooOld = true
}
if c.isTSOff {
ba.Txn.Timestamp = ba.Txn.OrigTimestamp.Add(1, 0)
}
} else {
require.False(t, c.isRestarted)
require.False(t, c.isWTO)
require.False(t, c.isTSOff)
}
if is1PC := isOnePhaseCommit(&ba); is1PC != c.exp1PC {
t.Errorf("%d: expected 1pc=%t; got %t", i, c.exp1PC, is1PC)
Expand Down
15 changes: 14 additions & 1 deletion pkg/storage/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,8 @@ func (r *Replica) evaluateWriteBatchWithLocalRetries(
// (1) the transaction has already been flagged with a write too old error
// (2) the transaction's commit timestamp has been forwarded
// (3) the transaction exceeded its deadline
// (4) the transaction is not in its first epoch and the EndTransaction request
// does not require one phase commit.
func isOnePhaseCommit(ba *roachpb.BatchRequest) bool {
if ba.Txn == nil {
return false
Expand All @@ -452,7 +454,18 @@ func isOnePhaseCommit(ba *roachpb.BatchRequest) bool {
if retry, _, _ := batcheval.IsEndTransactionTriggeringRetryError(ba.Txn, etArg); retry {
return false
}
return true
// If the transaction has already restarted at least once then it may have
// left intents at prior epochs that need to be cleaned up during the
// process of committing the transaction. Even if the current epoch could
// perform a one phase commit, we don't allow it to because that could
// prevent it from properly resolving intents from prior epochs and cause
// it to abandon them instead.
//
// The exception to this rule is transactions that require a one phase
// commit. We know that if they also required a one phase commit in past
// epochs then they couldn't have left any intents that they now need to
// clean up.
return ba.Txn.Epoch == 0 || etArg.Require1PC
}

// maybeStripInFlightWrites attempts to remove all point writes and query
Expand Down

0 comments on commit 8cc73c4

Please sign in to comment.