From 2a283fb0e07c243f8bfb7b937915f994057eab35 Mon Sep 17 00:00:00 2001 From: glorv Date: Sat, 31 Oct 2020 00:17:58 +0800 Subject: [PATCH 1/5] fix multi error in column permutation --- lightning/backend/tidb.go | 19 +++++++++++++++---- lightning/checkpoints/checkpoints.go | 2 +- lightning/restore/restore.go | 21 +++++++++++++++++++-- 3 files changed, 35 insertions(+), 7 deletions(-) diff --git a/lightning/backend/tidb.go b/lightning/backend/tidb.go index 165387cd3..b959ed8fe 100644 --- a/lightning/backend/tidb.go +++ b/lightning/backend/tidb.go @@ -51,9 +51,10 @@ func (row tidbRows) MarshalLogArray(encoder zapcore.ArrayEncoder) error { } type tidbEncoder struct { - mode mysql.SQLMode - tbl table.Table - se *session + mode mysql.SQLMode + tbl table.Table + se *session + columnIdx []int } type tidbBackend struct { @@ -230,6 +231,16 @@ func (*tidbEncoder) Close() {} func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, columnPermutation []int) (Row, error) { cols := enc.tbl.Cols() + if len(enc.columnIdx) == 0 { + columnIdx := make([]int, len(columnPermutation)) + for i, idx := range columnPermutation { + if idx >= 0 { + columnIdx[idx] = i + } + } + enc.columnIdx = columnIdx + } + var encoded strings.Builder encoded.Grow(8 * len(row)) encoded.WriteByte('(') @@ -237,7 +248,7 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co if i != 0 { encoded.WriteByte(',') } - if err := enc.appendSQL(&encoded, &field, cols[columnPermutation[i]]); err != nil { + if err := enc.appendSQL(&encoded, &field, cols[enc.columnIdx[i]]); err != nil { logger.Error("tidb encode failed", zap.Array("original", rowArrayMarshaler(row)), zap.Int("originalCol", i), diff --git a/lightning/checkpoints/checkpoints.go b/lightning/checkpoints/checkpoints.go index c05d4199a..2424334eb 100644 --- a/lightning/checkpoints/checkpoints.go +++ b/lightning/checkpoints/checkpoints.go @@ -1441,7 +1441,7 @@ func (cpdb *FileCheckpointsDB) DumpChunks(context.Context, io.Writer) error { } func intSlice2Int32Slice(s []int) []int32 { - res := make([]int32, len(s)) + res := make([]int32, 0, len(s)) for _, i := range s { res = append(res, int32(i)) } diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index d8d172ada..4461682e5 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -20,6 +20,7 @@ import ( "io" "math" "os" + "sort" "strings" "sync" "sync/atomic" @@ -1576,8 +1577,24 @@ func (t *TableRestore) parseColumnPermutations(columns []string) ([]int, error) } func getColumnNames(tableInfo *model.TableInfo, permutation []int) []string { - names := make([]string, 0, len(permutation)) - for _, idx := range permutation { + colIndexes := make([]int, 0, len(permutation)) + for i := 0; i < len(permutation); i++ { + colIndexes = append(colIndexes, -1) + } + colCnt := 0 + for i, p := range permutation { + if p >= 0 { + colIndexes[p] = i + colCnt++ + } + } + + sort.Slice(colIndexes, func(i, j int) bool { + return colIndexes[i] < colIndexes[j] + }) + + names := make([]string, 0, colCnt) + for _, idx := range colIndexes { // skip columns with index -1 if idx >= 0 { names = append(names, tableInfo.Columns[idx].Name.O) From 73a6bd11d868baabb6bc3629b1f016e1f60d9f15 Mon Sep 17 00:00:00 2001 From: glorv Date: Sat, 31 Oct 2020 15:50:36 +0800 Subject: [PATCH 2/5] add unit test --- lightning/backend/tidb_test.go | 3 +- lightning/restore/restore.go | 5 - lightning/restore/restore_test.go | 203 ++++++++++++++++++++++++++++++ 3 files changed, 205 insertions(+), 6 deletions(-) diff --git a/lightning/backend/tidb_test.go b/lightning/backend/tidb_test.go index 5718c160c..762f06d27 100644 --- a/lightning/backend/tidb_test.go +++ b/lightning/backend/tidb_test.go @@ -197,8 +197,9 @@ func (s *mysqlSuite) TestStrictMode(c *C) { c.Assert(err, ErrorMatches, `.*incorrect utf8 value .* for column s0`) _, err = encoder.Encode(logger, []types.Datum{ + types.NewStringDatum(""), types.NewStringDatum("非 ASCII 字符串"), - }, 1, []int{1, 0, -1}) + }, 1, []int{0, 1, -1}) c.Assert(err, ErrorMatches, ".*incorrect ascii value .* for column s1") } diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index 4461682e5..7df375da8 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -20,7 +20,6 @@ import ( "io" "math" "os" - "sort" "strings" "sync" "sync/atomic" @@ -1589,10 +1588,6 @@ func getColumnNames(tableInfo *model.TableInfo, permutation []int) []string { } } - sort.Slice(colIndexes, func(i, j int) bool { - return colIndexes[i] < colIndexes[j] - }) - names := make([]string, 0, colCnt) for _, idx := range colIndexes { // skip columns with index -1 diff --git a/lightning/restore/restore_test.go b/lightning/restore/restore_test.go index bef2beb45..e5afbd676 100644 --- a/lightning/restore/restore_test.go +++ b/lightning/restore/restore_test.go @@ -409,6 +409,209 @@ func (s *tableRestoreSuite) TestPopulateChunks(c *C) { s.cfg.Mydumper.CSV.Header = false } +func (s *tableRestoreSuite) TestPopulateChunksCSVHeader(c *C) { + fakeDataDir := c.MkDir() + store, err := storage.NewLocalStorage(fakeDataDir) + c.Assert(err, IsNil) + + fakeDataFiles := make([]mydump.FileInfo, 0) + + fakeCsvContents := []string{ + // small full header + "a,b,c\r\n1,2,3\r\n", + // small partial header + "b,c\r\n2,3\r\n", + // big full header + "a,b,c\r\n90000,80000,700000\r\n1000,2000,3000\r\n11,22,33\r\n3,4,5\r\n", + // big full header unordered + "c,a,b\r\n,1000,2000,3000\r\n11,22,33\r\n1000,2000,404\r\n3,4,5\r\n90000,80000,700000\r\n7999999,89999999,9999999\r\n", + // big partial header + "b,c\r\n2000001,30000001\r\n35231616,462424626\r\n62432,434898934\r\n", + } + total := 0 + for i, s := range fakeCsvContents { + csvName := fmt.Sprintf("db.table.%02d.csv", i) + err := ioutil.WriteFile(filepath.Join(fakeDataDir, csvName), []byte(s), 0644) + c.Assert(err, IsNil) + fakeDataFiles = append(fakeDataFiles, mydump.FileInfo{ + TableName: filter.Table{"db", "table"}, + FileMeta: mydump.SourceFileMeta{Path: csvName, Type: mydump.SourceTypeCSV, SortKey: fmt.Sprintf("%02d", i)}, + Size: int64(len(s)), + }) + total += len(s) + } + tableMeta := &mydump.MDTableMeta{ + DB: "db", + Name: "table", + TotalSize: int64(total), + SchemaFile: mydump.FileInfo{TableName: filter.Table{Schema: "db", Name: "table"}, FileMeta: mydump.SourceFileMeta{Path: "db.table-schema.sql", Type: mydump.SourceTypeTableSchema}}, + DataFiles: fakeDataFiles, + } + + failpoint.Enable("github.com/pingcap/tidb-lightning/lightning/restore/PopulateChunkTimestamp", "return(1234567897)") + defer failpoint.Disable("github.com/pingcap/tidb-lightning/lightning/restore/PopulateChunkTimestamp") + + cp := &TableCheckpoint{ + Engines: make(map[int32]*EngineCheckpoint), + } + + cfg := config.NewConfig() + cfg.Mydumper.BatchSize = 100 + cfg.Mydumper.MaxRegionSize = 40 + + cfg.Mydumper.CSV.Header = true + cfg.Mydumper.StrictFormat = true + rc := &RestoreController{cfg: cfg, ioWorkers: worker.NewPool(context.Background(), 1, "io"), store: store} + + tr, err := NewTableRestore("`db`.`table`", tableMeta, s.dbInfo, s.tableInfo, &TableCheckpoint{}) + c.Assert(err, IsNil) + c.Assert(tr.populateChunks(context.Background(), rc, cp), IsNil) + + c.Assert(cp.Engines, DeepEquals, map[int32]*EngineCheckpoint{ + -1: { + Status: CheckpointStatusLoaded, + }, + 0: { + Status: CheckpointStatusLoaded, + Chunks: []*ChunkCheckpoint{ + { + Key: ChunkCheckpointKey{Path: tableMeta.DataFiles[0].FileMeta.Path, Offset: 0}, + FileMeta: tableMeta.DataFiles[0].FileMeta, + Chunk: mydump.Chunk{ + Offset: 0, + EndOffset: 14, + PrevRowIDMax: 0, + RowIDMax: 4, // 37 bytes with 3 columns can store at most 7 rows. + }, + Timestamp: 1234567897, + }, + { + Key: ChunkCheckpointKey{Path: tableMeta.DataFiles[1].FileMeta.Path, Offset: 0}, + FileMeta: tableMeta.DataFiles[1].FileMeta, + Chunk: mydump.Chunk{ + Offset: 0, + EndOffset: 10, + PrevRowIDMax: 4, + RowIDMax: 7, + }, + Timestamp: 1234567897, + }, + { + Key: ChunkCheckpointKey{Path: tableMeta.DataFiles[2].FileMeta.Path, Offset: 6}, + FileMeta: tableMeta.DataFiles[2].FileMeta, + ColumnPermutation: []int{0, 1, 2, -1}, + Chunk: mydump.Chunk{ + Offset: 6, + EndOffset: 52, + PrevRowIDMax: 7, + RowIDMax: 20, + Columns: []string{"a", "b", "c"}, + }, + + Timestamp: 1234567897, + }, + { + Key: ChunkCheckpointKey{Path: tableMeta.DataFiles[2].FileMeta.Path, Offset: 52}, + FileMeta: tableMeta.DataFiles[2].FileMeta, + ColumnPermutation: []int{0, 1, 2, -1}, + Chunk: mydump.Chunk{ + Offset: 52, + EndOffset: 60, + PrevRowIDMax: 20, + RowIDMax: 22, + Columns: []string{"a", "b", "c"}, + }, + Timestamp: 1234567897, + }, + { + Key: ChunkCheckpointKey{Path: tableMeta.DataFiles[3].FileMeta.Path, Offset: 6}, + FileMeta: tableMeta.DataFiles[3].FileMeta, + ColumnPermutation: []int{1, 2, 0, -1}, + Chunk: mydump.Chunk{ + Offset: 6, + EndOffset: 48, + PrevRowIDMax: 22, + RowIDMax: 35, + Columns: []string{"c", "a", "b"}, + }, + Timestamp: 1234567897, + }, + }, + }, + 1: { + Status: CheckpointStatusLoaded, + Chunks: []*ChunkCheckpoint{ + { + Key: ChunkCheckpointKey{Path: tableMeta.DataFiles[3].FileMeta.Path, Offset: 48}, + FileMeta: tableMeta.DataFiles[3].FileMeta, + ColumnPermutation: []int{1, 2, 0, -1}, + Chunk: mydump.Chunk{ + Offset: 48, + EndOffset: 101, + PrevRowIDMax: 35, + RowIDMax: 48, + Columns: []string{"c", "a", "b"}, + }, + Timestamp: 1234567897, + }, + { + Key: ChunkCheckpointKey{Path: tableMeta.DataFiles[3].FileMeta.Path, Offset: 101}, + FileMeta: tableMeta.DataFiles[3].FileMeta, + ColumnPermutation: []int{1, 2, 0, -1}, + Chunk: mydump.Chunk{ + Offset: 101, + EndOffset: 102, + PrevRowIDMax: 48, + RowIDMax: 48, + Columns: []string{"c", "a", "b"}, + }, + Timestamp: 1234567897, + }, + { + Key: ChunkCheckpointKey{Path: tableMeta.DataFiles[4].FileMeta.Path, Offset: 4}, + FileMeta: tableMeta.DataFiles[4].FileMeta, + ColumnPermutation: []int{-1, 0, 1, -1}, + Chunk: mydump.Chunk{ + Offset: 4, + EndOffset: 59, + PrevRowIDMax: 48, + RowIDMax: 61, + Columns: []string{"b", "c"}, + }, + Timestamp: 1234567897, + }, + }, + }, + 2: { + Status: CheckpointStatusLoaded, + Chunks: []*ChunkCheckpoint{ + { + Key: ChunkCheckpointKey{Path: tableMeta.DataFiles[4].FileMeta.Path, Offset: 59}, + FileMeta: tableMeta.DataFiles[4].FileMeta, + ColumnPermutation: []int{-1, 0, 1, -1}, + Chunk: mydump.Chunk{ + Offset: 59, + EndOffset: 60, + PrevRowIDMax: 61, + RowIDMax: 61, + Columns: []string{"b", "c"}, + }, + Timestamp: 1234567897, + }, + }, + }, + }) +} + +func (s *tableRestoreSuite) TestGetColumnsNames(c *C) { + c.Assert(getColumnNames(s.tableInfo.Core, []int{0, 1, 2, -1}), DeepEquals, []string{"a", "b", "c"}) + c.Assert(getColumnNames(s.tableInfo.Core, []int{1, 0, 2, -1}), DeepEquals, []string{"b", "a", "c"}) + c.Assert(getColumnNames(s.tableInfo.Core, []int{-1, 0, 1, -1}), DeepEquals, []string{"b", "c"}) + c.Assert(getColumnNames(s.tableInfo.Core, []int{0, 1, -1, -1}), DeepEquals, []string{"a", "b"}) + c.Assert(getColumnNames(s.tableInfo.Core, []int{1, -1, 0, -1}), DeepEquals, []string{"c", "a"}) + c.Assert(getColumnNames(s.tableInfo.Core, []int{-1, 0, -1, -1}), DeepEquals, []string{"b"}) +} + func (s *tableRestoreSuite) TestInitializeColumns(c *C) { ccp := &ChunkCheckpoint{} c.Assert(s.tr.initializeColumns(nil, ccp), IsNil) From e9143ac53eb767dbd713c975ad36575cfe5281b3 Mon Sep 17 00:00:00 2001 From: glorv Date: Sat, 31 Oct 2020 16:14:32 +0800 Subject: [PATCH 3/5] add a integration test --- tests/column_permutation/config.toml | 3 +++ .../data/test-schema-create.sql | 1 + .../data/test.test_perm-schema.sql | 22 +++++++++++++++++++ .../data/test.test_perm.0.csv | 6 +++++ tests/column_permutation/run.sh | 18 +++++++++++++++ 5 files changed, 50 insertions(+) create mode 100644 tests/column_permutation/config.toml create mode 100644 tests/column_permutation/data/test-schema-create.sql create mode 100644 tests/column_permutation/data/test.test_perm-schema.sql create mode 100644 tests/column_permutation/data/test.test_perm.0.csv create mode 100644 tests/column_permutation/run.sh diff --git a/tests/column_permutation/config.toml b/tests/column_permutation/config.toml new file mode 100644 index 000000000..7bbff7f80 --- /dev/null +++ b/tests/column_permutation/config.toml @@ -0,0 +1,3 @@ +[mydumper] +strict-format = true +max-region-size = 200 diff --git a/tests/column_permutation/data/test-schema-create.sql b/tests/column_permutation/data/test-schema-create.sql new file mode 100644 index 000000000..e913277a5 --- /dev/null +++ b/tests/column_permutation/data/test-schema-create.sql @@ -0,0 +1 @@ +CREATE DATABASE `test` IF NOT EXISTS; diff --git a/tests/column_permutation/data/test.test_perm-schema.sql b/tests/column_permutation/data/test.test_perm-schema.sql new file mode 100644 index 000000000..85176eeee --- /dev/null +++ b/tests/column_permutation/data/test.test_perm-schema.sql @@ -0,0 +1,22 @@ +CREATE TABLE `test` ( + `id` int(11) NOT NULL, + `contract_no` varchar(64) DEFAULT NULL, + `fund_seq_no` varchar(64) DEFAULT NULL, + `term_no` int(11) DEFAULT NULL, + `contract_type` varchar(8) DEFAULT NULL, + `internal_transfer_tag` varchar(8) DEFAULT NULL, + `prin_amt` int(11) DEFAULT NULL, + `start_date` varchar(8) DEFAULT NULL, + `end_date` varchar(8) DEFAULT NULL, + `batch_date` varchar(8) DEFAULT NULL, + `crt_time` timestamp DEFAULT CURRENT_TIMESTAMP, + `region_code` varchar(8) DEFAULT NULL, + `credit_code` varchar(64) DEFAULT NULL +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin +PARTITION BY RANGE COLUMNS(batch_date) ( + PARTITION `P20200224` VALUES LESS THAN ("2020-02-05 00:00:00"), + PARTITION `P20200324` VALUES LESS THAN ("2020-03-05 00:00:00"), + PARTITION `P20200424` VALUES LESS THAN ("2020-04-05 00:00:00"), + PARTITION `P20200524` VALUES LESS THAN ("2020-05-05 00:00:00"), + PARTITION `P_MAXVALUE` VALUES LESS THAN ("MAXVALUE") +); \ No newline at end of file diff --git a/tests/column_permutation/data/test.test_perm.0.csv b/tests/column_permutation/data/test.test_perm.0.csv new file mode 100644 index 000000000..f27b9fd5b --- /dev/null +++ b/tests/column_permutation/data/test.test_perm.0.csv @@ -0,0 +1,6 @@ +contract_no,fund_seq_no,term_no,contract_type,internal_transfer_tag,prin_amt,start_date,end_date,region_code,credit_code +2020061000019011020164030595,202006100001901102016403059520200629,1,01,N,356,20200210,20200720,000000, +2020061000019011020164030596,202006100001901102016403059520200628,1,01,N,3561,20200310,20200720,000001, +2020061000019011020164030597,202006100001901102016403059520200627,1,01,N,3562,20200410,20200720,000002,33 +2020061000019011020164030598,108319xx0185-202006100001901102016403059520200626,12,02,Y,26368,20200510,20200620,000003, +2020061000019011020164030599,202006100001901102016403059520200625,1,01,N,3960,20200610,20200720,000005,999 \ No newline at end of file diff --git a/tests/column_permutation/run.sh b/tests/column_permutation/run.sh new file mode 100644 index 000000000..7e6767923 --- /dev/null +++ b/tests/column_permutation/run.sh @@ -0,0 +1,18 @@ +set -eu + +for BACKEND in local importer tidb; do + if [ "$BACKEND" = 'local' ]; then + check_cluster_version 4 0 0 'local backend' || continue + fi + run_sql 'DROP DATABASE IF EXISTS test' + + run_lightning --backend $BACKEND + + run_sql 'select count(*) from test.test_perm;' + check_contains "count(*): 5" + + run_sql "SELECT fund_seq_no, region_code, credit_code FROM test.test_perm WHERE contract_no = '2020061000019011020164030597';" + check_contains "fund_seq_no: 202006100001901102016403059520200627" + check_contains "region_code: 000002" + check_contains "credit_code: 33" +done From 150720ba50df8d2df36d5470c07cbceb6a5beec2 Mon Sep 17 00:00:00 2001 From: glorv Date: Sat, 31 Oct 2020 23:30:58 +0800 Subject: [PATCH 4/5] rename test db --- tests/column_permutation/data/perm-schema-create.sql | 1 + ...{test.test_perm-schema.sql => perm.test_perm-schema.sql} | 4 ++-- .../data/{test.test_perm.0.csv => perm.test_perm.0.csv} | 0 tests/column_permutation/data/test-schema-create.sql | 1 - tests/column_permutation/run.sh | 6 +++--- 5 files changed, 6 insertions(+), 6 deletions(-) create mode 100644 tests/column_permutation/data/perm-schema-create.sql rename tests/column_permutation/data/{test.test_perm-schema.sql => perm.test_perm-schema.sql} (94%) rename tests/column_permutation/data/{test.test_perm.0.csv => perm.test_perm.0.csv} (100%) delete mode 100644 tests/column_permutation/data/test-schema-create.sql diff --git a/tests/column_permutation/data/perm-schema-create.sql b/tests/column_permutation/data/perm-schema-create.sql new file mode 100644 index 000000000..fe9a5be60 --- /dev/null +++ b/tests/column_permutation/data/perm-schema-create.sql @@ -0,0 +1 @@ +CREATE DATABASE `perm` IF NOT EXISTS; diff --git a/tests/column_permutation/data/test.test_perm-schema.sql b/tests/column_permutation/data/perm.test_perm-schema.sql similarity index 94% rename from tests/column_permutation/data/test.test_perm-schema.sql rename to tests/column_permutation/data/perm.test_perm-schema.sql index 85176eeee..9bd9f113f 100644 --- a/tests/column_permutation/data/test.test_perm-schema.sql +++ b/tests/column_permutation/data/perm.test_perm-schema.sql @@ -18,5 +18,5 @@ PARTITION BY RANGE COLUMNS(batch_date) ( PARTITION `P20200324` VALUES LESS THAN ("2020-03-05 00:00:00"), PARTITION `P20200424` VALUES LESS THAN ("2020-04-05 00:00:00"), PARTITION `P20200524` VALUES LESS THAN ("2020-05-05 00:00:00"), - PARTITION `P_MAXVALUE` VALUES LESS THAN ("MAXVALUE") -); \ No newline at end of file + PARTITION `P_MAXVALUE` VALUES LESS THAN MAXVALUE +); diff --git a/tests/column_permutation/data/test.test_perm.0.csv b/tests/column_permutation/data/perm.test_perm.0.csv similarity index 100% rename from tests/column_permutation/data/test.test_perm.0.csv rename to tests/column_permutation/data/perm.test_perm.0.csv diff --git a/tests/column_permutation/data/test-schema-create.sql b/tests/column_permutation/data/test-schema-create.sql deleted file mode 100644 index e913277a5..000000000 --- a/tests/column_permutation/data/test-schema-create.sql +++ /dev/null @@ -1 +0,0 @@ -CREATE DATABASE `test` IF NOT EXISTS; diff --git a/tests/column_permutation/run.sh b/tests/column_permutation/run.sh index 7e6767923..db69f6448 100644 --- a/tests/column_permutation/run.sh +++ b/tests/column_permutation/run.sh @@ -4,14 +4,14 @@ for BACKEND in local importer tidb; do if [ "$BACKEND" = 'local' ]; then check_cluster_version 4 0 0 'local backend' || continue fi - run_sql 'DROP DATABASE IF EXISTS test' + run_sql 'DROP DATABASE IF EXISTS perm' run_lightning --backend $BACKEND - run_sql 'select count(*) from test.test_perm;' + run_sql 'select count(*) from perm.test_perm;' check_contains "count(*): 5" - run_sql "SELECT fund_seq_no, region_code, credit_code FROM test.test_perm WHERE contract_no = '2020061000019011020164030597';" + run_sql "SELECT fund_seq_no, region_code, credit_code FROM perm.test_perm WHERE contract_no = '2020061000019011020164030597';" check_contains "fund_seq_no: 202006100001901102016403059520200627" check_contains "region_code: 000002" check_contains "credit_code: 33" From 8f78f16b73c2ff0b609c5a08d56114a2539b76bc Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 2 Nov 2020 16:02:06 +0800 Subject: [PATCH 5/5] change log --- lightning/backend/tidb.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lightning/backend/tidb.go b/lightning/backend/tidb.go index b959ed8fe..f4559b88d 100644 --- a/lightning/backend/tidb.go +++ b/lightning/backend/tidb.go @@ -358,8 +358,8 @@ func (be *tidbBackend) WriteRows(ctx context.Context, _ uuid.UUID, tableName str // Retry will be done externally, so we're not going to retry here. _, err := be.db.ExecContext(ctx, insertStmt.String()) if err != nil { - log.L().Error("execute statement failed", - zap.Array("rows", rows), zap.String("stmt", insertStmt.String()), zap.Error(err)) + log.L().Error("execute statement failed", zap.String("stmt", insertStmt.String()), + zap.Array("rows", rows), zap.Error(err)) } failpoint.Inject("FailIfImportedSomeRows", func() { panic("forcing failure due to FailIfImportedSomeRows, before saving checkpoint")