Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: refactoring the code of batchChecker #12108

Merged
merged 5 commits into from
Sep 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,50 @@ func (b *batchChecker) deleteDupKeys(ctx context.Context, sctx sessionctx.Contex
return nil
}

// getOldRowNew gets the table record row from storage for batch check.
// t could be a normal table or a partition, but it must not be a PartitionedTable.
func (b *batchChecker) getOldRowNew(ctx context.Context, sctx sessionctx.Context, txn kv.Transaction, t table.Table, handle int64,
genExprs []expression.Expression) ([]types.Datum, error) {
oldValue, err := txn.Get(ctx, t.RecordKey(handle))
if err != nil {
return nil, err
}

cols := t.WritableCols()
oldRow, oldRowMap, err := tables.DecodeRawRowData(sctx, t.Meta(), handle, cols, oldValue)
if err != nil {
return nil, err
}
// Fill write-only and write-reorg columns with originDefaultValue if not found in oldValue.
gIdx := 0
for _, col := range cols {
if col.State != model.StatePublic && oldRow[col.Offset].IsNull() {
_, found := oldRowMap[col.ID]
if !found {
oldRow[col.Offset], err = table.GetColOriginDefaultValue(sctx, col.ToInfo())
if err != nil {
return nil, err
}
}
}
if col.IsGenerated() {
// only the virtual column needs fill back.
if !col.GeneratedStored {
val, err := genExprs[gIdx].Eval(chunk.MutRowFromDatums(oldRow).ToRow())
if err != nil {
return nil, err
}
oldRow[col.Offset], err = table.CastValue(sctx, val, col.ToInfo())
if err != nil {
return nil, err
}
}
gIdx++
}
}
return oldRow, nil
}

// getOldRow gets the table record row from storage for batch check.
// t could be a normal table or a partition, but it must not be a PartitionedTable.
func (b *batchChecker) getOldRow(ctx sessionctx.Context, t table.Table, handle int64,
Expand Down
158 changes: 157 additions & 1 deletion executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package executor

import (
"context"
"encoding/hex"
"fmt"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -67,7 +68,7 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
// If `ON DUPLICATE KEY UPDATE` is specified, and no `IGNORE` keyword,
// the to-be-insert rows will be check on duplicate keys and update to the new rows.
if len(e.OnDuplicate) > 0 {
err := e.batchUpdateDupRows(ctx, rows)
err := e.batchUpdateDupRowsNew(ctx, rows)
if err != nil {
return err
}
Expand All @@ -86,6 +87,161 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
return nil
}

func prefetchUniqueIndices(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow) (map[string][]byte, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("prefetchUniqueIndices", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

nKeys := 0
for _, r := range rows {
if r.handleKey != nil {
nKeys++
}
nKeys += len(r.uniqueKeys)
}
batchKeys := make([]kv.Key, 0, nKeys)
for _, r := range rows {
if r.handleKey != nil {
batchKeys = append(batchKeys, r.handleKey.newKV.key)
}
for _, k := range r.uniqueKeys {
batchKeys = append(batchKeys, k.newKV.key)
}
}
return txn.BatchGet(ctx, batchKeys)
}

func prefetchConflictedOldRows(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow, values map[string][]byte) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("prefetchConflictedOldRows", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

batchKeys := make([]kv.Key, 0, len(rows))
for _, r := range rows {
for _, uk := range r.uniqueKeys {
if val, found := values[string(uk.newKV.key)]; found {
handle, err := tables.DecodeHandle(val)
if err != nil {
return err
}
batchKeys = append(batchKeys, r.t.RecordKey(handle))
}
}
}
_, err := txn.BatchGet(ctx, batchKeys)
return err
}

func prefetchDataCache(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("prefetchDataCache", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
values, err := prefetchUniqueIndices(ctx, txn, rows)
if err != nil {
return err
}
return prefetchConflictedOldRows(ctx, txn, rows, values)
}

// updateDupRowNew updates a duplicate row to a new row.
func (e *InsertExec) updateDupRowNew(ctx context.Context, txn kv.Transaction, row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) error {
oldRow, err := e.getOldRowNew(ctx, e.ctx, txn, row.t, handle, e.GenExprs)
if err != nil {
return err
}

_, _, _, err = e.doDupRowUpdate(ctx, handle, oldRow, row.row, e.OnDuplicate)
if e.ctx.GetSessionVars().StmtCtx.DupKeyAsWarning && kv.ErrKeyExists.Equal(err) {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(err)
return nil
}
return err
}

func (e *InsertExec) batchUpdateDupRowsNew(ctx context.Context, newRows [][]types.Datum) error {
// Get keys need to be checked.
toBeCheckedRows, err := e.getKeysNeedCheck(ctx, e.ctx, e.Table, newRows)
if err != nil {
return err
}

txn, err := e.ctx.Txn(true)
if err != nil {
return err
}

// Use BatchGet to fill cache.
// It's an optimization and could be removed without affecting correctness.
if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil {
return err
}

for i, r := range toBeCheckedRows {
if r.handleKey != nil {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKV.key)
if err != nil {
return err
}

err = e.updateDupRowNew(ctx, txn, r, handle, e.OnDuplicate)
if err == nil {
continue
}
if !kv.IsErrNotFound(err) {
return err
}
}

for _, uk := range r.uniqueKeys {
val, err := txn.Get(ctx, uk.newKV.key)
if err != nil {
if kv.IsErrNotFound(err) {
continue
}
return err
}
handle, err := tables.DecodeHandle(val)
if err != nil {
return err
}

err = e.updateDupRowNew(ctx, txn, r, handle, e.OnDuplicate)
if err != nil {
if kv.IsErrNotFound(err) {
// Data index inconsistent? A unique key provide the handle information, but the
// handle points to nothing.
logutil.BgLogger().Error("get old row failed when insert on dup",
zap.String("uniqueKey", hex.EncodeToString(uk.newKV.key)),
zap.Int64("handle", handle),
zap.String("toBeInsertedRow", types.DatumsToStrNoErr(r.row)))
}
return err
}

newRows[i] = nil
break
}

// If row was checked with no duplicate keys,
// we should do insert the row,
// and key-values should be filled back to dupOldRowValues for the further row check,
// due to there may be duplicate keys inside the insert statement.
if newRows[i] != nil {
_, err := e.addRecord(ctx, newRows[i])
if err != nil {
return err
}
}
}
return nil
}

// batchUpdateDupRows updates multi-rows in batch if they are duplicate with rows in table.
func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.Datum) error {
err := e.batchGetInsertKeys(ctx, e.ctx, e.Table, newRows)
Expand Down
11 changes: 11 additions & 0 deletions executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,4 +561,15 @@ func (s *testSuite3) TestPartitionInsertOnDuplicate(c *C) {
tk.MustExec(`insert into t2 set a=1,b=1 on duplicate key update a=1,b=1`)
tk.MustQuery(`select * from t2`).Check(testkit.Rows("1 1"))

tk.MustExec(`CREATE TABLE t3 (a int, b int, c int, d int, e int,
PRIMARY KEY (a,b),
UNIQUE KEY (b,c,d)
) PARTITION BY RANGE ( b ) (
PARTITION p0 VALUES LESS THAN (4),
PARTITION p1 VALUES LESS THAN (7),
PARTITION p2 VALUES LESS THAN (11)
)`)
tk.MustExec("insert into t3 values (1,2,3,4,5)")
tk.MustExec("insert into t3 values (1,2,3,4,5),(6,2,3,4,6) on duplicate key update e = e + values(e)")
tk.MustQuery("select * from t3").Check(testkit.Rows("1 2 3 4 16"))
}
33 changes: 33 additions & 0 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ type tikvSnapshot struct {
vars *kv.Variables
replicaRead kv.ReplicaReadType
replicaReadSeed uint32

// Cache the result of BatchGet.
// The invariance is that calling BatchGet multiple times using the same start ts,
// the result should not change.
cached map[string][]byte
}

// newTiKVSnapshot creates a snapshot of an TiKV store.
Expand All @@ -78,7 +83,20 @@ func (s *tikvSnapshot) SetPriority(priority int) {
// BatchGet gets all the keys' value from kv-server and returns a map contains key/value pairs.
// The map will not contain nonexistent keys.
func (s *tikvSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string][]byte, error) {
// Check the cached value first.
m := make(map[string][]byte)
if s.cached != nil {
tmp := keys[:0]
for _, key := range keys {
if val, ok := s.cached[string(key)]; ok {
m[string(key)] = val
} else {
tmp = append(tmp, key)
}
}
keys = tmp
}

if len(keys) == 0 {
return m, nil
}
Expand Down Expand Up @@ -110,6 +128,14 @@ func (s *tikvSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string]
return nil, errors.Trace(err)
}

// Update the cache.
if s.cached == nil {
s.cached = make(map[string][]byte, len(m))
}
for key, value := range m {
s.cached[key] = value
}

return m, nil
}

Expand Down Expand Up @@ -233,6 +259,13 @@ func (s *tikvSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) {
}

func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
// Check the cached values first.
if s.cached != nil {
if value, ok := s.cached[string(k)]; ok {
return value, nil
}
}

sender := NewRegionRequestSender(s.store.regionCache, s.store.client)

req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet,
Expand Down
6 changes: 1 addition & 5 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,11 +696,7 @@ func DecodeRawRowData(ctx sessionctx.Context, meta *model.TableInfo, h int64, co

// Row implements table.Table Row interface.
func (t *tableCommon) Row(ctx sessionctx.Context, h int64) ([]types.Datum, error) {
r, err := t.RowWithCols(ctx, h, t.Cols())
if err != nil {
return nil, err
}
return r, nil
return t.RowWithCols(ctx, h, t.Cols())
}

// RemoveRecord implements table.Table RemoveRecord interface.
Expand Down