diff --git a/br/pkg/lightning/backend/kv/BUILD.bazel b/br/pkg/lightning/backend/kv/BUILD.bazel index 6bbfc0d35d18e..e51c4f261002e 100644 --- a/br/pkg/lightning/backend/kv/BUILD.bazel +++ b/br/pkg/lightning/backend/kv/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "kv", srcs = [ "allocator.go", + "base.go", "kv2sql.go", "session.go", "sql2kv.go", diff --git a/br/pkg/lightning/backend/kv/base.go b/br/pkg/lightning/backend/kv/base.go new file mode 100644 index 0000000000000..4683e78eed43c --- /dev/null +++ b/br/pkg/lightning/backend/kv/base.go @@ -0,0 +1,330 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import ( + "context" + "math/rand" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" + "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/br/pkg/lightning/log" + "github.com/pingcap/tidb/br/pkg/logutil" + "github.com/pingcap/tidb/br/pkg/redact" + "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/meta/autoid" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +// ExtraHandleColumnInfo is the column info of extra handle column. +var ExtraHandleColumnInfo = model.NewExtraHandleColInfo() + +// GeneratedCol generated column info. +type GeneratedCol struct { + // index of the column in the table + Index int + Expr expression.Expression +} + +// AutoIDConverterFn is a function to convert auto id. +type AutoIDConverterFn func(int64) int64 + +// RowArrayMarshaller wraps a slice of types.Datum for logging the content into zap. +type RowArrayMarshaller []types.Datum + +var kindStr = [...]string{ + types.KindNull: "null", + types.KindInt64: "int64", + types.KindUint64: "uint64", + types.KindFloat32: "float32", + types.KindFloat64: "float64", + types.KindString: "string", + types.KindBytes: "bytes", + types.KindBinaryLiteral: "binary", + types.KindMysqlDecimal: "decimal", + types.KindMysqlDuration: "duration", + types.KindMysqlEnum: "enum", + types.KindMysqlBit: "bit", + types.KindMysqlSet: "set", + types.KindMysqlTime: "time", + types.KindInterface: "interface", + types.KindMinNotNull: "min", + types.KindMaxValue: "max", + types.KindRaw: "raw", + types.KindMysqlJSON: "json", +} + +// MarshalLogArray implements the zapcore.ArrayMarshaler interface +func (row RowArrayMarshaller) MarshalLogArray(encoder zapcore.ArrayEncoder) error { + for _, datum := range row { + kind := datum.Kind() + var str string + var err error + switch kind { + case types.KindNull: + str = "NULL" + case types.KindMinNotNull: + str = "-inf" + case types.KindMaxValue: + str = "+inf" + default: + str, err = datum.ToString() + if err != nil { + return err + } + } + if err := encoder.AppendObject(zapcore.ObjectMarshalerFunc(func(enc zapcore.ObjectEncoder) error { + enc.AddString("kind", kindStr[kind]) + enc.AddString("val", redact.String(str)) + return nil + })); err != nil { + return err + } + } + return nil +} + +// BaseKVEncoder encodes a row into a KV pair. +type BaseKVEncoder struct { + GenCols []GeneratedCol + SessionCtx *Session + Table table.Table + Columns []*table.Column + AutoRandomColID int64 + // convert auto id for shard rowid or auto random id base on row id generated by lightning + AutoIDFn AutoIDConverterFn + + logger *zap.Logger + recordCache []types.Datum +} + +// NewBaseKVEncoder creates a new BaseKVEncoder. +func NewBaseKVEncoder(config *encode.EncodingConfig) (*BaseKVEncoder, error) { + meta := config.Table.Meta() + cols := config.Table.Cols() + se := NewSession(&config.SessionOptions, config.Logger) + // Set CommonAddRecordCtx to session to reuse the slices and BufStore in AddRecord + recordCtx := tables.NewCommonAddRecordCtx(len(cols)) + tables.SetAddRecordCtx(se, recordCtx) + + var autoRandomColID int64 + autoIDFn := func(id int64) int64 { return id } + if meta.ContainsAutoRandomBits() { + col := common.GetAutoRandomColumn(meta) + autoRandomColID = col.ID + + shardFmt := autoid.NewShardIDFormat(&col.FieldType, meta.AutoRandomBits, meta.AutoRandomRangeBits) + shard := rand.New(rand.NewSource(config.AutoRandomSeed)).Int63() + autoIDFn = func(id int64) int64 { + return shardFmt.Compose(shard, id) + } + } else if meta.ShardRowIDBits > 0 { + rd := rand.New(rand.NewSource(config.AutoRandomSeed)) // nolint:gosec + mask := int64(1)< 0 { - rd := rand.New(rand.NewSource(config.AutoRandomSeed)) // nolint:gosec - mask := int64(1)<= 0 && j < len(row) { theDatum = &row[j] } - value, err = kvcodec.getActualDatum(rowID, i, theDatum) + value, err = kvcodec.ProcessColDatum(col, rowID, theDatum) if err != nil { - return nil, logKVConvertFailed(kvcodec.logger, row, j, col.ToInfo(), err) + return nil, kvcodec.LogKVConvertFailed(row, j, col.ToInfo(), err) } record = append(record, value) - - if kvcodec.isAutoRandomCol(col.ToInfo()) { - shardFmt := autoid.NewShardIDFormat(&col.FieldType, meta.AutoRandomBits, meta.AutoRandomRangeBits) - // this allocator is the same as the allocator in table importer, i.e. PanickingAllocators. below too. - alloc := kvcodec.tbl.Allocators(kvcodec.se).Get(autoid.AutoRandomType) - if err := alloc.Rebase(context.Background(), value.GetInt64()&shardFmt.IncrementalMask(), false); err != nil { - return nil, errors.Trace(err) - } - } - if isAutoIncCol(col.ToInfo()) { - alloc := kvcodec.tbl.Allocators(kvcodec.se).Get(autoid.AutoIncrementType) - if err := alloc.Rebase(context.Background(), getAutoRecordID(value, &col.FieldType), false); err != nil { - return nil, errors.Trace(err) - } - } } - if common.TableHasAutoRowID(meta) { + if common.TableHasAutoRowID(kvcodec.Table.Meta()) { rowValue := rowID - j := columnPermutation[len(cols)] + j := columnPermutation[len(kvcodec.Columns)] if j >= 0 && j < len(row) { - value, err = table.CastValue(kvcodec.se, row[j], ExtraHandleColumnInfo, false, false) + value, err = table.CastValue(kvcodec.SessionCtx, row[j], ExtraHandleColumnInfo, false, false) rowValue = value.GetInt64() } else { - rowID := kvcodec.autoIDFn(rowID) + rowID := kvcodec.AutoIDFn(rowID) value, err = types.NewIntDatum(rowID), nil } if err != nil { - return nil, logKVConvertFailed(kvcodec.logger, row, j, ExtraHandleColumnInfo, err) + return nil, kvcodec.LogKVConvertFailed(row, j, ExtraHandleColumnInfo, err) } record = append(record, value) - alloc := kvcodec.tbl.Allocators(kvcodec.se).Get(autoid.RowIDAllocType) + alloc := kvcodec.Table.Allocators(kvcodec.SessionCtx).Get(autoid.RowIDAllocType) if err := alloc.Rebase(context.Background(), rowValue, false); err != nil { return nil, errors.Trace(err) } } - if len(kvcodec.genCols) > 0 { - if errCol, err := evaluateGeneratedColumns(kvcodec.se, record, cols, kvcodec.genCols); err != nil { - return nil, logEvalGenExprFailed(kvcodec.logger, row, errCol, err) + if len(kvcodec.GenCols) > 0 { + if errCol, err := kvcodec.EvalGeneratedColumns(record, kvcodec.Columns); err != nil { + return nil, kvcodec.LogEvalGenExprFailed(row, errCol, err) } } - _, err = kvcodec.tbl.AddRecord(kvcodec.se, record) - if err != nil { - kvcodec.logger.Error("kv encode failed", - zap.Array("originalRow", RowArrayMarshaler(row)), - zap.Array("convertedRow", RowArrayMarshaler(record)), - log.ShortError(err), - ) - return nil, errors.Trace(err) - } - kvPairs := kvcodec.se.TakeKvPairs() - for i := 0; i < len(kvPairs.Pairs); i++ { - var encoded [9]byte // The max length of encoded int64 is 9. - kvPairs.Pairs[i].RowID = common.EncodeIntRowIDToBuf(encoded[:0], rowID) - } - kvcodec.recordCache = record[:0] - return kvPairs, nil + return kvcodec.Record2KV(record, row, rowID) } -func (kvcodec *tableKVEncoder) isAutoRandomCol(col *model.ColumnInfo) bool { - return kvcodec.tbl.Meta().ContainsAutoRandomBits() && col.ID == kvcodec.autoRandomColID -} - -func isAutoIncCol(colInfo *model.ColumnInfo) bool { +func IsAutoIncCol(colInfo *model.ColumnInfo) bool { return mysql.HasAutoIncrementFlag(colInfo.GetFlag()) } // GetEncoderIncrementalID return Auto increment id. func GetEncoderIncrementalID(encoder encode.Encoder, id int64) int64 { - return encoder.(*tableKVEncoder).autoIDFn(id) + return encoder.(*tableKVEncoder).AutoIDFn(id) } // GetEncoderSe return session. func GetEncoderSe(encoder encode.Encoder) *Session { - return encoder.(*tableKVEncoder).se + return encoder.(*tableKVEncoder).SessionCtx } // GetActualDatum export getActualDatum function. -func GetActualDatum(encoder encode.Encoder, rowID int64, colIndex int, inputDatum *types.Datum) (types.Datum, error) { - return encoder.(*tableKVEncoder).getActualDatum(70, 0, inputDatum) -} - -func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDatum *types.Datum) (types.Datum, error) { - var ( - value types.Datum - err error - ) - - cols := kvcodec.tbl.Cols() - - // Since this method is only called when iterating the columns in the `Encode()` method, - // we can assume that the `colIndex` always have a valid input - col := cols[colIndex] - - isBadNullValue := false - if inputDatum != nil { - value, err = table.CastValue(kvcodec.se, *inputDatum, col.ToInfo(), false, false) - if err != nil { - return value, err - } - if err := col.CheckNotNull(&value, 0); err == nil { - return value, nil // the most normal case - } - isBadNullValue = true - } - // handle special values - switch { - case isAutoIncCol(col.ToInfo()): - // we still need a conversion, e.g. to catch overflow with a TINYINT column. - value, err = table.CastValue(kvcodec.se, types.NewIntDatum(rowID), col.ToInfo(), false, false) - case kvcodec.isAutoRandomCol(col.ToInfo()): - var val types.Datum - realRowID := kvcodec.autoIDFn(rowID) - if mysql.HasUnsignedFlag(col.GetFlag()) { - val = types.NewUintDatum(uint64(realRowID)) - } else { - val = types.NewIntDatum(realRowID) - } - value, err = table.CastValue(kvcodec.se, val, col.ToInfo(), false, false) - case col.IsGenerated(): - // inject some dummy value for gen col so that MutRowFromDatums below sees a real value instead of nil. - // if MutRowFromDatums sees a nil it won't initialize the underlying storage and cause SetDatum to panic. - value = types.GetMinValue(&col.FieldType) - case isBadNullValue: - err = col.HandleBadNull(&value, kvcodec.se.Vars.StmtCtx, 0) - default: - value, err = table.GetColDefaultValue(kvcodec.se, col.ToInfo()) - } - return value, err +func GetActualDatum(encoder encode.Encoder, col *table.Column, rowID int64, inputDatum *types.Datum) (types.Datum, error) { + return encoder.(*tableKVEncoder).getActualDatum(col, rowID, inputDatum) } // get record value for auto-increment field // // See: https://github.com/pingcap/tidb/blob/47f0f15b14ed54fc2222f3e304e29df7b05e6805/executor/insert_common.go#L781-L852 -func getAutoRecordID(d types.Datum, target *types.FieldType) int64 { +func GetAutoRecordID(d types.Datum, target *types.FieldType) int64 { switch target.GetType() { case mysql.TypeFloat, mysql.TypeDouble: return int64(math.Round(d.GetFloat64())) diff --git a/br/pkg/lightning/backend/kv/sql2kv_test.go b/br/pkg/lightning/backend/kv/sql2kv_test.go index 203d92487c625..ca9144977df52 100644 --- a/br/pkg/lightning/backend/kv/sql2kv_test.go +++ b/br/pkg/lightning/backend/kv/sql2kv_test.go @@ -49,7 +49,7 @@ func TestMarshal(t *testing.T) { minNotNull := types.Datum{} minNotNull.SetMinNotNull() encoder := zapcore.NewMapObjectEncoder() - err := encoder.AddArray("test", lkv.RowArrayMarshaler{types.NewStringDatum("1"), nullDatum, minNotNull, types.MaxValueDatum()}) + err := encoder.AddArray("test", lkv.RowArrayMarshaller{types.NewStringDatum("1"), nullDatum, minNotNull, types.MaxValueDatum()}) require.NoError(t, err) require.Equal(t, encoder.Fields["test"], []interface{}{ map[string]interface{}{"kind": "string", "val": "1"}, @@ -60,7 +60,7 @@ func TestMarshal(t *testing.T) { invalid := types.Datum{} invalid.SetInterface(1) - err = encoder.AddArray("bad-test", lkv.RowArrayMarshaler{minNotNull, invalid}) + err = encoder.AddArray("bad-test", lkv.RowArrayMarshaller{minNotNull, invalid}) require.Regexp(t, "cannot convert.*", err) require.Equal(t, encoder.Fields["bad-test"], []interface{}{ map[string]interface{}{"kind": "min", "val": "-inf"}, @@ -357,7 +357,7 @@ func TestEncodeDoubleAutoIncrement(t *testing.T) { require.NoError(t, err) strDatumForID := types.NewStringDatum("1") - actualDatum, err := lkv.GetActualDatum(encoder, 70, 0, &strDatumForID) + actualDatum, err := lkv.GetActualDatum(encoder, tbl.Cols()[0], 70, &strDatumForID) require.NoError(t, err) require.Equal(t, types.NewFloat64Datum(1.0), actualDatum) @@ -426,11 +426,11 @@ func TestEncodeMissingAutoValue(t *testing.T) { nullDatum.SetNull() expectIDDatum := types.NewIntDatum(realRowID) - actualIDDatum, err := lkv.GetActualDatum(encoder, rowID, 0, nil) + actualIDDatum, err := lkv.GetActualDatum(encoder, tbl.Cols()[0], rowID, nil) require.NoError(t, err) require.Equal(t, expectIDDatum, actualIDDatum) - actualIDDatum, err = lkv.GetActualDatum(encoder, rowID, 0, &nullDatum) + actualIDDatum, err = lkv.GetActualDatum(encoder, tbl.Cols()[0], rowID, &nullDatum) require.NoError(t, err) require.Equal(t, expectIDDatum, actualIDDatum) diff --git a/br/pkg/lightning/backend/tidb/tidb.go b/br/pkg/lightning/backend/tidb/tidb.go index d833e3fb619d6..338400dac6e11 100644 --- a/br/pkg/lightning/backend/tidb/tidb.go +++ b/br/pkg/lightning/backend/tidb/tidb.go @@ -475,7 +475,7 @@ func (enc *tidbEncoder) Encode(row []types.Datum, rowID int64, columnPermutation // 1. if len(row) < enc.columnCnt: data in row cannot populate the insert statement, because // there are enc.columnCnt elements to insert but fewer columns in row enc.logger.Error("column count mismatch", zap.Ints("column_permutation", columnPermutation), - zap.Array("data", kv.RowArrayMarshaler(row))) + zap.Array("data", kv.RowArrayMarshaller(row))) return emptyTiDBRow, errors.Errorf("column count mismatch, expected %d, got %d", enc.columnCnt, len(row)) } @@ -483,7 +483,7 @@ func (enc *tidbEncoder) Encode(row []types.Datum, rowID int64, columnPermutation // 2. if len(row) > len(columnIdx): raw row data has more columns than those // in the table enc.logger.Error("column count mismatch", zap.Ints("column_count", enc.columnIdx), - zap.Array("data", kv.RowArrayMarshaler(row))) + zap.Array("data", kv.RowArrayMarshaller(row))) return emptyTiDBRow, errors.Errorf("column count mismatch, at most %d but got %d", len(enc.columnIdx), len(row)) } @@ -501,7 +501,7 @@ func (enc *tidbEncoder) Encode(row []types.Datum, rowID int64, columnPermutation datum := field if err := enc.appendSQL(&encoded, &datum, getColumnByIndex(cols, enc.columnIdx[i])); err != nil { enc.logger.Error("tidb encode failed", - zap.Array("original", kv.RowArrayMarshaler(row)), + zap.Array("original", kv.RowArrayMarshaller(row)), zap.Int("originalCol", i), log.ShortError(err), ) diff --git a/executor/importer/BUILD.bazel b/executor/importer/BUILD.bazel index 190398c7c5d82..a28bcf8dec855 100644 --- a/executor/importer/BUILD.bazel +++ b/executor/importer/BUILD.bazel @@ -11,6 +11,7 @@ go_library( "//br/pkg/lightning/log", "//br/pkg/lightning/mydump", "//br/pkg/storage", + "//kv", "//parser/ast", "//parser/mysql", "//parser/terror", @@ -25,6 +26,7 @@ go_library( "//util/stringutil", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_log//:log", + "@com_github_tikv_client_go_v2//config", "@org_uber_go_zap//:zap", ], ) diff --git a/executor/importer/import.go b/executor/importer/import.go index 893f87744bf7f..d732f50de42ec 100644 --- a/executor/importer/import.go +++ b/executor/importer/import.go @@ -30,6 +30,7 @@ import ( litlog "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/lightning/mydump" "github.com/pingcap/tidb/br/pkg/storage" + tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" @@ -42,6 +43,7 @@ import ( "github.com/pingcap/tidb/util/intest" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/stringutil" + kvconfig "github.com/tikv/client-go/v2/config" "go.uber.org/zap" ) @@ -103,6 +105,10 @@ var ( LoadDataReadBlockSize = int64(config.ReadBlockSize) ) +// GetKVStore returns a kv.Storage. +// kv encoder of physical mode needs it. +var GetKVStore func(path string, tls kvconfig.Security) (tidbkv.Storage, error) + // FieldMapping indicates the relationship between input field and table column or user variable type FieldMapping struct { Column *table.Column diff --git a/store/driver/BUILD.bazel b/store/driver/BUILD.bazel index 6ce308b71e824..e56417cc419ce 100644 --- a/store/driver/BUILD.bazel +++ b/store/driver/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/pingcap/tidb/store/driver", visibility = ["//visibility:public"], deps = [ + "//executor/importer", "//kv", "//sessionctx/variable", "//store/copr", diff --git a/store/driver/tikv_driver.go b/store/driver/tikv_driver.go index 497984d1a0309..a457e8f2f54be 100644 --- a/store/driver/tikv_driver.go +++ b/store/driver/tikv_driver.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/errors" deadlockpb "github.com/pingcap/kvproto/pkg/deadlock" "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/tidb/executor/importer" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/copr" @@ -59,6 +60,8 @@ func init() { // Setup the Hooks to dynamic control global resource controller. variable.EnableGlobalResourceControlFunc = tikv.EnableResourceControl variable.DisableGlobalResourceControlFunc = tikv.DisableResourceControl + // cannot use this package directly, it causes import cycle + importer.GetKVStore = getKVStore } // Option is a function that changes some config of Driver @@ -111,6 +114,10 @@ func TrySetupGlobalResourceController(ctx context.Context, serverID uint64, s kv return nil } +func getKVStore(path string, tls config.Security) (kv.Storage, error) { + return TiKVDriver{}.OpenWithOptions(path, WithSecurity(tls)) +} + // TiKVDriver implements engine TiKV. type TiKVDriver struct { pdConfig config.PDClient