Skip to content

Commit

Permalink
*: implement the TxnHeartBeat API for the large transaction (#11979) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored and sre-bot committed Sep 30, 2019
1 parent 694e086 commit b12c4bc
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 16 deletions.
10 changes: 6 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ require (
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20190918085321-44e3817e1f18
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd
github.com/pingcap/parser v0.0.0-20190910041007-2a177b291004
github.com/pingcap/pd v0.0.0-20190711034019-ee98bf9063e9
github.com/pingcap/pd v1.1.0-beta.0.20190912093418-dc03c839debd
github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible
github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330
github.com/prometheus/client_golang v0.9.0
Expand All @@ -67,8 +67,10 @@ require (
go.etcd.io/bbolt v1.3.3 // indirect
go.uber.org/atomic v1.3.2
go.uber.org/zap v1.9.1
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e
golang.org/x/sys v0.0.0-20190109145017-48ac38b7c8cb // indirect
golang.org/x/crypto v0.0.0-20190909091759-094676da4a83 // indirect
golang.org/x/net v0.0.0-20190909003024-a7b16738d86b
golang.org/x/sync v0.0.0-20190423024810-112230192c58 // indirect
golang.org/x/sys v0.0.0-20190909082730-f460065e899a // indirect
golang.org/x/text v0.3.0
golang.org/x/tools v0.0.0-20190130214255-bb1329dc71a0
google.golang.org/genproto v0.0.0-20190108161440-ae2f86662275 // indirect
Expand Down
26 changes: 16 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8 h1:USx2/E1bX46VG32FI
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ=
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9 h1:KH4f4Si9XK6/IW50HtoaiLIFHGkapOM6w83za47UYik=
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9/go.mod h1:4b2X8xSqxIroj/IZ9MX/VGZhAwc11wB9wRIzHvz6SeM=
github.com/pingcap/errors v0.10.1/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
Expand All @@ -164,13 +163,12 @@ github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17Xtb
github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190918085321-44e3817e1f18 h1:5vQV8S/8B9nE+I+0Me6vZGyASeXl/QymwqtaOL5e5ZA=
github.com/pingcap/kvproto v0.0.0-20190918085321-44e3817e1f18/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/parser v0.0.0-20190910041007-2a177b291004 h1:LaA55frHvXh8vTYcQj0xNsQiiPb8iU/JcU8cc2HA9Jg=
github.com/pingcap/parser v0.0.0-20190910041007-2a177b291004/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/pd v0.0.0-20190711034019-ee98bf9063e9 h1:sqqiviE8oEYXJh3Aq59HO/AhxjsvcRb9ETh0ivFOHXc=
github.com/pingcap/pd v0.0.0-20190711034019-ee98bf9063e9/go.mod h1:3DlDlFT7EF64A1bmb/tulZb6wbPSagm5G4p1AlhaEDs=
github.com/pingcap/pd v1.1.0-beta.0.20190912093418-dc03c839debd h1:bKj6hodu/ro78B0oN2yicdGn0t4yd9XjnyoW95qmWic=
github.com/pingcap/pd v1.1.0-beta.0.20190912093418-dc03c839debd/go.mod h1:I7TEby5BHTYIxgHszfsOJSBsk8b2Qt8QrSIgdv5n5QQ=
github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible h1:MkWCxgZpJBgY2f4HtwWMMFzSBb3+JPzeJgF3VrXE/bU=
github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330 h1:rRMLMjIMFulCX9sGKZ1hoov/iROMsKyC8Snc02nSukw=
Expand Down Expand Up @@ -261,23 +259,31 @@ go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20180608092829-8ac0e0d97ce4/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190909091759-094676da4a83 h1:mgAKeshyNqWKdENOnQsg+8dRTwZFIwFaO3HNl52sweA=
golang.org/x/crypto v0.0.0-20190909091759-094676da4a83/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181005035420-146acd28ed58/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181106065722-10aee1819953/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e h1:bRhVy7zSSasaqNksaRZiA5EEI+Ei4I1nO5Jh72wfHlg=
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190909003024-a7b16738d86b h1:XfVGCX+0T4WOStkaOsJRllbsiImhB2jgVBGc9L0lPGc=
golang.org/x/net v0.0.0-20190909003024-a7b16738d86b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190109145017-48ac38b7c8cb h1:1w588/yEchbPNpa9sEvOcMZYbWHedwJjg4VOAdDHWHk=
golang.org/x/sys v0.0.0-20190109145017-48ac38b7c8cb/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190909082730-f460065e899a h1:mIzbOulag9/gXacgxKlFVwpCOWSfBT3/pDyyCwGA9as=
golang.org/x/sys v0.0.0-20190909082730-f460065e899a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 h1:+DCIGbF/swA92ohVg0//6X2IVY3KZs6p9mix0ziNYJM=
Expand Down
23 changes: 23 additions & 0 deletions store/mockstore/mocktikv/mock_tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ func (s *testMockTiKVSuite) mustRangeReverseScanOK(c *C, start, end string, limi
}

func (s *testMockTiKVSuite) mustPrewriteOK(c *C, mutations []*kvrpcpb.Mutation, primary string, startTS uint64) {
s.mustPrewriteWithTTLOK(c, mutations, primary, startTS, 0)
}

func (s *testMockTiKVSuite) mustPrewriteWithTTLOK(c *C, mutations []*kvrpcpb.Mutation, primary string, startTS uint64, ttl uint64) {
req := &kvrpcpb.PrewriteRequest{
Mutations: mutations,
PrimaryLock: []byte(primary),
Expand Down Expand Up @@ -595,3 +599,22 @@ func (s testMarshal) TestMarshalmvccValue(c *C) {
c.Assert(v.commitTS, Equals, v1.commitTS)
c.Assert(string(v.value), Equals, string(v.value))
}

func (s *testMVCCLevelDB) TestTxnHeartBeat(c *C) {
s.mustPrewriteWithTTLOK(c, putMutations("pk", "val"), "pk", 5, 666)

// Update the ttl
ttl, err := s.store.TxnHeartBeat([]byte("pk"), 5, 888)
c.Assert(err, IsNil)
c.Assert(ttl, Greater, uint64(666))

// Advise ttl is small
ttl, err = s.store.TxnHeartBeat([]byte("pk"), 5, 300)
c.Assert(err, IsNil)
c.Assert(ttl, Greater, uint64(300))

// The lock has already been clean up
c.Assert(s.store.Cleanup([]byte("pk"), 5), IsNil)
_, err = s.store.TxnHeartBeat([]byte("pk"), 5, 1000)
c.Assert(err, NotNil)
}
1 change: 1 addition & 0 deletions store/mockstore/mocktikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ type MVCCStore interface {
Rollback(keys [][]byte, startTS uint64) error
Cleanup(key []byte, startTS uint64) error
ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvrpcpb.LockInfo, error)
TxnHeartBeat(primaryKey []byte, startTS uint64, adviseTTL uint64) (uint64, error)
ResolveLock(startKey, endKey []byte, startTS, commitTS uint64) error
BatchResolveLock(startKey, endKey []byte, txnInfos map[uint64]uint64) error
DeleteRange(startKey, endKey []byte) error
Expand Down
45 changes: 45 additions & 0 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,51 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS uint64) error {
return mvcc.db.Write(batch, nil)
}

// TxnHeartBeat implements the MVCCStore interface.
func (mvcc *MVCCLevelDB) TxnHeartBeat(key []byte, startTS uint64, adviseTTL uint64) (uint64, error) {
mvcc.mu.Lock()
defer mvcc.mu.Unlock()

startKey := mvccEncode(key, lockVer)
iter := newIterator(mvcc.db, &util.Range{
Start: startKey,
})
defer iter.Release()

if iter.Valid() {
dec := lockDecoder{
expectKey: key,
}
ok, err := dec.Decode(iter)
if err != nil {
return 0, errors.Trace(err)
}
if ok && dec.lock.startTS == startTS {
if !bytes.Equal(dec.lock.primary, key) {
return 0, errors.New("txnHeartBeat on non-primary key, the code should not run here")
}

lock := dec.lock
batch := &leveldb.Batch{}
// Increase the ttl of this transaction.
if adviseTTL > lock.ttl {
lock.ttl = adviseTTL
writeKey := mvccEncode(key, lockVer)
writeValue, err := lock.MarshalBinary()
if err != nil {
return 0, errors.Trace(err)
}
batch.Put(writeKey, writeValue)
if err = mvcc.db.Write(batch, nil); err != nil {
return 0, errors.Trace(err)
}
}
return lock.ttl, nil
}
}
return 0, errors.New("lock doesn't exist")
}

// ScanLock implements the MVCCStore interface.
func (mvcc *MVCCLevelDB) ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvrpcpb.LockInfo, error) {
mvcc.mu.RLock()
Expand Down
20 changes: 20 additions & 0 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,19 @@ func (h *rpcHandler) handleKvCleanup(req *kvrpcpb.CleanupRequest) *kvrpcpb.Clean
return &resp
}

func (h *rpcHandler) handleTxnHeartBeat(req *kvrpcpb.TxnHeartBeatRequest) *kvrpcpb.TxnHeartBeatResponse {
if !h.checkKeyInRegion(req.PrimaryLock) {
panic("KvTxnHeartBeat: key not in region")
}
var resp kvrpcpb.TxnHeartBeatResponse
ttl, err := h.mvccStore.TxnHeartBeat(req.PrimaryLock, req.StartVersion, req.AdviseLockTtl)
if err != nil {
resp.Error = convertToKeyError(err)
}
resp.LockTtl = ttl
return &resp
}

func (h *rpcHandler) handleKvBatchGet(req *kvrpcpb.BatchGetRequest) *kvrpcpb.BatchGetResponse {
for _, k := range req.Keys {
if !h.checkKeyInRegion(k) {
Expand Down Expand Up @@ -764,6 +777,13 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
return resp, nil
}
resp.Cleanup = handler.handleKvCleanup(r)
case tikvrpc.CmdTxnHeartBeat:
r := req.TxnHeartBeat
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.TxnHeartBeat = &kvrpcpb.TxnHeartBeatResponse{RegionError: err}
return resp, nil
}
resp.TxnHeartBeat = handler.handleTxnHeartBeat(r)
case tikvrpc.CmdBatchGet:
r := req.BatchGet
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
Expand Down
40 changes: 40 additions & 0 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,46 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro
}, nil
}

func sendTxnHeartBeat(bo *Backoffer, store *tikvStore, primary []byte, startTS, ttl uint64) (uint64, error) {
req := &tikvrpc.Request{
Type: tikvrpc.CmdTxnHeartBeat,
TxnHeartBeat: &pb.TxnHeartBeatRequest{
PrimaryLock: primary,
StartVersion: startTS,
AdviseLockTtl: ttl,
},
}
for {
loc, err := store.GetRegionCache().LocateKey(bo, primary)
if err != nil {
return 0, errors.Trace(err)
}
resp, err := store.SendReq(bo, req, loc.Region, readTimeoutShort)
if err != nil {
return 0, errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return 0, errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return 0, errors.Trace(err)
}
continue
}
if resp.TxnHeartBeat == nil {
return 0, errors.Trace(ErrBodyMissing)
}
cmdResp := resp.TxnHeartBeat
if keyErr := cmdResp.GetError(); keyErr != nil {
return 0, errors.Errorf("txn %d heartbeat fail, primary key = %v, err = %s", startTS, primary, keyErr.Abort)
}
return cmdResp.GetLockTtl(), nil
}
}

func (c *twoPhaseCommitter) initKeysAndMutations() error {
var (
keys [][]byte
Expand Down
25 changes: 25 additions & 0 deletions store/tikv/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,31 @@ func (s *testLockSuite) TestGetTxnStatus(c *C) {
c.Assert(status.IsCommitted(), IsFalse)
}

func (s *testLockSuite) TestTxnHeartBeat(c *C) {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
txn.Set(kv.Key("key"), []byte("value"))
s.prewriteTxn(c, txn.(*tikvTxn))

bo := NewBackoffer(context.Background(), prewriteMaxBackoff)
newTTL, err := sendTxnHeartBeat(bo, s.store, []byte("key"), txn.StartTS(), 666)
c.Assert(err, IsNil)
c.Assert(newTTL, Equals, uint64(666))

newTTL, err = sendTxnHeartBeat(bo, s.store, []byte("key"), txn.StartTS(), 555)
c.Assert(err, IsNil)
c.Assert(newTTL, Equals, uint64(666))

// The getTxnStatus API is confusing, it really means rollback!
status, err := newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"))
c.Assert(err, IsNil)
c.Assert(status, Equals, TxnStatus(0))

newTTL, err = sendTxnHeartBeat(bo, s.store, []byte("key"), txn.StartTS(), 666)
c.Assert(err, NotNil)
c.Assert(newTTL, Equals, uint64(0))
}

func (s *testLockSuite) prewriteTxn(c *C, txn *tikvTxn) {
committer, err := newTwoPhaseCommitterWithInit(txn, 0)
c.Assert(err, IsNil)
Expand Down
23 changes: 21 additions & 2 deletions store/tikv/tikvrpc/tikvrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (
CmdDeleteRange
CmdPessimisticLock
CmdPessimisticRollback
CmdTxnHeartBeat

CmdRawGet CmdType = 256 + iota
CmdRawBatchGet
Expand Down Expand Up @@ -128,6 +129,8 @@ func (t CmdType) String() string {
return "SplitRegion"
case CmdDebugGetRegionProperties:
return "DebugGetRegionProperties"
case CmdTxnHeartBeat:
return "TxnHeartBeat"
}
return "Unknown"
}
Expand Down Expand Up @@ -166,7 +169,8 @@ type Request struct {

DebugGetRegionProperties *debugpb.GetRegionPropertiesRequest

Empty *tikvpb.BatchCommandsEmptyRequest
Empty *tikvpb.BatchCommandsEmptyRequest
TxnHeartBeat *kvrpcpb.TxnHeartBeatRequest
}

// ToBatchCommandsRequest converts the request to an entry in BatchCommands request.
Expand Down Expand Up @@ -218,6 +222,8 @@ func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Reques
return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_PessimisticRollback{PessimisticRollback: req.PessimisticRollback}}
case CmdEmpty:
return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_Empty{Empty: req.Empty}}
case CmdTxnHeartBeat:
return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_TxnHeartBeat{TxnHeartBeat: req.TxnHeartBeat}}
}
return nil
}
Expand Down Expand Up @@ -265,7 +271,8 @@ type Response struct {

DebugGetRegionProperties *debugpb.GetRegionPropertiesResponse

Empty *tikvpb.BatchCommandsEmptyResponse
Empty *tikvpb.BatchCommandsEmptyResponse
TxnHeartBeat *kvrpcpb.TxnHeartBeatResponse
}

// FromBatchCommandsResponse converts a BatchCommands response to Response.
Expand Down Expand Up @@ -317,6 +324,8 @@ func FromBatchCommandsResponse(res *tikvpb.BatchCommandsResponse_Response) *Resp
return &Response{Type: CmdPessimisticRollback, PessimisticRollback: res.PessimisticRollback}
case *tikvpb.BatchCommandsResponse_Response_Empty:
return &Response{Type: CmdEmpty, Empty: res.Empty}
case *tikvpb.BatchCommandsResponse_Response_TxnHeartBeat:
return &Response{Type: CmdTxnHeartBeat, TxnHeartBeat: res.TxnHeartBeat}
}
return nil
}
Expand Down Expand Up @@ -394,6 +403,8 @@ func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error {
case CmdSplitRegion:
req.SplitRegion.Context = ctx
case CmdEmpty:
case CmdTxnHeartBeat:
req.TxnHeartBeat.Context = ctx
default:
return fmt.Errorf("invalid request type %v", req.Type)
}
Expand Down Expand Up @@ -517,6 +528,10 @@ func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) {
RegionError: e,
}
case CmdEmpty:
case CmdTxnHeartBeat:
resp.TxnHeartBeat = &kvrpcpb.TxnHeartBeatResponse{
RegionError: e,
}
default:
return nil, fmt.Errorf("invalid request type %v", req.Type)
}
Expand Down Expand Up @@ -582,6 +597,8 @@ func (resp *Response) GetRegionError() (*errorpb.Error, error) {
case CmdSplitRegion:
e = resp.SplitRegion.GetRegionError()
case CmdEmpty:
case CmdTxnHeartBeat:
e = resp.TxnHeartBeat.GetRegionError()
default:
return nil, fmt.Errorf("invalid response type %v", resp.Type)
}
Expand Down Expand Up @@ -656,6 +673,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp
resp.SplitRegion, err = client.SplitRegion(ctx, req.SplitRegion)
case CmdEmpty:
resp.Empty, err = &tikvpb.BatchCommandsEmptyResponse{}, nil
case CmdTxnHeartBeat:
resp.TxnHeartBeat, err = client.KvTxnHeartBeat(ctx, req.TxnHeartBeat)
default:
return nil, errors.Errorf("invalid request type: %v", req.Type)
}
Expand Down

0 comments on commit b12c4bc

Please sign in to comment.