From 1682ebb00abe8e83a0619aed81a3ef9b063e3a49 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 11 Nov 2019 17:31:16 +0800 Subject: [PATCH] store,kv: snapshot doesn't cache the non-exists kv entries lead to poor 'insert ignore' performance #12872 --- kv/kv.go | 2 ++ store/mockstore/mocktikv/rpc.go | 7 ++++++ store/tikv/snapshot.go | 43 +++++++++++++++++++++++++++++++++ store/tikv/snapshot_test.go | 21 ++++++++++++++++ 4 files changed, 73 insertions(+) diff --git a/kv/kv.go b/kv/kv.go index 2911b4ba0a2e5..13e1338be61f1 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -175,6 +175,8 @@ type Transaction interface { // SetAssertion sets an assertion for an operation on the key. SetAssertion(key Key, assertion AssertionType) // BatchGet gets kv from the memory buffer of statement and transaction, and the kv storage. + // Do not use len(value) == 0 or value == nil to represent non-exist. + // If a key doesn't exist, there shouldn't be any corresponding entry in the result map. BatchGet(keys []Key) (map[string][]byte, error) IsPessimistic() bool } diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index b7be40913fb50..a175909862e35 100644 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -23,6 +23,7 @@ import ( "time" "github.com/golang/protobuf/proto" + "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/coprocessor" @@ -657,6 +658,12 @@ func (c *RPCClient) checkArgs(ctx context.Context, addr string) (*rpcHandler, er // SendRequest sends a request to mock cluster. func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("RPCClient.SendRequest", opentracing.ChildOf(span.Context())) + defer span1.Finish() + ctx = opentracing.ContextWithSpan(ctx, span1) + } + failpoint.Inject("rpcServerBusy", func(val failpoint.Value) { if val.(bool) { failpoint.Return(tikvrpc.GenRegionErrorResp(req, &errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}})) diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index c4d5814855568..ffad344295062 100755 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -58,6 +58,15 @@ type tikvSnapshot struct { vars *kv.Variables replicaRead kv.ReplicaReadType replicaReadSeed uint32 + + // Cache the result of BatchGet. + // The invariance is that calling BatchGet multiple times using the same start ts, + // the result should not change. + // NOTE: This representation here is different from the BatchGet API. + // cached use len(value)=0 to represent a key-value entry doesn't exist (a reliable truth from TiKV). + // In the BatchGet API, it use no key-value entry to represent non-exist. + // It's OK as long as there are no zero-byte values in the protocol. + cached map[string][]byte } // newTiKVSnapshot creates a snapshot of an TiKV store. @@ -79,6 +88,20 @@ func (s *tikvSnapshot) SetPriority(priority int) { // The map will not contain nonexistent keys. func (s *tikvSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) { m := make(map[string][]byte) + if s.cached != nil { + tmp := keys[:0] + for _, key := range keys { + if val, ok := s.cached[string(key)]; ok { + if len(val) > 0 { + m[string(key)] = val + } + } else { + tmp = append(tmp, key) + } + } + keys = tmp + } + if len(keys) == 0 { return m, nil } @@ -97,6 +120,7 @@ func (s *tikvSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) { if len(v) == 0 { return } + mu.Lock() m[string(k)] = v mu.Unlock() @@ -110,6 +134,14 @@ func (s *tikvSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) { return nil, errors.Trace(err) } + // Update the cache. + if s.cached == nil { + s.cached = make(map[string][]byte, len(m)) + } + for _, key := range keys { + s.cached[string(key)] = m[string(key)] + } + return m, nil } @@ -239,6 +271,17 @@ func (s *tikvSnapshot) Get(k kv.Key) ([]byte, error) { } func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) { + // Check the cached values first. + if s.cached != nil { + if value, ok := s.cached[string(k)]; ok { + return value, nil + } + } + + failpoint.Inject("snapshot-get-cache-fail", func(_ failpoint.Value) { + panic("cache miss") + }) + sender := NewRegionRequestSender(s.store.regionCache, s.store.client) req := &tikvrpc.Request{ diff --git a/store/tikv/snapshot_test.go b/store/tikv/snapshot_test.go index 529271074d1e7..ceaf37e394b97 100644 --- a/store/tikv/snapshot_test.go +++ b/store/tikv/snapshot_test.go @@ -19,6 +19,7 @@ import ( "time" . "github.com/pingcap/check" + "github.com/pingcap/failpoint" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/util/logutil" @@ -117,6 +118,26 @@ func (s *testSnapshotSuite) TestBatchGet(c *C) { } } +func (s *testSnapshotSuite) TestSnapshotCache(c *C) { + txn := s.beginTxn(c) + c.Assert(txn.Set(kv.Key("x"), []byte("x")), IsNil) + c.Assert(txn.Commit(context.Background()), IsNil) + + txn = s.beginTxn(c) + snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: txn.StartTS()}) + _, err := snapshot.BatchGet([]kv.Key{kv.Key("x"), kv.Key("y")}) + c.Assert(err, IsNil) + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/snapshot-get-cache-fail", `return(true)`), IsNil) + _, err = snapshot.Get(kv.Key("x")) + c.Assert(err, IsNil) + + _, err = snapshot.Get(kv.Key("y")) + c.Assert(kv.IsErrNotFound(err), IsTrue) + + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/snapshot-get-cache-fail"), IsNil) +} + func (s *testSnapshotSuite) TestBatchGetNotExist(c *C) { for _, rowNum := range s.rowNums { logutil.Logger(context.Background()).Debug("test BatchGetNotExist",