Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv,executor: redesign the latch scheduler #7711

Merged
merged 13 commits into from
Oct 9, 2018
3 changes: 1 addition & 2 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1808,8 +1808,7 @@ func (s *testBypassSuite) TestBypassLatch(c *C) {

// txn1 and txn2 data range do not overlap, but using latches result in txn conflict.
fn()
_, err = tk1.Exec("commit")
c.Assert(err, NotNil)
tk1.MustExec("commit")

tk1.MustExec("truncate table t")
fn()
Expand Down
177 changes: 98 additions & 79 deletions store/tikv/latch/latch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package latch

import (
"bytes"
"math/bits"
"sort"
"sync"
Expand All @@ -22,32 +23,25 @@ import (
"github.com/spaolacci/murmur3"
)

// latch stores a key's waiting transactions information.
type latch struct {
// Whether there is any transaction in waitingQueue except head.
hasMoreWaiting bool
// The startTS of the transaction which is the head of waiting transactions.
waitingQueueHead uint64
maxCommitTS uint64
sync.Mutex
}

func (l *latch) isEmpty() bool {
return l.waitingQueueHead == 0 && !l.hasMoreWaiting
}
type node struct {
slotID int
key []byte
maxCommitTS uint64
value *Lock

func (l *latch) free() {
l.waitingQueueHead = 0
next *node
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use list.List?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. list.List will make unnecessary allocation.

Use

type Element struct {

    // The value stored with this element.
    Value interface{}
    // contains filtered or unexported fields
}

is similar to

type node struct {
    Value *nodeValue
}
type nodeValue {
    slotID int
    key []byte
    maxCommitTS uint64
    value *Lock
}
  1. list.List is a doubly linked list, while a single linked list is sufficient here.
  2. list data struct is simple and common enough to implement

}

func (l *latch) refreshCommitTS(commitTS uint64) {
l.Lock()
defer l.Unlock()
l.maxCommitTS = mathutil.MaxUint64(commitTS, l.maxCommitTS)
// latch stores a key's waiting transactions information.
type latch struct {
queue *node
waiting []*Lock
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not each node has a waiting queue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Waiting queue is moved from each node to the latch for those reasons:

  1. nodes in the queue is inserted every now and then, if each node has is waiting queue, the queue would be created and destroyed. There will be more allocations, and it's less memory efficient.
  2. I have an assumption that the waiting queue would not be large, when a list is small enough, an array is very efficient.
  3. You may still remember the "first waiting one automatically become running" problem and the old code is complex enough to handle different states. If each node doesn't have the waiting queue, the problem could be avoid.
    @zhangjinpeng1987

sync.Mutex
}

// Lock is the locks' information required for a transaction.
type Lock struct {
keys [][]byte
// The slot IDs of the latches(keys) that a startTS must acquire before being able to processed.
requiredSlots []int
// The number of latches that the transaction has acquired. For status is stale, it include the
Expand Down Expand Up @@ -96,9 +90,20 @@ func (l *Lock) SetCommitTS(commitTS uint64) {
// but conceptually a latch is a queue, and a slot is an index to the queue
type Latches struct {
slots []latch
// The waiting queue for each slot(slotID => slice of Lock).
waitingQueues map[int][]*Lock
sync.RWMutex
}

type bytesSlice [][]byte

func (s bytesSlice) Len() int {
return len(s)
}

func (s bytesSlice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

func (s bytesSlice) Less(i, j int) bool {
return bytes.Compare(s[i], s[j]) < 0
}

// NewLatches create a Latches with fixed length,
Expand All @@ -107,14 +112,15 @@ func NewLatches(size uint) *Latches {
powerOfTwoSize := 1 << uint32(bits.Len32(uint32(size-1)))
slots := make([]latch, powerOfTwoSize)
return &Latches{
slots: slots,
waitingQueues: make(map[int][]*Lock),
slots: slots,
}
}

// genLock generates Lock for the transaction with startTS and keys.
func (latches *Latches) genLock(startTS uint64, keys [][]byte) *Lock {
sort.Sort(bytesSlice(keys))
return &Lock{
keys: keys,
requiredSlots: latches.genSlotIDs(keys),
acquiredCount: 0,
startTS: startTS,
Expand All @@ -126,17 +132,7 @@ func (latches *Latches) genSlotIDs(keys [][]byte) []int {
for _, key := range keys {
slots = append(slots, latches.slotID(key))
}
sort.Ints(slots)
if len(slots) <= 1 {
return slots
}
dedup := slots[:1]
for i := 1; i < len(slots); i++ {
if slots[i] != slots[i-1] {
dedup = append(dedup, slots[i])
}
}
return dedup
return slots
}

// slotID return slotID for current key.
Expand All @@ -150,8 +146,7 @@ func (latches *Latches) acquire(lock *Lock) acquireResult {
return acquireStale
}
for lock.acquiredCount < len(lock.requiredSlots) {
slotID := lock.requiredSlots[lock.acquiredCount]
status := latches.acquireSlot(slotID, lock)
status := latches.acquireSlot(lock)
if status != acquireSuccess {
return status
}
Expand All @@ -161,75 +156,99 @@ func (latches *Latches) acquire(lock *Lock) acquireResult {

// release releases all latches owned by the `lock` and returns the wakeup list.
// Preconditions: the caller must ensure the transaction's status is not locked.
func (latches *Latches) release(lock *Lock, commitTS uint64, wakeupList []*Lock) []*Lock {
func (latches *Latches) release(lock *Lock, wakeupList []*Lock) []*Lock {
wakeupList = wakeupList[:0]
for i := 0; i < lock.acquiredCount; i++ {
slotID := lock.requiredSlots[i]
if nextLock := latches.releaseSlot(slotID, commitTS); nextLock != nil {
for lock.acquiredCount > 0 {
if nextLock := latches.releaseSlot(lock); nextLock != nil {
wakeupList = append(wakeupList, nextLock)
}
}
return wakeupList
}

// refreshCommitTS refreshes commitTS for keys.
func (latches *Latches) refreshCommitTS(keys [][]byte, commitTS uint64) {
slotIDs := latches.genSlotIDs(keys)
for _, slotID := range slotIDs {
latches.slots[slotID].refreshCommitTS(commitTS)
}
}

func (latches *Latches) releaseSlot(slotID int, commitTS uint64) (nextLock *Lock) {
func (latches *Latches) releaseSlot(lock *Lock) (nextLock *Lock) {
key := lock.keys[lock.acquiredCount-1]
slotID := lock.requiredSlots[lock.acquiredCount-1]
latch := &latches.slots[slotID]
lock.acquiredCount--
latch.Lock()
defer latch.Unlock()
latch.maxCommitTS = mathutil.MaxUint64(latch.maxCommitTS, commitTS)
if !latch.hasMoreWaiting {
latch.free()

var find *node
for n := latch.queue; n != nil; n = n.next {
if bytes.Compare(n.key, key) == 0 {
find = n
break
}
}

if find.value != lock {
panic("releaseSlot wrong")
}
find.maxCommitTS = mathutil.MaxUint64(find.maxCommitTS, lock.commitTS)
find.value = nil
if len(latch.waiting) == 0 {
return nil
}
nextLock, latch.hasMoreWaiting = latches.popFromWaitingQueue(slotID)
latch.waitingQueueHead = nextLock.startTS
nextLock.acquiredCount++
if latch.maxCommitTS > nextLock.startTS {
nextLock.isStale = true

idx := 0
for i := 0; i < len(latch.waiting); i++ {
waiting := latch.waiting[i]
if bytes.Compare(waiting.keys[waiting.acquiredCount], key) == 0 {
nextLock = waiting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that there are more than 1 Locks in the waiting list have the same key?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible! you find a bug.
I should only wake up the first one.
Wake up the first one is in a FIFO manner, there are still room for improvement here to choose which one to wake up.

} else {
idx++
latch.waiting[idx] = waiting
}
}
return nextLock
}
latch.waiting = latch.waiting[:idx]

func (latches *Latches) popFromWaitingQueue(slotID int) (front *Lock, hasMoreWaiting bool) {
latches.Lock()
defer latches.Unlock()
waiting := latches.waitingQueues[slotID]
front = waiting[0]
if len(waiting) == 1 {
delete(latches.waitingQueues, slotID)
} else {
latches.waitingQueues[slotID] = waiting[1:]
hasMoreWaiting = true
if nextLock != nil && find.maxCommitTS > nextLock.startTS {
nextLock.isStale = true
}
return
return nextLock
}

func (latches *Latches) acquireSlot(slotID int, lock *Lock) acquireResult {
func (latches *Latches) acquireSlot(lock *Lock) acquireResult {
key := lock.keys[lock.acquiredCount]
slotID := lock.requiredSlots[lock.acquiredCount]
latch := &latches.slots[slotID]
latch.Lock()
defer latch.Unlock()
if latch.maxCommitTS > lock.startTS {

var find *node
for n := latch.queue; n != nil; n = n.next {
if bytes.Compare(n.key, key) == 0 {
find = n
break
}
// TODO: Invalidate old data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot merge this PR before fixing this TODO. Because without it we will get an OOM.

}

if find == nil {
tmp := &node{
slotID: slotID,
key: key,
value: lock,
}
tmp.next = latch.queue
latch.queue = tmp
lock.acquiredCount++
return acquireSuccess
}

if find.maxCommitTS > lock.startTS {
lock.isStale = true
return acquireStale
}

if latch.isEmpty() {
latch.waitingQueueHead = lock.startTS
if find.value == nil {
find.value = lock
lock.acquiredCount++
return acquireSuccess
}

// Push the current transaction into waitingQueue.
latch.hasMoreWaiting = true
latches.Lock()
defer latches.Unlock()
latches.waitingQueues[slotID] = append(latches.waitingQueues[slotID], lock)
latch.waiting = append(latch.waiting, lock)
return acquireLocked
}
14 changes: 8 additions & 6 deletions store/tikv/latch/latch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func getTso() uint64 {

func (s *testLatchSuite) TestWakeUp(c *C) {
keysA := [][]byte{
[]byte("a"), []byte("b"), []byte("c"), []byte("c")}
[]byte("a"), []byte("b"), []byte("c")}
_, lockA := s.newLock(keysA)

keysB := [][]byte{[]byte("d"), []byte("e"), []byte("a"), []byte("c")}
Expand All @@ -65,15 +65,16 @@ func (s *testLatchSuite) TestWakeUp(c *C) {
// A release lock, and get wakeup list.
commitTSA := getTso()
wakeupList := make([]*Lock, 0)
wakeupList = s.latches.release(lockA, commitTSA, wakeupList)
lockA.SetCommitTS(commitTSA)
wakeupList = s.latches.release(lockA, wakeupList)
c.Assert(wakeupList[0].startTS, Equals, startTSB)

// B acquire failed since startTSB has stale for some keys.
result = s.latches.acquire(lockB)
c.Assert(result, Equals, acquireStale)

// B release lock since it received a stale.
wakeupList = s.latches.release(lockB, 0, wakeupList)
wakeupList = s.latches.release(lockB, wakeupList)
c.Assert(wakeupList, HasLen, 0)

// B restart:get a new startTS.
Expand All @@ -85,7 +86,7 @@ func (s *testLatchSuite) TestWakeUp(c *C) {

func (s *testLatchSuite) TestFirstAcquireFailedWithStale(c *C) {
keys := [][]byte{
[]byte("a"), []byte("b"), []byte("c"), []byte("c")}
[]byte("a"), []byte("b"), []byte("c")}
_, lockA := s.newLock(keys)
startTSB, lockB := s.newLock(keys)
// acquire lockA success
Expand All @@ -94,11 +95,12 @@ func (s *testLatchSuite) TestFirstAcquireFailedWithStale(c *C) {
// release lockA
commitTSA := getTso()
wakeupList := make([]*Lock, 0)
s.latches.release(lockA, commitTSA, wakeupList)
lockA.SetCommitTS(commitTSA)
s.latches.release(lockA, wakeupList)

c.Assert(commitTSA, Greater, startTSB)
// acquire lockB first time, should be failed with stale since commitTSA > startTSB
result = s.latches.acquire(lockB)
c.Assert(result, Equals, acquireStale)
s.latches.release(lockB, 0, wakeupList)
s.latches.release(lockB, wakeupList)
}
8 changes: 1 addition & 7 deletions store/tikv/latch/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewScheduler(size uint) *LatchesScheduler {
func (scheduler *LatchesScheduler) run() {
wakeupList := make([]*Lock, 0)
for lock := range scheduler.unlockCh {
wakeupList = scheduler.latches.release(lock, lock.commitTS, wakeupList)
wakeupList = scheduler.latches.release(lock, wakeupList)
if len(wakeupList) > 0 {
scheduler.wakeup(wakeupList)
}
Expand Down Expand Up @@ -91,9 +91,3 @@ func (scheduler *LatchesScheduler) UnLock(lock *Lock) {
scheduler.unlockCh <- lock
}
}

// RefreshCommitTS refreshes commitTS for keys. It could be used for the transaction not retryable,
// which would do 2PC directly and wouldn't get a lock.
func (scheduler *LatchesScheduler) RefreshCommitTS(keys [][]byte, commitTS uint64) {
scheduler.latches.refreshCommitTS(keys, commitTS)
}
2 changes: 1 addition & 1 deletion store/tikv/latch/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (s *testSchedulerSuite) SetUpTest(c *C) {

func (s *testSchedulerSuite) TestWithConcurrency(c *C) {
txns := [][][]byte{
{[]byte("a"), []byte("a"), []byte("b"), []byte("c")},
{[]byte("a"), []byte("b"), []byte("c")},
{[]byte("a"), []byte("d"), []byte("e"), []byte("f")},
{[]byte("e"), []byte("f"), []byte("g"), []byte("h")},
}
Expand Down
3 changes: 0 additions & 3 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,6 @@ func (txn *tikvTxn) Commit(ctx context.Context) error {
// When bypassLatch flag is true, commit directly.
if bypassLatch {
err = committer.executeAndWriteFinishBinlog(ctx)
if err == nil {
txn.store.txnLatches.RefreshCommitTS(committer.keys, committer.commitTS)
}
log.Debug("[kv]", connID, " txnLatches enabled while txn not retryable, 2pc directly:", err)
return errors.Trace(err)
}
Expand Down