From 0872c98bed2f0a96b540658779117371dcec86b8 Mon Sep 17 00:00:00 2001 From: lysu Date: Sun, 28 Jun 2020 22:01:44 +0800 Subject: [PATCH 1/2] tikv: fix primary selection when delete-your-writes --- store/tikv/2pc.go | 12 ++++++++++++ store/tikv/2pc_test.go | 44 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index fd1e81e563692..2726fe52949d1 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -159,6 +159,11 @@ type twoPhaseCommitter struct { regionTxnSize map[uint64]int // Used by pessimistic transaction and large transaction. ttlManager + + testingKnobs struct { + acAfterCommitPrimary chan struct{} + bkAfterCommitPrimary chan struct{} + } } type committerMutations struct { @@ -328,6 +333,9 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error { lockIdx++ } } + if len(c.primaryKey) == 0 && op != pb.Op_CheckNotExists { + c.primaryKey = k + } mutations.push(op, k, value, isPessimisticLock) entrySize := len(k) + len(v) if entrySize > kv.TxnEntrySizeLimit { @@ -569,6 +577,10 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *Backoffer, action twoPh if err != nil { return errors.Trace(err) } + if actionIsCommit && c.testingKnobs.bkAfterCommitPrimary != nil && c.testingKnobs.acAfterCommitPrimary != nil { + c.testingKnobs.acAfterCommitPrimary <- struct{}{} + <-c.testingKnobs.bkAfterCommitPrimary + } batches = batches[1:] } if actionIsCommit && !actionCommit.retry { diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index 5c0a08663f8ef..206539c615bad 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -681,6 +681,50 @@ func (s *testCommitterSuite) TestElapsedTTL(c *C) { c.Assert(lockInfo.LockTtl-atomic.LoadUint64(&ManagedLockTTL), Less, uint64(150)) } +func (s *testCommitterSuite) TestNoPrimary(c *C) { + s.cluster.SplitKeys(kv.Key("d"), kv.Key("a"), 4) + k1 := kv.Key("a") // insert but deleted key at first pos in txn1 + k2 := kv.Key("b") // insert key at second pos in txn1 + k3 := kv.Key("c") // insert key in txn1 and will be conflict read by txn2 + + // insert k1, k2, k2 and delete k1 + txn1 := s.begin(c) + txn1.DelOption(kv.Pessimistic) + txn1.SetOption(kv.PresumeKeyNotExists, nil) + txn1.SetOption(kv.PresumeKeyNotExistsError, kv.NewExistErrInfo("name", "value")) + txn1.store.txnLatches = nil + txn1.Get(context.Background(), k1) + txn1.Set(k1, []byte{0}) + txn1.Set(k2, []byte{1}) + txn1.Set(k3, []byte{2}) + txn1.Delete(k1) + committer1, err := newTwoPhaseCommitter(txn1, 0) + c.Assert(err, IsNil) + // setup test knob in txn's committer + committer1.testingKnobs.acAfterCommitPrimary = make(chan struct{}) + committer1.testingKnobs.bkAfterCommitPrimary = make(chan struct{}) + txn1.committer = committer1 + var txn1Done sync.WaitGroup + txn1Done.Add(1) + go func() { + err1 := txn1.Commit(context.Background()) + c.Assert(err1, IsNil) + txn1Done.Done() + }() + // resume after after primary key be committed + <-txn1.committer.testingKnobs.acAfterCommitPrimary + + // start txn2 to read k3(prewrite success and primary should be commited) + txn2 := s.begin(c) + txn2.DelOption(kv.Pessimistic) + txn2.store.txnLatches = nil + v, err := txn2.Get(context.Background(), k3) + c.Assert(err, IsNil) // should resolve lock and read txn1 k3 result instead of rollback it. + c.Assert(v[0], Equals, byte(2)) + txn1.committer.testingKnobs.bkAfterCommitPrimary <- struct{}{} + txn1Done.Wait() +} + // TestAcquireFalseTimeoutLock tests acquiring a key which is a secondary key of another transaction. // The lock's own TTL is expired but the primary key is still alive due to heartbeats. func (s *testCommitterSuite) TestAcquireFalseTimeoutLock(c *C) { From 6db34ff00abb1197c637b00716fd43635113dc94 Mon Sep 17 00:00:00 2001 From: lysu Date: Sun, 28 Jun 2020 22:30:15 +0800 Subject: [PATCH 2/2] make lint happy --- store/tikv/2pc_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index 206539c615bad..ea36ca9ddee90 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -681,7 +681,7 @@ func (s *testCommitterSuite) TestElapsedTTL(c *C) { c.Assert(lockInfo.LockTtl-atomic.LoadUint64(&ManagedLockTTL), Less, uint64(150)) } -func (s *testCommitterSuite) TestNoPrimary(c *C) { +func (s *testCommitterSuite) TestDeleteYourWriteCauseGhostPrimary(c *C) { s.cluster.SplitKeys(kv.Key("d"), kv.Key("a"), 4) k1 := kv.Key("a") // insert but deleted key at first pos in txn1 k2 := kv.Key("b") // insert key at second pos in txn1 @@ -714,7 +714,7 @@ func (s *testCommitterSuite) TestNoPrimary(c *C) { // resume after after primary key be committed <-txn1.committer.testingKnobs.acAfterCommitPrimary - // start txn2 to read k3(prewrite success and primary should be commited) + // start txn2 to read k3(prewrite success and primary should be committed) txn2 := s.begin(c) txn2.DelOption(kv.Pessimistic) txn2.store.txnLatches = nil