Skip to content

Commit

Permalink
(cherry-pick) executer: optimization for insert ignore (#5738)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackysp authored and coocood committed Jan 29, 2018
1 parent 4913f64 commit 9588cec
Show file tree
Hide file tree
Showing 15 changed files with 230 additions and 31 deletions.
3 changes: 2 additions & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,8 @@ func (w *worker) doBackfillIndexTaskInTxn(t table.Table, txn kv.Transaction, col
}

// Create the index.
handle, err := w.index.Create(txn, idxRecord.vals, idxRecord.handle)
handle, err := w.index.Create(w.ctx, txn, idxRecord.vals,
idxRecord.handle)
if err != nil {
if kv.ErrKeyExists.Equal(err) && idxRecord.handle == handle {
// Index already exists, skip it.
Expand Down
4 changes: 3 additions & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tidb/util/testutil"
Expand Down Expand Up @@ -189,7 +190,8 @@ func (s *testSuite) TestAdmin(c *C) {
tb, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("admin_test"))
c.Assert(err, IsNil)
c.Assert(tb.Indices(), HasLen, 1)
_, err = tb.Indices()[0].Create(txn, types.MakeDatums(int64(10)), 1)
_, err = tb.Indices()[0].Create(mock.NewContext(), txn,
types.MakeDatums(int64(10)), 1)
c.Assert(err, IsNil)
err = txn.Commit()
c.Assert(err, IsNil)
Expand Down
152 changes: 145 additions & 7 deletions executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,24 @@ func (e *InsertExec) Next() (Row, error) {

txn := e.ctx.Txn()
rowCount := 0

// If you use the IGNORE keyword, duplicate-key error that occurs while executing the INSERT statement are ignored.
// For example, without IGNORE, a row that duplicates an existing UNIQUE index or PRIMARY KEY value in
// the table causes a duplicate-key error and the statement is aborted. With IGNORE, the row is discarded and no error occurs.
// Using BatchGet in insert ignore to mark rows as duplicated before we add records to the table.
if e.IgnoreErr {
var err error
rows, err = e.batchMarkDupRows(rows)
if err != nil {
return nil, errors.Trace(err)
}
}
for _, row := range rows {
// duplicate row will be marked as nil in batchMarkDupRows if
// IgnoreErr is true. For IgnoreErr is false, it is a protection.
if row == nil {
continue
}
if batchInsert && rowCount >= batchSize {
if err := e.ctx.NewTxn(); err != nil {
// We should return a special error for batch insert.
Expand All @@ -770,13 +787,6 @@ func (e *InsertExec) Next() (Row, error) {
}

if kv.ErrKeyExists.Equal(err) {
// If you use the IGNORE keyword, duplicate-key error that occurs while executing the INSERT statement are ignored.
// For example, without IGNORE, a row that duplicates an existing UNIQUE index or PRIMARY KEY value in
// the table causes a duplicate-key error and the statement is aborted. With IGNORE, the row is discarded and no error occurs.
if e.IgnoreErr {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(err)
continue
}
if len(e.OnDuplicate) > 0 {
if err = e.onDuplicateUpdate(row, h, e.OnDuplicate); err != nil {
return nil, errors.Trace(err)
Expand All @@ -795,6 +805,134 @@ func (e *InsertExec) Next() (Row, error) {
return nil, nil
}

type keyWithDupError struct {
key kv.Key
dupErr error
}

func (e *InsertExec) getRecordIDs(rows [][]types.Datum) ([]int64, error) {
recordIDs := make([]int64, 0, len(rows))
if e.Table.Meta().PKIsHandle {
var handleCol *table.Column
for _, col := range e.Table.Cols() {
if col.IsPKHandleColumn(e.Table.Meta()) {
handleCol = col
break
}
}
for _, row := range rows {
recordIDs = append(recordIDs, row[handleCol.Offset].GetInt64())
}
} else {
for range rows {
recordID, err := e.Table.AllocAutoID(e.ctx)
if err != nil {
return nil, errors.Trace(err)
}
recordIDs = append(recordIDs, recordID)
}
}
return recordIDs, nil
}

// getKeysNeedCheck gets keys converted from to-be-insert rows to record keys and unique index keys,
// which need to be checked whether they are duplicate keys.
func (e *InsertExec) getKeysNeedCheck(rows [][]types.Datum) ([][]keyWithDupError, error) {
nUnique := 0
for _, v := range e.Table.WritableIndices() {
if v.Meta().Unique {
nUnique++
}
}
rowWithKeys := make([][]keyWithDupError, 0, len(rows))

// get recordIDs
recordIDs, err := e.getRecordIDs(rows)
if err != nil {
return nil, errors.Trace(err)
}

for i, row := range rows {
keysWithErr := make([]keyWithDupError, 0, nUnique+1)
// append record keys and errors
if e.Table.Meta().PKIsHandle {
keysWithErr = append(keysWithErr, keyWithDupError{e.Table.RecordKey(recordIDs[i]), kv.ErrKeyExists.FastGen("Duplicate entry '%d' for key 'PRIMARY'", recordIDs[i])})
}

// append unique keys and errors
for _, v := range e.Table.WritableIndices() {
if !v.Meta().Unique {
continue
}
var colVals []types.Datum
colVals, err = v.FetchValues(row)
if err != nil {
return nil, errors.Trace(err)
}
var key []byte
var distinct bool
key, distinct, err = v.GenIndexKey(colVals, recordIDs[i])
if err != nil {
return nil, errors.Trace(err)
}
if !distinct {
continue
}
keysWithErr = append(keysWithErr, keyWithDupError{key, kv.ErrKeyExists.FastGen("Duplicate entry '%d' for key '%s'", recordIDs[i], v.Meta().Name)})
}
rowWithKeys = append(rowWithKeys, keysWithErr)
}
return rowWithKeys, nil
}

// batchMarkDupRows marks rows with duplicate errors as nil.
// All duplicate rows were marked and appended as duplicate warnings
// to the statement context in batch.
func (e *InsertExec) batchMarkDupRows(rows [][]types.Datum) ([][]types.Datum, error) {
// get keys need to be checked
rowWithKeys, err := e.getKeysNeedCheck(rows)

// batch get values
nKeys := 0
for _, v := range rowWithKeys {
nKeys += len(v)
}
batchKeys := make([]kv.Key, 0, nKeys)
for _, v := range rowWithKeys {
for _, k := range v {
batchKeys = append(batchKeys, k.key)
}
}
values, err := e.ctx.Txn().GetSnapshot().BatchGet(batchKeys)
if err != nil {
return nil, errors.Trace(err)
}

// append warnings and get no duplicated error rows
for i, v := range rowWithKeys {
for _, k := range v {
if _, found := values[string(k.key)]; found {
// If duplicate keys were found in BatchGet, mark row = nil.
rows[i] = nil
e.ctx.GetSessionVars().StmtCtx.AppendWarning(k.dupErr)
break
}
}
// If row was checked with no duplicate keys,
// it should be add to values map for the further row check.
// There may be duplicate keys inside the insert statement.
if rows[i] != nil {
for _, k := range v {
values[string(k.key)] = []byte{}
}
}
}

// this statement was already been checked
e.ctx.GetSessionVars().StmtCtx.BatchCheck = true
return rows, nil
}

// Close implements the Executor Close interface.
func (e *InsertExec) Close() error {
e.ctx.GetSessionVars().CurrInsertValues = nil
Expand Down
16 changes: 13 additions & 3 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (s *testSuite) TestInsertIgnore(c *C) {
tk := testkit.NewTestKit(c, kv.NewInjectedStore(s.store, &cfg))
tk.MustExec("use test")
testSQL := `drop table if exists t;
create table t (id int PRIMARY KEY AUTO_INCREMENT, c1 int);`
create table t (id int PRIMARY KEY AUTO_INCREMENT, c1 int unique key);`
tk.MustExec(testSQL)
testSQL = `insert into t values (1, 2);`
tk.MustExec(testSQL)
Expand All @@ -356,12 +356,22 @@ func (s *testSuite) TestInsertIgnore(c *C) {
r.Check(testkit.Rows(rowStr))

tk.MustExec("insert ignore into t values (1, 3), (2, 3)")

r = tk.MustQuery("select * from t;")
rowStr = fmt.Sprintf("%v %v", "1", "2")
rowStr1 := fmt.Sprintf("%v %v", "2", "3")
r.Check(testkit.Rows(rowStr, rowStr1))

tk.MustExec("insert ignore into t values (3, 4), (3, 4)")
r = tk.MustQuery("select * from t;")
rowStr2 := fmt.Sprintf("%v %v", "3", "4")
r.Check(testkit.Rows(rowStr, rowStr1, rowStr2))

tk.MustExec("begin")
tk.MustExec("insert ignore into t values (4, 4), (4, 5), (4, 6)")
r = tk.MustQuery("select * from t;")
rowStr3 := fmt.Sprintf("%v %v", "4", "5")
r.Check(testkit.Rows(rowStr, rowStr1, rowStr2, rowStr3))
tk.MustExec("commit")

cfg.SetGetError(errors.New("foo"))
_, err := tk.Exec("insert ignore into t values (1, 3)")
c.Assert(err, NotNil)
Expand Down
4 changes: 2 additions & 2 deletions inspectkv/inspectkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func (s *testSuite) testIndex(c *C, tb table.Table, idx table.Index) {
// set data to:
// index data (handle, data): (1, 10), (2, 20), (3, 30)
// table data (handle, data): (1, 10), (2, 20), (4, 40)
_, err = idx.Create(txn, types.MakeDatums(int64(30)), 3)
_, err = idx.Create(s.ctx, txn, types.MakeDatums(int64(30)), 3)
c.Assert(err, IsNil)
key := tablecodec.EncodeRowKey(tb.Meta().ID, codec.EncodeInt(nil, 4))
setColValue(c, txn, key, types.NewDatum(int64(40)))
Expand All @@ -426,7 +426,7 @@ func (s *testSuite) testIndex(c *C, tb table.Table, idx table.Index) {
// set data to:
// index data (handle, data): (1, 10), (2, 20), (3, 30), (4, 40)
// table data (handle, data): (1, 10), (2, 20), (4, 40), (3, 31)
_, err = idx.Create(txn, types.MakeDatums(int64(40)), 4)
_, err = idx.Create(s.ctx, txn, types.MakeDatums(int64(40)), 4)
c.Assert(err, IsNil)
key = tablecodec.EncodeRowKey(tb.Meta().ID, codec.EncodeInt(nil, 3))
setColValue(c, txn, key, types.NewDatum(int64(31)))
Expand Down
18 changes: 18 additions & 0 deletions kv/fault_injection.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ func (t *InjectedTransaction) Commit() error {
return t.Transaction.Commit()
}

// GetSnapshot implements Transaction GetSnapshot method.
func (t *InjectedTransaction) GetSnapshot() Snapshot {
return &InjectedSnapshot{
Snapshot: t.Transaction.GetSnapshot(),
cfg: t.cfg,
}
}

// InjectedSnapshot wraps a Snapshot with injections.
type InjectedSnapshot struct {
Snapshot
Expand All @@ -119,3 +127,13 @@ func (t *InjectedSnapshot) Get(k Key) ([]byte, error) {
}
return t.Snapshot.Get(k)
}

// BatchGet returns an error if cfg.getError is set.
func (t *InjectedSnapshot) BatchGet(keys []Key) (map[string][]byte, error) {
t.cfg.RLock()
defer t.cfg.RUnlock()
if t.cfg.getError != nil {
return nil, t.cfg.getError
}
return t.Snapshot.BatchGet(keys)
}
2 changes: 2 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ type Transaction interface {
// Valid returns if the transaction is valid.
// A transaction become invalid after commit or rollback.
Valid() bool
// GetSnapshot returns the snapshot of this transaction.
GetSnapshot() Snapshot
}

// Client is used to send request to KV layer.
Expand Down
6 changes: 6 additions & 0 deletions kv/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ func (t *mockTxn) Size() int {
return 0
}

func (t *mockTxn) GetSnapshot() Snapshot {
return &mockSnapshot{
store: NewMemDbBuffer(),
}
}

// mockStorage is used to start a must commit-failed txn.
type mockStorage struct {
}
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ type StatementContext struct {
TimeZone *time.Location
Priority mysql.PriorityEnum
NotFillCache bool
BatchCheck bool
}

// AddAffectedRows adds affected rows.
Expand Down
4 changes: 4 additions & 0 deletions store/localstore/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ func (txn *dbTxn) Commit() error {
return errors.Trace(txn.doCommit())
}

func (txn *dbTxn) GetSnapshot() kv.Snapshot {
return newSnapshot(txn.store, kv.Version{Ver: txn.tid})
}

type schemaLeaseChecker interface {
Check(txnTS uint64) error
}
Expand Down
4 changes: 4 additions & 0 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,7 @@ func (txn *tikvTxn) Len() int {
func (txn *tikvTxn) Size() int {
return txn.us.Size()
}

func (txn *tikvTxn) GetSnapshot() kv.Snapshot {
return txn.snapshot
}
4 changes: 3 additions & 1 deletion table/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package table

import (
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/util/types"
Expand All @@ -30,7 +31,8 @@ type Index interface {
// Meta returns IndexInfo.
Meta() *model.IndexInfo
// Create supports insert into statement.
Create(rm kv.RetrieverMutator, indexedValues []types.Datum, h int64) (int64, error)
Create(ctx context.Context, rm kv.RetrieverMutator,
indexedValues []types.Datum, h int64) (int64, error)
// Delete supports delete from statement.
Delete(m kv.Mutator, indexedValues []types.Datum, h int64) error
// Drop supports drop table, drop index statements.
Expand Down
13 changes: 10 additions & 3 deletions table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"io"

"github.com/juju/errors"
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/table"
Expand Down Expand Up @@ -177,7 +178,9 @@ func (c *index) GenIndexKey(indexedValues []types.Datum, h int64) (key []byte, d
// Create creates a new entry in the kvIndex data.
// If the index is unique and there is an existing entry with the same key,
// Create will return the existing entry's handle as the first return value, ErrKeyExists as the second return value.
func (c *index) Create(rm kv.RetrieverMutator, indexedValues []types.Datum, h int64) (int64, error) {
func (c *index) Create(ctx context.Context, rm kv.RetrieverMutator,
indexedValues []types.Datum, h int64) (int64, error) {
skipCheck := ctx.GetSessionVars().StmtCtx.BatchCheck
key, distinct, err := c.GenIndexKey(indexedValues, h)
if err != nil {
return 0, errors.Trace(err)
Expand All @@ -188,8 +191,12 @@ func (c *index) Create(rm kv.RetrieverMutator, indexedValues []types.Datum, h in
return 0, errors.Trace(err)
}

value, err := rm.Get(key)
if kv.IsErrNotFound(err) {
var value []byte
if !skipCheck {
value, err = rm.Get(key)
}

if skipCheck || kv.IsErrNotFound(err) {
err = rm.Set(key, encodeHandle(h))
return 0, errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 9588cec

Please sign in to comment.