Skip to content

Commit

Permalink
try another way to recycle memory
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao committed Sep 25, 2018
1 parent 65a42d1 commit f055cd3
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 20 deletions.
72 changes: 55 additions & 17 deletions store/tikv/latch/latch.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type node struct {
// latch stores a key's waiting transactions information.
type latch struct {
queue *node
count int
waiting []*Lock
sync.Mutex
}
Expand Down Expand Up @@ -174,14 +175,7 @@ func (latches *Latches) releaseSlot(lock *Lock) (nextLock *Lock) {
latch.Lock()
defer latch.Unlock()

var find *node
for n := latch.queue; n != nil; n = n.next {
if bytes.Compare(n.key, key) == 0 {
find = n
break
}
}

find := findNode(latch.queue, key)
if find.value != lock {
panic("releaseSlot wrong")
}
Expand Down Expand Up @@ -216,15 +210,7 @@ func (latches *Latches) acquireSlot(lock *Lock) acquireResult {
latch.Lock()
defer latch.Unlock()

var find *node
for n := latch.queue; n != nil; n = n.next {
if bytes.Compare(n.key, key) == 0 {
find = n
break
}
// TODO: Invalidate old data.
}

find := findNode(latch.queue, key)
if find == nil {
tmp := &node{
slotID: slotID,
Expand All @@ -233,10 +219,17 @@ func (latches *Latches) acquireSlot(lock *Lock) acquireResult {
}
tmp.next = latch.queue
latch.queue = tmp
latch.count++

lock.acquiredCount++
return acquireSuccess
}

// Try to limits the memory usage.
if latch.count > 5 {
latch.recycle(lock.startTS)
}

if find.maxCommitTS > lock.startTS {
lock.isStale = true
return acquireStale
Expand All @@ -252,3 +245,48 @@ func (latches *Latches) acquireSlot(lock *Lock) acquireResult {
latch.waiting = append(latch.waiting, lock)
return acquireLocked
}

func (l *latch) recycle(currentTS uint64) {
l.Lock()
defer l.Unlock()

if l.queue == nil {
return
}

prev := l.queue
curr := l.queue.next

// Handle list nodes.
for curr != nil {
if tsoSub(currentTS, curr.maxCommitTS) >= expireDuration {
l.count--
prev.next = curr.next
} else {
prev = curr
}
curr = curr.next
}

// Handle the head node.
if tsoSub(currentTS, l.queue.maxCommitTS) >= expireDuration {
l.queue = nil
}
return
}

func (latches *Latches) recycle(currentTS uint64) {
for i := 0; i < len(latches.slots); i++ {
latch := &latches.slots[i]
latch.recycle(currentTS)
}
}

func findNode(list *node, key []byte) *node {
for n := list; n != nil; n = n.next {
if bytes.Compare(n.key, key) == 0 {
return n
}
}
return nil
}
43 changes: 43 additions & 0 deletions store/tikv/latch/latch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package latch
import (
"sync/atomic"
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/store/tikv/oracle"
)

func TestT(t *testing.T) {
Expand Down Expand Up @@ -104,3 +106,44 @@ func (s *testLatchSuite) TestFirstAcquireFailedWithStale(c *C) {
c.Assert(result, Equals, acquireStale)
s.latches.release(lockB, wakeupList)
}

func (s *testLatchSuite) TestRecycle(c *C) {
latches := NewLatches(8)
now := time.Now()
startTS := oracle.ComposeTS(oracle.GetPhysical(now), 0)
lock := latches.genLock(startTS, [][]byte{
[]byte("a"), []byte("b"),
})
lock1 := latches.genLock(startTS, [][]byte{
[]byte("b"), []byte("c"),
})
c.Assert(latches.acquire(lock), Equals, acquireSuccess)
c.Assert(latches.acquire(lock1), Equals, acquireLocked)
lock.SetCommitTS(startTS + 1)
var wakeupList []*Lock
latches.release(lock, wakeupList)

lock2 := latches.genLock(startTS+3, [][]byte{
[]byte("b"), []byte("c"),
})
c.Assert(latches.acquire(lock2), Equals, acquireSuccess)
wakeupList = wakeupList[:0]
latches.release(lock2, wakeupList)

allEmpty := true
for i := 0; i < len(latches.slots); i++ {
latch := &latches.slots[i]
if latch.queue != nil {
allEmpty = false
}
}
c.Assert(allEmpty, IsFalse)

currentTS := oracle.ComposeTS(oracle.GetPhysical(now.Add(expireDuration)), 3)
latches.recycle(currentTS)

for i := 0; i < len(latches.slots); i++ {
latch := &latches.slots[i]
c.Assert(latch.queue, IsNil)
}
}
33 changes: 30 additions & 3 deletions store/tikv/latch/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,19 @@ package latch

import (
"sync"
"time"

"github.com/pingcap/tidb/store/tikv/oracle"
)

const lockChanSize = 100

// LatchesScheduler is used to schedule latches for transactions.
type LatchesScheduler struct {
latches *Latches
unlockCh chan *Lock
closed bool
latches *Latches
unlockCh chan *Lock
closed bool
lastRecycleTime uint64
sync.RWMutex
}

Expand All @@ -40,13 +44,30 @@ func NewScheduler(size uint) *LatchesScheduler {
return scheduler
}

const checkInterval = 10 * time.Minute
const expireDuration = 2 * time.Hour
const checkCounter = 50000
const latchListCount = 5

func (scheduler *LatchesScheduler) run() {
var counter int
wakeupList := make([]*Lock, 0)
for lock := range scheduler.unlockCh {
wakeupList = scheduler.latches.release(lock, wakeupList)
if len(wakeupList) > 0 {
scheduler.wakeup(wakeupList)
}

if lock.commitTS > lock.startTS {
currentTS := lock.commitTS
elapsed := tsoSub(currentTS, scheduler.lastRecycleTime)
if elapsed > checkInterval && counter > checkCounter {
go scheduler.latches.recycle(lock.commitTS)
scheduler.lastRecycleTime = currentTS
counter = 0
}
}
counter++
}
}

Expand Down Expand Up @@ -91,3 +112,9 @@ func (scheduler *LatchesScheduler) UnLock(lock *Lock) {
scheduler.unlockCh <- lock
}
}

func tsoSub(ts1, ts2 uint64) time.Duration {
t1 := oracle.GetTimeFromTS(ts1)
t2 := oracle.GetTimeFromTS(ts2)
return t1.Sub(t2)
}

0 comments on commit f055cd3

Please sign in to comment.