From 64298f027841f1f9e869b99f15f02745222e3dd3 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 10 Sep 2019 16:36:49 +0800 Subject: [PATCH] *: refactoring the code of batchChecker (#12108) batchChecker is difficult to maintain, we should get rid of it. In this commit, I catch the BatchGet result into the snapshot, in this way we can achieve the same goal as the batchChecker --- executor/batch_checker.go | 44 +++++++++++ executor/insert.go | 158 +++++++++++++++++++++++++++++++++++++- executor/insert_test.go | 11 +++ store/tikv/snapshot.go | 33 ++++++++ table/tables/tables.go | 6 +- 5 files changed, 246 insertions(+), 6 deletions(-) diff --git a/executor/batch_checker.go b/executor/batch_checker.go index 65272865b8daf..607815e8c0087 100644 --- a/executor/batch_checker.go +++ b/executor/batch_checker.go @@ -292,6 +292,50 @@ func (b *batchChecker) deleteDupKeys(ctx context.Context, sctx sessionctx.Contex return nil } +// getOldRowNew gets the table record row from storage for batch check. +// t could be a normal table or a partition, but it must not be a PartitionedTable. +func (b *batchChecker) getOldRowNew(ctx context.Context, sctx sessionctx.Context, txn kv.Transaction, t table.Table, handle int64, + genExprs []expression.Expression) ([]types.Datum, error) { + oldValue, err := txn.Get(ctx, t.RecordKey(handle)) + if err != nil { + return nil, err + } + + cols := t.WritableCols() + oldRow, oldRowMap, err := tables.DecodeRawRowData(sctx, t.Meta(), handle, cols, oldValue) + if err != nil { + return nil, err + } + // Fill write-only and write-reorg columns with originDefaultValue if not found in oldValue. + gIdx := 0 + for _, col := range cols { + if col.State != model.StatePublic && oldRow[col.Offset].IsNull() { + _, found := oldRowMap[col.ID] + if !found { + oldRow[col.Offset], err = table.GetColOriginDefaultValue(sctx, col.ToInfo()) + if err != nil { + return nil, err + } + } + } + if col.IsGenerated() { + // only the virtual column needs fill back. + if !col.GeneratedStored { + val, err := genExprs[gIdx].Eval(chunk.MutRowFromDatums(oldRow).ToRow()) + if err != nil { + return nil, err + } + oldRow[col.Offset], err = table.CastValue(sctx, val, col.ToInfo()) + if err != nil { + return nil, err + } + } + gIdx++ + } + } + return oldRow, nil +} + // getOldRow gets the table record row from storage for batch check. // t could be a normal table or a partition, but it must not be a PartitionedTable. func (b *batchChecker) getOldRow(ctx sessionctx.Context, t table.Table, handle int64, diff --git a/executor/insert.go b/executor/insert.go index 1b22adc7a91e9..b6543474fe5b0 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -15,6 +15,7 @@ package executor import ( "context" + "encoding/hex" "fmt" "github.com/opentracing/opentracing-go" @@ -67,7 +68,7 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error { // If `ON DUPLICATE KEY UPDATE` is specified, and no `IGNORE` keyword, // the to-be-insert rows will be check on duplicate keys and update to the new rows. if len(e.OnDuplicate) > 0 { - err := e.batchUpdateDupRows(ctx, rows) + err := e.batchUpdateDupRowsNew(ctx, rows) if err != nil { return err } @@ -86,6 +87,161 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error { return nil } +func prefetchUniqueIndices(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow) (map[string][]byte, error) { + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("prefetchUniqueIndices", opentracing.ChildOf(span.Context())) + defer span1.Finish() + ctx = opentracing.ContextWithSpan(ctx, span1) + } + + nKeys := 0 + for _, r := range rows { + if r.handleKey != nil { + nKeys++ + } + nKeys += len(r.uniqueKeys) + } + batchKeys := make([]kv.Key, 0, nKeys) + for _, r := range rows { + if r.handleKey != nil { + batchKeys = append(batchKeys, r.handleKey.newKV.key) + } + for _, k := range r.uniqueKeys { + batchKeys = append(batchKeys, k.newKV.key) + } + } + return txn.BatchGet(ctx, batchKeys) +} + +func prefetchConflictedOldRows(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow, values map[string][]byte) error { + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("prefetchConflictedOldRows", opentracing.ChildOf(span.Context())) + defer span1.Finish() + ctx = opentracing.ContextWithSpan(ctx, span1) + } + + batchKeys := make([]kv.Key, 0, len(rows)) + for _, r := range rows { + for _, uk := range r.uniqueKeys { + if val, found := values[string(uk.newKV.key)]; found { + handle, err := tables.DecodeHandle(val) + if err != nil { + return err + } + batchKeys = append(batchKeys, r.t.RecordKey(handle)) + } + } + } + _, err := txn.BatchGet(ctx, batchKeys) + return err +} + +func prefetchDataCache(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow) error { + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("prefetchDataCache", opentracing.ChildOf(span.Context())) + defer span1.Finish() + ctx = opentracing.ContextWithSpan(ctx, span1) + } + values, err := prefetchUniqueIndices(ctx, txn, rows) + if err != nil { + return err + } + return prefetchConflictedOldRows(ctx, txn, rows, values) +} + +// updateDupRowNew updates a duplicate row to a new row. +func (e *InsertExec) updateDupRowNew(ctx context.Context, txn kv.Transaction, row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) error { + oldRow, err := e.getOldRowNew(ctx, e.ctx, txn, row.t, handle, e.GenExprs) + if err != nil { + return err + } + + _, _, _, err = e.doDupRowUpdate(ctx, handle, oldRow, row.row, e.OnDuplicate) + if e.ctx.GetSessionVars().StmtCtx.DupKeyAsWarning && kv.ErrKeyExists.Equal(err) { + e.ctx.GetSessionVars().StmtCtx.AppendWarning(err) + return nil + } + return err +} + +func (e *InsertExec) batchUpdateDupRowsNew(ctx context.Context, newRows [][]types.Datum) error { + // Get keys need to be checked. + toBeCheckedRows, err := e.getKeysNeedCheck(ctx, e.ctx, e.Table, newRows) + if err != nil { + return err + } + + txn, err := e.ctx.Txn(true) + if err != nil { + return err + } + + // Use BatchGet to fill cache. + // It's an optimization and could be removed without affecting correctness. + if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil { + return err + } + + for i, r := range toBeCheckedRows { + if r.handleKey != nil { + handle, err := tablecodec.DecodeRowKey(r.handleKey.newKV.key) + if err != nil { + return err + } + + err = e.updateDupRowNew(ctx, txn, r, handle, e.OnDuplicate) + if err == nil { + continue + } + if !kv.IsErrNotFound(err) { + return err + } + } + + for _, uk := range r.uniqueKeys { + val, err := txn.Get(ctx, uk.newKV.key) + if err != nil { + if kv.IsErrNotFound(err) { + continue + } + return err + } + handle, err := tables.DecodeHandle(val) + if err != nil { + return err + } + + err = e.updateDupRowNew(ctx, txn, r, handle, e.OnDuplicate) + if err != nil { + if kv.IsErrNotFound(err) { + // Data index inconsistent? A unique key provide the handle information, but the + // handle points to nothing. + logutil.BgLogger().Error("get old row failed when insert on dup", + zap.String("uniqueKey", hex.EncodeToString(uk.newKV.key)), + zap.Int64("handle", handle), + zap.String("toBeInsertedRow", types.DatumsToStrNoErr(r.row))) + } + return err + } + + newRows[i] = nil + break + } + + // If row was checked with no duplicate keys, + // we should do insert the row, + // and key-values should be filled back to dupOldRowValues for the further row check, + // due to there may be duplicate keys inside the insert statement. + if newRows[i] != nil { + _, err := e.addRecord(ctx, newRows[i]) + if err != nil { + return err + } + } + } + return nil +} + // batchUpdateDupRows updates multi-rows in batch if they are duplicate with rows in table. func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.Datum) error { err := e.batchGetInsertKeys(ctx, e.ctx, e.Table, newRows) diff --git a/executor/insert_test.go b/executor/insert_test.go index b9ac781e5ea93..1ad3ab5dae696 100644 --- a/executor/insert_test.go +++ b/executor/insert_test.go @@ -561,4 +561,15 @@ func (s *testSuite3) TestPartitionInsertOnDuplicate(c *C) { tk.MustExec(`insert into t2 set a=1,b=1 on duplicate key update a=1,b=1`) tk.MustQuery(`select * from t2`).Check(testkit.Rows("1 1")) + tk.MustExec(`CREATE TABLE t3 (a int, b int, c int, d int, e int, + PRIMARY KEY (a,b), + UNIQUE KEY (b,c,d) +) PARTITION BY RANGE ( b ) ( + PARTITION p0 VALUES LESS THAN (4), + PARTITION p1 VALUES LESS THAN (7), + PARTITION p2 VALUES LESS THAN (11) +)`) + tk.MustExec("insert into t3 values (1,2,3,4,5)") + tk.MustExec("insert into t3 values (1,2,3,4,5),(6,2,3,4,6) on duplicate key update e = e + values(e)") + tk.MustQuery("select * from t3").Check(testkit.Rows("1 2 3 4 16")) } diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 703471e468351..8ee3144b94029 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -58,6 +58,11 @@ type tikvSnapshot struct { vars *kv.Variables replicaRead kv.ReplicaReadType replicaReadSeed uint32 + + // Cache the result of BatchGet. + // The invariance is that calling BatchGet multiple times using the same start ts, + // the result should not change. + cached map[string][]byte } // newTiKVSnapshot creates a snapshot of an TiKV store. @@ -78,7 +83,20 @@ func (s *tikvSnapshot) SetPriority(priority int) { // BatchGet gets all the keys' value from kv-server and returns a map contains key/value pairs. // The map will not contain nonexistent keys. func (s *tikvSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string][]byte, error) { + // Check the cached value first. m := make(map[string][]byte) + if s.cached != nil { + tmp := keys[:0] + for _, key := range keys { + if val, ok := s.cached[string(key)]; ok { + m[string(key)] = val + } else { + tmp = append(tmp, key) + } + } + keys = tmp + } + if len(keys) == 0 { return m, nil } @@ -110,6 +128,14 @@ func (s *tikvSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string] return nil, errors.Trace(err) } + // Update the cache. + if s.cached == nil { + s.cached = make(map[string][]byte, len(m)) + } + for key, value := range m { + s.cached[key] = value + } + return m, nil } @@ -233,6 +259,13 @@ func (s *tikvSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) { } func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) { + // Check the cached values first. + if s.cached != nil { + if value, ok := s.cached[string(k)]; ok { + return value, nil + } + } + sender := NewRegionRequestSender(s.store.regionCache, s.store.client) req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, diff --git a/table/tables/tables.go b/table/tables/tables.go index a2bbd20f524c9..7dc9f5be0abd1 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -696,11 +696,7 @@ func DecodeRawRowData(ctx sessionctx.Context, meta *model.TableInfo, h int64, co // Row implements table.Table Row interface. func (t *tableCommon) Row(ctx sessionctx.Context, h int64) ([]types.Datum, error) { - r, err := t.RowWithCols(ctx, h, t.Cols()) - if err != nil { - return nil, err - } - return r, nil + return t.RowWithCols(ctx, h, t.Cols()) } // RemoveRecord implements table.Table RemoveRecord interface.