From 0102c07fdd243d43087fd017539e1f68452c223a Mon Sep 17 00:00:00 2001 From: Yu Shuaipeng Date: Wed, 20 Feb 2019 16:51:09 +0800 Subject: [PATCH 1/3] merge statement buffer when BatchGet --- executor/write_test.go | 11 +++++++++++ kv/kv.go | 2 ++ kv/mock.go | 4 ++++ kv/txn.go | 5 +---- session/txn.go | 9 +++++++++ store/tikv/txn.go | 4 ++++ 6 files changed, 31 insertions(+), 4 deletions(-) diff --git a/executor/write_test.go b/executor/write_test.go index db8a81ff282de..d39c8bd7284db 100644 --- a/executor/write_test.go +++ b/executor/write_test.go @@ -303,6 +303,17 @@ func (s *testSuite2) TestInsert(c *C) { tk.MustExec("drop view v") } +func (s *testSuite) TestMultiBatch(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("create table t0 (i int)") + tk.MustExec("insert into t0 values (1), (1)") + tk.MustExec("create table t (i int unique key)") + tk.MustExec("set tidb_dml_batch_size = 1") + tk.MustExec("insert ignore into t select * from t0") + tk.MustExec("admin check table t") +} + func (s *testSuite2) TestInsertAutoInc(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/kv/kv.go b/kv/kv.go index 2f0efd4352291..86c026087a02f 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -150,6 +150,8 @@ type Transaction interface { GetSnapshot() Snapshot // SetVars sets variables to the transaction. SetVars(vars *Variables) + // MemGet get kv from the memory buffer. + MemGet(k Key) ([]byte, error) } // Client is used to send request to KV layer. diff --git a/kv/mock.go b/kv/mock.go index 3038c3d39b9c1..8d19c90e5766b 100644 --- a/kv/mock.go +++ b/kv/mock.go @@ -116,6 +116,10 @@ func (t *mockTxn) SetVars(vars *Variables) { } +func (t *mockTxn) MemGet(k Key) ([]byte, error) { + return nil, nil +} + // NewMockTxn new a mockTxn. func NewMockTxn() Transaction { return &mockTxn{ diff --git a/kv/txn.go b/kv/txn.go index 00ee680049ad4..5597dd266acf7 100644 --- a/kv/txn.go +++ b/kv/txn.go @@ -94,13 +94,10 @@ func BackOff(attempts uint) int { // BatchGetValues gets values in batch. // The values from buffer in transaction and the values from the storage node are merged together. func BatchGetValues(txn Transaction, keys []Key) (map[string][]byte, error) { - if txn.IsReadOnly() { - return txn.GetSnapshot().BatchGet(keys) - } bufferValues := make([][]byte, len(keys)) shrinkKeys := make([]Key, 0, len(keys)) for i, key := range keys { - val, err := txn.GetMemBuffer().Get(key) + val, err := txn.MemGet(key) if IsErrNotFound(err) { shrinkKeys = append(shrinkKeys, key) continue diff --git a/session/txn.go b/session/txn.go index c854dcbc270a0..802b9040f6331 100644 --- a/session/txn.go +++ b/session/txn.go @@ -213,6 +213,15 @@ func (st *TxnState) Get(k kv.Key) ([]byte, error) { return val, nil } +// MemGet overrides the Transaction interface. +func (st *TxnState) MemGet(k kv.Key) ([]byte, error) { + val, err := st.buf.Get(k) + if kv.IsErrNotFound(err) { + return st.Transaction.MemGet(k) + } + return val, err +} + // Set overrides the Transaction interface. func (st *TxnState) Set(k kv.Key, v []byte) error { return st.buf.Set(k, v) diff --git a/store/tikv/txn.go b/store/tikv/txn.go index e0a04ccdcafe5..e80cd6bb5d8a6 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -282,3 +282,7 @@ func (txn *tikvTxn) GetMemBuffer() kv.MemBuffer { func (txn *tikvTxn) GetSnapshot() kv.Snapshot { return txn.snapshot } + +func (txn *tikvTxn) MemGet(k kv.Key) ([]byte, error) { + return txn.GetMemBuffer().Get(k) +} From 859ccea23d8c70209c42cd97933a98b74f91eafe Mon Sep 17 00:00:00 2001 From: Yu Shuaipeng Date: Thu, 21 Feb 2019 12:28:15 +0800 Subject: [PATCH 2/3] refactor --- ddl/index.go | 2 +- executor/admin.go | 4 ++-- executor/batch_checker.go | 4 ++-- kv/fault_injection.go | 18 ++++++++++-------- kv/kv.go | 6 ++---- kv/mock.go | 14 ++++---------- kv/txn.go | 31 ------------------------------ session/txn.go | 33 ++++++++++++++++++++++++++------ store/tikv/txn.go | 40 +++++++++++++++++++++++++++++++-------- 9 files changed, 80 insertions(+), 72 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index dd351b1ef74d7..95571d66dd18f 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -710,7 +710,7 @@ func (w *addIndexWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords []*i w.distinctCheckFlags = append(w.distinctCheckFlags, distinct) } - batchVals, err := kv.BatchGetValues(txn, w.batchCheckKeys) + batchVals, err := txn.BatchGet(w.batchCheckKeys) if err != nil { return errors.Trace(err) } diff --git a/executor/admin.go b/executor/admin.go index e31597062d807..bc62de1084fda 100644 --- a/executor/admin.go +++ b/executor/admin.go @@ -376,7 +376,7 @@ func (e *RecoverIndexExec) batchMarkDup(txn kv.Transaction, rows []recoverRows) distinctFlags[i] = distinct } - values, err := kv.BatchGetValues(txn, e.batchKeys) + values, err := txn.BatchGet(e.batchKeys) if err != nil { return errors.Trace(err) } @@ -500,7 +500,7 @@ func (e *CleanupIndexExec) batchGetRecord(txn kv.Transaction) (map[string][]byte for handle := range e.idxValues { e.batchKeys = append(e.batchKeys, e.table.RecordKey(handle)) } - values, err := kv.BatchGetValues(txn, e.batchKeys) + values, err := txn.BatchGet(e.batchKeys) if err != nil { return nil, errors.Trace(err) } diff --git a/executor/batch_checker.go b/executor/batch_checker.go index 6baaff8c2f235..d151c7dd85745 100644 --- a/executor/batch_checker.go +++ b/executor/batch_checker.go @@ -56,7 +56,7 @@ func (b *batchChecker) batchGetOldValues(ctx sessionctx.Context, batchKeys []kv. if err != nil { return errors.Trace(err) } - values, err := kv.BatchGetValues(txn, batchKeys) + values, err := txn.BatchGet(batchKeys) if err != nil { return errors.Trace(err) } @@ -213,7 +213,7 @@ func (b *batchChecker) batchGetInsertKeys(ctx sessionctx.Context, t table.Table, if err != nil { return errors.Trace(err) } - b.dupKVs, err = kv.BatchGetValues(txn, batchKeys) + b.dupKVs, err = txn.BatchGet(batchKeys) return errors.Trace(err) } diff --git a/kv/fault_injection.go b/kv/fault_injection.go index c3e913e4f86f5..6760516c3e97b 100644 --- a/kv/fault_injection.go +++ b/kv/fault_injection.go @@ -97,6 +97,16 @@ func (t *InjectedTransaction) Get(k Key) ([]byte, error) { return t.Transaction.Get(k) } +// BatchGet returns an error if cfg.getError is set. +func (t *InjectedTransaction) BatchGet(keys []Key) (map[string][]byte, error) { + t.cfg.RLock() + defer t.cfg.RUnlock() + if t.cfg.getError != nil { + return nil, t.cfg.getError + } + return t.Transaction.BatchGet(keys) +} + // Commit returns an error if cfg.commitError is set. func (t *InjectedTransaction) Commit(ctx context.Context) error { t.cfg.RLock() @@ -107,14 +117,6 @@ func (t *InjectedTransaction) Commit(ctx context.Context) error { return t.Transaction.Commit(ctx) } -// GetSnapshot implements Transaction GetSnapshot method. -func (t *InjectedTransaction) GetSnapshot() Snapshot { - return &InjectedSnapshot{ - Snapshot: t.Transaction.GetSnapshot(), - cfg: t.cfg, - } -} - // InjectedSnapshot wraps a Snapshot with injections. type InjectedSnapshot struct { Snapshot diff --git a/kv/kv.go b/kv/kv.go index 86c026087a02f..202cc8a1e4ffe 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -146,12 +146,10 @@ type Transaction interface { Valid() bool // GetMemBuffer return the MemBuffer binding to this transaction. GetMemBuffer() MemBuffer - // GetSnapshot returns the snapshot of this transaction. - GetSnapshot() Snapshot // SetVars sets variables to the transaction. SetVars(vars *Variables) - // MemGet get kv from the memory buffer. - MemGet(k Key) ([]byte, error) + // BatchGet gets kv from the memory buffer of statement and transaction, and the kv storage. + BatchGet(keys []Key) (map[string][]byte, error) } // Client is used to send request to KV layer. diff --git a/kv/mock.go b/kv/mock.go index 8d19c90e5766b..278c24636dc14 100644 --- a/kv/mock.go +++ b/kv/mock.go @@ -67,6 +67,10 @@ func (t *mockTxn) Get(k Key) ([]byte, error) { return nil, nil } +func (t *mockTxn) BatchGet(keys []Key) (map[string][]byte, error) { + return nil, nil +} + func (t *mockTxn) Iter(k Key, upperBound Key) (Iterator, error) { return nil, nil } @@ -98,12 +102,6 @@ func (t *mockTxn) GetMemBuffer() MemBuffer { return nil } -func (t *mockTxn) GetSnapshot() Snapshot { - return &mockSnapshot{ - store: NewMemDbBuffer(DefaultTxnMembufCap), - } -} - func (t *mockTxn) SetCap(cap int) { } @@ -116,10 +114,6 @@ func (t *mockTxn) SetVars(vars *Variables) { } -func (t *mockTxn) MemGet(k Key) ([]byte, error) { - return nil, nil -} - // NewMockTxn new a mockTxn. func NewMockTxn() Transaction { return &mockTxn{ diff --git a/kv/txn.go b/kv/txn.go index 5597dd266acf7..089edb00defb7 100644 --- a/kv/txn.go +++ b/kv/txn.go @@ -91,37 +91,6 @@ func BackOff(attempts uint) int { return int(sleep) } -// BatchGetValues gets values in batch. -// The values from buffer in transaction and the values from the storage node are merged together. -func BatchGetValues(txn Transaction, keys []Key) (map[string][]byte, error) { - bufferValues := make([][]byte, len(keys)) - shrinkKeys := make([]Key, 0, len(keys)) - for i, key := range keys { - val, err := txn.MemGet(key) - if IsErrNotFound(err) { - shrinkKeys = append(shrinkKeys, key) - continue - } - if err != nil { - return nil, errors.Trace(err) - } - if len(val) != 0 { - bufferValues[i] = val - } - } - storageValues, err := txn.GetSnapshot().BatchGet(shrinkKeys) - if err != nil { - return nil, errors.Trace(err) - } - for i, key := range keys { - if bufferValues[i] == nil { - continue - } - storageValues[string(key)] = bufferValues[i] - } - return storageValues, nil -} - // mockCommitErrorEnable uses to enable `mockCommitError` and only mock error once. var mockCommitErrorEnable = int64(0) diff --git a/session/txn.go b/session/txn.go index 802b9040f6331..be54a1423d9a2 100644 --- a/session/txn.go +++ b/session/txn.go @@ -213,13 +213,34 @@ func (st *TxnState) Get(k kv.Key) ([]byte, error) { return val, nil } -// MemGet overrides the Transaction interface. -func (st *TxnState) MemGet(k kv.Key) ([]byte, error) { - val, err := st.buf.Get(k) - if kv.IsErrNotFound(err) { - return st.Transaction.MemGet(k) +// BatchGet overrides the Transaction interface. +func (st *TxnState) BatchGet(keys []kv.Key) (map[string][]byte, error) { + bufferValues := make([][]byte, len(keys)) + shrinkKeys := make([]kv.Key, 0, len(keys)) + for i, key := range keys { + val, err := st.buf.Get(key) + if kv.IsErrNotFound(err) { + shrinkKeys = append(shrinkKeys, key) + continue + } + if err != nil { + return nil, errors.Trace(err) + } + if len(val) != 0 { + bufferValues[i] = val + } + } + storageValues, err := st.Transaction.BatchGet(shrinkKeys) + if err != nil { + return nil, errors.Trace(err) + } + for i, key := range keys { + if bufferValues[i] == nil { + continue + } + storageValues[string(key)] = bufferValues[i] } - return val, err + return storageValues, nil } // Set overrides the Transaction interface. diff --git a/store/tikv/txn.go b/store/tikv/txn.go index e80cd6bb5d8a6..79d24de09471b 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -108,6 +108,38 @@ func (txn *tikvTxn) Get(k kv.Key) ([]byte, error) { return ret, nil } +func (txn *tikvTxn) BatchGet(keys []kv.Key) (map[string][]byte, error) { + if txn.IsReadOnly() { + return txn.snapshot.BatchGet(keys) + } + bufferValues := make([][]byte, len(keys)) + shrinkKeys := make([]kv.Key, 0, len(keys)) + for i, key := range keys { + val, err := txn.GetMemBuffer().Get(key) + if kv.IsErrNotFound(err) { + shrinkKeys = append(shrinkKeys, key) + continue + } + if err != nil { + return nil, errors.Trace(err) + } + if len(val) != 0 { + bufferValues[i] = val + } + } + storageValues, err := txn.snapshot.BatchGet(shrinkKeys) + if err != nil { + return nil, errors.Trace(err) + } + for i, key := range keys { + if bufferValues[i] == nil { + continue + } + storageValues[string(key)] = bufferValues[i] + } + return storageValues, nil +} + func (txn *tikvTxn) Set(k kv.Key, v []byte) error { txn.setCnt++ @@ -278,11 +310,3 @@ func (txn *tikvTxn) Size() int { func (txn *tikvTxn) GetMemBuffer() kv.MemBuffer { return txn.us.GetMemBuffer() } - -func (txn *tikvTxn) GetSnapshot() kv.Snapshot { - return txn.snapshot -} - -func (txn *tikvTxn) MemGet(k kv.Key) ([]byte, error) { - return txn.GetMemBuffer().Get(k) -} From a7e5533850243dea97b1bb1e13a80074dbf6016e Mon Sep 17 00:00:00 2001 From: Yu Shuaipeng Date: Thu, 21 Feb 2019 17:32:11 +0800 Subject: [PATCH 3/3] address comment --- executor/write_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/write_test.go b/executor/write_test.go index d39c8bd7284db..75538075f3e93 100644 --- a/executor/write_test.go +++ b/executor/write_test.go @@ -309,7 +309,7 @@ func (s *testSuite) TestMultiBatch(c *C) { tk.MustExec("create table t0 (i int)") tk.MustExec("insert into t0 values (1), (1)") tk.MustExec("create table t (i int unique key)") - tk.MustExec("set tidb_dml_batch_size = 1") + tk.MustExec("set @@tidb_dml_batch_size = 1") tk.MustExec("insert ignore into t select * from t0") tk.MustExec("admin check table t") }