Skip to content

Commit

Permalink
store/tikv: implement a ttlManager to update the TTL of a transacti…
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao committed Sep 26, 2019
1 parent 0f7905f commit b3f9a73
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 9 deletions.
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,5 +150,6 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVBatchClientUnavailable)
prometheus.MustRegister(TiKVRangeTaskStats)
prometheus.MustRegister(TiKVRangeTaskPushDuration)
prometheus.MustRegister(TiKVTxnHeartBeatHistogram)
prometheus.MustRegister(GRPCConnTransientFailureCounter)
}
9 changes: 9 additions & 0 deletions metrics/tikvclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,13 @@ var (
Buckets: prometheus.ExponentialBuckets(0.001, 2, 20),
Help: "duration to push sub tasks to range task workers",
}, []string{LblType})

TiKVTxnHeartBeatHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "tikvclient",
Name: "txn_heart_beat",
Help: "Bucketed histogram of the txn_heartbeat request duration.",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 18), // 1ms ~ 292s
}, []string{LblType})
)
7 changes: 3 additions & 4 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,14 +365,13 @@ func (s *testPessimisticSuite) TestOptimisticConflicts(c *C) {
syncCh <- struct{}{}
tk.MustQuery("select c from conflict where id = 1").Check(testkit.Rows("3"))

// Check outdated pessimistic lock is resolved.
// Check pessimistic lock is not resolved.
tk.MustExec("begin pessimistic")
tk.MustExec("update conflict set c = 4 where id = 1")
time.Sleep(300 * time.Millisecond)
tk2.MustExec("begin optimistic")
tk2.MustExec("update conflict set c = 5 where id = 1")
tk2.MustExec("commit")
_, err := tk.Exec("commit")
// TODO: ResolveLock block until timeout, takes about 40s, makes CI slow!
_, err := tk2.Exec("commit")
c.Check(err, NotNil)

// Update snapshotTS after a conflict, invalidate snapshot cache.
Expand Down
1 change: 0 additions & 1 deletion store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,6 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error {
}
// If current transaction's lock exists.
if ok && dec.lock.startTS == startTS {

// If the lock has already outdated, clean up it.
if currentTS == 0 || uint64(oracle.ExtractPhysical(dec.lock.startTS))+dec.lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) {
if err = rollbackLock(batch, dec.lock, key, startTS); err != nil {
Expand Down
92 changes: 91 additions & 1 deletion store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/execdetails"
Expand All @@ -52,11 +53,13 @@ const (
var (
tikvSecondaryLockCleanupFailureCounterCommit = metrics.TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("commit")
tikvSecondaryLockCleanupFailureCounterRollback = metrics.TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("rollback")
tiKVTxnHeartBeatHistogramOK = metrics.TiKVTxnHeartBeatHistogram.WithLabelValues("ok")
tiKVTxnHeartBeatHistogramError = metrics.TiKVTxnHeartBeatHistogram.WithLabelValues("err")
)

// Global variable set by config file.
var (
PessimisticLockTTL uint64
PessimisticLockTTL uint64 = 15000 // 15s ~ 40s
)

func (ca twoPhaseCommitAction) String() string {
Expand Down Expand Up @@ -112,6 +115,8 @@ type twoPhaseCommitter struct {
isFirstLock bool
// regionTxnSize stores the number of keys involved in each region
regionTxnSize map[uint64]int
// Used by pessimistic transaction and large transaction.
ttlManager
}

type mutationEx struct {
Expand All @@ -128,6 +133,9 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro
startTS: txn.StartTS(),
connID: connID,
regionTxnSize: map[uint64]int{},
ttlManager: ttlManager{
ch: make(chan struct{}),
},
}, nil
}

Expand Down Expand Up @@ -586,6 +594,10 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
}
keyErrs := prewriteResp.GetErrors()
if len(keyErrs) == 0 {
isPrimary := bytes.Equal(batch.keys[0], c.primary())
if isPrimary && c.isPessimistic {
c.ttlManager.run(c)
}
return nil
}
var locks []*Lock
Expand Down Expand Up @@ -628,6 +640,80 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
}
}

type ttlManagerState uint32

const (
stateUninitialized ttlManagerState = iota
stateRunning
stateClosed
)

type ttlManager struct {
state ttlManagerState
ch chan struct{}
}

func (tm *ttlManager) run(c *twoPhaseCommitter) {
// Run only once.
if !atomic.CompareAndSwapUint32((*uint32)(&tm.state), uint32(stateUninitialized), uint32(stateRunning)) {
return
}
go tm.keepAlive(c)
}

func (tm *ttlManager) close() {
if !atomic.CompareAndSwapUint32((*uint32)(&tm.state), uint32(stateRunning), uint32(stateClosed)) {
return
}
close(tm.ch)
}

func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) {
// Ticker is set to 1/3 of the PessimisticLockTTL.
ticker := time.NewTicker(time.Duration(PessimisticLockTTL) * time.Millisecond / 3)
defer ticker.Stop()
for {
select {
case <-tm.ch:
return
case <-ticker.C:
bo := NewBackoffer(context.Background(), pessimisticLockMaxBackoff)
now, err := c.store.GetOracle().GetTimestamp(bo.ctx)
if err != nil {
err1 := bo.Backoff(BoPDRPC, err)
if err1 != nil {
logutil.BgLogger().Warn("keepAlive get tso fail",
zap.Error(err))
return
}
continue
}

uptime := uint64(oracle.ExtractPhysical(now) - oracle.ExtractPhysical(c.startTS))
const c10min = 10 * 60 * 1000
if uptime > c10min {
// Set a 10min maximum lifetime for the ttlManager, so when something goes wrong
// the key will not be locked forever.
logutil.BgLogger().Info("ttlManager live up to its lifetime",
zap.Uint64("txnStartTS", c.startTS))
return
}

newTTL := uptime + PessimisticLockTTL
startTime := time.Now()
_, err = sendTxnHeartBeat(bo, c.store, c.primary(), c.startTS, newTTL)
if err != nil {
tiKVTxnHeartBeatHistogramError.Observe(time.Since(startTime).Seconds())
logutil.BgLogger().Warn("send TxnHeartBeat failed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
return
}
tiKVTxnHeartBeatHistogramOK.Observe(time.Since(startTime).Seconds())
}
}
}

func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batchKeys) error {
mutations := make([]*pb.Mutation, len(batch.keys))
for i, k := range batch.keys {
Expand Down Expand Up @@ -680,6 +766,10 @@ func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batc
}
keyErrs := lockResp.GetErrors()
if len(keyErrs) == 0 {
isPrimary := bytes.Equal(batch.keys[0], c.primary())
if isPrimary { // No need to check isPessimistic because this function is only called in that case.
c.ttlManager.run(c)
}
return nil
}
var locks []*Lock
Expand Down
27 changes: 24 additions & 3 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
)

Expand Down Expand Up @@ -539,9 +540,29 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) {
lockInfo := s.getLockInfo(c, key)
elapsedTTL := lockInfo.LockTtl - PessimisticLockTTL
c.Assert(elapsedTTL, GreaterEqual, uint64(100))
c.Assert(elapsedTTL, Less, uint64(200))
lockInfo2 := s.getLockInfo(c, key2)
c.Assert(lockInfo2.LockTtl, Equals, lockInfo.LockTtl)

lr := newLockResolver(s.store)
bo := NewBackoffer(context.Background(), getMaxBackoff)
status, err := lr.getTxnStatus(bo, txn.startTS, key2, txn.startTS)
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, lockInfo.LockTtl)

// Check primary lock TTL is auto increasing while the pessimistic txn is ongoing.
for i := 0; i < 50; i++ {
lockInfoNew := s.getLockInfo(c, key)
if lockInfoNew.LockTtl > lockInfo.LockTtl {
currentTS, err := lr.store.GetOracle().GetTimestamp(bo.ctx)
c.Assert(err, IsNil)
// Check that the TTL is update to a reasonable range.
expire := oracle.ExtractPhysical(txn.startTS) + int64(lockInfoNew.LockTtl)
now := oracle.ExtractPhysical(currentTS)
c.Assert(expire > now, IsTrue)
c.Assert(uint64(expire-now) <= PessimisticLockTTL, IsTrue)
return
}
time.Sleep(100 * time.Millisecond)
}
c.Assert(false, IsTrue, Commentf("update pessimistic ttl fail"))
}

func (s *testCommitterSuite) getLockInfo(c *C, key []byte) *kvrpcpb.LockInfo {
Expand Down
2 changes: 2 additions & 0 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ func (txn *tikvTxn) Commit(ctx context.Context) error {
return errors.Trace(err)
}
}
defer committer.ttlManager.close()
if err := committer.initKeysAndMutations(); err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -331,6 +332,7 @@ func (txn *tikvTxn) Rollback() error {
// Clean up pessimistic lock.
if txn.IsPessimistic() && txn.committer != nil {
err := txn.rollbackPessimisticLocks()
txn.committer.ttlManager.close()
if err != nil {
logutil.Logger(context.Background()).Error(err.Error())
}
Expand Down

0 comments on commit b3f9a73

Please sign in to comment.