Skip to content

Commit

Permalink
tikv: choose right primary only contains SelectForUpdate & Delete-you…
Browse files Browse the repository at this point in the history
…r-writes (#18264)
  • Loading branch information
lysu authored Jun 30, 2020
1 parent 9d9f330 commit 402fd2a
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 4 deletions.
12 changes: 9 additions & 3 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,6 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error {
lockIdx++
}
}
if len(c.primaryKey) == 0 && op != pb.Op_CheckNotExists {
c.primaryKey = k
}
mutations.push(op, k, value, isPessimisticLock)
entrySize := len(k) + len(v)
if uint64(entrySize) > kv.TxnEntrySizeLimit {
Expand All @@ -358,6 +355,15 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error {
}
c.txnSize = size

if len(c.primaryKey) == 0 {
for i, op := range mutations.ops {
if op != pb.Op_CheckNotExists {
c.primaryKey = mutations.keys[i]
break
}
}
}

if size > int(kv.TxnTotalSizeLimit) {
return kv.ErrTxnTooLarge.GenWithStackByArgs(size)
}
Expand Down
77 changes: 76 additions & 1 deletion store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ func (s *testCommitterSuite) TestDeleteYourWriteCauseGhostPrimary(c *C) {
k2 := kv.Key("b") // insert key at second pos in txn1
k3 := kv.Key("c") // insert key in txn1 and will be conflict read by txn2

// insert k1, k2, k2 and delete k1
// insert k1, k2, k3 and delete k1
txn1 := s.begin(c)
txn1.DelOption(kv.Pessimistic)
txn1.SetOption(kv.PresumeKeyNotExists, nil)
Expand Down Expand Up @@ -725,6 +725,81 @@ func (s *testCommitterSuite) TestDeleteYourWriteCauseGhostPrimary(c *C) {
txn1Done.Wait()
}

func (s *testCommitterSuite) TestDeleteAllYourWrites(c *C) {
s.cluster.SplitKeys(kv.Key("d"), kv.Key("a"), 4)
k1 := kv.Key("a")
k2 := kv.Key("b")
k3 := kv.Key("c")

// insert k1, k2, k3 and delete k1, k2, k3
txn1 := s.begin(c)
txn1.DelOption(kv.Pessimistic)
txn1.SetOption(kv.PresumeKeyNotExists, nil)
txn1.SetOption(kv.PresumeKeyNotExistsError, kv.NewExistErrInfo("name", "value"))
txn1.store.txnLatches = nil
txn1.Get(context.Background(), k1)
txn1.Set(k1, []byte{0})
txn1.Delete(k1)
txn1.Get(context.Background(), k2)
txn1.Set(k2, []byte{1})
txn1.Delete(k2)
txn1.Get(context.Background(), k3)
txn1.Set(k3, []byte{2})
txn1.Delete(k3)
err1 := txn1.Commit(context.Background())
c.Assert(err1, IsNil)
}

func (s *testCommitterSuite) TestDeleteAllYourWritesWithSFU(c *C) {
s.cluster.SplitKeys(kv.Key("d"), kv.Key("a"), 4)
k1 := kv.Key("a")
k2 := kv.Key("b")
k3 := kv.Key("c")

// insert k1, k2, k2 and delete k1
txn1 := s.begin(c)
txn1.DelOption(kv.Pessimistic)
txn1.SetOption(kv.PresumeKeyNotExists, nil)
txn1.SetOption(kv.PresumeKeyNotExistsError, kv.NewExistErrInfo("name", "value"))
txn1.store.txnLatches = nil
txn1.Get(context.Background(), k1)
txn1.Set(k1, []byte{0})
txn1.Delete(k1)
err := txn1.LockKeys(context.Background(), &kv.LockCtx{}, k2, k3) // select * from t where x in (k2, k3) for update
c.Assert(err, IsNil)

committer1, err := newTwoPhaseCommitter(txn1, 0)
c.Assert(err, IsNil)
// setup test knob in txn's committer
committer1.testingKnobs.acAfterCommitPrimary = make(chan struct{})
committer1.testingKnobs.bkAfterCommitPrimary = make(chan struct{})
txn1.committer = committer1
var txn1Done sync.WaitGroup
txn1Done.Add(1)
go func() {
err1 := txn1.Commit(context.Background())
c.Assert(err1, IsNil)
txn1Done.Done()
}()
// resume after after primary key be committed
<-txn1.committer.testingKnobs.acAfterCommitPrimary
// start txn2 to read k3
txn2 := s.begin(c)
txn2.DelOption(kv.Pessimistic)
txn2.store.txnLatches = nil
err = txn2.Set(k3, []byte{33})
c.Assert(err, IsNil)
var meetLocks []*Lock
txn2.store.lockResolver.testingKnobs.meetLock = func(locks []*Lock) {
meetLocks = append(meetLocks, locks...)
}
err = txn2.Commit(context.Background())
c.Assert(err, IsNil)
txn1.committer.testingKnobs.bkAfterCommitPrimary <- struct{}{}
txn1Done.Wait()
c.Assert(meetLocks[0].Primary[0], Equals, k2[0])
}

// TestAcquireFalseTimeoutLock tests acquiring a key which is a secondary key of another transaction.
// The lock's own TTL is expired but the primary key is still alive due to heartbeats.
func (s *testCommitterSuite) TestAcquireFalseTimeoutLock(c *C) {
Expand Down
6 changes: 6 additions & 0 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ type LockResolver struct {
resolved map[uint64]TxnStatus
recentResolved *list.List
}
testingKnobs struct {
meetLock func(locks []*Lock)
}
}

func newLockResolver(store Storage) *LockResolver {
Expand Down Expand Up @@ -298,6 +301,9 @@ func (lr *LockResolver) resolveLocksLite(bo *Backoffer, callerStartTS uint64, lo
}

func (lr *LockResolver) resolveLocks(bo *Backoffer, callerStartTS uint64, locks []*Lock, forWrite bool, lite bool) (int64, []uint64 /*pushed*/, error) {
if lr.testingKnobs.meetLock != nil {
lr.testingKnobs.meetLock(locks)
}
var msBeforeTxnExpired txnExpireTime
if len(locks) == 0 {
return msBeforeTxnExpired.value(), nil, nil
Expand Down

0 comments on commit 402fd2a

Please sign in to comment.