From 65de0e3c43592829c6703dfee3d618b1a3347065 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 28 Mar 2023 18:59:46 +0800 Subject: [PATCH 1/5] fix kvstore import cycle --- executor/importer/import.go | 4 ++++ store/driver/tikv_driver.go | 7 +++++++ 2 files changed, 11 insertions(+) diff --git a/executor/importer/import.go b/executor/importer/import.go index 893f87744bf7f..2cc5e3fe0c2c3 100644 --- a/executor/importer/import.go +++ b/executor/importer/import.go @@ -30,6 +30,7 @@ import ( litlog "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/lightning/mydump" "github.com/pingcap/tidb/br/pkg/storage" + tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" @@ -42,6 +43,7 @@ import ( "github.com/pingcap/tidb/util/intest" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/stringutil" + kvconfig "github.com/tikv/client-go/v2/config" "go.uber.org/zap" ) @@ -103,6 +105,8 @@ var ( LoadDataReadBlockSize = int64(config.ReadBlockSize) ) +var GetKVStore func(path string, tls kvconfig.Security) (tidbkv.Storage, error) + // FieldMapping indicates the relationship between input field and table column or user variable type FieldMapping struct { Column *table.Column diff --git a/store/driver/tikv_driver.go b/store/driver/tikv_driver.go index 497984d1a0309..a457e8f2f54be 100644 --- a/store/driver/tikv_driver.go +++ b/store/driver/tikv_driver.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/errors" deadlockpb "github.com/pingcap/kvproto/pkg/deadlock" "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/tidb/executor/importer" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/copr" @@ -59,6 +60,8 @@ func init() { // Setup the Hooks to dynamic control global resource controller. variable.EnableGlobalResourceControlFunc = tikv.EnableResourceControl variable.DisableGlobalResourceControlFunc = tikv.DisableResourceControl + // cannot use this package directly, it causes import cycle + importer.GetKVStore = getKVStore } // Option is a function that changes some config of Driver @@ -111,6 +114,10 @@ func TrySetupGlobalResourceController(ctx context.Context, serverID uint64, s kv return nil } +func getKVStore(path string, tls config.Security) (kv.Storage, error) { + return TiKVDriver{}.OpenWithOptions(path, WithSecurity(tls)) +} + // TiKVDriver implements engine TiKV. type TiKVDriver struct { pdConfig config.PDClient From 3589c93fc01d2cfed16a38a522ebc11e97c38b57 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Wed, 29 Mar 2023 15:33:55 +0800 Subject: [PATCH 2/5] refactor --- br/pkg/lightning/backend/kv/base.go | 228 +++++++++++++++++ br/pkg/lightning/backend/kv/kv2sql.go | 6 +- br/pkg/lightning/backend/kv/sql2kv.go | 269 ++++----------------- br/pkg/lightning/backend/kv/sql2kv_test.go | 4 +- br/pkg/lightning/backend/tidb/tidb.go | 6 +- 5 files changed, 285 insertions(+), 228 deletions(-) create mode 100644 br/pkg/lightning/backend/kv/base.go diff --git a/br/pkg/lightning/backend/kv/base.go b/br/pkg/lightning/backend/kv/base.go new file mode 100644 index 0000000000000..32521ba7eeb0b --- /dev/null +++ b/br/pkg/lightning/backend/kv/base.go @@ -0,0 +1,228 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import ( + "math/rand" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" + "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/br/pkg/lightning/log" + "github.com/pingcap/tidb/br/pkg/logutil" + "github.com/pingcap/tidb/br/pkg/redact" + "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/meta/autoid" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +// ExtraHandleColumnInfo is the column info of extra handle column. +var ExtraHandleColumnInfo = model.NewExtraHandleColInfo() + +// GeneratedCol generated column info. +type GeneratedCol struct { + // index of the column in the table + Index int + Expr expression.Expression +} + +// AutoIDConverterFn is a function to convert auto id. +type AutoIDConverterFn func(int64) int64 + +// RowArrayMarshaller wraps a slice of types.Datum for logging the content into zap. +type RowArrayMarshaller []types.Datum + +var kindStr = [...]string{ + types.KindNull: "null", + types.KindInt64: "int64", + types.KindUint64: "uint64", + types.KindFloat32: "float32", + types.KindFloat64: "float64", + types.KindString: "string", + types.KindBytes: "bytes", + types.KindBinaryLiteral: "binary", + types.KindMysqlDecimal: "decimal", + types.KindMysqlDuration: "duration", + types.KindMysqlEnum: "enum", + types.KindMysqlBit: "bit", + types.KindMysqlSet: "set", + types.KindMysqlTime: "time", + types.KindInterface: "interface", + types.KindMinNotNull: "min", + types.KindMaxValue: "max", + types.KindRaw: "raw", + types.KindMysqlJSON: "json", +} + +// MarshalLogArray implements the zapcore.ArrayMarshaler interface +func (row RowArrayMarshaller) MarshalLogArray(encoder zapcore.ArrayEncoder) error { + for _, datum := range row { + kind := datum.Kind() + var str string + var err error + switch kind { + case types.KindNull: + str = "NULL" + case types.KindMinNotNull: + str = "-inf" + case types.KindMaxValue: + str = "+inf" + default: + str, err = datum.ToString() + if err != nil { + return err + } + } + if err := encoder.AppendObject(zapcore.ObjectMarshalerFunc(func(enc zapcore.ObjectEncoder) error { + enc.AddString("kind", kindStr[kind]) + enc.AddString("val", redact.String(str)) + return nil + })); err != nil { + return err + } + } + return nil +} + +// BaseKVEncoder encodes a row into a KV pair. +type BaseKVEncoder struct { + GenCols []GeneratedCol + SessionCtx *Session + Table table.Table + AutoRandomColID int64 + // convert auto id for shard rowid or auto random id base on row id generated by lightning + AutoIDFn AutoIDConverterFn + logger *zap.Logger +} + +// NewBaseKVEncoder creates a new BaseKVEncoder. +func NewBaseKVEncoder(config *encode.EncodingConfig) (*BaseKVEncoder, error) { + meta := config.Table.Meta() + cols := config.Table.Cols() + se := NewSession(&config.SessionOptions, config.Logger) + // Set CommonAddRecordCtx to session to reuse the slices and BufStore in AddRecord + recordCtx := tables.NewCommonAddRecordCtx(len(cols)) + tables.SetAddRecordCtx(se, recordCtx) + + var autoRandomColID int64 + autoIDFn := func(id int64) int64 { return id } + if meta.ContainsAutoRandomBits() { + col := common.GetAutoRandomColumn(meta) + autoRandomColID = col.ID + + shardFmt := autoid.NewShardIDFormat(&col.FieldType, meta.AutoRandomBits, meta.AutoRandomRangeBits) + shard := rand.New(rand.NewSource(config.AutoRandomSeed)).Int63() + autoIDFn = func(id int64) int64 { + return shardFmt.Compose(shard, id) + } + } else if meta.ShardRowIDBits > 0 { + rd := rand.New(rand.NewSource(config.AutoRandomSeed)) // nolint:gosec + mask := int64(1)< 0 { - rd := rand.New(rand.NewSource(config.AutoRandomSeed)) // nolint:gosec - mask := int64(1)<= 0 && j < len(row) { - value, err = table.CastValue(kvcodec.se, row[j], ExtraHandleColumnInfo, false, false) + value, err = table.CastValue(kvcodec.SessionCtx, row[j], ExtraHandleColumnInfo, false, false) rowValue = value.GetInt64() } else { - rowID := kvcodec.autoIDFn(rowID) + rowID := kvcodec.AutoIDFn(rowID) value, err = types.NewIntDatum(rowID), nil } if err != nil { - return nil, logKVConvertFailed(kvcodec.logger, row, j, ExtraHandleColumnInfo, err) + return nil, kvcodec.LogKVConvertFailed(row, j, ExtraHandleColumnInfo, err) } record = append(record, value) - alloc := kvcodec.tbl.Allocators(kvcodec.se).Get(autoid.RowIDAllocType) + alloc := kvcodec.Table.Allocators(kvcodec.SessionCtx).Get(autoid.RowIDAllocType) if err := alloc.Rebase(context.Background(), rowValue, false); err != nil { return nil, errors.Trace(err) } } - if len(kvcodec.genCols) > 0 { - if errCol, err := evaluateGeneratedColumns(kvcodec.se, record, cols, kvcodec.genCols); err != nil { - return nil, logEvalGenExprFailed(kvcodec.logger, row, errCol, err) + if len(kvcodec.GenCols) > 0 { + if errCol, err := kvcodec.EvalGeneratedColumns(record, cols); err != nil { + return nil, kvcodec.LogEvalGenExprFailed(row, errCol, err) } } - _, err = kvcodec.tbl.AddRecord(kvcodec.se, record) + _, err = kvcodec.Table.AddRecord(kvcodec.SessionCtx, record) if err != nil { kvcodec.logger.Error("kv encode failed", - zap.Array("originalRow", RowArrayMarshaler(row)), - zap.Array("convertedRow", RowArrayMarshaler(record)), + zap.Array("originalRow", RowArrayMarshaller(row)), + zap.Array("convertedRow", RowArrayMarshaller(record)), log.ShortError(err), ) return nil, errors.Trace(err) } - kvPairs := kvcodec.se.TakeKvPairs() + kvPairs := kvcodec.SessionCtx.TakeKvPairs() for i := 0; i < len(kvPairs.Pairs); i++ { var encoded [9]byte // The max length of encoded int64 is 9. kvPairs.Pairs[i].RowID = common.EncodeIntRowIDToBuf(encoded[:0], rowID) @@ -435,22 +268,18 @@ func (kvcodec *tableKVEncoder) Encode(row []types.Datum, rowID int64, columnPerm return kvPairs, nil } -func (kvcodec *tableKVEncoder) isAutoRandomCol(col *model.ColumnInfo) bool { - return kvcodec.tbl.Meta().ContainsAutoRandomBits() && col.ID == kvcodec.autoRandomColID -} - -func isAutoIncCol(colInfo *model.ColumnInfo) bool { +func IsAutoIncCol(colInfo *model.ColumnInfo) bool { return mysql.HasAutoIncrementFlag(colInfo.GetFlag()) } // GetEncoderIncrementalID return Auto increment id. func GetEncoderIncrementalID(encoder encode.Encoder, id int64) int64 { - return encoder.(*tableKVEncoder).autoIDFn(id) + return encoder.(*tableKVEncoder).AutoIDFn(id) } // GetEncoderSe return session. func GetEncoderSe(encoder encode.Encoder) *Session { - return encoder.(*tableKVEncoder).se + return encoder.(*tableKVEncoder).SessionCtx } // GetActualDatum export getActualDatum function. @@ -464,7 +293,7 @@ func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDa err error ) - cols := kvcodec.tbl.Cols() + cols := kvcodec.Table.Cols() // Since this method is only called when iterating the columns in the `Encode()` method, // we can assume that the `colIndex` always have a valid input @@ -472,7 +301,7 @@ func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDa isBadNullValue := false if inputDatum != nil { - value, err = table.CastValue(kvcodec.se, *inputDatum, col.ToInfo(), false, false) + value, err = table.CastValue(kvcodec.SessionCtx, *inputDatum, col.ToInfo(), false, false) if err != nil { return value, err } @@ -483,26 +312,26 @@ func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDa } // handle special values switch { - case isAutoIncCol(col.ToInfo()): + case IsAutoIncCol(col.ToInfo()): // we still need a conversion, e.g. to catch overflow with a TINYINT column. - value, err = table.CastValue(kvcodec.se, types.NewIntDatum(rowID), col.ToInfo(), false, false) - case kvcodec.isAutoRandomCol(col.ToInfo()): + value, err = table.CastValue(kvcodec.SessionCtx, types.NewIntDatum(rowID), col.ToInfo(), false, false) + case kvcodec.IsAutoRandomCol(col.ToInfo()): var val types.Datum - realRowID := kvcodec.autoIDFn(rowID) + realRowID := kvcodec.AutoIDFn(rowID) if mysql.HasUnsignedFlag(col.GetFlag()) { val = types.NewUintDatum(uint64(realRowID)) } else { val = types.NewIntDatum(realRowID) } - value, err = table.CastValue(kvcodec.se, val, col.ToInfo(), false, false) + value, err = table.CastValue(kvcodec.SessionCtx, val, col.ToInfo(), false, false) case col.IsGenerated(): // inject some dummy value for gen col so that MutRowFromDatums below sees a real value instead of nil. // if MutRowFromDatums sees a nil it won't initialize the underlying storage and cause SetDatum to panic. value = types.GetMinValue(&col.FieldType) case isBadNullValue: - err = col.HandleBadNull(&value, kvcodec.se.Vars.StmtCtx, 0) + err = col.HandleBadNull(&value, kvcodec.SessionCtx.Vars.StmtCtx, 0) default: - value, err = table.GetColDefaultValue(kvcodec.se, col.ToInfo()) + value, err = table.GetColDefaultValue(kvcodec.SessionCtx, col.ToInfo()) } return value, err } @@ -510,7 +339,7 @@ func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDa // get record value for auto-increment field // // See: https://github.com/pingcap/tidb/blob/47f0f15b14ed54fc2222f3e304e29df7b05e6805/executor/insert_common.go#L781-L852 -func getAutoRecordID(d types.Datum, target *types.FieldType) int64 { +func GetAutoRecordID(d types.Datum, target *types.FieldType) int64 { switch target.GetType() { case mysql.TypeFloat, mysql.TypeDouble: return int64(math.Round(d.GetFloat64())) diff --git a/br/pkg/lightning/backend/kv/sql2kv_test.go b/br/pkg/lightning/backend/kv/sql2kv_test.go index 203d92487c625..d9ca8015c2858 100644 --- a/br/pkg/lightning/backend/kv/sql2kv_test.go +++ b/br/pkg/lightning/backend/kv/sql2kv_test.go @@ -49,7 +49,7 @@ func TestMarshal(t *testing.T) { minNotNull := types.Datum{} minNotNull.SetMinNotNull() encoder := zapcore.NewMapObjectEncoder() - err := encoder.AddArray("test", lkv.RowArrayMarshaler{types.NewStringDatum("1"), nullDatum, minNotNull, types.MaxValueDatum()}) + err := encoder.AddArray("test", lkv.RowArrayMarshaller{types.NewStringDatum("1"), nullDatum, minNotNull, types.MaxValueDatum()}) require.NoError(t, err) require.Equal(t, encoder.Fields["test"], []interface{}{ map[string]interface{}{"kind": "string", "val": "1"}, @@ -60,7 +60,7 @@ func TestMarshal(t *testing.T) { invalid := types.Datum{} invalid.SetInterface(1) - err = encoder.AddArray("bad-test", lkv.RowArrayMarshaler{minNotNull, invalid}) + err = encoder.AddArray("bad-test", lkv.RowArrayMarshaller{minNotNull, invalid}) require.Regexp(t, "cannot convert.*", err) require.Equal(t, encoder.Fields["bad-test"], []interface{}{ map[string]interface{}{"kind": "min", "val": "-inf"}, diff --git a/br/pkg/lightning/backend/tidb/tidb.go b/br/pkg/lightning/backend/tidb/tidb.go index d833e3fb619d6..338400dac6e11 100644 --- a/br/pkg/lightning/backend/tidb/tidb.go +++ b/br/pkg/lightning/backend/tidb/tidb.go @@ -475,7 +475,7 @@ func (enc *tidbEncoder) Encode(row []types.Datum, rowID int64, columnPermutation // 1. if len(row) < enc.columnCnt: data in row cannot populate the insert statement, because // there are enc.columnCnt elements to insert but fewer columns in row enc.logger.Error("column count mismatch", zap.Ints("column_permutation", columnPermutation), - zap.Array("data", kv.RowArrayMarshaler(row))) + zap.Array("data", kv.RowArrayMarshaller(row))) return emptyTiDBRow, errors.Errorf("column count mismatch, expected %d, got %d", enc.columnCnt, len(row)) } @@ -483,7 +483,7 @@ func (enc *tidbEncoder) Encode(row []types.Datum, rowID int64, columnPermutation // 2. if len(row) > len(columnIdx): raw row data has more columns than those // in the table enc.logger.Error("column count mismatch", zap.Ints("column_count", enc.columnIdx), - zap.Array("data", kv.RowArrayMarshaler(row))) + zap.Array("data", kv.RowArrayMarshaller(row))) return emptyTiDBRow, errors.Errorf("column count mismatch, at most %d but got %d", len(enc.columnIdx), len(row)) } @@ -501,7 +501,7 @@ func (enc *tidbEncoder) Encode(row []types.Datum, rowID int64, columnPermutation datum := field if err := enc.appendSQL(&encoded, &datum, getColumnByIndex(cols, enc.columnIdx[i])); err != nil { enc.logger.Error("tidb encode failed", - zap.Array("original", kv.RowArrayMarshaler(row)), + zap.Array("original", kv.RowArrayMarshaller(row)), zap.Int("originalCol", i), log.ShortError(err), ) From a8d19a4b5c4e191ca77cc79c86bb47810edac124 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Wed, 29 Mar 2023 15:53:10 +0800 Subject: [PATCH 3/5] refactor --- br/pkg/lightning/backend/kv/base.go | 72 ++++++++++++++++++++++++ br/pkg/lightning/backend/kv/sql2kv.go | 79 ++------------------------- 2 files changed, 78 insertions(+), 73 deletions(-) diff --git a/br/pkg/lightning/backend/kv/base.go b/br/pkg/lightning/backend/kv/base.go index 32521ba7eeb0b..d11019cf7a591 100644 --- a/br/pkg/lightning/backend/kv/base.go +++ b/br/pkg/lightning/backend/kv/base.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/types" @@ -111,6 +112,8 @@ type BaseKVEncoder struct { // convert auto id for shard rowid or auto random id base on row id generated by lightning AutoIDFn AutoIDConverterFn logger *zap.Logger + + RecordCache []types.Datum } // NewBaseKVEncoder creates a new BaseKVEncoder. @@ -159,6 +162,75 @@ func NewBaseKVEncoder(config *encode.EncodingConfig) (*BaseKVEncoder, error) { }, nil } +func (e *BaseKVEncoder) Record2KV(record, originalRow []types.Datum, rowID int64) (*KvPairs, error) { + _, err := e.Table.AddRecord(e.SessionCtx, record) + if err != nil { + e.logger.Error("kv encode failed", + zap.Array("originalRow", RowArrayMarshaller(originalRow)), + zap.Array("convertedRow", RowArrayMarshaller(record)), + log.ShortError(err), + ) + return nil, errors.Trace(err) + } + kvPairs := e.SessionCtx.TakeKvPairs() + for i := 0; i < len(kvPairs.Pairs); i++ { + var encoded [9]byte // The max length of encoded int64 is 9. + kvPairs.Pairs[i].RowID = common.EncodeIntRowIDToBuf(encoded[:0], rowID) + } + e.RecordCache = record[:0] + return kvPairs, nil +} + +// GetActualDatum returns the actual datum for the column. +func (e *BaseKVEncoder) GetActualDatum(rowID int64, colIndex int, inputDatum *types.Datum) (types.Datum, error) { + var ( + value types.Datum + err error + ) + + cols := e.Table.Cols() + + // Since this method is only called when iterating the columns in the `Encode()` method, + // we can assume that the `colIndex` always have a valid input + col := cols[colIndex] + + isBadNullValue := false + if inputDatum != nil { + value, err = table.CastValue(e.SessionCtx, *inputDatum, col.ToInfo(), false, false) + if err != nil { + return value, err + } + if err := col.CheckNotNull(&value, 0); err == nil { + return value, nil // the most normal case + } + isBadNullValue = true + } + // handle special values + switch { + case IsAutoIncCol(col.ToInfo()): + // we still need a conversion, e.g. to catch overflow with a TINYINT column. + value, err = table.CastValue(e.SessionCtx, types.NewIntDatum(rowID), col.ToInfo(), false, false) + case e.IsAutoRandomCol(col.ToInfo()): + var val types.Datum + realRowID := e.AutoIDFn(rowID) + if mysql.HasUnsignedFlag(col.GetFlag()) { + val = types.NewUintDatum(uint64(realRowID)) + } else { + val = types.NewIntDatum(realRowID) + } + value, err = table.CastValue(e.SessionCtx, val, col.ToInfo(), false, false) + case col.IsGenerated(): + // inject some dummy value for gen col so that MutRowFromDatums below sees a real value instead of nil. + // if MutRowFromDatums sees a nil it won't initialize the underlying storage and cause SetDatum to panic. + value = types.GetMinValue(&col.FieldType) + case isBadNullValue: + err = col.HandleBadNull(&value, e.SessionCtx.Vars.StmtCtx, 0) + default: + value, err = table.GetColDefaultValue(e.SessionCtx, col.ToInfo()) + } + return value, err +} + // IsAutoRandomCol checks if the column is auto random column. func (e *BaseKVEncoder) IsAutoRandomCol(col *model.ColumnInfo) bool { return e.Table.Meta().ContainsAutoRandomBits() && col.ID == e.AutoRandomColID diff --git a/br/pkg/lightning/backend/kv/sql2kv.go b/br/pkg/lightning/backend/kv/sql2kv.go index cd374e9ae73cc..4551f623871bc 100644 --- a/br/pkg/lightning/backend/kv/sql2kv.go +++ b/br/pkg/lightning/backend/kv/sql2kv.go @@ -24,7 +24,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" "github.com/pingcap/tidb/br/pkg/lightning/common" - "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/lightning/metric" "github.com/pingcap/tidb/br/pkg/lightning/verification" "github.com/pingcap/tidb/expression" @@ -36,14 +35,12 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" - "go.uber.org/zap" "golang.org/x/exp/slices" ) type tableKVEncoder struct { *BaseKVEncoder - recordCache []types.Datum - metrics *metric.Metrics + metrics *metric.Metrics } func GetSession4test(encoder encode.Encoder) sessionctx.Context { @@ -188,8 +185,8 @@ func (kvcodec *tableKVEncoder) Encode(row []types.Datum, rowID int64, columnPerm //nolint: prealloc var record []types.Datum - if kvcodec.recordCache != nil { - record = kvcodec.recordCache + if kvcodec.RecordCache != nil { + record = kvcodec.RecordCache } else { record = make([]types.Datum, 0, len(cols)+1) } @@ -201,7 +198,7 @@ func (kvcodec *tableKVEncoder) Encode(row []types.Datum, rowID int64, columnPerm if j >= 0 && j < len(row) { theDatum = &row[j] } - value, err = kvcodec.getActualDatum(rowID, i, theDatum) + value, err = kvcodec.GetActualDatum(rowID, i, theDatum) if err != nil { return nil, kvcodec.LogKVConvertFailed(row, j, col.ToInfo(), err) } @@ -250,22 +247,7 @@ func (kvcodec *tableKVEncoder) Encode(row []types.Datum, rowID int64, columnPerm } } - _, err = kvcodec.Table.AddRecord(kvcodec.SessionCtx, record) - if err != nil { - kvcodec.logger.Error("kv encode failed", - zap.Array("originalRow", RowArrayMarshaller(row)), - zap.Array("convertedRow", RowArrayMarshaller(record)), - log.ShortError(err), - ) - return nil, errors.Trace(err) - } - kvPairs := kvcodec.SessionCtx.TakeKvPairs() - for i := 0; i < len(kvPairs.Pairs); i++ { - var encoded [9]byte // The max length of encoded int64 is 9. - kvPairs.Pairs[i].RowID = common.EncodeIntRowIDToBuf(encoded[:0], rowID) - } - kvcodec.recordCache = record[:0] - return kvPairs, nil + return kvcodec.Record2KV(record, row, rowID) } func IsAutoIncCol(colInfo *model.ColumnInfo) bool { @@ -284,56 +266,7 @@ func GetEncoderSe(encoder encode.Encoder) *Session { // GetActualDatum export getActualDatum function. func GetActualDatum(encoder encode.Encoder, rowID int64, colIndex int, inputDatum *types.Datum) (types.Datum, error) { - return encoder.(*tableKVEncoder).getActualDatum(70, 0, inputDatum) -} - -func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDatum *types.Datum) (types.Datum, error) { - var ( - value types.Datum - err error - ) - - cols := kvcodec.Table.Cols() - - // Since this method is only called when iterating the columns in the `Encode()` method, - // we can assume that the `colIndex` always have a valid input - col := cols[colIndex] - - isBadNullValue := false - if inputDatum != nil { - value, err = table.CastValue(kvcodec.SessionCtx, *inputDatum, col.ToInfo(), false, false) - if err != nil { - return value, err - } - if err := col.CheckNotNull(&value, 0); err == nil { - return value, nil // the most normal case - } - isBadNullValue = true - } - // handle special values - switch { - case IsAutoIncCol(col.ToInfo()): - // we still need a conversion, e.g. to catch overflow with a TINYINT column. - value, err = table.CastValue(kvcodec.SessionCtx, types.NewIntDatum(rowID), col.ToInfo(), false, false) - case kvcodec.IsAutoRandomCol(col.ToInfo()): - var val types.Datum - realRowID := kvcodec.AutoIDFn(rowID) - if mysql.HasUnsignedFlag(col.GetFlag()) { - val = types.NewUintDatum(uint64(realRowID)) - } else { - val = types.NewIntDatum(realRowID) - } - value, err = table.CastValue(kvcodec.SessionCtx, val, col.ToInfo(), false, false) - case col.IsGenerated(): - // inject some dummy value for gen col so that MutRowFromDatums below sees a real value instead of nil. - // if MutRowFromDatums sees a nil it won't initialize the underlying storage and cause SetDatum to panic. - value = types.GetMinValue(&col.FieldType) - case isBadNullValue: - err = col.HandleBadNull(&value, kvcodec.SessionCtx.Vars.StmtCtx, 0) - default: - value, err = table.GetColDefaultValue(kvcodec.SessionCtx, col.ToInfo()) - } - return value, err + return encoder.(*tableKVEncoder).GetActualDatum(70, 0, inputDatum) } // get record value for auto-increment field From 44650261b3f7396c90c6ac3bcb19a3a8b21c4a15 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Wed, 29 Mar 2023 16:44:35 +0800 Subject: [PATCH 4/5] refactor --- br/pkg/lightning/backend/kv/base.go | 52 +++++++++++++++++----- br/pkg/lightning/backend/kv/sql2kv.go | 41 ++++------------- br/pkg/lightning/backend/kv/sql2kv_test.go | 6 +-- 3 files changed, 52 insertions(+), 47 deletions(-) diff --git a/br/pkg/lightning/backend/kv/base.go b/br/pkg/lightning/backend/kv/base.go index d11019cf7a591..4683e78eed43c 100644 --- a/br/pkg/lightning/backend/kv/base.go +++ b/br/pkg/lightning/backend/kv/base.go @@ -15,6 +15,7 @@ package kv import ( + "context" "math/rand" "github.com/pingcap/errors" @@ -108,12 +109,13 @@ type BaseKVEncoder struct { GenCols []GeneratedCol SessionCtx *Session Table table.Table + Columns []*table.Column AutoRandomColID int64 // convert auto id for shard rowid or auto random id base on row id generated by lightning AutoIDFn AutoIDConverterFn - logger *zap.Logger - RecordCache []types.Datum + logger *zap.Logger + recordCache []types.Datum } // NewBaseKVEncoder creates a new BaseKVEncoder. @@ -156,12 +158,22 @@ func NewBaseKVEncoder(config *encode.EncodingConfig) (*BaseKVEncoder, error) { GenCols: genCols, SessionCtx: se, Table: config.Table, + Columns: cols, AutoRandomColID: autoRandomColID, AutoIDFn: autoIDFn, logger: config.Logger.Logger, }, nil } +// GetOrCreateRecord returns a record slice from the cache if possible, otherwise creates a new one. +func (e *BaseKVEncoder) GetOrCreateRecord() []types.Datum { + if e.recordCache != nil { + return e.recordCache + } + return make([]types.Datum, 0, len(e.Columns)+1) +} + +// Record2KV converts a row into a KV pair. func (e *BaseKVEncoder) Record2KV(record, originalRow []types.Datum, rowID int64) (*KvPairs, error) { _, err := e.Table.AddRecord(e.SessionCtx, record) if err != nil { @@ -177,23 +189,41 @@ func (e *BaseKVEncoder) Record2KV(record, originalRow []types.Datum, rowID int64 var encoded [9]byte // The max length of encoded int64 is 9. kvPairs.Pairs[i].RowID = common.EncodeIntRowIDToBuf(encoded[:0], rowID) } - e.RecordCache = record[:0] + e.recordCache = record[:0] return kvPairs, nil } -// GetActualDatum returns the actual datum for the column. -func (e *BaseKVEncoder) GetActualDatum(rowID int64, colIndex int, inputDatum *types.Datum) (types.Datum, error) { +// ProcessColDatum processes the datum of a column. +func (e *BaseKVEncoder) ProcessColDatum(col *table.Column, rowID int64, inputDatum *types.Datum) (types.Datum, error) { + value, err := e.getActualDatum(col, rowID, inputDatum) + if err != nil { + return value, err + } + + if e.IsAutoRandomCol(col.ToInfo()) { + meta := e.Table.Meta() + shardFmt := autoid.NewShardIDFormat(&col.FieldType, meta.AutoRandomBits, meta.AutoRandomRangeBits) + // this allocator is the same as the allocator in table importer, i.e. PanickingAllocators. below too. + alloc := e.Table.Allocators(e.SessionCtx).Get(autoid.AutoRandomType) + if err := alloc.Rebase(context.Background(), value.GetInt64()&shardFmt.IncrementalMask(), false); err != nil { + return value, errors.Trace(err) + } + } + if IsAutoIncCol(col.ToInfo()) { + alloc := e.Table.Allocators(e.SessionCtx).Get(autoid.AutoIncrementType) + if err := alloc.Rebase(context.Background(), GetAutoRecordID(value, &col.FieldType), false); err != nil { + return value, errors.Trace(err) + } + } + return value, nil +} + +func (e *BaseKVEncoder) getActualDatum(col *table.Column, rowID int64, inputDatum *types.Datum) (types.Datum, error) { var ( value types.Datum err error ) - cols := e.Table.Cols() - - // Since this method is only called when iterating the columns in the `Encode()` method, - // we can assume that the `colIndex` always have a valid input - col := cols[colIndex] - isBadNullValue := false if inputDatum != nil { value, err = table.CastValue(e.SessionCtx, *inputDatum, col.ToInfo(), false, false) diff --git a/br/pkg/lightning/backend/kv/sql2kv.go b/br/pkg/lightning/backend/kv/sql2kv.go index 4551f623871bc..e31586f3f3966 100644 --- a/br/pkg/lightning/backend/kv/sql2kv.go +++ b/br/pkg/lightning/backend/kv/sql2kv.go @@ -178,52 +178,27 @@ func KvPairsFromRow(row encode.Row) []common.KvPair { // See comments in `(*TableRestore).initializeColumns` for the meaning of the // `columnPermutation` parameter. func (kvcodec *tableKVEncoder) Encode(row []types.Datum, rowID int64, columnPermutation []int, offset int64) (encode.Row, error) { - cols := kvcodec.Table.Cols() - var value types.Datum var err error - //nolint: prealloc - var record []types.Datum - if kvcodec.RecordCache != nil { - record = kvcodec.RecordCache - } else { - record = make([]types.Datum, 0, len(cols)+1) - } - - meta := kvcodec.Table.Meta() - for i, col := range cols { + record := kvcodec.GetOrCreateRecord() + for i, col := range kvcodec.Columns { var theDatum *types.Datum = nil j := columnPermutation[i] if j >= 0 && j < len(row) { theDatum = &row[j] } - value, err = kvcodec.GetActualDatum(rowID, i, theDatum) + value, err = kvcodec.ProcessColDatum(col, rowID, theDatum) if err != nil { return nil, kvcodec.LogKVConvertFailed(row, j, col.ToInfo(), err) } record = append(record, value) - - if kvcodec.IsAutoRandomCol(col.ToInfo()) { - shardFmt := autoid.NewShardIDFormat(&col.FieldType, meta.AutoRandomBits, meta.AutoRandomRangeBits) - // this allocator is the same as the allocator in table importer, i.e. PanickingAllocators. below too. - alloc := kvcodec.Table.Allocators(kvcodec.SessionCtx).Get(autoid.AutoRandomType) - if err := alloc.Rebase(context.Background(), value.GetInt64()&shardFmt.IncrementalMask(), false); err != nil { - return nil, errors.Trace(err) - } - } - if IsAutoIncCol(col.ToInfo()) { - alloc := kvcodec.Table.Allocators(kvcodec.SessionCtx).Get(autoid.AutoIncrementType) - if err := alloc.Rebase(context.Background(), GetAutoRecordID(value, &col.FieldType), false); err != nil { - return nil, errors.Trace(err) - } - } } - if common.TableHasAutoRowID(meta) { + if common.TableHasAutoRowID(kvcodec.Table.Meta()) { rowValue := rowID - j := columnPermutation[len(cols)] + j := columnPermutation[len(kvcodec.Columns)] if j >= 0 && j < len(row) { value, err = table.CastValue(kvcodec.SessionCtx, row[j], ExtraHandleColumnInfo, false, false) rowValue = value.GetInt64() @@ -242,7 +217,7 @@ func (kvcodec *tableKVEncoder) Encode(row []types.Datum, rowID int64, columnPerm } if len(kvcodec.GenCols) > 0 { - if errCol, err := kvcodec.EvalGeneratedColumns(record, cols); err != nil { + if errCol, err := kvcodec.EvalGeneratedColumns(record, kvcodec.Columns); err != nil { return nil, kvcodec.LogEvalGenExprFailed(row, errCol, err) } } @@ -265,8 +240,8 @@ func GetEncoderSe(encoder encode.Encoder) *Session { } // GetActualDatum export getActualDatum function. -func GetActualDatum(encoder encode.Encoder, rowID int64, colIndex int, inputDatum *types.Datum) (types.Datum, error) { - return encoder.(*tableKVEncoder).GetActualDatum(70, 0, inputDatum) +func GetActualDatum(encoder encode.Encoder, col *table.Column, rowID int64, inputDatum *types.Datum) (types.Datum, error) { + return encoder.(*tableKVEncoder).getActualDatum(col, rowID, inputDatum) } // get record value for auto-increment field diff --git a/br/pkg/lightning/backend/kv/sql2kv_test.go b/br/pkg/lightning/backend/kv/sql2kv_test.go index d9ca8015c2858..ca9144977df52 100644 --- a/br/pkg/lightning/backend/kv/sql2kv_test.go +++ b/br/pkg/lightning/backend/kv/sql2kv_test.go @@ -357,7 +357,7 @@ func TestEncodeDoubleAutoIncrement(t *testing.T) { require.NoError(t, err) strDatumForID := types.NewStringDatum("1") - actualDatum, err := lkv.GetActualDatum(encoder, 70, 0, &strDatumForID) + actualDatum, err := lkv.GetActualDatum(encoder, tbl.Cols()[0], 70, &strDatumForID) require.NoError(t, err) require.Equal(t, types.NewFloat64Datum(1.0), actualDatum) @@ -426,11 +426,11 @@ func TestEncodeMissingAutoValue(t *testing.T) { nullDatum.SetNull() expectIDDatum := types.NewIntDatum(realRowID) - actualIDDatum, err := lkv.GetActualDatum(encoder, rowID, 0, nil) + actualIDDatum, err := lkv.GetActualDatum(encoder, tbl.Cols()[0], rowID, nil) require.NoError(t, err) require.Equal(t, expectIDDatum, actualIDDatum) - actualIDDatum, err = lkv.GetActualDatum(encoder, rowID, 0, &nullDatum) + actualIDDatum, err = lkv.GetActualDatum(encoder, tbl.Cols()[0], rowID, &nullDatum) require.NoError(t, err) require.Equal(t, expectIDDatum, actualIDDatum) From ac595402ff845af9cc70108c7069aa71de2632e6 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Wed, 29 Mar 2023 18:49:29 +0800 Subject: [PATCH 5/5] change --- br/pkg/lightning/backend/kv/BUILD.bazel | 1 + executor/importer/BUILD.bazel | 2 ++ executor/importer/import.go | 2 ++ store/driver/BUILD.bazel | 1 + 4 files changed, 6 insertions(+) diff --git a/br/pkg/lightning/backend/kv/BUILD.bazel b/br/pkg/lightning/backend/kv/BUILD.bazel index 6bbfc0d35d18e..e51c4f261002e 100644 --- a/br/pkg/lightning/backend/kv/BUILD.bazel +++ b/br/pkg/lightning/backend/kv/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "kv", srcs = [ "allocator.go", + "base.go", "kv2sql.go", "session.go", "sql2kv.go", diff --git a/executor/importer/BUILD.bazel b/executor/importer/BUILD.bazel index 190398c7c5d82..a28bcf8dec855 100644 --- a/executor/importer/BUILD.bazel +++ b/executor/importer/BUILD.bazel @@ -11,6 +11,7 @@ go_library( "//br/pkg/lightning/log", "//br/pkg/lightning/mydump", "//br/pkg/storage", + "//kv", "//parser/ast", "//parser/mysql", "//parser/terror", @@ -25,6 +26,7 @@ go_library( "//util/stringutil", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_log//:log", + "@com_github_tikv_client_go_v2//config", "@org_uber_go_zap//:zap", ], ) diff --git a/executor/importer/import.go b/executor/importer/import.go index 2cc5e3fe0c2c3..d732f50de42ec 100644 --- a/executor/importer/import.go +++ b/executor/importer/import.go @@ -105,6 +105,8 @@ var ( LoadDataReadBlockSize = int64(config.ReadBlockSize) ) +// GetKVStore returns a kv.Storage. +// kv encoder of physical mode needs it. var GetKVStore func(path string, tls kvconfig.Security) (tidbkv.Storage, error) // FieldMapping indicates the relationship between input field and table column or user variable diff --git a/store/driver/BUILD.bazel b/store/driver/BUILD.bazel index 6ce308b71e824..e56417cc419ce 100644 --- a/store/driver/BUILD.bazel +++ b/store/driver/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/pingcap/tidb/store/driver", visibility = ["//visibility:public"], deps = [ + "//executor/importer", "//kv", "//sessionctx/variable", "//store/copr",