diff --git a/go.mod b/go.mod index a9ecde49c5771..69cd503008104 100644 --- a/go.mod +++ b/go.mod @@ -42,9 +42,9 @@ require ( github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e github.com/pingcap/kvproto v0.0.0-20190918085321-44e3817e1f18 - github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 + github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd github.com/pingcap/parser v0.0.0-20190910041007-2a177b291004 - github.com/pingcap/pd v0.0.0-20190711034019-ee98bf9063e9 + github.com/pingcap/pd v1.1.0-beta.0.20190912093418-dc03c839debd github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330 github.com/prometheus/client_golang v0.9.0 @@ -67,8 +67,10 @@ require ( go.etcd.io/bbolt v1.3.3 // indirect go.uber.org/atomic v1.3.2 go.uber.org/zap v1.9.1 - golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e - golang.org/x/sys v0.0.0-20190109145017-48ac38b7c8cb // indirect + golang.org/x/crypto v0.0.0-20190909091759-094676da4a83 // indirect + golang.org/x/net v0.0.0-20190909003024-a7b16738d86b + golang.org/x/sync v0.0.0-20190423024810-112230192c58 // indirect + golang.org/x/sys v0.0.0-20190909082730-f460065e899a // indirect golang.org/x/text v0.3.0 golang.org/x/tools v0.0.0-20190130214255-bb1329dc71a0 google.golang.org/genproto v0.0.0-20190108161440-ae2f86662275 // indirect diff --git a/go.sum b/go.sum index 32e61f0eea591..7e712396dc556 100644 --- a/go.sum +++ b/go.sum @@ -153,7 +153,6 @@ github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8 h1:USx2/E1bX46VG32FI github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ= github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9 h1:KH4f4Si9XK6/IW50HtoaiLIFHGkapOM6w83za47UYik= github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9/go.mod h1:4b2X8xSqxIroj/IZ9MX/VGZhAwc11wB9wRIzHvz6SeM= -github.com/pingcap/errors v0.10.1/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= @@ -164,13 +163,12 @@ github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17Xtb github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= github.com/pingcap/kvproto v0.0.0-20190918085321-44e3817e1f18 h1:5vQV8S/8B9nE+I+0Me6vZGyASeXl/QymwqtaOL5e5ZA= github.com/pingcap/kvproto v0.0.0-20190918085321-44e3817e1f18/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= -github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w= -github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ= -github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= +github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg= +github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= github.com/pingcap/parser v0.0.0-20190910041007-2a177b291004 h1:LaA55frHvXh8vTYcQj0xNsQiiPb8iU/JcU8cc2HA9Jg= github.com/pingcap/parser v0.0.0-20190910041007-2a177b291004/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA= -github.com/pingcap/pd v0.0.0-20190711034019-ee98bf9063e9 h1:sqqiviE8oEYXJh3Aq59HO/AhxjsvcRb9ETh0ivFOHXc= -github.com/pingcap/pd v0.0.0-20190711034019-ee98bf9063e9/go.mod h1:3DlDlFT7EF64A1bmb/tulZb6wbPSagm5G4p1AlhaEDs= +github.com/pingcap/pd v1.1.0-beta.0.20190912093418-dc03c839debd h1:bKj6hodu/ro78B0oN2yicdGn0t4yd9XjnyoW95qmWic= +github.com/pingcap/pd v1.1.0-beta.0.20190912093418-dc03c839debd/go.mod h1:I7TEby5BHTYIxgHszfsOJSBsk8b2Qt8QrSIgdv5n5QQ= github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible h1:MkWCxgZpJBgY2f4HtwWMMFzSBb3+JPzeJgF3VrXE/bU= github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330 h1:rRMLMjIMFulCX9sGKZ1hoov/iROMsKyC8Snc02nSukw= @@ -261,23 +259,31 @@ go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20180608092829-8ac0e0d97ce4/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190909091759-094676da4a83 h1:mgAKeshyNqWKdENOnQsg+8dRTwZFIwFaO3HNl52sweA= +golang.org/x/crypto v0.0.0-20190909091759-094676da4a83/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181005035420-146acd28ed58/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181106065722-10aee1819953/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e h1:bRhVy7zSSasaqNksaRZiA5EEI+Ei4I1nO5Jh72wfHlg= -golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190909003024-a7b16738d86b h1:XfVGCX+0T4WOStkaOsJRllbsiImhB2jgVBGc9L0lPGc= +golang.org/x/net v0.0.0-20190909003024-a7b16738d86b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190109145017-48ac38b7c8cb h1:1w588/yEchbPNpa9sEvOcMZYbWHedwJjg4VOAdDHWHk= -golang.org/x/sys v0.0.0-20190109145017-48ac38b7c8cb/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190909082730-f460065e899a h1:mIzbOulag9/gXacgxKlFVwpCOWSfBT3/pDyyCwGA9as= +golang.org/x/sys v0.0.0-20190909082730-f460065e899a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 h1:+DCIGbF/swA92ohVg0//6X2IVY3KZs6p9mix0ziNYJM= diff --git a/store/mockstore/mocktikv/mock_tikv_test.go b/store/mockstore/mocktikv/mock_tikv_test.go index b76a5a11ed3d9..4664dda4d0251 100644 --- a/store/mockstore/mocktikv/mock_tikv_test.go +++ b/store/mockstore/mocktikv/mock_tikv_test.go @@ -157,6 +157,10 @@ func (s *testMockTiKVSuite) mustRangeReverseScanOK(c *C, start, end string, limi } func (s *testMockTiKVSuite) mustPrewriteOK(c *C, mutations []*kvrpcpb.Mutation, primary string, startTS uint64) { + s.mustPrewriteWithTTLOK(c, mutations, primary, startTS, 0) +} + +func (s *testMockTiKVSuite) mustPrewriteWithTTLOK(c *C, mutations []*kvrpcpb.Mutation, primary string, startTS uint64, ttl uint64) { req := &kvrpcpb.PrewriteRequest{ Mutations: mutations, PrimaryLock: []byte(primary), @@ -595,3 +599,22 @@ func (s testMarshal) TestMarshalmvccValue(c *C) { c.Assert(v.commitTS, Equals, v1.commitTS) c.Assert(string(v.value), Equals, string(v.value)) } + +func (s *testMVCCLevelDB) TestTxnHeartBeat(c *C) { + s.mustPrewriteWithTTLOK(c, putMutations("pk", "val"), "pk", 5, 666) + + // Update the ttl + ttl, err := s.store.TxnHeartBeat([]byte("pk"), 5, 888) + c.Assert(err, IsNil) + c.Assert(ttl, Greater, uint64(666)) + + // Advise ttl is small + ttl, err = s.store.TxnHeartBeat([]byte("pk"), 5, 300) + c.Assert(err, IsNil) + c.Assert(ttl, Greater, uint64(300)) + + // The lock has already been clean up + c.Assert(s.store.Cleanup([]byte("pk"), 5), IsNil) + _, err = s.store.TxnHeartBeat([]byte("pk"), 5, 1000) + c.Assert(err, NotNil) +} diff --git a/store/mockstore/mocktikv/mvcc.go b/store/mockstore/mocktikv/mvcc.go index 576bfc4b744c6..912be1819c227 100644 --- a/store/mockstore/mocktikv/mvcc.go +++ b/store/mockstore/mocktikv/mvcc.go @@ -442,6 +442,7 @@ type MVCCStore interface { Rollback(keys [][]byte, startTS uint64) error Cleanup(key []byte, startTS uint64) error ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvrpcpb.LockInfo, error) + TxnHeartBeat(primaryKey []byte, startTS uint64, adviseTTL uint64) (uint64, error) ResolveLock(startKey, endKey []byte, startTS, commitTS uint64) error BatchResolveLock(startKey, endKey []byte, txnInfos map[uint64]uint64) error DeleteRange(startKey, endKey []byte) error diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index be550fcb8b4a0..b6ef0424b9226 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -969,6 +969,51 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS uint64) error { return mvcc.db.Write(batch, nil) } +// TxnHeartBeat implements the MVCCStore interface. +func (mvcc *MVCCLevelDB) TxnHeartBeat(key []byte, startTS uint64, adviseTTL uint64) (uint64, error) { + mvcc.mu.Lock() + defer mvcc.mu.Unlock() + + startKey := mvccEncode(key, lockVer) + iter := newIterator(mvcc.db, &util.Range{ + Start: startKey, + }) + defer iter.Release() + + if iter.Valid() { + dec := lockDecoder{ + expectKey: key, + } + ok, err := dec.Decode(iter) + if err != nil { + return 0, errors.Trace(err) + } + if ok && dec.lock.startTS == startTS { + if !bytes.Equal(dec.lock.primary, key) { + return 0, errors.New("txnHeartBeat on non-primary key, the code should not run here") + } + + lock := dec.lock + batch := &leveldb.Batch{} + // Increase the ttl of this transaction. + if adviseTTL > lock.ttl { + lock.ttl = adviseTTL + writeKey := mvccEncode(key, lockVer) + writeValue, err := lock.MarshalBinary() + if err != nil { + return 0, errors.Trace(err) + } + batch.Put(writeKey, writeValue) + if err = mvcc.db.Write(batch, nil); err != nil { + return 0, errors.Trace(err) + } + } + return lock.ttl, nil + } + } + return 0, errors.New("lock doesn't exist") +} + // ScanLock implements the MVCCStore interface. func (mvcc *MVCCLevelDB) ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvrpcpb.LockInfo, error) { mvcc.mu.RLock() diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index 10e76c6cbf16f..b07c3c36a1f0d 100755 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -368,6 +368,19 @@ func (h *rpcHandler) handleKvCleanup(req *kvrpcpb.CleanupRequest) *kvrpcpb.Clean return &resp } +func (h *rpcHandler) handleTxnHeartBeat(req *kvrpcpb.TxnHeartBeatRequest) *kvrpcpb.TxnHeartBeatResponse { + if !h.checkKeyInRegion(req.PrimaryLock) { + panic("KvTxnHeartBeat: key not in region") + } + var resp kvrpcpb.TxnHeartBeatResponse + ttl, err := h.mvccStore.TxnHeartBeat(req.PrimaryLock, req.StartVersion, req.AdviseLockTtl) + if err != nil { + resp.Error = convertToKeyError(err) + } + resp.LockTtl = ttl + return &resp +} + func (h *rpcHandler) handleKvBatchGet(req *kvrpcpb.BatchGetRequest) *kvrpcpb.BatchGetResponse { for _, k := range req.Keys { if !h.checkKeyInRegion(k) { @@ -764,6 +777,13 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R return resp, nil } resp.Cleanup = handler.handleKvCleanup(r) + case tikvrpc.CmdTxnHeartBeat: + r := req.TxnHeartBeat + if err := handler.checkRequest(reqCtx, r.Size()); err != nil { + resp.TxnHeartBeat = &kvrpcpb.TxnHeartBeatResponse{RegionError: err} + return resp, nil + } + resp.TxnHeartBeat = handler.handleTxnHeartBeat(r) case tikvrpc.CmdBatchGet: r := req.BatchGet if err := handler.checkRequest(reqCtx, r.Size()); err != nil { diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index e5a08af5a98c6..1af2cbd45865a 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -131,6 +131,46 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro }, nil } +func sendTxnHeartBeat(bo *Backoffer, store *tikvStore, primary []byte, startTS, ttl uint64) (uint64, error) { + req := &tikvrpc.Request{ + Type: tikvrpc.CmdTxnHeartBeat, + TxnHeartBeat: &pb.TxnHeartBeatRequest{ + PrimaryLock: primary, + StartVersion: startTS, + AdviseLockTtl: ttl, + }, + } + for { + loc, err := store.GetRegionCache().LocateKey(bo, primary) + if err != nil { + return 0, errors.Trace(err) + } + resp, err := store.SendReq(bo, req, loc.Region, readTimeoutShort) + if err != nil { + return 0, errors.Trace(err) + } + regionErr, err := resp.GetRegionError() + if err != nil { + return 0, errors.Trace(err) + } + if regionErr != nil { + err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) + if err != nil { + return 0, errors.Trace(err) + } + continue + } + if resp.TxnHeartBeat == nil { + return 0, errors.Trace(ErrBodyMissing) + } + cmdResp := resp.TxnHeartBeat + if keyErr := cmdResp.GetError(); keyErr != nil { + return 0, errors.Errorf("txn %d heartbeat fail, primary key = %v, err = %s", startTS, primary, keyErr.Abort) + } + return cmdResp.GetLockTtl(), nil + } +} + func (c *twoPhaseCommitter) initKeysAndMutations() error { var ( keys [][]byte diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index 3b9ed28f3fded..3861e3ed3141d 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -202,6 +202,31 @@ func (s *testLockSuite) TestGetTxnStatus(c *C) { c.Assert(status.IsCommitted(), IsFalse) } +func (s *testLockSuite) TestTxnHeartBeat(c *C) { + txn, err := s.store.Begin() + c.Assert(err, IsNil) + txn.Set(kv.Key("key"), []byte("value")) + s.prewriteTxn(c, txn.(*tikvTxn)) + + bo := NewBackoffer(context.Background(), prewriteMaxBackoff) + newTTL, err := sendTxnHeartBeat(bo, s.store, []byte("key"), txn.StartTS(), 666) + c.Assert(err, IsNil) + c.Assert(newTTL, Equals, uint64(666)) + + newTTL, err = sendTxnHeartBeat(bo, s.store, []byte("key"), txn.StartTS(), 555) + c.Assert(err, IsNil) + c.Assert(newTTL, Equals, uint64(666)) + + // The getTxnStatus API is confusing, it really means rollback! + status, err := newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key")) + c.Assert(err, IsNil) + c.Assert(status, Equals, TxnStatus(0)) + + newTTL, err = sendTxnHeartBeat(bo, s.store, []byte("key"), txn.StartTS(), 666) + c.Assert(err, NotNil) + c.Assert(newTTL, Equals, uint64(0)) +} + func (s *testLockSuite) prewriteTxn(c *C, txn *tikvTxn) { committer, err := newTwoPhaseCommitterWithInit(txn, 0) c.Assert(err, IsNil) diff --git a/store/tikv/tikvrpc/tikvrpc.go b/store/tikv/tikvrpc/tikvrpc.go index b3cf78ee5e82e..1d88e7b25bcb3 100644 --- a/store/tikv/tikvrpc/tikvrpc.go +++ b/store/tikv/tikvrpc/tikvrpc.go @@ -46,6 +46,7 @@ const ( CmdDeleteRange CmdPessimisticLock CmdPessimisticRollback + CmdTxnHeartBeat CmdRawGet CmdType = 256 + iota CmdRawBatchGet @@ -128,6 +129,8 @@ func (t CmdType) String() string { return "SplitRegion" case CmdDebugGetRegionProperties: return "DebugGetRegionProperties" + case CmdTxnHeartBeat: + return "TxnHeartBeat" } return "Unknown" } @@ -166,7 +169,8 @@ type Request struct { DebugGetRegionProperties *debugpb.GetRegionPropertiesRequest - Empty *tikvpb.BatchCommandsEmptyRequest + Empty *tikvpb.BatchCommandsEmptyRequest + TxnHeartBeat *kvrpcpb.TxnHeartBeatRequest } // ToBatchCommandsRequest converts the request to an entry in BatchCommands request. @@ -218,6 +222,8 @@ func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Reques return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_PessimisticRollback{PessimisticRollback: req.PessimisticRollback}} case CmdEmpty: return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_Empty{Empty: req.Empty}} + case CmdTxnHeartBeat: + return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_TxnHeartBeat{TxnHeartBeat: req.TxnHeartBeat}} } return nil } @@ -265,7 +271,8 @@ type Response struct { DebugGetRegionProperties *debugpb.GetRegionPropertiesResponse - Empty *tikvpb.BatchCommandsEmptyResponse + Empty *tikvpb.BatchCommandsEmptyResponse + TxnHeartBeat *kvrpcpb.TxnHeartBeatResponse } // FromBatchCommandsResponse converts a BatchCommands response to Response. @@ -317,6 +324,8 @@ func FromBatchCommandsResponse(res *tikvpb.BatchCommandsResponse_Response) *Resp return &Response{Type: CmdPessimisticRollback, PessimisticRollback: res.PessimisticRollback} case *tikvpb.BatchCommandsResponse_Response_Empty: return &Response{Type: CmdEmpty, Empty: res.Empty} + case *tikvpb.BatchCommandsResponse_Response_TxnHeartBeat: + return &Response{Type: CmdTxnHeartBeat, TxnHeartBeat: res.TxnHeartBeat} } return nil } @@ -394,6 +403,8 @@ func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error { case CmdSplitRegion: req.SplitRegion.Context = ctx case CmdEmpty: + case CmdTxnHeartBeat: + req.TxnHeartBeat.Context = ctx default: return fmt.Errorf("invalid request type %v", req.Type) } @@ -517,6 +528,10 @@ func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) { RegionError: e, } case CmdEmpty: + case CmdTxnHeartBeat: + resp.TxnHeartBeat = &kvrpcpb.TxnHeartBeatResponse{ + RegionError: e, + } default: return nil, fmt.Errorf("invalid request type %v", req.Type) } @@ -582,6 +597,8 @@ func (resp *Response) GetRegionError() (*errorpb.Error, error) { case CmdSplitRegion: e = resp.SplitRegion.GetRegionError() case CmdEmpty: + case CmdTxnHeartBeat: + e = resp.TxnHeartBeat.GetRegionError() default: return nil, fmt.Errorf("invalid response type %v", resp.Type) } @@ -656,6 +673,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp resp.SplitRegion, err = client.SplitRegion(ctx, req.SplitRegion) case CmdEmpty: resp.Empty, err = &tikvpb.BatchCommandsEmptyResponse{}, nil + case CmdTxnHeartBeat: + resp.TxnHeartBeat, err = client.KvTxnHeartBeat(ctx, req.TxnHeartBeat) default: return nil, errors.Errorf("invalid request type: %v", req.Type) }