Skip to content

Commit

Permalink
tikv: fix primary selection when delete-your-writes (#18244) (#18248)
Browse files Browse the repository at this point in the history
* cherry pick #18244 to release-3.0

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>

* fix conflict to 4.0

* fix test case

Co-authored-by: lysu <sulifx@gmail.com>
  • Loading branch information
ti-srebot and lysu authored Jun 29, 2020
1 parent e0950dc commit f588330
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 6 deletions.
16 changes: 14 additions & 2 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ type twoPhaseCommitter struct {
regionTxnSize map[uint64]int
// Used by pessimistic transaction and large transaction.
ttlManager

testingKnobs struct {
acAfterCommitPrimary chan struct{}
bkAfterCommitPrimary chan struct{}
}
}

type mutationEx struct {
Expand Down Expand Up @@ -219,11 +224,12 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error {
}
}
err := txn.us.WalkBuffer(func(k kv.Key, v []byte) error {
var op pb.Op
if len(v) > 0 {
if tablecodec.IsUntouchedIndexKValue(k, v) {
return nil
}
op := pb.Op_Put
op = pb.Op_Put
if c := txn.us.LookupConditionPair(k); c != nil && c.ShouldNotExist() {
op = pb.Op_Insert
}
Expand All @@ -236,7 +242,6 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error {
}
putCnt++
} else {
var op pb.Op
if !txn.IsPessimistic() && txn.us.LookupConditionPair(k) != nil {
// delete-your-writes keys in optimistic txn need check not exists in prewrite-phase
// due to `Op_CheckNotExists` doesn't prewrite lock, so mark those keys should not be used in commit-phase.
Expand All @@ -260,6 +265,9 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error {
keys = append(keys, k)
}
} else {
if len(c.primaryKey) == 0 && op != pb.Op_CheckNotExists {
c.primaryKey = k
}
keys = append(keys, k)
}
entrySize := len(k) + len(v)
Expand Down Expand Up @@ -435,6 +443,10 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA
if err != nil {
return errors.Trace(err)
}
if actionIsCommit && c.testingKnobs.bkAfterCommitPrimary != nil && c.testingKnobs.acAfterCommitPrimary != nil {
c.testingKnobs.acAfterCommitPrimary <- struct{}{}
<-c.testingKnobs.bkAfterCommitPrimary
}
batches = batches[1:]
}
if actionIsCommit {
Expand Down
55 changes: 51 additions & 4 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"math"
"math/rand"
"strings"
"sync"
"time"

. "github.com/pingcap/check"
Expand All @@ -31,8 +32,9 @@ import (

type testCommitterSuite struct {
OneByOneSuite
cluster *mocktikv.Cluster
store *tikvStore
cluster *mocktikv.Cluster
store *tikvStore
mvccStore mocktikv.MVCCStore
}

var _ = Suite(&testCommitterSuite{})
Expand All @@ -45,9 +47,10 @@ func (s *testCommitterSuite) SetUpSuite(c *C) {
func (s *testCommitterSuite) SetUpTest(c *C) {
s.cluster = mocktikv.NewCluster()
mocktikv.BootstrapWithMultiRegions(s.cluster, []byte("a"), []byte("b"), []byte("c"))
mvccStore, err := mocktikv.NewMVCCLevelDB("")
var err error
s.mvccStore, err = mocktikv.NewMVCCLevelDB("")
c.Assert(err, IsNil)
client := mocktikv.NewRPCClient(s.cluster, mvccStore)
client := mocktikv.NewRPCClient(s.cluster, s.mvccStore)
pdCli := &codecPDClient{mocktikv.NewPDClient(s.cluster)}
spkv := NewMockSafePointKV()
store, err := newTikvStore("mocktikv-store", pdCli, spkv, client, false)
Expand Down Expand Up @@ -593,6 +596,50 @@ func (s *testCommitterSuite) TestElapsedTTL(c *C) {
c.Assert(lockInfo.LockTtl-PessimisticLockTTL, Less, uint64(150))
}

func (s *testCommitterSuite) TestDeleteYourWriteCauseGhostPrimary(c *C) {
s.cluster.SplitKeys(s.mvccStore, kv.Key("d"), kv.Key("a"), 4)
k1 := kv.Key("a") // insert but deleted key at first pos in txn1
k2 := kv.Key("b") // insert key at second pos in txn1
k3 := kv.Key("c") // insert key in txn1 and will be conflict read by txn2

// insert k1, k2, k2 and delete k1
txn1 := s.begin(c)
txn1.DelOption(kv.Pessimistic)
txn1.SetOption(kv.PresumeKeyNotExists, nil)
txn1.SetOption(kv.PresumeKeyNotExistsError, kv.ErrKeyExists.FastGen(""))
txn1.store.txnLatches = nil
txn1.Get(k1)
txn1.Set(k1, []byte{0})
txn1.Set(k2, []byte{1})
txn1.Set(k3, []byte{2})
txn1.Delete(k1)
committer1, err := newTwoPhaseCommitter(txn1, 0)
c.Assert(err, IsNil)
// setup test knob in txn's committer
committer1.testingKnobs.acAfterCommitPrimary = make(chan struct{})
committer1.testingKnobs.bkAfterCommitPrimary = make(chan struct{})
txn1.committer = committer1
var txn1Done sync.WaitGroup
txn1Done.Add(1)
go func() {
err1 := txn1.Commit(context.Background())
c.Assert(err1, IsNil)
txn1Done.Done()
}()
// resume after after primary key be committed
<-txn1.committer.testingKnobs.acAfterCommitPrimary

// start txn2 to read k3(prewrite success and primary should be committed)
txn2 := s.begin(c)
txn2.DelOption(kv.Pessimistic)
txn2.store.txnLatches = nil
v, err := txn2.Get(k3)
c.Assert(err, IsNil) // should resolve lock and read txn1 k3 result instead of rollback it.
c.Assert(v[0], Equals, byte(2))
txn1.committer.testingKnobs.bkAfterCommitPrimary <- struct{}{}
txn1Done.Wait()
}

// TestAcquireFalseTimeoutLock tests acquiring a key which is a secondary key of another transaction.
// The lock's own TTL is expired but the primary key is still alive due to heartbeats.
func (s *testCommitterSuite) TestAcquireFalseTimeoutLock(c *C) {
Expand Down
1 change: 1 addition & 0 deletions store/tikv/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func (s *testLockSuite) lockKey(c *C, key, value, primaryKey, primaryValue []byt
}
c.Assert(err, IsNil)
tpc, err := newTwoPhaseCommitterWithInit(txn, 0)
tpc.primaryKey = primaryKey
c.Assert(err, IsNil)
if bytes.Equal(key, primaryKey) {
tpc.keys = [][]byte{primaryKey}
Expand Down

0 comments on commit f588330

Please sign in to comment.