Skip to content

Commit

Permalink
executor: do not use batchChecker in 'insert ignore into ...' (#12122)
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored Sep 10, 2019
1 parent add1023 commit 213b783
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 121 deletions.
44 changes: 44 additions & 0 deletions executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,50 @@ func (b *batchChecker) getOldRowNew(ctx context.Context, sctx sessionctx.Context
return oldRow, nil
}

// getOldRow gets the table record row from storage for batch check.
// t could be a normal table or a partition, but it must not be a PartitionedTable.
func getOldRow(ctx context.Context, sctx sessionctx.Context, txn kv.Transaction, t table.Table, handle int64,
genExprs []expression.Expression) ([]types.Datum, error) {
oldValue, err := txn.Get(ctx, t.RecordKey(handle))
if err != nil {
return nil, err
}

cols := t.WritableCols()
oldRow, oldRowMap, err := tables.DecodeRawRowData(sctx, t.Meta(), handle, cols, oldValue)
if err != nil {
return nil, err
}
// Fill write-only and write-reorg columns with originDefaultValue if not found in oldValue.
gIdx := 0
for _, col := range cols {
if col.State != model.StatePublic && oldRow[col.Offset].IsNull() {
_, found := oldRowMap[col.ID]
if !found {
oldRow[col.Offset], err = table.GetColOriginDefaultValue(sctx, col.ToInfo())
if err != nil {
return nil, err
}
}
}
if col.IsGenerated() {
// only the virtual column needs fill back.
if !col.GeneratedStored {
val, err := genExprs[gIdx].Eval(chunk.MutRowFromDatums(oldRow).ToRow())
if err != nil {
return nil, err
}
oldRow[col.Offset], err = table.CastValue(sctx, val, col.ToInfo())
if err != nil {
return nil, err
}
}
gIdx++
}
}
return oldRow, nil
}

// getOldRow gets the table record row from storage for batch check.
// t could be a normal table or a partition, but it must not be a PartitionedTable.
func (b *batchChecker) getOldRow(ctx sessionctx.Context, t table.Table, handle int64,
Expand Down
119 changes: 8 additions & 111 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
// If `ON DUPLICATE KEY UPDATE` is specified, and no `IGNORE` keyword,
// the to-be-insert rows will be check on duplicate keys and update to the new rows.
if len(e.OnDuplicate) > 0 {
err := e.batchUpdateDupRowsNew(ctx, rows)
err := e.batchUpdateDupRows(ctx, rows)
if err != nil {
return err
}
Expand Down Expand Up @@ -149,9 +149,9 @@ func prefetchDataCache(ctx context.Context, txn kv.Transaction, rows []toBeCheck
return prefetchConflictedOldRows(ctx, txn, rows, values)
}

// updateDupRowNew updates a duplicate row to a new row.
func (e *InsertExec) updateDupRowNew(ctx context.Context, txn kv.Transaction, row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) error {
oldRow, err := e.getOldRowNew(ctx, e.ctx, txn, row.t, handle, e.GenExprs)
// updateDupRow updates a duplicate row to a new row.
func (e *InsertExec) updateDupRow(ctx context.Context, txn kv.Transaction, row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) error {
oldRow, err := getOldRow(ctx, e.ctx, txn, row.t, handle, e.GenExprs)
if err != nil {
return err
}
Expand All @@ -164,7 +164,8 @@ func (e *InsertExec) updateDupRowNew(ctx context.Context, txn kv.Transaction, ro
return err
}

func (e *InsertExec) batchUpdateDupRowsNew(ctx context.Context, newRows [][]types.Datum) error {
// batchUpdateDupRows updates multi-rows in batch if they are duplicate with rows in table.
func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.Datum) error {
// Get keys need to be checked.
toBeCheckedRows, err := e.getKeysNeedCheck(ctx, e.ctx, e.Table, newRows)
if err != nil {
Expand All @@ -189,7 +190,7 @@ func (e *InsertExec) batchUpdateDupRowsNew(ctx context.Context, newRows [][]type
return err
}

err = e.updateDupRowNew(ctx, txn, r, handle, e.OnDuplicate)
err = e.updateDupRow(ctx, txn, r, handle, e.OnDuplicate)
if err == nil {
continue
}
Expand All @@ -211,7 +212,7 @@ func (e *InsertExec) batchUpdateDupRowsNew(ctx context.Context, newRows [][]type
return err
}

err = e.updateDupRowNew(ctx, txn, r, handle, e.OnDuplicate)
err = e.updateDupRow(ctx, txn, r, handle, e.OnDuplicate)
if err != nil {
if kv.IsErrNotFound(err) {
// Data index inconsistent? A unique key provide the handle information, but the
Expand Down Expand Up @@ -242,62 +243,6 @@ func (e *InsertExec) batchUpdateDupRowsNew(ctx context.Context, newRows [][]type
return nil
}

// batchUpdateDupRows updates multi-rows in batch if they are duplicate with rows in table.
func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.Datum) error {
err := e.batchGetInsertKeys(ctx, e.ctx, e.Table, newRows)
if err != nil {
return err
}

// Batch get the to-be-updated rows in storage.
err = e.initDupOldRowValue(ctx, e.ctx, e.Table, newRows)
if err != nil {
return err
}

for i, r := range e.toBeCheckedRows {
if r.handleKey != nil {
if _, found := e.dupKVs[string(r.handleKey.newKV.key)]; found {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKV.key)
if err != nil {
return err
}
err = e.updateDupRow(ctx, r, handle, e.OnDuplicate)
if err != nil {
return err
}
continue
}
}
for _, uk := range r.uniqueKeys {
if val, found := e.dupKVs[string(uk.newKV.key)]; found {
handle, err := tables.DecodeHandle(val)
if err != nil {
return err
}
err = e.updateDupRow(ctx, r, handle, e.OnDuplicate)
if err != nil {
return err
}
newRows[i] = nil
break
}
}
// If row was checked with no duplicate keys,
// we should do insert the row,
// and key-values should be filled back to dupOldRowValues for the further row check,
// due to there may be duplicate keys inside the insert statement.
if newRows[i] != nil {
newHandle, err := e.addRecord(ctx, newRows[i])
if err != nil {
return err
}
e.fillBackKeys(e.Table, r, newHandle)
}
}
return nil
}

// Next implements the Executor Next interface.
func (e *InsertExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.Reset()
Expand Down Expand Up @@ -328,31 +273,6 @@ func (e *InsertExec) Open(ctx context.Context) error {
return nil
}

// updateDupRow updates a duplicate row to a new row.
func (e *InsertExec) updateDupRow(ctx context.Context, row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("InsertExec.updateDupRow", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

oldRow, err := e.getOldRow(e.ctx, row.t, handle, e.GenExprs)
if err != nil {
logutil.BgLogger().Error("get old row failed when insert on dup", zap.Int64("handle", handle), zap.String("toBeInsertedRow", types.DatumsToStrNoErr(row.row)))
return err
}
// Do update row.
updatedRow, handleChanged, newHandle, err := e.doDupRowUpdate(ctx, handle, oldRow, row.row, onDuplicate)
if e.ctx.GetSessionVars().StmtCtx.DupKeyAsWarning && kv.ErrKeyExists.Equal(err) {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(err)
return nil
}
if err != nil {
return err
}
return e.updateDupKeyValues(ctx, handle, newHandle, handleChanged, oldRow, updatedRow)
}

// doDupRowUpdate updates the duplicate row.
func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle int64, oldRow []types.Datum, newRow []types.Datum,
cols []*expression.Assignment) ([]types.Datum, bool, int64, error) {
Expand Down Expand Up @@ -385,29 +305,6 @@ func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle int64, oldRow []
return newData, handleChanged, newHandle, nil
}

// updateDupKeyValues updates the dupKeyValues for further duplicate key check.
func (e *InsertExec) updateDupKeyValues(ctx context.Context, oldHandle int64, newHandle int64,
handleChanged bool, oldRow []types.Datum, updatedRow []types.Datum) error {
// There is only one row per update.
fillBackKeysInRows, err := e.getKeysNeedCheck(ctx, e.ctx, e.Table, [][]types.Datum{updatedRow})
if err != nil {
return err
}
// Delete old keys and fill back new key-values of the updated row.
err = e.deleteDupKeys(ctx, e.ctx, e.Table, [][]types.Datum{oldRow})
if err != nil {
return err
}

if handleChanged {
delete(e.dupOldRowValues, string(e.Table.RecordKey(oldHandle)))
e.fillBackKeys(e.Table, fillBackKeysInRows[0], newHandle)
} else {
e.fillBackKeys(e.Table, fillBackKeysInRows[0], oldHandle)
}
return nil
}

// setMessage sets info message(ERR_INSERT_INFO) generated by INSERT statement
func (e *InsertExec) setMessage() {
stmtCtx := e.ctx.GetSessionVars().StmtCtx
Expand Down
35 changes: 25 additions & 10 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,26 +618,47 @@ func (e *InsertValues) handleWarning(err error) {
func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.Datum, addRecord func(ctx context.Context, row []types.Datum) (int64, error)) error {
// all the rows will be checked, so it is safe to set BatchCheck = true
e.ctx.GetSessionVars().StmtCtx.BatchCheck = true
err := e.batchGetInsertKeys(ctx, e.ctx, e.Table, rows)

// Get keys need to be checked.
toBeCheckedRows, err := e.getKeysNeedCheck(ctx, e.ctx, e.Table, rows)
if err != nil {
return err
}

txn, err := e.ctx.Txn(true)
if err != nil {
return err
}

// Fill cache using BatchGet, the following Get requests don't need to visit TiKV.
if _, err = prefetchUniqueIndices(ctx, txn, toBeCheckedRows); err != nil {
return err
}

// append warnings and get no duplicated error rows
for i, r := range e.toBeCheckedRows {
for i, r := range toBeCheckedRows {
skip := false
if r.handleKey != nil {
if _, found := e.dupKVs[string(r.handleKey.newKV.key)]; found {
_, err := txn.Get(ctx, r.handleKey.newKV.key)
if err == nil {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(r.handleKey.dupErr)
continue
}
if !kv.IsErrNotFound(err) {
return err
}
}
for _, uk := range r.uniqueKeys {
if _, found := e.dupKVs[string(uk.newKV.key)]; found {
_, err := txn.Get(ctx, uk.newKV.key)
if err == nil {
// If duplicate keys were found in BatchGet, mark row = nil.
e.ctx.GetSessionVars().StmtCtx.AppendWarning(uk.dupErr)
skip = true
break
}
if !kv.IsErrNotFound(err) {
return err
}
}
// If row was checked with no duplicate keys,
// it should be add to values map for the further row check.
Expand All @@ -648,12 +669,6 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
if err != nil {
return err
}
if r.handleKey != nil {
e.dupKVs[string(r.handleKey.newKV.key)] = r.handleKey.newKV.value
}
for _, uk := range r.uniqueKeys {
e.dupKVs[string(uk.newKV.key)] = []byte{}
}
}
}
return nil
Expand Down

0 comments on commit 213b783

Please sign in to comment.