Skip to content

Commit

Permalink
*: merge statement buffer when BatchGetValues (#9374) (#9416)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackysp authored and zimulala committed Feb 22, 2019
1 parent 21774e3 commit 7aefd50
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 59 deletions.
2 changes: 1 addition & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ func (w *addIndexWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords []*i
w.distinctCheckFlags = append(w.distinctCheckFlags, distinct)
}

batchVals, err := kv.BatchGetValues(txn, w.batchCheckKeys)
batchVals, err := txn.BatchGet(w.batchCheckKeys)
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func (e *RecoverIndexExec) batchMarkDup(txn kv.Transaction, rows []recoverRows)
distinctFlags[i] = distinct
}

values, err := kv.BatchGetValues(txn, e.batchKeys)
values, err := txn.BatchGet(e.batchKeys)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -495,7 +495,7 @@ func (e *CleanupIndexExec) batchGetRecord(txn kv.Transaction) (map[string][]byte
for handle := range e.idxValues {
e.batchKeys = append(e.batchKeys, e.table.RecordKey(handle))
}
values, err := kv.BatchGetValues(txn, e.batchKeys)
values, err := txn.BatchGet(e.batchKeys)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ func batchGetOldValues(ctx sessionctx.Context, t table.Table, handles []int64) (
if err != nil {
return nil, errors.Trace(err)
}
values, err := kv.BatchGetValues(txn, batchKeys)
values, err := txn.BatchGet(batchKeys)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -1070,7 +1070,7 @@ func batchGetInsertKeys(ctx sessionctx.Context, t table.Table, newRows [][]types
if err != nil {
return nil, nil, errors.Trace(err)
}
values, err := kv.BatchGetValues(txn, batchKeys)
values, err := txn.BatchGet(batchKeys)
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand Down
11 changes: 11 additions & 0 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,17 @@ func (s *testSuite) TestInsert(c *C) {
tk.MustQuery("select * from t").Check(testkit.Rows("0 0", "0 0", "0 0", "1.1 1.1"))
}

func (s *testSuite) TestMultiBatch(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("create table t0 (i int)")
tk.MustExec("insert into t0 values (1), (1)")
tk.MustExec("create table t (i int unique key)")
tk.MustExec("set @@tidb_dml_batch_size = 1")
tk.MustExec("insert ignore into t select * from t0")
tk.MustExec("admin check table t")
}

func (s *testSuite) TestInsertAutoInc(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
18 changes: 10 additions & 8 deletions kv/fault_injection.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ func (t *InjectedTransaction) Get(k Key) ([]byte, error) {
return t.Transaction.Get(k)
}

// BatchGet returns an error if cfg.getError is set.
func (t *InjectedTransaction) BatchGet(keys []Key) (map[string][]byte, error) {
t.cfg.RLock()
defer t.cfg.RUnlock()
if t.cfg.getError != nil {
return nil, t.cfg.getError
}
return t.Transaction.BatchGet(keys)
}

// Commit returns an error if cfg.commitError is set.
func (t *InjectedTransaction) Commit(ctx context.Context) error {
t.cfg.RLock()
Expand All @@ -108,14 +118,6 @@ func (t *InjectedTransaction) Commit(ctx context.Context) error {
return t.Transaction.Commit(ctx)
}

// GetSnapshot implements Transaction GetSnapshot method.
func (t *InjectedTransaction) GetSnapshot() Snapshot {
return &InjectedSnapshot{
Snapshot: t.Transaction.GetSnapshot(),
cfg: t.cfg,
}
}

// InjectedSnapshot wraps a Snapshot with injections.
type InjectedSnapshot struct {
Snapshot
Expand Down
4 changes: 2 additions & 2 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ type Transaction interface {
Valid() bool
// GetMemBuffer return the MemBuffer binding to this transaction.
GetMemBuffer() MemBuffer
// GetSnapshot returns the snapshot of this transaction.
GetSnapshot() Snapshot
// BatchGet gets kv from the memory buffer of statement and transaction, and the kv storage.
BatchGet(keys []Key) (map[string][]byte, error)
}

// Client is used to send request to KV layer.
Expand Down
10 changes: 4 additions & 6 deletions kv/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ func (t *mockTxn) Get(k Key) ([]byte, error) {
return nil, nil
}

func (t *mockTxn) BatchGet(keys []Key) (map[string][]byte, error) {
return nil, nil
}

func (t *mockTxn) Iter(k Key, upperBound Key) (Iterator, error) {
return nil, nil
}
Expand Down Expand Up @@ -99,12 +103,6 @@ func (t *mockTxn) GetMemBuffer() MemBuffer {
return nil
}

func (t *mockTxn) GetSnapshot() Snapshot {
return &mockSnapshot{
store: NewMemDbBuffer(DefaultTxnMembufCap),
}
}

func (t *mockTxn) SetCap(cap int) {

}
Expand Down
34 changes: 0 additions & 34 deletions kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,37 +86,3 @@ func BackOff(attempts uint) int {
time.Sleep(sleep)
return int(sleep)
}

// BatchGetValues gets values in batch.
// The values from buffer in transaction and the values from the storage node are merged together.
func BatchGetValues(txn Transaction, keys []Key) (map[string][]byte, error) {
if txn.IsReadOnly() {
return txn.GetSnapshot().BatchGet(keys)
}
bufferValues := make([][]byte, len(keys))
shrinkKeys := make([]Key, 0, len(keys))
for i, key := range keys {
val, err := txn.GetMemBuffer().Get(key)
if IsErrNotFound(err) {
shrinkKeys = append(shrinkKeys, key)
continue
}
if err != nil {
return nil, errors.Trace(err)
}
if len(val) != 0 {
bufferValues[i] = val
}
}
storageValues, err := txn.GetSnapshot().BatchGet(shrinkKeys)
if err != nil {
return nil, errors.Trace(err)
}
for i, key := range keys {
if bufferValues[i] == nil {
continue
}
storageValues[string(key)] = bufferValues[i]
}
return storageValues, nil
}
30 changes: 30 additions & 0 deletions session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,36 @@ func (st *TxnState) Get(k kv.Key) ([]byte, error) {
return val, nil
}

// BatchGet overrides the Transaction interface.
func (st *TxnState) BatchGet(keys []kv.Key) (map[string][]byte, error) {
bufferValues := make([][]byte, len(keys))
shrinkKeys := make([]kv.Key, 0, len(keys))
for i, key := range keys {
val, err := st.buf.Get(key)
if kv.IsErrNotFound(err) {
shrinkKeys = append(shrinkKeys, key)
continue
}
if err != nil {
return nil, errors.Trace(err)
}
if len(val) != 0 {
bufferValues[i] = val
}
}
storageValues, err := st.Transaction.BatchGet(shrinkKeys)
if err != nil {
return nil, errors.Trace(err)
}
for i, key := range keys {
if bufferValues[i] == nil {
continue
}
storageValues[string(key)] = bufferValues[i]
}
return storageValues, nil
}

// Set overrides the Transaction interface.
func (st *TxnState) Set(k kv.Key, v []byte) error {
return st.buf.Set(k, v)
Expand Down
36 changes: 32 additions & 4 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,38 @@ func (txn *tikvTxn) Get(k kv.Key) ([]byte, error) {
return ret, nil
}

func (txn *tikvTxn) BatchGet(keys []kv.Key) (map[string][]byte, error) {
if txn.IsReadOnly() {
return txn.snapshot.BatchGet(keys)
}
bufferValues := make([][]byte, len(keys))
shrinkKeys := make([]kv.Key, 0, len(keys))
for i, key := range keys {
val, err := txn.GetMemBuffer().Get(key)
if kv.IsErrNotFound(err) {
shrinkKeys = append(shrinkKeys, key)
continue
}
if err != nil {
return nil, errors.Trace(err)
}
if len(val) != 0 {
bufferValues[i] = val
}
}
storageValues, err := txn.snapshot.BatchGet(shrinkKeys)
if err != nil {
return nil, errors.Trace(err)
}
for i, key := range keys {
if bufferValues[i] == nil {
continue
}
storageValues[string(key)] = bufferValues[i]
}
return storageValues, nil
}

func (txn *tikvTxn) Set(k kv.Key, v []byte) error {
txn.setCnt++

Expand Down Expand Up @@ -237,7 +269,3 @@ func (txn *tikvTxn) Size() int {
func (txn *tikvTxn) GetMemBuffer() kv.MemBuffer {
return txn.us.GetMemBuffer()
}

func (txn *tikvTxn) GetSnapshot() kv.Snapshot {
return txn.snapshot
}

0 comments on commit 7aefd50

Please sign in to comment.