Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: implement the TxnHeartBeat API for the large transaction (#11979) #12397

Merged
merged 4 commits into from
Sep 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ require (
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20190918085321-44e3817e1f18
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd
github.com/pingcap/parser v0.0.0-20190910041007-2a177b291004
github.com/pingcap/pd v0.0.0-20190711034019-ee98bf9063e9
github.com/pingcap/pd v1.1.0-beta.0.20190912093418-dc03c839debd
github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible
github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330
github.com/prometheus/client_golang v0.9.0
Expand All @@ -67,8 +67,10 @@ require (
go.etcd.io/bbolt v1.3.3 // indirect
go.uber.org/atomic v1.3.2
go.uber.org/zap v1.9.1
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e
golang.org/x/sys v0.0.0-20190109145017-48ac38b7c8cb // indirect
golang.org/x/crypto v0.0.0-20190909091759-094676da4a83 // indirect
golang.org/x/net v0.0.0-20190909003024-a7b16738d86b
golang.org/x/sync v0.0.0-20190423024810-112230192c58 // indirect
golang.org/x/sys v0.0.0-20190909082730-f460065e899a // indirect
golang.org/x/text v0.3.0
golang.org/x/tools v0.0.0-20190130214255-bb1329dc71a0
google.golang.org/genproto v0.0.0-20190108161440-ae2f86662275 // indirect
Expand Down
26 changes: 16 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8 h1:USx2/E1bX46VG32FI
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ=
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9 h1:KH4f4Si9XK6/IW50HtoaiLIFHGkapOM6w83za47UYik=
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9/go.mod h1:4b2X8xSqxIroj/IZ9MX/VGZhAwc11wB9wRIzHvz6SeM=
github.com/pingcap/errors v0.10.1/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
Expand All @@ -164,13 +163,12 @@ github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17Xtb
github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190918085321-44e3817e1f18 h1:5vQV8S/8B9nE+I+0Me6vZGyASeXl/QymwqtaOL5e5ZA=
github.com/pingcap/kvproto v0.0.0-20190918085321-44e3817e1f18/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/parser v0.0.0-20190910041007-2a177b291004 h1:LaA55frHvXh8vTYcQj0xNsQiiPb8iU/JcU8cc2HA9Jg=
github.com/pingcap/parser v0.0.0-20190910041007-2a177b291004/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/pd v0.0.0-20190711034019-ee98bf9063e9 h1:sqqiviE8oEYXJh3Aq59HO/AhxjsvcRb9ETh0ivFOHXc=
github.com/pingcap/pd v0.0.0-20190711034019-ee98bf9063e9/go.mod h1:3DlDlFT7EF64A1bmb/tulZb6wbPSagm5G4p1AlhaEDs=
github.com/pingcap/pd v1.1.0-beta.0.20190912093418-dc03c839debd h1:bKj6hodu/ro78B0oN2yicdGn0t4yd9XjnyoW95qmWic=
github.com/pingcap/pd v1.1.0-beta.0.20190912093418-dc03c839debd/go.mod h1:I7TEby5BHTYIxgHszfsOJSBsk8b2Qt8QrSIgdv5n5QQ=
github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible h1:MkWCxgZpJBgY2f4HtwWMMFzSBb3+JPzeJgF3VrXE/bU=
github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330 h1:rRMLMjIMFulCX9sGKZ1hoov/iROMsKyC8Snc02nSukw=
Expand Down Expand Up @@ -261,23 +259,31 @@ go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20180608092829-8ac0e0d97ce4/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190909091759-094676da4a83 h1:mgAKeshyNqWKdENOnQsg+8dRTwZFIwFaO3HNl52sweA=
golang.org/x/crypto v0.0.0-20190909091759-094676da4a83/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181005035420-146acd28ed58/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181106065722-10aee1819953/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e h1:bRhVy7zSSasaqNksaRZiA5EEI+Ei4I1nO5Jh72wfHlg=
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190909003024-a7b16738d86b h1:XfVGCX+0T4WOStkaOsJRllbsiImhB2jgVBGc9L0lPGc=
golang.org/x/net v0.0.0-20190909003024-a7b16738d86b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190109145017-48ac38b7c8cb h1:1w588/yEchbPNpa9sEvOcMZYbWHedwJjg4VOAdDHWHk=
golang.org/x/sys v0.0.0-20190109145017-48ac38b7c8cb/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190909082730-f460065e899a h1:mIzbOulag9/gXacgxKlFVwpCOWSfBT3/pDyyCwGA9as=
golang.org/x/sys v0.0.0-20190909082730-f460065e899a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 h1:+DCIGbF/swA92ohVg0//6X2IVY3KZs6p9mix0ziNYJM=
Expand Down
23 changes: 23 additions & 0 deletions store/mockstore/mocktikv/mock_tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ func (s *testMockTiKVSuite) mustRangeReverseScanOK(c *C, start, end string, limi
}

func (s *testMockTiKVSuite) mustPrewriteOK(c *C, mutations []*kvrpcpb.Mutation, primary string, startTS uint64) {
s.mustPrewriteWithTTLOK(c, mutations, primary, startTS, 0)
}

func (s *testMockTiKVSuite) mustPrewriteWithTTLOK(c *C, mutations []*kvrpcpb.Mutation, primary string, startTS uint64, ttl uint64) {
req := &kvrpcpb.PrewriteRequest{
Mutations: mutations,
PrimaryLock: []byte(primary),
Expand Down Expand Up @@ -595,3 +599,22 @@ func (s testMarshal) TestMarshalmvccValue(c *C) {
c.Assert(v.commitTS, Equals, v1.commitTS)
c.Assert(string(v.value), Equals, string(v.value))
}

func (s *testMVCCLevelDB) TestTxnHeartBeat(c *C) {
s.mustPrewriteWithTTLOK(c, putMutations("pk", "val"), "pk", 5, 666)

// Update the ttl
ttl, err := s.store.TxnHeartBeat([]byte("pk"), 5, 888)
c.Assert(err, IsNil)
c.Assert(ttl, Greater, uint64(666))

// Advise ttl is small
ttl, err = s.store.TxnHeartBeat([]byte("pk"), 5, 300)
c.Assert(err, IsNil)
c.Assert(ttl, Greater, uint64(300))

// The lock has already been clean up
c.Assert(s.store.Cleanup([]byte("pk"), 5), IsNil)
_, err = s.store.TxnHeartBeat([]byte("pk"), 5, 1000)
c.Assert(err, NotNil)
}
1 change: 1 addition & 0 deletions store/mockstore/mocktikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ type MVCCStore interface {
Rollback(keys [][]byte, startTS uint64) error
Cleanup(key []byte, startTS uint64) error
ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvrpcpb.LockInfo, error)
TxnHeartBeat(primaryKey []byte, startTS uint64, adviseTTL uint64) (uint64, error)
ResolveLock(startKey, endKey []byte, startTS, commitTS uint64) error
BatchResolveLock(startKey, endKey []byte, txnInfos map[uint64]uint64) error
DeleteRange(startKey, endKey []byte) error
Expand Down
45 changes: 45 additions & 0 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,51 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS uint64) error {
return mvcc.db.Write(batch, nil)
}

// TxnHeartBeat implements the MVCCStore interface.
func (mvcc *MVCCLevelDB) TxnHeartBeat(key []byte, startTS uint64, adviseTTL uint64) (uint64, error) {
mvcc.mu.Lock()
defer mvcc.mu.Unlock()

startKey := mvccEncode(key, lockVer)
iter := newIterator(mvcc.db, &util.Range{
Start: startKey,
})
defer iter.Release()

if iter.Valid() {
dec := lockDecoder{
expectKey: key,
}
ok, err := dec.Decode(iter)
if err != nil {
return 0, errors.Trace(err)
}
if ok && dec.lock.startTS == startTS {
if !bytes.Equal(dec.lock.primary, key) {
return 0, errors.New("txnHeartBeat on non-primary key, the code should not run here")
}

lock := dec.lock
batch := &leveldb.Batch{}
// Increase the ttl of this transaction.
if adviseTTL > lock.ttl {
lock.ttl = adviseTTL
writeKey := mvccEncode(key, lockVer)
writeValue, err := lock.MarshalBinary()
if err != nil {
return 0, errors.Trace(err)
}
batch.Put(writeKey, writeValue)
if err = mvcc.db.Write(batch, nil); err != nil {
return 0, errors.Trace(err)
}
}
return lock.ttl, nil
}
}
return 0, errors.New("lock doesn't exist")
}

// ScanLock implements the MVCCStore interface.
func (mvcc *MVCCLevelDB) ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvrpcpb.LockInfo, error) {
mvcc.mu.RLock()
Expand Down
20 changes: 20 additions & 0 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,19 @@ func (h *rpcHandler) handleKvCleanup(req *kvrpcpb.CleanupRequest) *kvrpcpb.Clean
return &resp
}

func (h *rpcHandler) handleTxnHeartBeat(req *kvrpcpb.TxnHeartBeatRequest) *kvrpcpb.TxnHeartBeatResponse {
if !h.checkKeyInRegion(req.PrimaryLock) {
panic("KvTxnHeartBeat: key not in region")
}
var resp kvrpcpb.TxnHeartBeatResponse
ttl, err := h.mvccStore.TxnHeartBeat(req.PrimaryLock, req.StartVersion, req.AdviseLockTtl)
if err != nil {
resp.Error = convertToKeyError(err)
}
resp.LockTtl = ttl
return &resp
}

func (h *rpcHandler) handleKvBatchGet(req *kvrpcpb.BatchGetRequest) *kvrpcpb.BatchGetResponse {
for _, k := range req.Keys {
if !h.checkKeyInRegion(k) {
Expand Down Expand Up @@ -764,6 +777,13 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
return resp, nil
}
resp.Cleanup = handler.handleKvCleanup(r)
case tikvrpc.CmdTxnHeartBeat:
r := req.TxnHeartBeat
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.TxnHeartBeat = &kvrpcpb.TxnHeartBeatResponse{RegionError: err}
return resp, nil
}
resp.TxnHeartBeat = handler.handleTxnHeartBeat(r)
case tikvrpc.CmdBatchGet:
r := req.BatchGet
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
Expand Down
40 changes: 40 additions & 0 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,46 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro
}, nil
}

func sendTxnHeartBeat(bo *Backoffer, store *tikvStore, primary []byte, startTS, ttl uint64) (uint64, error) {
req := &tikvrpc.Request{
Type: tikvrpc.CmdTxnHeartBeat,
TxnHeartBeat: &pb.TxnHeartBeatRequest{
PrimaryLock: primary,
StartVersion: startTS,
AdviseLockTtl: ttl,
},
}
for {
loc, err := store.GetRegionCache().LocateKey(bo, primary)
if err != nil {
return 0, errors.Trace(err)
}
resp, err := store.SendReq(bo, req, loc.Region, readTimeoutShort)
if err != nil {
return 0, errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return 0, errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return 0, errors.Trace(err)
}
continue
}
if resp.TxnHeartBeat == nil {
return 0, errors.Trace(ErrBodyMissing)
}
cmdResp := resp.TxnHeartBeat
if keyErr := cmdResp.GetError(); keyErr != nil {
return 0, errors.Errorf("txn %d heartbeat fail, primary key = %v, err = %s", startTS, primary, keyErr.Abort)
}
return cmdResp.GetLockTtl(), nil
}
}

func (c *twoPhaseCommitter) initKeysAndMutations() error {
var (
keys [][]byte
Expand Down
25 changes: 25 additions & 0 deletions store/tikv/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,31 @@ func (s *testLockSuite) TestGetTxnStatus(c *C) {
c.Assert(status.IsCommitted(), IsFalse)
}

func (s *testLockSuite) TestTxnHeartBeat(c *C) {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
txn.Set(kv.Key("key"), []byte("value"))
s.prewriteTxn(c, txn.(*tikvTxn))

bo := NewBackoffer(context.Background(), prewriteMaxBackoff)
newTTL, err := sendTxnHeartBeat(bo, s.store, []byte("key"), txn.StartTS(), 666)
c.Assert(err, IsNil)
c.Assert(newTTL, Equals, uint64(666))

newTTL, err = sendTxnHeartBeat(bo, s.store, []byte("key"), txn.StartTS(), 555)
c.Assert(err, IsNil)
c.Assert(newTTL, Equals, uint64(666))

// The getTxnStatus API is confusing, it really means rollback!
status, err := newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"))
c.Assert(err, IsNil)
c.Assert(status, Equals, TxnStatus(0))

newTTL, err = sendTxnHeartBeat(bo, s.store, []byte("key"), txn.StartTS(), 666)
c.Assert(err, NotNil)
c.Assert(newTTL, Equals, uint64(0))
}

func (s *testLockSuite) prewriteTxn(c *C, txn *tikvTxn) {
committer, err := newTwoPhaseCommitterWithInit(txn, 0)
c.Assert(err, IsNil)
Expand Down
23 changes: 21 additions & 2 deletions store/tikv/tikvrpc/tikvrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (
CmdDeleteRange
CmdPessimisticLock
CmdPessimisticRollback
CmdTxnHeartBeat

CmdRawGet CmdType = 256 + iota
CmdRawBatchGet
Expand Down Expand Up @@ -128,6 +129,8 @@ func (t CmdType) String() string {
return "SplitRegion"
case CmdDebugGetRegionProperties:
return "DebugGetRegionProperties"
case CmdTxnHeartBeat:
return "TxnHeartBeat"
}
return "Unknown"
}
Expand Down Expand Up @@ -166,7 +169,8 @@ type Request struct {

DebugGetRegionProperties *debugpb.GetRegionPropertiesRequest

Empty *tikvpb.BatchCommandsEmptyRequest
Empty *tikvpb.BatchCommandsEmptyRequest
TxnHeartBeat *kvrpcpb.TxnHeartBeatRequest
}

// ToBatchCommandsRequest converts the request to an entry in BatchCommands request.
Expand Down Expand Up @@ -218,6 +222,8 @@ func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Reques
return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_PessimisticRollback{PessimisticRollback: req.PessimisticRollback}}
case CmdEmpty:
return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_Empty{Empty: req.Empty}}
case CmdTxnHeartBeat:
return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_TxnHeartBeat{TxnHeartBeat: req.TxnHeartBeat}}
}
return nil
}
Expand Down Expand Up @@ -265,7 +271,8 @@ type Response struct {

DebugGetRegionProperties *debugpb.GetRegionPropertiesResponse

Empty *tikvpb.BatchCommandsEmptyResponse
Empty *tikvpb.BatchCommandsEmptyResponse
TxnHeartBeat *kvrpcpb.TxnHeartBeatResponse
}

// FromBatchCommandsResponse converts a BatchCommands response to Response.
Expand Down Expand Up @@ -317,6 +324,8 @@ func FromBatchCommandsResponse(res *tikvpb.BatchCommandsResponse_Response) *Resp
return &Response{Type: CmdPessimisticRollback, PessimisticRollback: res.PessimisticRollback}
case *tikvpb.BatchCommandsResponse_Response_Empty:
return &Response{Type: CmdEmpty, Empty: res.Empty}
case *tikvpb.BatchCommandsResponse_Response_TxnHeartBeat:
return &Response{Type: CmdTxnHeartBeat, TxnHeartBeat: res.TxnHeartBeat}
}
return nil
}
Expand Down Expand Up @@ -394,6 +403,8 @@ func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error {
case CmdSplitRegion:
req.SplitRegion.Context = ctx
case CmdEmpty:
case CmdTxnHeartBeat:
req.TxnHeartBeat.Context = ctx
default:
return fmt.Errorf("invalid request type %v", req.Type)
}
Expand Down Expand Up @@ -517,6 +528,10 @@ func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) {
RegionError: e,
}
case CmdEmpty:
case CmdTxnHeartBeat:
resp.TxnHeartBeat = &kvrpcpb.TxnHeartBeatResponse{
RegionError: e,
}
default:
return nil, fmt.Errorf("invalid request type %v", req.Type)
}
Expand Down Expand Up @@ -582,6 +597,8 @@ func (resp *Response) GetRegionError() (*errorpb.Error, error) {
case CmdSplitRegion:
e = resp.SplitRegion.GetRegionError()
case CmdEmpty:
case CmdTxnHeartBeat:
e = resp.TxnHeartBeat.GetRegionError()
default:
return nil, fmt.Errorf("invalid response type %v", resp.Type)
}
Expand Down Expand Up @@ -656,6 +673,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp
resp.SplitRegion, err = client.SplitRegion(ctx, req.SplitRegion)
case CmdEmpty:
resp.Empty, err = &tikvpb.BatchCommandsEmptyResponse{}, nil
case CmdTxnHeartBeat:
resp.TxnHeartBeat, err = client.KvTxnHeartBeat(ctx, req.TxnHeartBeat)
default:
return nil, errors.Errorf("invalid request type: %v", req.Type)
}
Expand Down