Skip to content

Commit

Permalink
lightning: "error" strategy outputs duplicate error in terminal, log …
Browse files Browse the repository at this point in the history
…and table (pingcap#45471)

ref pingcap#41629
  • Loading branch information
lance6716 authored Jul 20, 2023
1 parent ca12156 commit 9662254
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 12 deletions.
4 changes: 3 additions & 1 deletion br/pkg/lightning/backend/tidb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ go_test(
timeout = "short",
srcs = ["tidb_test.go"],
flaky = True,
shard_count = 13,
shard_count = 14,
deps = [
":tidb",
"//br/pkg/lightning/backend",
Expand All @@ -48,13 +48,15 @@ go_test(
"//br/pkg/lightning/errormanager",
"//br/pkg/lightning/log",
"//br/pkg/lightning/verification",
"//errno",
"//parser/charset",
"//parser/model",
"//parser/mysql",
"//table",
"//table/tables",
"//types",
"@com_github_data_dog_go_sqlmock//:go-sqlmock",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_stretchr_testify//require",
"@org_uber_go_atomic//:atomic",
],
Expand Down
26 changes: 23 additions & 3 deletions br/pkg/lightning/backend/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ func (*targetInfoGetter) CheckRequirements(ctx context.Context, _ *backend.Check

type tidbBackend struct {
db *sql.DB
conflictCfg config.Conflict
// onDuplicate is the type of INSERT SQL. It may be different with
// conflictCfg.Strategy to implement other feature, but the behaviour in caller's
// view should be the same.
onDuplicate string
errorMgr *errormanager.ErrorManager
}
Expand Down Expand Up @@ -302,6 +306,7 @@ func NewTiDBBackend(
}
return &tidbBackend{
db: db,
conflictCfg: conflict,
onDuplicate: onDuplicate,
errorMgr: errorMgr,
}
Expand Down Expand Up @@ -615,12 +620,14 @@ rowLoop:
continue rowLoop
case common.IsRetryableError(err):
// retry next loop
case be.errorMgr.TypeErrorsRemain() > 0 || be.errorMgr.ConflictErrorsRemain() > 0:
case be.errorMgr.TypeErrorsRemain() > 0 ||
be.errorMgr.ConflictErrorsRemain() > 0 ||
(be.conflictCfg.Strategy == config.ErrorOnDup && !be.errorMgr.RecordErrorOnce()):
// WriteBatchRowsToDB failed in the batch mode and can not be retried,
// we need to redo the writing row-by-row to find where the error locates (and skip it correctly in future).
if err = be.WriteRowsToDB(ctx, tableName, columnNames, r); err != nil {
// If the error is not nil, it means we reach the max error count in the non-batch mode.
// For now, we will treat like maxErrorCount is always 0. So we will just return if any error occurs.
// If the error is not nil, it means we reach the max error count in the
// non-batch mode or this is "error" conflict strategy.
return errors.Annotatef(err, "[%s] write rows exceed conflict threshold", tableName)
}
continue rowLoop
Expand Down Expand Up @@ -762,6 +769,19 @@ stmtLoop:

if isDupEntryError(err) {
// rowID is ignored in tidb backend
if be.conflictCfg.Strategy == config.ErrorOnDup {
be.errorMgr.RecordDuplicateOnce(
ctx,
log.FromContext(ctx),
tableName,
firstRow.path,
firstRow.offset,
err.Error(),
0,
firstRow.insertStmt,
)
return err
}
err = be.errorMgr.RecordDuplicate(
ctx,
log.FromContext(ctx),
Expand Down
44 changes: 44 additions & 0 deletions br/pkg/lightning/backend/tidb/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"

"github.com/DATA-DOG/go-sqlmock"
gmysql "github.com/go-sql-driver/mysql"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/errormanager"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/parser/charset"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
Expand Down Expand Up @@ -672,6 +674,48 @@ func TestWriteRowsErrorDowngradingExceedThreshold(t *testing.T) {
require.Nil(t, st)
}

func TestWriteRowsRecordOneError(t *testing.T) {
dupErr := &gmysql.MySQLError{Number: errno.ErrDupEntry, Message: "Duplicate entry '2' for key 'PRIMARY'"}
s := createMysqlSuite(t)
defer s.TearDownTest(t)
// First, batch insert, fail and rollback.
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1),(2),(3),(4),(5)\\E").
WillReturnError(dupErr)
// Then, insert row-by-row due to the non-retryable error.
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1)\\E").
WillReturnResult(sqlmock.NewResult(1, 1))
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(2)\\E").
WillReturnError(dupErr)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.duplicate_records.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "8.csv", int64(0), dupErr.Error(), 0, "(2)").
WillReturnResult(driver.ResultNoRows)

cfg := config.NewConfig()
cfg.Conflict.Strategy = config.ErrorOnDup
cfg.Conflict.Threshold = 0
cfg.Conflict.MaxRecordRows = 0
cfg.App.TaskInfoSchemaName = "tidb_lightning_errors"
ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg.Conflict,
errormanager.New(s.dbHandle, cfg, log.L()),
)
encBuilder := tidb.NewEncodingBuilder()
dataRows := encodeRowsTiDB(t, encBuilder, s.tbl)
ctx := context.Background()
engine, err := backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1)
require.NoError(t, err)
writer, err := engine.LocalWriter(ctx, &backend.LocalWriterConfig{TableName: "`foo`.`bar`"})
require.NoError(t, err)
err = writer.AppendRows(ctx, []string{"a"}, dataRows)
require.ErrorContains(t, err, "Duplicate entry '2' for key 'PRIMARY'")
st, err := writer.Close(ctx)
require.NoError(t, err)
require.Nil(t, st)
}

func TestDuplicateThreshold(t *testing.T) {
s := createMysqlSuite(t)
defer s.TearDownTest(t)
Expand Down
49 changes: 47 additions & 2 deletions br/pkg/lightning/errormanager/errormanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ type ErrorManager struct {
conflictV1Enabled bool
conflictV2Enabled bool
logger log.Logger
recordErrorOnce *atomic.Bool
}

// TypeErrorsRemain returns the number of type errors that can be recorded.
Expand All @@ -169,6 +170,12 @@ func (em *ErrorManager) ConflictRecordsRemain() int64 {
return em.conflictRecordsRemain.Load()
}

// RecordErrorOnce returns if RecordDuplicateOnce has been called. Not that this
// method is not atomic with RecordDuplicateOnce.
func (em *ErrorManager) RecordErrorOnce() bool {
return em.recordErrorOnce.Load()
}

// New creates a new error manager.
func New(db *sql.DB, cfg *config.Config, logger log.Logger) *ErrorManager {
conflictErrRemain := atomic.NewInt64(cfg.Conflict.Threshold)
Expand All @@ -182,10 +189,11 @@ func New(db *sql.DB, cfg *config.Config, logger log.Logger) *ErrorManager {
conflictErrRemain: conflictErrRemain,
conflictRecordsRemain: conflictRecordsRemain,
logger: logger,
recordErrorOnce: atomic.NewBool(false),
}
switch cfg.TikvImporter.Backend {
case config.BackendLocal:
if cfg.TikvImporter.OnDuplicate != "" {
if cfg.Conflict.Strategy != "" {
em.conflictV2Enabled = true
}
case config.BackendTiDB:
Expand Down Expand Up @@ -220,7 +228,7 @@ func (em *ErrorManager) Init(ctx context.Context) error {
if em.conflictV1Enabled {
sqls = append(sqls, [2]string{"create conflict error v1 table", createConflictErrorTable})
}
if em.conflictV2Enabled && em.conflictErrRemain.Load() > 0 {
if em.conflictV2Enabled {
sqls = append(sqls, [2]string{"create duplicate records table", createDupRecordTable})
}

Expand Down Expand Up @@ -532,6 +540,19 @@ func (em *ErrorManager) RecordDuplicate(
return nil
}

return em.recordDuplicate(ctx, logger, tableName, path, offset, errMsg, rowID, rowData)
}

func (em *ErrorManager) recordDuplicate(
ctx context.Context,
logger log.Logger,
tableName string,
path string,
offset int64,
errMsg string,
rowID int64,
rowData string,
) error {
exec := common.SQLWithRetry{
DB: em.db,
Logger: logger,
Expand All @@ -549,6 +570,30 @@ func (em *ErrorManager) RecordDuplicate(
)
}

// RecordDuplicateOnce records a "duplicate entry" error so user can query them later.
// Currently the error will not be shared for multiple lightning instances.
// Different from RecordDuplicate, this function is used when conflict.strategy
// is "error" and will only write the first conflict error to the table.
func (em *ErrorManager) RecordDuplicateOnce(
ctx context.Context,
logger log.Logger,
tableName string,
path string,
offset int64,
errMsg string,
rowID int64,
rowData string,
) {
ok := em.recordErrorOnce.CompareAndSwap(false, true)
if !ok {
return
}
err := em.recordDuplicate(ctx, logger, tableName, path, offset, errMsg, rowID, rowData)
if err != nil {
logger.Warn("meet error when record duplicate entry error", zap.Error(err))
}
}

func (em *ErrorManager) errorCount(typeVal func(*config.MaxError) int64) int64 {
cfgVal := typeVal(em.configError)
val := typeVal(&em.remainingError)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/errormanager/errormanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestInit(t *testing.T) {
cfg := config.NewConfig()
cfg.TikvImporter.Backend = config.BackendLocal
cfg.TikvImporter.DuplicateResolution = config.DupeResAlgNone
cfg.TikvImporter.OnDuplicate = config.ReplaceOnDup
cfg.Conflict.Strategy = config.ReplaceOnDup
cfg.App.MaxError.Type.Store(10)
cfg.Conflict.Threshold = 20
cfg.App.TaskInfoSchemaName = "lightning_errors"
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/dup_detect.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (d *dupDetector) run(
return errors.Trace(err)
}

handlerConstructor := makeDupHandlerConstructor(ignoreRows, d.rc.cfg.TikvImporter.OnDuplicate)
handlerConstructor := makeDupHandlerConstructor(ignoreRows, d.rc.cfg.Conflict.Strategy)
numDups, err = detector.Detect(ctx, &duplicate.DetectOptions{
Concurrency: d.rc.cfg.App.RegionConcurrency,
HandlerConstructor: handlerConstructor,
Expand Down
18 changes: 15 additions & 3 deletions br/pkg/lightning/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (tr *TableImporter) importTable(
}

// 2. Do duplicate detection if needed
if isLocalBackend(rc.cfg) && rc.cfg.TikvImporter.OnDuplicate != "" {
if isLocalBackend(rc.cfg) && rc.cfg.Conflict.Strategy != "" {
_, uuid := backend.MakeUUID(tr.tableName, common.IndexEngineID)
workingDir := filepath.Join(rc.cfg.TikvImporter.SortedKVDir, uuid.String()+local.DupDetectDirSuffix)
resultDir := filepath.Join(rc.cfg.TikvImporter.SortedKVDir, uuid.String()+local.DupResultDirSuffix)
Expand Down Expand Up @@ -1616,8 +1616,12 @@ func (tr *TableImporter) preDeduplicate(
return errors.Trace(originalErr)
}
if !rc.cfg.Checkpoint.Enable {
return errors.Errorf("duplicate key in table %s caused by index `%s`, you can turn on checkpoint and re-run to see the conflicting rows",
err := errors.Errorf("duplicate key in table %s caused by index `%s`, but because checkpoint is off we can't have more details",
tr.tableName, idxName)
rc.errorMgr.RecordDuplicateOnce(
ctx, tr.logger, tr.tableName, "<unknown-path>", -1, err.Error(), -1, "<unknown-data>",
)
return err
}
conflictEncodedRowIDs := dupErr.Args()[1].([][]byte)
if len(conflictEncodedRowIDs) < 2 {
Expand Down Expand Up @@ -1648,6 +1652,9 @@ func (tr *TableImporter) preDeduplicate(
tr.logger.Error("failed to get table checkpoint", zap.Error(err))
return errors.Trace(err)
}
var (
secondConflictPath string
)
for _, engineCp := range tableCp.Engines {
for _, chunkCp := range engineCp.Chunks {
if chunkCp.Chunk.PrevRowIDMax <= rowID[0] && rowID[0] < chunkCp.Chunk.RowIDMax {
Expand All @@ -1657,6 +1664,7 @@ func (tr *TableImporter) preDeduplicate(
chunkCp.FileMeta.Path)
}
if chunkCp.Chunk.PrevRowIDMax <= rowID[1] && rowID[1] < chunkCp.Chunk.RowIDMax {
secondConflictPath = chunkCp.FileMeta.Path
otherConflictMsg = fmt.Sprintf("row %d counting from offset %d in file %s",
rowID[1]-chunkCp.Chunk.PrevRowIDMax,
chunkCp.Chunk.Offset,
Expand All @@ -1670,6 +1678,10 @@ func (tr *TableImporter) preDeduplicate(
zap.Int64("rowID[1]", rowID[1]))
return errors.Trace(originalErr)
}
return errors.Errorf("duplicate entry for key '%s', a pair of conflicting rows are (%s, %s)",
err = errors.Errorf("duplicate entry for key '%s', a pair of conflicting rows are (%s, %s)",
idxName, oneConflictMsg, otherConflictMsg)
rc.errorMgr.RecordDuplicateOnce(
ctx, tr.logger, tr.tableName, secondConflictPath, -1, err.Error(), rowID[1], "<unknown-data>",
)
return err
}
12 changes: 12 additions & 0 deletions br/tests/lightning_config_max_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,15 @@ run_sql 'DROP DATABASE IF EXISTS mytest'
cp "${mydir}/tidb-limit-record.toml" "${TEST_DIR}/tidb-limit-record.toml"
sed -i.bak "s/threshold = 5/threshold = 4/g" "${TEST_DIR}/tidb-limit-record.toml"
run_lightning --backend tidb --config "${TEST_DIR}/tidb-limit-record.toml" 2>&1 | grep -q "The number of conflict errors exceeds the threshold"

# Check when strategy is "error", the stderr, log and duplicate record table all contains the error message
run_sql 'DROP DATABASE IF EXISTS lightning_task_info'
run_sql 'DROP DATABASE IF EXISTS mytest'
rm "${TEST_DIR}/lightning.log"
run_lightning --backend tidb --config "${mydir}/tidb-error.toml" 2>&1 | grep -q "Error 1062 (23000): Duplicate entry '1' for key 'testtbl.PRIMARY'"
check_contains "Error 1062 (23000): Duplicate entry '1' for key 'testtbl.PRIMARY'" "${TEST_DIR}/lightning.log"
run_sql 'SELECT COUNT(*) FROM lightning_task_info.duplicate_records'
check_contains "COUNT(*): 1"
run_sql 'SELECT * FROM lightning_task_info.duplicate_records'
check_contains "error: Error 1062 (23000): Duplicate entry '1' for key 'testtbl.PRIMARY'"
check_contains "row_data: ('1','bbb01')"
5 changes: 5 additions & 0 deletions br/tests/lightning_config_max_error/tidb-error.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[conflict]
strategy = "error"

[mydumper.csv]
header = true
8 changes: 7 additions & 1 deletion br/tests/lightning_duplicate_detection_new/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,14 @@ check_contains "count(*): 228"

# 3. Test error strategy.
cleanup
run_lightning --backend local --config "$CUR/local-error.toml" --log-file "$LOG_FILE" 2>&1 | grep -q "duplicate key in table \`test\`.\`dup_detect\` caused by index .*, you can turn on checkpoint and re-run to see the conflicting rows"
run_lightning --backend local --config "$CUR/local-error.toml" --log-file "$LOG_FILE" 2>&1 | grep -q "duplicate key in table \`test\`.\`dup_detect\` caused by index .*, but because checkpoint is off we can't have more details"
grep -q "duplicate key in table \`test\`.\`dup_detect\` caused by index .*, but because checkpoint is off we can't have more details" "$LOG_FILE"
run_sql "SELECT * FROM lightning_task_info.duplicate_records"
check_contains "error: duplicate key in table \`test\`.\`dup_detect\`"
run_lightning --backend local --config "$CUR/local-error.toml" --log-file "$LOG_FILE" --enable-checkpoint=1 2>&1 | grep -q "duplicate entry for key 'uniq_col6_col7', a pair of conflicting rows are (row 1 counting from offset 0 in file test.dup_detect.1.sql, row 101 counting from offset 0 in file test.dup_detect.4.sql)"
grep -q "duplicate entry for key 'uniq_col6_col7', a pair of conflicting rows are (row 1 counting from offset 0 in file test.dup_detect.1.sql, row 101 counting from offset 0 in file test.dup_detect.4.sql)" "$LOG_FILE"
run_sql "SELECT * FROM lightning_task_info.duplicate_records"
check_contains "error: duplicate entry for key 'uniq_col6_col7', a pair of conflicting rows are"
check_contains "restore table \`test\`.\`dup_detect\` failed: duplicate entry for key 'uniq_col6_col7', a pair of conflicting rows are (row 1 counting from offset 0 in file test.dup_detect.1.sql, row 101 counting from offset 0 in file test.dup_detect.4.sql)" "$LOG_FILE"
run_lightning_ctl --enable-checkpoint=1 --backend local --config "$CUR/local-error.toml" --checkpoint-error-destroy="\`test\`.\`dup_detect\`"
files_left=$(ls "$TEST_DIR/$TEST_NAME.sorted" | wc -l)
Expand Down

0 comments on commit 9662254

Please sign in to comment.