diff --git a/ddl/index.go b/ddl/index.go index d46d8dbe8b3dc..e1f4e896e9d81 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -689,7 +689,8 @@ func (w *worker) doBackfillIndexTaskInTxn(t table.Table, txn kv.Transaction, col } // Create the index. - handle, err := w.index.Create(txn, idxRecord.vals, idxRecord.handle) + handle, err := w.index.Create(w.ctx, txn, idxRecord.vals, + idxRecord.handle) if err != nil { if kv.ErrKeyExists.Equal(err) && idxRecord.handle == handle { // Index already exists, skip it. diff --git a/executor/executor_test.go b/executor/executor_test.go index 9bb4ad288eecf..8115930a42e1d 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -43,6 +43,7 @@ import ( "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/terror" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tidb/util/testkit" "github.com/pingcap/tidb/util/testleak" "github.com/pingcap/tidb/util/testutil" @@ -189,7 +190,8 @@ func (s *testSuite) TestAdmin(c *C) { tb, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("admin_test")) c.Assert(err, IsNil) c.Assert(tb.Indices(), HasLen, 1) - _, err = tb.Indices()[0].Create(txn, types.MakeDatums(int64(10)), 1) + _, err = tb.Indices()[0].Create(mock.NewContext(), txn, + types.MakeDatums(int64(10)), 1) c.Assert(err, IsNil) err = txn.Commit() c.Assert(err, IsNil) diff --git a/executor/write.go b/executor/write.go index 5e7340bbfeb5d..af5d5cd589314 100644 --- a/executor/write.go +++ b/executor/write.go @@ -749,7 +749,24 @@ func (e *InsertExec) Next() (Row, error) { txn := e.ctx.Txn() rowCount := 0 + + // If you use the IGNORE keyword, duplicate-key error that occurs while executing the INSERT statement are ignored. + // For example, without IGNORE, a row that duplicates an existing UNIQUE index or PRIMARY KEY value in + // the table causes a duplicate-key error and the statement is aborted. With IGNORE, the row is discarded and no error occurs. + // Using BatchGet in insert ignore to mark rows as duplicated before we add records to the table. + if e.IgnoreErr { + var err error + rows, err = e.batchMarkDupRows(rows) + if err != nil { + return nil, errors.Trace(err) + } + } for _, row := range rows { + // duplicate row will be marked as nil in batchMarkDupRows if + // IgnoreErr is true. For IgnoreErr is false, it is a protection. + if row == nil { + continue + } if batchInsert && rowCount >= batchSize { if err := e.ctx.NewTxn(); err != nil { // We should return a special error for batch insert. @@ -770,13 +787,6 @@ func (e *InsertExec) Next() (Row, error) { } if kv.ErrKeyExists.Equal(err) { - // If you use the IGNORE keyword, duplicate-key error that occurs while executing the INSERT statement are ignored. - // For example, without IGNORE, a row that duplicates an existing UNIQUE index or PRIMARY KEY value in - // the table causes a duplicate-key error and the statement is aborted. With IGNORE, the row is discarded and no error occurs. - if e.IgnoreErr { - e.ctx.GetSessionVars().StmtCtx.AppendWarning(err) - continue - } if len(e.OnDuplicate) > 0 { if err = e.onDuplicateUpdate(row, h, e.OnDuplicate); err != nil { return nil, errors.Trace(err) @@ -795,6 +805,134 @@ func (e *InsertExec) Next() (Row, error) { return nil, nil } +type keyWithDupError struct { + key kv.Key + dupErr error +} + +func (e *InsertExec) getRecordIDs(rows [][]types.Datum) ([]int64, error) { + recordIDs := make([]int64, 0, len(rows)) + if e.Table.Meta().PKIsHandle { + var handleCol *table.Column + for _, col := range e.Table.Cols() { + if col.IsPKHandleColumn(e.Table.Meta()) { + handleCol = col + break + } + } + for _, row := range rows { + recordIDs = append(recordIDs, row[handleCol.Offset].GetInt64()) + } + } else { + for range rows { + recordID, err := e.Table.AllocAutoID(e.ctx) + if err != nil { + return nil, errors.Trace(err) + } + recordIDs = append(recordIDs, recordID) + } + } + return recordIDs, nil +} + +// getKeysNeedCheck gets keys converted from to-be-insert rows to record keys and unique index keys, +// which need to be checked whether they are duplicate keys. +func (e *InsertExec) getKeysNeedCheck(rows [][]types.Datum) ([][]keyWithDupError, error) { + nUnique := 0 + for _, v := range e.Table.WritableIndices() { + if v.Meta().Unique { + nUnique++ + } + } + rowWithKeys := make([][]keyWithDupError, 0, len(rows)) + + // get recordIDs + recordIDs, err := e.getRecordIDs(rows) + if err != nil { + return nil, errors.Trace(err) + } + + for i, row := range rows { + keysWithErr := make([]keyWithDupError, 0, nUnique+1) + // append record keys and errors + if e.Table.Meta().PKIsHandle { + keysWithErr = append(keysWithErr, keyWithDupError{e.Table.RecordKey(recordIDs[i]), kv.ErrKeyExists.FastGen("Duplicate entry '%d' for key 'PRIMARY'", recordIDs[i])}) + } + + // append unique keys and errors + for _, v := range e.Table.WritableIndices() { + if !v.Meta().Unique { + continue + } + var colVals []types.Datum + colVals, err = v.FetchValues(row) + if err != nil { + return nil, errors.Trace(err) + } + var key []byte + var distinct bool + key, distinct, err = v.GenIndexKey(colVals, recordIDs[i]) + if err != nil { + return nil, errors.Trace(err) + } + if !distinct { + continue + } + keysWithErr = append(keysWithErr, keyWithDupError{key, kv.ErrKeyExists.FastGen("Duplicate entry '%d' for key '%s'", recordIDs[i], v.Meta().Name)}) + } + rowWithKeys = append(rowWithKeys, keysWithErr) + } + return rowWithKeys, nil +} + +// batchMarkDupRows marks rows with duplicate errors as nil. +// All duplicate rows were marked and appended as duplicate warnings +// to the statement context in batch. +func (e *InsertExec) batchMarkDupRows(rows [][]types.Datum) ([][]types.Datum, error) { + // get keys need to be checked + rowWithKeys, err := e.getKeysNeedCheck(rows) + + // batch get values + nKeys := 0 + for _, v := range rowWithKeys { + nKeys += len(v) + } + batchKeys := make([]kv.Key, 0, nKeys) + for _, v := range rowWithKeys { + for _, k := range v { + batchKeys = append(batchKeys, k.key) + } + } + values, err := e.ctx.Txn().GetSnapshot().BatchGet(batchKeys) + if err != nil { + return nil, errors.Trace(err) + } + + // append warnings and get no duplicated error rows + for i, v := range rowWithKeys { + for _, k := range v { + if _, found := values[string(k.key)]; found { + // If duplicate keys were found in BatchGet, mark row = nil. + rows[i] = nil + e.ctx.GetSessionVars().StmtCtx.AppendWarning(k.dupErr) + break + } + } + // If row was checked with no duplicate keys, + // it should be add to values map for the further row check. + // There may be duplicate keys inside the insert statement. + if rows[i] != nil { + for _, k := range v { + values[string(k.key)] = []byte{} + } + } + } + + // this statement was already been checked + e.ctx.GetSessionVars().StmtCtx.BatchCheck = true + return rows, nil +} + // Close implements the Executor Close interface. func (e *InsertExec) Close() error { e.ctx.GetSessionVars().CurrInsertValues = nil diff --git a/executor/write_test.go b/executor/write_test.go index 36e2286549c48..534ef53829bd4 100644 --- a/executor/write_test.go +++ b/executor/write_test.go @@ -346,7 +346,7 @@ func (s *testSuite) TestInsertIgnore(c *C) { tk := testkit.NewTestKit(c, kv.NewInjectedStore(s.store, &cfg)) tk.MustExec("use test") testSQL := `drop table if exists t; - create table t (id int PRIMARY KEY AUTO_INCREMENT, c1 int);` + create table t (id int PRIMARY KEY AUTO_INCREMENT, c1 int unique key);` tk.MustExec(testSQL) testSQL = `insert into t values (1, 2);` tk.MustExec(testSQL) @@ -356,12 +356,22 @@ func (s *testSuite) TestInsertIgnore(c *C) { r.Check(testkit.Rows(rowStr)) tk.MustExec("insert ignore into t values (1, 3), (2, 3)") - r = tk.MustQuery("select * from t;") - rowStr = fmt.Sprintf("%v %v", "1", "2") rowStr1 := fmt.Sprintf("%v %v", "2", "3") r.Check(testkit.Rows(rowStr, rowStr1)) + tk.MustExec("insert ignore into t values (3, 4), (3, 4)") + r = tk.MustQuery("select * from t;") + rowStr2 := fmt.Sprintf("%v %v", "3", "4") + r.Check(testkit.Rows(rowStr, rowStr1, rowStr2)) + + tk.MustExec("begin") + tk.MustExec("insert ignore into t values (4, 4), (4, 5), (4, 6)") + r = tk.MustQuery("select * from t;") + rowStr3 := fmt.Sprintf("%v %v", "4", "5") + r.Check(testkit.Rows(rowStr, rowStr1, rowStr2, rowStr3)) + tk.MustExec("commit") + cfg.SetGetError(errors.New("foo")) _, err := tk.Exec("insert ignore into t values (1, 3)") c.Assert(err, NotNil) diff --git a/inspectkv/inspectkv_test.go b/inspectkv/inspectkv_test.go index 7af9cc7560fce..9e91a2523c654 100644 --- a/inspectkv/inspectkv_test.go +++ b/inspectkv/inspectkv_test.go @@ -408,7 +408,7 @@ func (s *testSuite) testIndex(c *C, tb table.Table, idx table.Index) { // set data to: // index data (handle, data): (1, 10), (2, 20), (3, 30) // table data (handle, data): (1, 10), (2, 20), (4, 40) - _, err = idx.Create(txn, types.MakeDatums(int64(30)), 3) + _, err = idx.Create(s.ctx, txn, types.MakeDatums(int64(30)), 3) c.Assert(err, IsNil) key := tablecodec.EncodeRowKey(tb.Meta().ID, codec.EncodeInt(nil, 4)) setColValue(c, txn, key, types.NewDatum(int64(40))) @@ -426,7 +426,7 @@ func (s *testSuite) testIndex(c *C, tb table.Table, idx table.Index) { // set data to: // index data (handle, data): (1, 10), (2, 20), (3, 30), (4, 40) // table data (handle, data): (1, 10), (2, 20), (4, 40), (3, 31) - _, err = idx.Create(txn, types.MakeDatums(int64(40)), 4) + _, err = idx.Create(s.ctx, txn, types.MakeDatums(int64(40)), 4) c.Assert(err, IsNil) key = tablecodec.EncodeRowKey(tb.Meta().ID, codec.EncodeInt(nil, 3)) setColValue(c, txn, key, types.NewDatum(int64(31))) diff --git a/kv/fault_injection.go b/kv/fault_injection.go index c6aafe9f5e8e5..e7af1250a3dbc 100644 --- a/kv/fault_injection.go +++ b/kv/fault_injection.go @@ -104,6 +104,14 @@ func (t *InjectedTransaction) Commit() error { return t.Transaction.Commit() } +// GetSnapshot implements Transaction GetSnapshot method. +func (t *InjectedTransaction) GetSnapshot() Snapshot { + return &InjectedSnapshot{ + Snapshot: t.Transaction.GetSnapshot(), + cfg: t.cfg, + } +} + // InjectedSnapshot wraps a Snapshot with injections. type InjectedSnapshot struct { Snapshot @@ -119,3 +127,13 @@ func (t *InjectedSnapshot) Get(k Key) ([]byte, error) { } return t.Snapshot.Get(k) } + +// BatchGet returns an error if cfg.getError is set. +func (t *InjectedSnapshot) BatchGet(keys []Key) (map[string][]byte, error) { + t.cfg.RLock() + defer t.cfg.RUnlock() + if t.cfg.getError != nil { + return nil, t.cfg.getError + } + return t.Snapshot.BatchGet(keys) +} diff --git a/kv/kv.go b/kv/kv.go index cb3d05714e5cb..118ed9c55d0a5 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -135,6 +135,8 @@ type Transaction interface { // Valid returns if the transaction is valid. // A transaction become invalid after commit or rollback. Valid() bool + // GetSnapshot returns the snapshot of this transaction. + GetSnapshot() Snapshot } // Client is used to send request to KV layer. diff --git a/kv/mock.go b/kv/mock.go index 85e3c01897725..c7b53d14f8418 100644 --- a/kv/mock.go +++ b/kv/mock.go @@ -94,6 +94,12 @@ func (t *mockTxn) Size() int { return 0 } +func (t *mockTxn) GetSnapshot() Snapshot { + return &mockSnapshot{ + store: NewMemDbBuffer(), + } +} + // mockStorage is used to start a must commit-failed txn. type mockStorage struct { } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 705bdb83c68b7..0bd253aa67e3b 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -360,6 +360,7 @@ type StatementContext struct { TimeZone *time.Location Priority mysql.PriorityEnum NotFillCache bool + BatchCheck bool } // AddAffectedRows adds affected rows. diff --git a/store/localstore/txn.go b/store/localstore/txn.go index 8e1104582ba89..94226de16873f 100644 --- a/store/localstore/txn.go +++ b/store/localstore/txn.go @@ -125,6 +125,10 @@ func (txn *dbTxn) Commit() error { return errors.Trace(txn.doCommit()) } +func (txn *dbTxn) GetSnapshot() kv.Snapshot { + return newSnapshot(txn.store, kv.Version{Ver: txn.tid}) +} + type schemaLeaseChecker interface { Check(txnTS uint64) error } diff --git a/store/tikv/txn.go b/store/tikv/txn.go index c011729a04aa2..41451c2d2c203 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -214,3 +214,7 @@ func (txn *tikvTxn) Len() int { func (txn *tikvTxn) Size() int { return txn.us.Size() } + +func (txn *tikvTxn) GetSnapshot() kv.Snapshot { + return txn.snapshot +} diff --git a/table/index.go b/table/index.go index 8112920ca8240..f43aa8d9a2427 100644 --- a/table/index.go +++ b/table/index.go @@ -14,6 +14,7 @@ package table import ( + "github.com/pingcap/tidb/context" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/model" "github.com/pingcap/tidb/util/types" @@ -30,7 +31,8 @@ type Index interface { // Meta returns IndexInfo. Meta() *model.IndexInfo // Create supports insert into statement. - Create(rm kv.RetrieverMutator, indexedValues []types.Datum, h int64) (int64, error) + Create(ctx context.Context, rm kv.RetrieverMutator, + indexedValues []types.Datum, h int64) (int64, error) // Delete supports delete from statement. Delete(m kv.Mutator, indexedValues []types.Datum, h int64) error // Drop supports drop table, drop index statements. diff --git a/table/tables/index.go b/table/tables/index.go index 34324359c1a6f..d5736faa9f84b 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -19,6 +19,7 @@ import ( "io" "github.com/juju/errors" + "github.com/pingcap/tidb/context" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/model" "github.com/pingcap/tidb/table" @@ -177,7 +178,9 @@ func (c *index) GenIndexKey(indexedValues []types.Datum, h int64) (key []byte, d // Create creates a new entry in the kvIndex data. // If the index is unique and there is an existing entry with the same key, // Create will return the existing entry's handle as the first return value, ErrKeyExists as the second return value. -func (c *index) Create(rm kv.RetrieverMutator, indexedValues []types.Datum, h int64) (int64, error) { +func (c *index) Create(ctx context.Context, rm kv.RetrieverMutator, + indexedValues []types.Datum, h int64) (int64, error) { + skipCheck := ctx.GetSessionVars().StmtCtx.BatchCheck key, distinct, err := c.GenIndexKey(indexedValues, h) if err != nil { return 0, errors.Trace(err) @@ -188,8 +191,12 @@ func (c *index) Create(rm kv.RetrieverMutator, indexedValues []types.Datum, h in return 0, errors.Trace(err) } - value, err := rm.Get(key) - if kv.IsErrNotFound(err) { + var value []byte + if !skipCheck { + value, err = rm.Get(key) + } + + if skipCheck || kv.IsErrNotFound(err) { err = rm.Set(key, encodeHandle(h)) return 0, errors.Trace(err) } diff --git a/table/tables/index_test.go b/table/tables/index_test.go index 6c6cf1511d904..1f947d0cac5fe 100644 --- a/table/tables/index_test.go +++ b/table/tables/index_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/terror" + "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tidb/util/testleak" "github.com/pingcap/tidb/util/types" ) @@ -66,7 +67,8 @@ func (s *testIndexSuite) TestIndex(c *C) { c.Assert(err, IsNil) values := types.MakeDatums(1, 2) - _, err = index.Create(txn, values, 1) + mockCtx := mock.NewContext() + _, err = index.Create(mockCtx, txn, values, 1) c.Assert(err, IsNil) it, err := index.SeekFirst(txn) @@ -98,7 +100,7 @@ func (s *testIndexSuite) TestIndex(c *C) { c.Assert(terror.ErrorEqual(err, io.EOF), IsTrue) it.Close() - _, err = index.Create(txn, values, 0) + _, err = index.Create(mockCtx, txn, values, 0) c.Assert(err, IsNil) _, err = index.SeekFirst(txn) @@ -149,10 +151,10 @@ func (s *testIndexSuite) TestIndex(c *C) { txn, err = s.s.Begin() c.Assert(err, IsNil) - _, err = index.Create(txn, values, 1) + _, err = index.Create(mockCtx, txn, values, 1) c.Assert(err, IsNil) - _, err = index.Create(txn, values, 2) + _, err = index.Create(mockCtx, txn, values, 2) c.Assert(err, NotNil) it, err = index.SeekFirst(txn) @@ -203,7 +205,7 @@ func (s *testIndexSuite) TestCombineIndexSeek(c *C) { c.Assert(err, IsNil) values := types.MakeDatums("abc", "def") - _, err = index.Create(txn, values, 1) + _, err = index.Create(mock.NewContext(), txn, values, 1) c.Assert(err, IsNil) index2 := tables.NewIndex(tblInfo, tblInfo.Indices[0]) diff --git a/table/tables/tables.go b/table/tables/tables.go index 289e0892b093a..36b88df566e1c 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -217,7 +217,7 @@ func (t *Table) UpdateRecord(ctx context.Context, h int64, oldData, newData []ty bs := kv.NewBufferStore(txn) // rebuild index - err := t.rebuildIndices(bs, h, touched, oldData, newData) + err := t.rebuildIndices(ctx, bs, h, touched, oldData, newData) if err != nil { return errors.Trace(err) } @@ -282,7 +282,8 @@ func (t *Table) UpdateRecord(ctx context.Context, h int64, oldData, newData []ty return nil } -func (t *Table) rebuildIndices(rm kv.RetrieverMutator, h int64, touched []bool, oldData []types.Datum, newData []types.Datum) error { +func (t *Table) rebuildIndices(ctx context.Context, rm kv.RetrieverMutator, + h int64, touched []bool, oldData []types.Datum, newData []types.Datum) error { for _, idx := range t.DeletableIndices() { for _, ic := range idx.Meta().Columns { if !touched[ic.Offset] { @@ -307,7 +308,7 @@ func (t *Table) rebuildIndices(rm kv.RetrieverMutator, h int64, touched []bool, if err != nil { return errors.Trace(err) } - if err := t.buildIndexForRow(rm, h, newVs, idx); err != nil { + if err := t.buildIndexForRow(ctx, rm, h, newVs, idx); err != nil { return errors.Trace(err) } break @@ -336,7 +337,7 @@ func (t *Table) AddRecord(ctx context.Context, r []types.Datum, skipHandleCheck txn := ctx.Txn() bs := kv.NewBufferStore(txn) - skipCheck := ctx.GetSessionVars().SkipConstraintCheck + skipCheck := ctx.GetSessionVars().SkipConstraintCheck || ctx.GetSessionVars().StmtCtx.BatchCheck if skipCheck { txn.SetOption(kv.SkipCheckForWrite, true) } @@ -417,7 +418,7 @@ func (t *Table) addIndices(ctx context.Context, recordID int64, r []types.Datum, txn := ctx.Txn() // Clean up lazy check error environment defer txn.DelOption(kv.PresumeKeyNotExistsError) - skipCheck := ctx.GetSessionVars().SkipConstraintCheck + skipCheck := ctx.GetSessionVars().SkipConstraintCheck || ctx.GetSessionVars().StmtCtx.BatchCheck if t.meta.PKIsHandle && !skipCheck && !skipHandleCheck { // Check key exists. recordKey := t.RecordKey(recordID) @@ -446,7 +447,7 @@ func (t *Table) addIndices(ctx context.Context, recordID int64, r []types.Datum, dupKeyErr = kv.ErrKeyExists.FastGen("Duplicate entry '%s' for key '%s'", entryKey, v.Meta().Name) txn.SetOption(kv.PresumeKeyNotExistsError, dupKeyErr) } - if dupHandle, err := v.Create(bs, colVals, recordID); err != nil { + if dupHandle, err := v.Create(ctx, bs, colVals, recordID); err != nil { if kv.ErrKeyExists.Equal(err) { return dupHandle, errors.Trace(dupKeyErr) } @@ -617,8 +618,9 @@ func (t *Table) removeRowIndex(rm kv.RetrieverMutator, h int64, vals []types.Dat } // buildIndexForRow implements table.Table BuildIndexForRow interface. -func (t *Table) buildIndexForRow(rm kv.RetrieverMutator, h int64, vals []types.Datum, idx table.Index) error { - if _, err := idx.Create(rm, vals, h); err != nil { +func (t *Table) buildIndexForRow(ctx context.Context, rm kv.RetrieverMutator, + h int64, vals []types.Datum, idx table.Index) error { + if _, err := idx.Create(ctx, rm, vals, h); err != nil { return errors.Trace(err) } return nil