Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: implement a ttlManager to update the TTL of a transaction #12177

Merged
merged 41 commits into from
Sep 25, 2019
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
4ce8dba
*: implement the TxnHeartBeat API for the large transaction
tiancaiamao Sep 2, 2019
e736d97
fix CI
tiancaiamao Sep 2, 2019
1ff204f
make golint happy
tiancaiamao Sep 2, 2019
c07e771
Merge branch 'master' into txn-heart-beat
tiancaiamao Sep 2, 2019
0943045
using Go1.12
tiancaiamao Sep 2, 2019
7ef4464
make golint happy
tiancaiamao Sep 2, 2019
03a2432
address comment
tiancaiamao Sep 3, 2019
f1e97df
address comment
tiancaiamao Sep 3, 2019
d49ebfc
fix CI
tiancaiamao Sep 3, 2019
d3c3178
Merge branch 'master' into txn-heart-beat
tiancaiamao Sep 5, 2019
3cfe3ea
Merge branch 'master' into txn-heart-beat
tiancaiamao Sep 5, 2019
dbe8d34
update go.mod
tiancaiamao Sep 5, 2019
a7d9bda
address comment
tiancaiamao Sep 5, 2019
5938ccf
go mod tidy
tiancaiamao Sep 5, 2019
fcffd2b
Merge branch 'master' into txn-heart-beat
tiancaiamao Sep 5, 2019
491fa33
Merge branch 'master' into txn-heart-beat
tiancaiamao Sep 5, 2019
2e36825
address comment
tiancaiamao Sep 5, 2019
c05f9cb
fix integration test
tiancaiamao Sep 5, 2019
641f514
Merge branch 'master' into ttl-keep-alive
tiancaiamao Sep 6, 2019
6401f80
store/tikv: implement a `ttlManager` to update the TTL of a transaction
tiancaiamao Sep 8, 2019
8522359
Merge branch 'master' into ttl-keep-alive
tiancaiamao Sep 12, 2019
84525f4
update test
tiancaiamao Sep 12, 2019
8f423ee
go mod tidy
tiancaiamao Sep 12, 2019
953dce6
fix data race
tiancaiamao Sep 12, 2019
5dab766
go mod tidy
tiancaiamao Sep 12, 2019
0b29c2b
address comment
tiancaiamao Sep 16, 2019
b2d7864
address comment
tiancaiamao Sep 16, 2019
f2f8e6d
clean up unused code
tiancaiamao Sep 16, 2019
bd5978d
fix ci
tiancaiamao Sep 16, 2019
9f5e3aa
address comment
tiancaiamao Sep 16, 2019
60be7b8
address comment
tiancaiamao Sep 16, 2019
e400c4d
init PessimisticLockTTL in store/tikv package
tiancaiamao Sep 16, 2019
312cb24
Merge branch 'master' into ttl-keep-alive
tiancaiamao Sep 23, 2019
2552971
tiny update
tiancaiamao Sep 23, 2019
012157f
address comment
tiancaiamao Sep 24, 2019
f50f093
address comment
tiancaiamao Sep 24, 2019
657360a
address comment
tiancaiamao Sep 24, 2019
786e65d
address comment
tiancaiamao Sep 25, 2019
ef03cee
address comment
tiancaiamao Sep 25, 2019
433b3aa
Merge branch 'master' into ttl-keep-alive
tiancaiamao Sep 25, 2019
dbfd099
Merge branch 'master' into ttl-keep-alive
tiancaiamao Sep 25, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,5 +153,6 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVRangeTaskStats)
prometheus.MustRegister(TiKVRangeTaskPushDuration)
prometheus.MustRegister(TiKVTokenWaitDuration)
prometheus.MustRegister(TiKVTxnHeartBeatHistogram)
prometheus.MustRegister(GRPCConnTransientFailureCounter)
}
9 changes: 9 additions & 0 deletions metrics/tikvclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,13 @@ var (
Buckets: prometheus.ExponentialBuckets(1, 2, 30), // 1ns ~ 1s
Help: "tidb txn token wait duration to process batches",
})

TiKVTxnHeartBeatHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "tikvclient",
Name: "txn_heart_beat",
Help: "Bucketed histogram of the txn_heartbeat request duration.",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 18), // 1ms ~ 292s
}, []string{LblType})
)
7 changes: 3 additions & 4 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,14 +365,13 @@ func (s *testPessimisticSuite) TestOptimisticConflicts(c *C) {
syncCh <- struct{}{}
tk.MustQuery("select c from conflict where id = 1").Check(testkit.Rows("3"))

// Check outdated pessimistic lock is resolved.
// Check pessimistic lock is not resolved.
tk.MustExec("begin pessimistic")
tk.MustExec("update conflict set c = 4 where id = 1")
time.Sleep(300 * time.Millisecond)
tk2.MustExec("begin optimistic")
tk2.MustExec("update conflict set c = 5 where id = 1")
tk2.MustExec("commit")
_, err := tk.Exec("commit")
// TODO: ResolveLock block until timeout, takes about 40s, makes CI slow!
_, err := tk2.Exec("commit")
c.Check(err, NotNil)

// Update snapshotTS after a conflict, invalidate snapshot cache.
Expand Down
1 change: 0 additions & 1 deletion store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -957,7 +957,6 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error {
}
// If current transaction's lock exists.
if ok && dec.lock.startTS == startTS {

// If the lock has already outdated, clean up it.
if currentTS == 0 || uint64(oracle.ExtractPhysical(dec.lock.startTS))+dec.lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) {
if err = rollbackLock(batch, dec.lock, key, startTS); err != nil {
Expand Down
92 changes: 91 additions & 1 deletion store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/execdetails"
Expand All @@ -52,11 +53,13 @@ const (
var (
tikvSecondaryLockCleanupFailureCounterCommit = metrics.TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("commit")
tikvSecondaryLockCleanupFailureCounterRollback = metrics.TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("rollback")
tiKVTxnHeartBeatHistogramOK = metrics.TiKVTxnHeartBeatHistogram.WithLabelValues("ok")
tiKVTxnHeartBeatHistogramError = metrics.TiKVTxnHeartBeatHistogram.WithLabelValues("err")
)

// Global variable set by config file.
var (
PessimisticLockTTL uint64
PessimisticLockTTL uint64 = 15000 // 15s ~ 40s
)

func (ca twoPhaseCommitAction) String() string {
Expand Down Expand Up @@ -113,6 +116,8 @@ type twoPhaseCommitter struct {
isFirstLock bool
// regionTxnSize stores the number of keys involved in each region
regionTxnSize map[uint64]int
// Used by pessimistic transaction and large transaction.
ttlManager
}

// batchExecutor is txn controller providing rate control like utils
Expand Down Expand Up @@ -142,6 +147,9 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro
startTS: txn.StartTS(),
connID: connID,
regionTxnSize: map[uint64]int{},
ttlManager: ttlManager{
ch: make(chan struct{}),
},
}, nil
}

Expand Down Expand Up @@ -542,6 +550,10 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
prewriteResp := resp.Resp.(*pb.PrewriteResponse)
keyErrs := prewriteResp.GetErrors()
if len(keyErrs) == 0 {
isPrimary := bytes.Equal(batch.keys[0], c.primary())
if isPrimary && c.isPessimistic {
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
c.ttlManager.run(c)
}
return nil
}
var locks []*Lock
Expand Down Expand Up @@ -581,6 +593,80 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
}
}

type ttlManagerState uint32

const (
stateUninitialized ttlManagerState = iota
stateRunning
stateClosed
)

type ttlManager struct {
state ttlManagerState
ch chan struct{}
}

func (tm *ttlManager) run(c *twoPhaseCommitter) {
// Run only once.
if !atomic.CompareAndSwapUint32((*uint32)(&tm.state), uint32(stateUninitialized), uint32(stateRunning)) {
return
}
go tm.keepAlive(c)
}

func (tm *ttlManager) close() {
if !atomic.CompareAndSwapUint32((*uint32)(&tm.state), uint32(stateRunning), uint32(stateClosed)) {
return
}
close(tm.ch)
}

func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) {
// Ticker is set to 1/3 of the PessimisticLockTTL.
ticker := time.NewTicker(time.Duration(PessimisticLockTTL) * time.Millisecond / 3)
defer ticker.Stop()
for {
select {
case <-tm.ch:
return
case <-ticker.C:
bo := NewBackoffer(context.Background(), pessimisticLockMaxBackoff)
now, err := c.store.GetOracle().GetTimestamp(bo.ctx)
if err != nil {
err1 := bo.Backoff(BoPDRPC, err)
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
if err1 != nil {
logutil.BgLogger().Warn("keepAlive get tso fail",
zap.Error(err))
return
}
continue
}

uptime := uint64(oracle.ExtractPhysical(now) - oracle.ExtractPhysical(c.startTS))
const c10min = 10 * 60 * 1000
if uptime > c10min {
// Set a 10min maximum lifetime for the ttlManager, so when something goes wrong
// the key will not be locked forever.
logutil.BgLogger().Info("ttlManager live up to its lifetime",
zap.Uint64("txnStartTS", c.startTS))
return
}

newTTL := uptime + PessimisticLockTTL
startTime := time.Now()
_, err = sendTxnHeartBeat(bo, c.store, c.primary(), c.startTS, newTTL)
lysu marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
tiKVTxnHeartBeatHistogramError.Observe(time.Since(startTime).Seconds())
logutil.BgLogger().Warn("send TxnHeartBeat failed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
return
}
tiKVTxnHeartBeatHistogramOK.Observe(time.Since(startTime).Seconds())
}
}
}

func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batchKeys) error {
mutations := make([]*pb.Mutation, len(batch.keys))
for i, k := range batch.keys {
Expand Down Expand Up @@ -626,6 +712,10 @@ func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batc
lockResp := resp.Resp.(*pb.PessimisticLockResponse)
keyErrs := lockResp.GetErrors()
if len(keyErrs) == 0 {
isPrimary := bytes.Equal(batch.keys[0], c.primary())
if isPrimary { // No need to check isPessimistic because this function is only called in that case.
c.ttlManager.run(c)
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}
var locks []*Lock
Expand Down
27 changes: 24 additions & 3 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
)

Expand Down Expand Up @@ -538,9 +539,29 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) {
lockInfo := s.getLockInfo(c, key)
elapsedTTL := lockInfo.LockTtl - PessimisticLockTTL
c.Assert(elapsedTTL, GreaterEqual, uint64(100))
c.Assert(elapsedTTL, Less, uint64(200))
lockInfo2 := s.getLockInfo(c, key2)
c.Assert(lockInfo2.LockTtl, Equals, lockInfo.LockTtl)

lr := newLockResolver(s.store)
bo := NewBackoffer(context.Background(), getMaxBackoff)
status, err := lr.getTxnStatus(bo, txn.startTS, key2, txn.startTS)
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, lockInfo.LockTtl)

// Check primary lock TTL is auto increasing while the pessimistic txn is ongoing.
for i := 0; i < 50; i++ {
lockInfoNew := s.getLockInfo(c, key)
if lockInfoNew.LockTtl > lockInfo.LockTtl {
currentTS, err := lr.store.GetOracle().GetTimestamp(bo.ctx)
c.Assert(err, IsNil)
// Check that the TTL is update to a reasonable range.
expire := oracle.ExtractPhysical(txn.startTS) + int64(lockInfoNew.LockTtl)
now := oracle.ExtractPhysical(currentTS)
c.Assert(expire > now, IsTrue)
c.Assert(uint64(expire-now) <= PessimisticLockTTL, IsTrue)
return
}
time.Sleep(100 * time.Millisecond)
}
c.Assert(false, IsTrue, Commentf("update pessimistic ttl fail"))
}

func (s *testCommitterSuite) getLockInfo(c *C, key []byte) *kvrpcpb.LockInfo {
Expand Down
2 changes: 2 additions & 0 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ func (txn *tikvTxn) Commit(ctx context.Context) error {
return errors.Trace(err)
}
}
defer committer.ttlManager.close()
if err := committer.initKeysAndMutations(); err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -359,6 +360,7 @@ func (txn *tikvTxn) Rollback() error {
// Clean up pessimistic lock.
if txn.IsPessimistic() && txn.committer != nil {
err := txn.rollbackPessimisticLocks()
txn.committer.ttlManager.close()
lysu marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
logutil.BgLogger().Error(err.Error())
}
Expand Down