Skip to content

Commit

Permalink
tikv: fix primary selection when delete-your-writes (#18244) (#18250)
Browse files Browse the repository at this point in the history
* cherry pick #18244 to release-4.0

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>

* fix test compile

Co-authored-by: lysu <sulifx@gmail.com>
  • Loading branch information
ti-srebot and lysu authored Jun 29, 2020
1 parent c6e7277 commit a0eef10
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 0 deletions.
12 changes: 12 additions & 0 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ type twoPhaseCommitter struct {
regionTxnSize map[uint64]int
// Used by pessimistic transaction and large transaction.
ttlManager

testingKnobs struct {
acAfterCommitPrimary chan struct{}
bkAfterCommitPrimary chan struct{}
}
}

type committerMutations struct {
Expand Down Expand Up @@ -328,6 +333,9 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error {
lockIdx++
}
}
if len(c.primaryKey) == 0 && op != pb.Op_CheckNotExists {
c.primaryKey = k
}
mutations.push(op, k, value, isPessimisticLock)
entrySize := len(k) + len(v)
if entrySize > kv.TxnEntrySizeLimit {
Expand Down Expand Up @@ -569,6 +577,10 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *Backoffer, action twoPh
if err != nil {
return errors.Trace(err)
}
if actionIsCommit && c.testingKnobs.bkAfterCommitPrimary != nil && c.testingKnobs.acAfterCommitPrimary != nil {
c.testingKnobs.acAfterCommitPrimary <- struct{}{}
<-c.testingKnobs.bkAfterCommitPrimary
}
batches = batches[1:]
}
if actionIsCommit && !actionCommit.retry {
Expand Down
44 changes: 44 additions & 0 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,50 @@ func (s *testCommitterSuite) TestElapsedTTL(c *C) {
c.Assert(lockInfo.LockTtl-atomic.LoadUint64(&ManagedLockTTL), Less, uint64(150))
}

func (s *testCommitterSuite) TestDeleteYourWriteCauseGhostPrimary(c *C) {
s.cluster.SplitKeys(s.mvccStore, kv.Key("d"), kv.Key("a"), 4)
k1 := kv.Key("a") // insert but deleted key at first pos in txn1
k2 := kv.Key("b") // insert key at second pos in txn1
k3 := kv.Key("c") // insert key in txn1 and will be conflict read by txn2

// insert k1, k2, k2 and delete k1
txn1 := s.begin(c)
txn1.DelOption(kv.Pessimistic)
txn1.SetOption(kv.PresumeKeyNotExists, nil)
txn1.SetOption(kv.PresumeKeyNotExistsError, kv.NewExistErrInfo("name", "value"))
txn1.store.txnLatches = nil
txn1.Get(context.Background(), k1)
txn1.Set(k1, []byte{0})
txn1.Set(k2, []byte{1})
txn1.Set(k3, []byte{2})
txn1.Delete(k1)
committer1, err := newTwoPhaseCommitter(txn1, 0)
c.Assert(err, IsNil)
// setup test knob in txn's committer
committer1.testingKnobs.acAfterCommitPrimary = make(chan struct{})
committer1.testingKnobs.bkAfterCommitPrimary = make(chan struct{})
txn1.committer = committer1
var txn1Done sync.WaitGroup
txn1Done.Add(1)
go func() {
err1 := txn1.Commit(context.Background())
c.Assert(err1, IsNil)
txn1Done.Done()
}()
// resume after after primary key be committed
<-txn1.committer.testingKnobs.acAfterCommitPrimary

// start txn2 to read k3(prewrite success and primary should be committed)
txn2 := s.begin(c)
txn2.DelOption(kv.Pessimistic)
txn2.store.txnLatches = nil
v, err := txn2.Get(context.Background(), k3)
c.Assert(err, IsNil) // should resolve lock and read txn1 k3 result instead of rollback it.
c.Assert(v[0], Equals, byte(2))
txn1.committer.testingKnobs.bkAfterCommitPrimary <- struct{}{}
txn1Done.Wait()
}

// TestAcquireFalseTimeoutLock tests acquiring a key which is a secondary key of another transaction.
// The lock's own TTL is expired but the primary key is still alive due to heartbeats.
func (s *testCommitterSuite) TestAcquireFalseTimeoutLock(c *C) {
Expand Down

0 comments on commit a0eef10

Please sign in to comment.