Skip to content

Commit

Permalink
*: refactoring the code of batchChecker (#12108)
Browse files Browse the repository at this point in the history
batchChecker is difficult to maintain, we should get rid of it.
In this commit, I catch the BatchGet result into the snapshot, in this way we can
achieve the same goal as the batchChecker
  • Loading branch information
tiancaiamao authored Sep 10, 2019
1 parent 68b709e commit 64298f0
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 6 deletions.
44 changes: 44 additions & 0 deletions executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,50 @@ func (b *batchChecker) deleteDupKeys(ctx context.Context, sctx sessionctx.Contex
return nil
}

// getOldRowNew gets the table record row from storage for batch check.
// t could be a normal table or a partition, but it must not be a PartitionedTable.
func (b *batchChecker) getOldRowNew(ctx context.Context, sctx sessionctx.Context, txn kv.Transaction, t table.Table, handle int64,
genExprs []expression.Expression) ([]types.Datum, error) {
oldValue, err := txn.Get(ctx, t.RecordKey(handle))
if err != nil {
return nil, err
}

cols := t.WritableCols()
oldRow, oldRowMap, err := tables.DecodeRawRowData(sctx, t.Meta(), handle, cols, oldValue)
if err != nil {
return nil, err
}
// Fill write-only and write-reorg columns with originDefaultValue if not found in oldValue.
gIdx := 0
for _, col := range cols {
if col.State != model.StatePublic && oldRow[col.Offset].IsNull() {
_, found := oldRowMap[col.ID]
if !found {
oldRow[col.Offset], err = table.GetColOriginDefaultValue(sctx, col.ToInfo())
if err != nil {
return nil, err
}
}
}
if col.IsGenerated() {
// only the virtual column needs fill back.
if !col.GeneratedStored {
val, err := genExprs[gIdx].Eval(chunk.MutRowFromDatums(oldRow).ToRow())
if err != nil {
return nil, err
}
oldRow[col.Offset], err = table.CastValue(sctx, val, col.ToInfo())
if err != nil {
return nil, err
}
}
gIdx++
}
}
return oldRow, nil
}

// getOldRow gets the table record row from storage for batch check.
// t could be a normal table or a partition, but it must not be a PartitionedTable.
func (b *batchChecker) getOldRow(ctx sessionctx.Context, t table.Table, handle int64,
Expand Down
158 changes: 157 additions & 1 deletion executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package executor

import (
"context"
"encoding/hex"
"fmt"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -67,7 +68,7 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
// If `ON DUPLICATE KEY UPDATE` is specified, and no `IGNORE` keyword,
// the to-be-insert rows will be check on duplicate keys and update to the new rows.
if len(e.OnDuplicate) > 0 {
err := e.batchUpdateDupRows(ctx, rows)
err := e.batchUpdateDupRowsNew(ctx, rows)
if err != nil {
return err
}
Expand All @@ -86,6 +87,161 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
return nil
}

func prefetchUniqueIndices(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow) (map[string][]byte, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("prefetchUniqueIndices", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

nKeys := 0
for _, r := range rows {
if r.handleKey != nil {
nKeys++
}
nKeys += len(r.uniqueKeys)
}
batchKeys := make([]kv.Key, 0, nKeys)
for _, r := range rows {
if r.handleKey != nil {
batchKeys = append(batchKeys, r.handleKey.newKV.key)
}
for _, k := range r.uniqueKeys {
batchKeys = append(batchKeys, k.newKV.key)
}
}
return txn.BatchGet(ctx, batchKeys)
}

func prefetchConflictedOldRows(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow, values map[string][]byte) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("prefetchConflictedOldRows", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

batchKeys := make([]kv.Key, 0, len(rows))
for _, r := range rows {
for _, uk := range r.uniqueKeys {
if val, found := values[string(uk.newKV.key)]; found {
handle, err := tables.DecodeHandle(val)
if err != nil {
return err
}
batchKeys = append(batchKeys, r.t.RecordKey(handle))
}
}
}
_, err := txn.BatchGet(ctx, batchKeys)
return err
}

func prefetchDataCache(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("prefetchDataCache", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
values, err := prefetchUniqueIndices(ctx, txn, rows)
if err != nil {
return err
}
return prefetchConflictedOldRows(ctx, txn, rows, values)
}

// updateDupRowNew updates a duplicate row to a new row.
func (e *InsertExec) updateDupRowNew(ctx context.Context, txn kv.Transaction, row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) error {
oldRow, err := e.getOldRowNew(ctx, e.ctx, txn, row.t, handle, e.GenExprs)
if err != nil {
return err
}

_, _, _, err = e.doDupRowUpdate(ctx, handle, oldRow, row.row, e.OnDuplicate)
if e.ctx.GetSessionVars().StmtCtx.DupKeyAsWarning && kv.ErrKeyExists.Equal(err) {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(err)
return nil
}
return err
}

func (e *InsertExec) batchUpdateDupRowsNew(ctx context.Context, newRows [][]types.Datum) error {
// Get keys need to be checked.
toBeCheckedRows, err := e.getKeysNeedCheck(ctx, e.ctx, e.Table, newRows)
if err != nil {
return err
}

txn, err := e.ctx.Txn(true)
if err != nil {
return err
}

// Use BatchGet to fill cache.
// It's an optimization and could be removed without affecting correctness.
if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil {
return err
}

for i, r := range toBeCheckedRows {
if r.handleKey != nil {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKV.key)
if err != nil {
return err
}

err = e.updateDupRowNew(ctx, txn, r, handle, e.OnDuplicate)
if err == nil {
continue
}
if !kv.IsErrNotFound(err) {
return err
}
}

for _, uk := range r.uniqueKeys {
val, err := txn.Get(ctx, uk.newKV.key)
if err != nil {
if kv.IsErrNotFound(err) {
continue
}
return err
}
handle, err := tables.DecodeHandle(val)
if err != nil {
return err
}

err = e.updateDupRowNew(ctx, txn, r, handle, e.OnDuplicate)
if err != nil {
if kv.IsErrNotFound(err) {
// Data index inconsistent? A unique key provide the handle information, but the
// handle points to nothing.
logutil.BgLogger().Error("get old row failed when insert on dup",
zap.String("uniqueKey", hex.EncodeToString(uk.newKV.key)),
zap.Int64("handle", handle),
zap.String("toBeInsertedRow", types.DatumsToStrNoErr(r.row)))
}
return err
}

newRows[i] = nil
break
}

// If row was checked with no duplicate keys,
// we should do insert the row,
// and key-values should be filled back to dupOldRowValues for the further row check,
// due to there may be duplicate keys inside the insert statement.
if newRows[i] != nil {
_, err := e.addRecord(ctx, newRows[i])
if err != nil {
return err
}
}
}
return nil
}

// batchUpdateDupRows updates multi-rows in batch if they are duplicate with rows in table.
func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.Datum) error {
err := e.batchGetInsertKeys(ctx, e.ctx, e.Table, newRows)
Expand Down
11 changes: 11 additions & 0 deletions executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,4 +561,15 @@ func (s *testSuite3) TestPartitionInsertOnDuplicate(c *C) {
tk.MustExec(`insert into t2 set a=1,b=1 on duplicate key update a=1,b=1`)
tk.MustQuery(`select * from t2`).Check(testkit.Rows("1 1"))

tk.MustExec(`CREATE TABLE t3 (a int, b int, c int, d int, e int,
PRIMARY KEY (a,b),
UNIQUE KEY (b,c,d)
) PARTITION BY RANGE ( b ) (
PARTITION p0 VALUES LESS THAN (4),
PARTITION p1 VALUES LESS THAN (7),
PARTITION p2 VALUES LESS THAN (11)
)`)
tk.MustExec("insert into t3 values (1,2,3,4,5)")
tk.MustExec("insert into t3 values (1,2,3,4,5),(6,2,3,4,6) on duplicate key update e = e + values(e)")
tk.MustQuery("select * from t3").Check(testkit.Rows("1 2 3 4 16"))
}
33 changes: 33 additions & 0 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ type tikvSnapshot struct {
vars *kv.Variables
replicaRead kv.ReplicaReadType
replicaReadSeed uint32

// Cache the result of BatchGet.
// The invariance is that calling BatchGet multiple times using the same start ts,
// the result should not change.
cached map[string][]byte
}

// newTiKVSnapshot creates a snapshot of an TiKV store.
Expand All @@ -78,7 +83,20 @@ func (s *tikvSnapshot) SetPriority(priority int) {
// BatchGet gets all the keys' value from kv-server and returns a map contains key/value pairs.
// The map will not contain nonexistent keys.
func (s *tikvSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string][]byte, error) {
// Check the cached value first.
m := make(map[string][]byte)
if s.cached != nil {
tmp := keys[:0]
for _, key := range keys {
if val, ok := s.cached[string(key)]; ok {
m[string(key)] = val
} else {
tmp = append(tmp, key)
}
}
keys = tmp
}

if len(keys) == 0 {
return m, nil
}
Expand Down Expand Up @@ -110,6 +128,14 @@ func (s *tikvSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string]
return nil, errors.Trace(err)
}

// Update the cache.
if s.cached == nil {
s.cached = make(map[string][]byte, len(m))
}
for key, value := range m {
s.cached[key] = value
}

return m, nil
}

Expand Down Expand Up @@ -233,6 +259,13 @@ func (s *tikvSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) {
}

func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
// Check the cached values first.
if s.cached != nil {
if value, ok := s.cached[string(k)]; ok {
return value, nil
}
}

sender := NewRegionRequestSender(s.store.regionCache, s.store.client)

req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet,
Expand Down
6 changes: 1 addition & 5 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,11 +696,7 @@ func DecodeRawRowData(ctx sessionctx.Context, meta *model.TableInfo, h int64, co

// Row implements table.Table Row interface.
func (t *tableCommon) Row(ctx sessionctx.Context, h int64) ([]types.Datum, error) {
r, err := t.RowWithCols(ctx, h, t.Cols())
if err != nil {
return nil, err
}
return r, nil
return t.RowWithCols(ctx, h, t.Cols())
}

// RemoveRecord implements table.Table RemoveRecord interface.
Expand Down

0 comments on commit 64298f0

Please sign in to comment.