Skip to content

Commit

Permalink
executor: do not use batchChecker in 'insert ignore into ...' (#12122) (
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored and sre-bot committed Sep 25, 2019
1 parent 1f7ba4c commit c5fdec7
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 119 deletions.
46 changes: 46 additions & 0 deletions executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package executor

import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -328,6 +330,50 @@ func (b *batchChecker) getOldRowNew(sctx sessionctx.Context, txn kv.Transaction,
return oldRow, nil
}

// getOldRow gets the table record row from storage for batch check.
// t could be a normal table or a partition, but it must not be a PartitionedTable.
func getOldRow(ctx context.Context, sctx sessionctx.Context, txn kv.Transaction, t table.Table, handle int64,
genExprs []expression.Expression) ([]types.Datum, error) {
oldValue, err := txn.Get(t.RecordKey(handle))
if err != nil {
return nil, err
}

cols := t.WritableCols()
oldRow, oldRowMap, err := tables.DecodeRawRowData(sctx, t.Meta(), handle, cols, oldValue)
if err != nil {
return nil, err
}
// Fill write-only and write-reorg columns with originDefaultValue if not found in oldValue.
gIdx := 0
for _, col := range cols {
if col.State != model.StatePublic && oldRow[col.Offset].IsNull() {
_, found := oldRowMap[col.ID]
if !found {
oldRow[col.Offset], err = table.GetColOriginDefaultValue(sctx, col.ToInfo())
if err != nil {
return nil, err
}
}
}
if col.IsGenerated() {
// only the virtual column needs fill back.
if !col.GeneratedStored {
val, err := genExprs[gIdx].Eval(chunk.MutRowFromDatums(oldRow).ToRow())
if err != nil {
return nil, err
}
oldRow[col.Offset], err = table.CastValue(sctx, val, col.ToInfo())
if err != nil {
return nil, err
}
}
gIdx++
}
}
return oldRow, nil
}

// getOldRow gets the table record row from storage for batch check.
// t could be a normal table or a partition, but it must not be a PartitionedTable.
func (b *batchChecker) getOldRow(ctx sessionctx.Context, t table.Table, handle int64,
Expand Down
117 changes: 10 additions & 107 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
// If `ON DUPLICATE KEY UPDATE` is specified, and no `IGNORE` keyword,
// the to-be-insert rows will be check on duplicate keys and update to the new rows.
if len(e.OnDuplicate) > 0 {
err := e.batchUpdateDupRowsNew(ctx, rows)
err := e.batchUpdateDupRows(ctx, rows)
if err != nil {
return err
}
Expand All @@ -79,7 +79,7 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
return nil
}

func prefetchUniqueIndices(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow) (map[string][]byte, error) {
func prefetchUniqueIndices(txn kv.Transaction, rows []toBeCheckedRow) (map[string][]byte, error) {
nKeys := 0
for _, r := range rows {
if r.handleKey != nil {
Expand Down Expand Up @@ -122,16 +122,16 @@ func prefetchDataCache(ctx context.Context, txn kv.Transaction, rows []toBeCheck
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
values, err := prefetchUniqueIndices(ctx, txn, rows)
values, err := prefetchUniqueIndices(txn, rows)
if err != nil {
return err
}
return prefetchConflictedOldRows(ctx, txn, rows, values)
}

// updateDupRowNew updates a duplicate row to a new row.
func (e *InsertExec) updateDupRowNew(ctx context.Context, txn kv.Transaction, row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) error {
oldRow, err := e.getOldRowNew(e.ctx, txn, row.t, handle, e.GenExprs)
// updateDupRow updates a duplicate row to a new row.
func (e *InsertExec) updateDupRow(ctx context.Context, txn kv.Transaction, row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) error {
oldRow, err := getOldRow(ctx, e.ctx, txn, row.t, handle, e.GenExprs)
if err != nil {
return err
}
Expand All @@ -144,7 +144,8 @@ func (e *InsertExec) updateDupRowNew(ctx context.Context, txn kv.Transaction, ro
return err
}

func (e *InsertExec) batchUpdateDupRowsNew(ctx context.Context, newRows [][]types.Datum) error {
// batchUpdateDupRows updates multi-rows in batch if they are duplicate with rows in table.
func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.Datum) error {
// Get keys need to be checked.
toBeCheckedRows, err := e.getKeysNeedCheck(e.ctx, e.Table, newRows)
if err != nil {
Expand All @@ -169,7 +170,7 @@ func (e *InsertExec) batchUpdateDupRowsNew(ctx context.Context, newRows [][]type
return err
}

err = e.updateDupRowNew(ctx, txn, r, handle, e.OnDuplicate)
err = e.updateDupRow(ctx, txn, r, handle, e.OnDuplicate)
if err == nil {
continue
}
Expand All @@ -191,7 +192,7 @@ func (e *InsertExec) batchUpdateDupRowsNew(ctx context.Context, newRows [][]type
return err
}

err = e.updateDupRowNew(ctx, txn, r, handle, e.OnDuplicate)
err = e.updateDupRow(ctx, txn, r, handle, e.OnDuplicate)
if err != nil {
if kv.IsErrNotFound(err) {
// Data index inconsistent? A unique key provide the handle information, but the
Expand Down Expand Up @@ -222,62 +223,6 @@ func (e *InsertExec) batchUpdateDupRowsNew(ctx context.Context, newRows [][]type
return nil
}

// batchUpdateDupRows updates multi-rows in batch if they are duplicate with rows in table.
func (e *InsertExec) batchUpdateDupRows(newRows [][]types.Datum) error {
err := e.batchGetInsertKeys(e.ctx, e.Table, newRows)
if err != nil {
return err
}

// Batch get the to-be-updated rows in storage.
err = e.initDupOldRowValue(e.ctx, e.Table, newRows)
if err != nil {
return err
}

for i, r := range e.toBeCheckedRows {
if r.handleKey != nil {
if _, found := e.dupKVs[string(r.handleKey.newKV.key)]; found {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKV.key)
if err != nil {
return err
}
err = e.updateDupRow(r, handle, e.OnDuplicate)
if err != nil {
return err
}
continue
}
}
for _, uk := range r.uniqueKeys {
if val, found := e.dupKVs[string(uk.newKV.key)]; found {
handle, err := tables.DecodeHandle(val)
if err != nil {
return err
}
err = e.updateDupRow(r, handle, e.OnDuplicate)
if err != nil {
return err
}
newRows[i] = nil
break
}
}
// If row was checked with no duplicate keys,
// we should do insert the row,
// and key-values should be filled back to dupOldRowValues for the further row check,
// due to there may be duplicate keys inside the insert statement.
if newRows[i] != nil {
newHandle, err := e.addRecord(newRows[i])
if err != nil {
return err
}
e.fillBackKeys(e.Table, r, newHandle)
}
}
return nil
}

// Next implements the Executor Next interface.
func (e *InsertExec) Next(ctx context.Context, req *chunk.Chunk) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
Expand Down Expand Up @@ -311,25 +256,6 @@ func (e *InsertExec) Open(ctx context.Context) error {
return nil
}

// updateDupRow updates a duplicate row to a new row.
func (e *InsertExec) updateDupRow(row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) error {
oldRow, err := e.getOldRow(e.ctx, row.t, handle, e.GenExprs)
if err != nil {
logutil.Logger(context.Background()).Error("get old row failed when insert on dup", zap.Int64("handle", handle), zap.String("toBeInsertedRow", types.DatumsToStrNoErr(row.row)))
return err
}
// Do update row.
updatedRow, handleChanged, newHandle, err := e.doDupRowUpdate(handle, oldRow, row.row, onDuplicate)
if e.ctx.GetSessionVars().StmtCtx.DupKeyAsWarning && kv.ErrKeyExists.Equal(err) {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(err)
return nil
}
if err != nil {
return err
}
return e.updateDupKeyValues(handle, newHandle, handleChanged, oldRow, updatedRow)
}

// doDupRowUpdate updates the duplicate row.
func (e *InsertExec) doDupRowUpdate(handle int64, oldRow []types.Datum, newRow []types.Datum,
cols []*expression.Assignment) ([]types.Datum, bool, int64, error) {
Expand Down Expand Up @@ -362,29 +288,6 @@ func (e *InsertExec) doDupRowUpdate(handle int64, oldRow []types.Datum, newRow [
return newData, handleChanged, newHandle, nil
}

// updateDupKeyValues updates the dupKeyValues for further duplicate key check.
func (e *InsertExec) updateDupKeyValues(oldHandle int64, newHandle int64,
handleChanged bool, oldRow []types.Datum, updatedRow []types.Datum) error {
// There is only one row per update.
fillBackKeysInRows, err := e.getKeysNeedCheck(e.ctx, e.Table, [][]types.Datum{updatedRow})
if err != nil {
return err
}
// Delete old keys and fill back new key-values of the updated row.
err = e.deleteDupKeys(e.ctx, e.Table, [][]types.Datum{oldRow})
if err != nil {
return err
}

if handleChanged {
delete(e.dupOldRowValues, string(e.Table.RecordKey(oldHandle)))
e.fillBackKeys(e.Table, fillBackKeysInRows[0], newHandle)
} else {
e.fillBackKeys(e.Table, fillBackKeysInRows[0], oldHandle)
}
return nil
}

// setMessage sets info message(ERR_INSERT_INFO) generated by INSERT statement
func (e *InsertExec) setMessage() {
stmtCtx := e.ctx.GetSessionVars().StmtCtx
Expand Down
38 changes: 26 additions & 12 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,26 +551,46 @@ func (e *InsertValues) handleWarning(err error) {
func (e *InsertValues) batchCheckAndInsert(rows [][]types.Datum, addRecord func(row []types.Datum) (int64, error)) error {
// all the rows will be checked, so it is safe to set BatchCheck = true
e.ctx.GetSessionVars().StmtCtx.BatchCheck = true
err := e.batchGetInsertKeys(e.ctx, e.Table, rows)

// Get keys need to be checked.
toBeCheckedRows, err := e.getKeysNeedCheck(e.ctx, e.Table, rows)
if err != nil {
return err
}
// append warnings and get no duplicated error rows
for i, r := range e.toBeCheckedRows {

txn, err := e.ctx.Txn(true)
if err != nil {
return err
}

// Fill cache using BatchGet, the following Get requests don't need to visit TiKV.
if _, err = prefetchUniqueIndices(txn, toBeCheckedRows); err != nil {
return err
}

for i, r := range toBeCheckedRows {
// skip := false
if r.handleKey != nil {
if _, found := e.dupKVs[string(r.handleKey.newKV.key)]; found {
rows[i] = nil
_, err := txn.Get(r.handleKey.newKV.key)
if err == nil {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(r.handleKey.dupErr)
continue
}
if !kv.IsErrNotFound(err) {
return err
}
}
for _, uk := range r.uniqueKeys {
if _, found := e.dupKVs[string(uk.newKV.key)]; found {
_, err := txn.Get(uk.newKV.key)
if err == nil {
// If duplicate keys were found in BatchGet, mark row = nil.
rows[i] = nil
e.ctx.GetSessionVars().StmtCtx.AppendWarning(uk.dupErr)
break
}
if !kv.IsErrNotFound(err) {
return err
}
}
// If row was checked with no duplicate keys,
// it should be add to values map for the further row check.
Expand All @@ -581,12 +601,6 @@ func (e *InsertValues) batchCheckAndInsert(rows [][]types.Datum, addRecord func(
if err != nil {
return err
}
if r.handleKey != nil {
e.dupKVs[string(r.handleKey.newKV.key)] = r.handleKey.newKV.value
}
for _, uk := range r.uniqueKeys {
e.dupKVs[string(uk.newKV.key)] = []byte{}
}
}
}
return nil
Expand Down

0 comments on commit c5fdec7

Please sign in to comment.