Skip to content

Commit

Permalink
executor: refactoring the 'replace' implementation, remove its depend…
Browse files Browse the repository at this point in the history
…ency of batch checker (#12319) (#13600)
  • Loading branch information
tiancaiamao authored and jackysp committed Nov 20, 2019
1 parent c700fd2 commit dc3ec0b
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 35 deletions.
93 changes: 59 additions & 34 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"fmt"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -53,13 +55,19 @@ func (e *ReplaceExec) Open(ctx context.Context) error {

// removeRow removes the duplicate row and cleanup its keys in the key-value map,
// but if the to-be-removed row equals to the to-be-added row, no remove or add things to do.
func (e *ReplaceExec) removeRow(handle int64, r toBeCheckedRow) (bool, error) {
func (e *ReplaceExec) removeRow(ctx context.Context, txn kv.Transaction, handle int64, r toBeCheckedRow) (bool, error) {
newRow := r.row
oldRow, err := e.batchChecker.getOldRow(e.ctx, r.t, handle, e.GenExprs)
oldRow, err := getOldRow(ctx, e.ctx, txn, r.t, handle, e.GenExprs)
if err != nil {
logutil.Logger(context.Background()).Error("get old row failed when replace", zap.Int64("handle", handle), zap.String("toBeInsertedRow", types.DatumsToStrNoErr(r.row)))
logutil.Logger(context.Background()).Error("get old row failed when replace",
zap.Int64("handle", handle),
zap.String("toBeInsertedRow", types.DatumsToStrNoErr(r.row)))
if kv.IsErrNotFound(err) {
err = errors.NotFoundf("can not be duplicated row, due to old row not found. handle %d", handle)
}
return false, err
}

rowUnchanged, err := types.EqualDatums(e.ctx.GetSessionVars().StmtCtx, oldRow, newRow)
if err != nil {
return false, err
Expand All @@ -74,36 +82,40 @@ func (e *ReplaceExec) removeRow(handle int64, r toBeCheckedRow) (bool, error) {
return false, err
}
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)

// Cleanup keys map, because the record was removed.
err = e.deleteDupKeys(e.ctx, r.t, [][]types.Datum{oldRow})
if err != nil {
return false, err
}
return false, nil
}

// replaceRow removes all duplicate rows for one row, then inserts it.
func (e *ReplaceExec) replaceRow(r toBeCheckedRow) error {
func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error {
txn, err := e.ctx.Txn(true)
if err != nil {
return err
}

if r.handleKey != nil {
if _, found := e.dupKVs[string(r.handleKey.newKV.key)]; found {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKV.key)
if err != nil {
return err
}
rowUnchanged, err := e.removeRow(handle, r)
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKV.key)
if err != nil {
return err
}

if _, err := txn.Get(r.handleKey.newKV.key); err == nil {
rowUnchanged, err := e.removeRow(ctx, txn, handle, r)
if err != nil {
return err
}
if rowUnchanged {
return nil
}
} else {
if !kv.IsErrNotFound(err) {
return err
}
}
}

// Keep on removing duplicated rows.
for {
rowUnchanged, foundDupKey, err := e.removeIndexRow(r)
rowUnchanged, foundDupKey, err := e.removeIndexRow(ctx, txn, r)
if err != nil {
return err
}
Expand All @@ -117,11 +129,10 @@ func (e *ReplaceExec) replaceRow(r toBeCheckedRow) error {
}

// No duplicated rows now, insert the row.
newHandle, err := e.addRecord(r.row)
_, err = e.addRecord(r.row)
if err != nil {
return err
}
e.fillBackKeys(r.t, r, newHandle)
return nil
}

Expand All @@ -131,19 +142,25 @@ func (e *ReplaceExec) replaceRow(r toBeCheckedRow) error {
// 2. bool: true when found the duplicated key. This only means that duplicated key was found,
// and the row was removed.
// 3. error: the error.
func (e *ReplaceExec) removeIndexRow(r toBeCheckedRow) (bool, bool, error) {
func (e *ReplaceExec) removeIndexRow(ctx context.Context, txn kv.Transaction, r toBeCheckedRow) (bool, bool, error) {
for _, uk := range r.uniqueKeys {
if val, found := e.dupKVs[string(uk.newKV.key)]; found {
handle, err := tables.DecodeHandle(val)
if err != nil {
return false, found, err
}
rowUnchanged, err := e.removeRow(handle, r)
if err != nil {
return false, found, err
val, err := txn.Get(uk.newKV.key)
if err != nil {
if kv.IsErrNotFound(err) {
continue
}
return rowUnchanged, found, nil
return false, false, err
}

handle, err := tables.DecodeHandle(val)
if err != nil {
return false, true, err
}
rowUnchanged, err := e.removeRow(ctx, txn, handle, r)
if err != nil {
return false, true, err
}
return rowUnchanged, true, nil
}
return false, false, nil
}
Expand All @@ -161,19 +178,27 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error {
* because in this case, one row was inserted after the duplicate was deleted.
* See http://dev.mysql.com/doc/refman/5.7/en/mysql-affected-rows.html
*/
err := e.batchGetInsertKeys(e.ctx, e.Table, newRows)

// Get keys need to be checked.
toBeCheckedRows, err := e.getKeysNeedCheck(e.ctx, e.Table, newRows)
if err != nil {
return err
}

// Batch get the to-be-replaced rows in storage.
err = e.initDupOldRowValue(e.ctx, e.Table, newRows)
txn, err := e.ctx.Txn(true)
if err != nil {
return err
}

// Use BatchGet to fill cache.
// It's an optimization and could be removed without affecting correctness.
if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil {
return err
}

e.ctx.GetSessionVars().StmtCtx.AddRecordRows(uint64(len(newRows)))
for _, r := range e.toBeCheckedRows {
err = e.replaceRow(r)
for _, r := range toBeCheckedRows {
err = e.replaceRow(ctx, r)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d h1:GoAlyOgbOEIFd
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/remyoudompheng/bigfft v0.0.0-20190512091148-babf20351dd7 h1:FUL3b97ZY2EPqg2NbXKuMHs5pXJB9hjj1fDHnF2vl28=
github.com/remyoudompheng/bigfft v0.0.0-20190512091148-babf20351dd7/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44 h1:tB9NOR21++IjLyVx3/PCPhWMwqGNCMQEH96A6dMZ/gc=
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shirou/gopsutil v2.18.10+incompatible h1:cy84jW6EVRPa5g9HAHrlbxMSIjBhDSX0OFYyMYminYs=
github.com/shirou/gopsutil v2.18.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
Expand Down
2 changes: 1 addition & 1 deletion planner/core/rule_partition_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (s *partitionProcessor) prune(ds *DataSource) (LogicalPlan, error) {
children := make([]LogicalPlan, 0, len(pi.Definitions))
for i, expr := range partitionExprs {
// If the select condition would never be satisified, prune that partition.
pruned, err := s.canBePruned(ds.context(), col, expr, ds.allConds)
pruned, err := s.canBePruned(ds.context(), col, expr.Clone(), ds.allConds)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit dc3ec0b

Please sign in to comment.