Skip to content

Commit

Permalink
syncer(dm): Add unit test and integration test for multi-schema chang…
Browse files Browse the repository at this point in the history
…e when encountering "invalid connection" error (#7104)

ref #4689
  • Loading branch information
lyzx2001 authored Oct 27, 2022
1 parent 864ccb2 commit b34ecff
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 2 deletions.
2 changes: 1 addition & 1 deletion dm/syncer/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (s *Syncer) handleSpecialDDLError(tctx *tcontext.Context, err error, ddls [
for {
status, err2 := getDDLStatusFromTiDB(tctx, conn, ddls[index], createTime)
if err2 != nil {
s.tctx.L().Warn("error when getting DDL status fromTiDB", zap.Error(err2))
s.tctx.L().Warn("error when getting DDL status from TiDB", zap.Error(err2))
}
failpoint.Inject("TestStatus", func(val failpoint.Value) {
status = val.(string)
Expand Down
5 changes: 4 additions & 1 deletion dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1401,7 +1401,10 @@ func (s *Syncer) syncDDL(queueBucket string, db *dbconn.DBConn, ddlJobChan chan
affected, err = db.ExecuteSQLWithIgnore(s.syncCtx, s.metricsProxies, errorutil.IsIgnorableMySQLDDLError, ddlJob.ddls)
failpoint.Inject("TestHandleSpecialDDLError", func() {
err = mysql2.ErrInvalidConn
affected = len(ddlJob.ddls) / 2
// simulate the value of affected along with the injected error due to the adding of SET SQL of timezone and timestamp
if affected == 0 {
affected++
}
})
if err != nil {
err = s.handleSpecialDDLError(s.syncCtx, err, ddlJob.ddls, affected, db, ddlCreateTime)
Expand Down
34 changes: 34 additions & 0 deletions dm/syncer/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,4 +296,38 @@ func TestGetDDLStatusFromTiDB(t *testing.T) {

err = mock.ExpectationsWereMet()
require.NoError(t, err)

// multi-schema change tests
// test 5 (for manual operation in TiDB)
mock.ExpectQuery(adminShowDDLJobsSQL1).WillReturnRows(sqlmock.NewRows([]string{"JOB_ID", "DB_NAME", "TABLE_NAME", "JOB_TYPE", "SCHEMA_STATE", "SCHEMA_ID", "TABLE_ID", "ROW_COUNT", "CREATE_TIME", "START_TIME", "END_TIME", "STATE"}).
AddRow(59, "many_tables_test", "t4", "alter table multi-schema change", "public", 1, 59, 0, "2022-08-02 2:51:39", "2022-08-02 2:51:39", "NULL", "running").
AddRow(59, "many_tables_test", "t4", "add column /* subjob */", "public", 1, 59, 0, "NULL", "NULL", "NULL", "done").
AddRow(59, "many_tables_test", "t4", "add column /* subjob */", "public", 1, 59, 0, "NULL", "NULL", "NULL", "done").
AddRow(58, "many_tables_test", "t3", "alter table", "public", 1, 58, 0, "2022-08-02 2:50:12", "2022-08-02 2:50:12", "2022-08-02 2:50:12", "synced").
AddRow(57, "many_tables_test", "t2", "alter table", "public", 1, 57, 0, "2022-08-02 2:49:39", "2022-08-02 2:49:39", "2022-08-02 2:49:39", "synced").
AddRow(56, "many_tables_test", "t1", "alter table", "public", 1, 56, 0, "2022-08-02 2:49:09", "2022-08-02 2:49:09", "2022-08-02 2:49:09", "synced").
AddRow(55, "many_tables_test", "t6", "create table", "public", 1, 55, 0, "2022-08-02 2:48:38", "2022-08-02 2:48:38", "2022-08-02 2:48:38", "synced").
AddRow(54, "many_tables_test", "t5", "create table", "public", 1, 54, 0, "2022-08-02 2:48:19", "2022-08-02 2:48:19", "2022-08-02 2:48:19", "synced").
AddRow(53, "many_tables_test", "t4", "create table", "public", 1, 53, 0, "2022-08-02 2:47:55", "2022-08-02 2:47:55", "2022-08-02 2:47:55", "synced").
AddRow(52, "many_tables_test", "t3", "create table", "public", 1, 52, 0, "2022-08-02 2:47:24", "2022-08-02 2:47:24", "2022-08-02 2:47:24", "synced").
AddRow(51, "many_tables_test", "t2", "create table", "public", 1, 51, 0, "2022-08-02 2:46:43", "2022-08-02 2:46:43", "2022-08-02 2:46:43", "synced").
AddRow(50, "many_tables_test", "t1", "create table", "public", 1, 50, 0, "2022-08-02 2:46:14", "2022-08-02 2:46:14", "2022-08-02 2:46:14", "synced"))

mock.ExpectQuery(adminShowDDLJobsLimitSQL1).WillReturnRows(sqlmock.NewRows([]string{"JOB_ID", "QUERY"}).
AddRow(59, "ALTER TABLE many_tables_test.t4 ADD y INT, ADD z INT").
AddRow(58, "ALTER TABLE many_tables_test.t3 ADD x timestamp DEFAULT current_timestamp").
AddRow(57, "ALTER TABLE many_tables_test.t2 ADD x timestamp DEFAULT current_timestamp").
AddRow(56, "ALTER TABLE many_tables_test.t1 ADD x timestamp DEFAULT current_timestamp").
AddRow(55, "CREATE TABLE IF NOT EXISTS many_tables_test.t6(i TINYINT, j INT UNIQUE KEY)").
AddRow(54, "CREATE TABLE IF NOT EXISTS many_tables_test.t5(i TINYINT, j INT UNIQUE KEY)").
AddRow(53, "CREATE TABLE IF NOT EXISTS many_tables_test.t4(i TINYINT, j INT UNIQUE KEY)").
AddRow(52, "CREATE TABLE IF NOT EXISTS many_tables_test.t3(i TINYINT, j INT UNIQUE KEY)").
AddRow(51, "CREATE TABLE IF NOT EXISTS many_tables_test.t2(i TINYINT, j INT UNIQUE KEY)").
AddRow(50, "CREATE TABLE IF NOT EXISTS many_tables_test.t1(i TINYINT, j INT UNIQUE KEY)"))

createTime, err = time.Parse(timeLayout, "2022-08-02 2:50:36")
require.NoError(t, err)
status, err = getDDLStatusFromTiDB(tctx, dbConn, "ALTER TABLE many_tables_test.t4 ADD y INT, ADD z INT", createTime.Unix())
require.NoError(t, err)
require.Equal(t, "running", status)
}
120 changes: 120 additions & 0 deletions dm/tests/gbk/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,127 @@ function run() {
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status gbk" \
"origin SQL: \[ALTER TABLE gbk.invalid_conn_test1 ADD UNIQUE(i)\]: DDL ALTER TABLE \`gbk\`.\`invalid_conn_test1\` ADD UNIQUE(\`i\`) executed in background and met error" 1
# manually synchronize upstream and downstream
run_sql_tidb "DELETE FROM gbk.invalid_conn_test1 WHERE j=4"
# manually resume
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"resume-task gbk" \
"\"result\": true" 3
echo "check test adding UNIQUE on column with duplicate data successfully"

kill_dm_worker

# multi-schema change tests
# test invalid connection with status running (multi-schema change)
export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/syncer/TestHandleSpecialDDLError=return();github.com/pingcap/tiflow/dm/syncer/TestStatus=1*return(\"running\");github.com/pingcap/tiflow/dm/syncer/ChangeDuration=return()"
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

echo "start test invalid connection with status running (multi-schema change)"
run_sql_source1 "ALTER TABLE gbk.invalid_conn_test1 MODIFY COLUMN i INT(4) NOT NULL DEFAULT _UTF8MB4'0', MODIFY COLUMN j INT(4) NOT NULL DEFAULT _UTF8MB4'0'"
run_sql_source1 "ALTER TABLE gbk.invalid_conn_test2 MODIFY COLUMN i INT(4) NOT NULL DEFAULT _UTF8MB4'0', MODIFY COLUMN j INT(4) NOT NULL DEFAULT _UTF8MB4'0'"

run_sql_tidb_with_retry "ADMIN SHOW DDL JOB QUERIES LIMIT 10 OFFSET 0" "ALTER TABLE \`gbk\`.\`invalid_conn_test1\` MODIFY COLUMN \`i\` INT(4) NOT NULL DEFAULT '0'"
echo "check count 1"
check_count "ALTER TABLE \`gbk\`.\`invalid_conn_test1\` MODIFY COLUMN \`i\` INT(4) NOT NULL DEFAULT '0'" 1
run_sql_tidb_with_retry "ADMIN SHOW DDL JOB QUERIES LIMIT 10 OFFSET 0" "ALTER TABLE \`gbk\`.\`invalid_conn_test1\` MODIFY COLUMN \`j\` INT(4) NOT NULL DEFAULT '0'"
echo "check count 2"
check_count "ALTER TABLE \`gbk\`.\`invalid_conn_test1\` MODIFY COLUMN \`j\` INT(4) NOT NULL DEFAULT '0'" 1
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
echo "check test invalid connection with status running (multi-schema change) successfully"

kill_dm_worker

# test invalid connection with status queueing (multi-schema change)
export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/syncer/TestHandleSpecialDDLError=return();github.com/pingcap/tiflow/dm/syncer/TestStatus=1*return(\"queueing\");github.com/pingcap/tiflow/dm/syncer/ChangeDuration=return()"
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

echo "start test invalid connection with status queueing (multi-schema change)"

run_sql_source1 "ALTER TABLE gbk.invalid_conn_test1 MODIFY COLUMN k INT(4) NOT NULL DEFAULT _UTF8MB4'0', MODIFY COLUMN m INT(4) NOT NULL DEFAULT _UTF8MB4'0'"
run_sql_source1 "ALTER TABLE gbk.invalid_conn_test2 MODIFY COLUMN k INT(4) NOT NULL DEFAULT _UTF8MB4'0', MODIFY COLUMN m INT(4) NOT NULL DEFAULT _UTF8MB4'0'"

run_sql_tidb_with_retry "ADMIN SHOW DDL JOB QUERIES LIMIT 10 OFFSET 0" "ALTER TABLE \`gbk\`.\`invalid_conn_test1\` MODIFY COLUMN \`k\` INT(4) NOT NULL DEFAULT '0'"
echo "check count 1"
check_count "ALTER TABLE \`gbk\`.\`invalid_conn_test1\` MODIFY COLUMN \`k\` INT(4) NOT NULL DEFAULT '0'" 1
run_sql_tidb_with_retry "ADMIN SHOW DDL JOB QUERIES LIMIT 10 OFFSET 0" "ALTER TABLE \`gbk\`.\`invalid_conn_test1\` MODIFY COLUMN \`m\` INT(4) NOT NULL DEFAULT '0'"
echo "check count 2"
check_count "ALTER TABLE \`gbk\`.\`invalid_conn_test1\` MODIFY COLUMN \`m\` INT(4) NOT NULL DEFAULT '0'" 1
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
echo "check test invalid connection with status queueing (multi-schema change) successfully"

kill_dm_worker

# test invalid connection with status none (multi-schema change)
export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/syncer/TestHandleSpecialDDLError=return();github.com/pingcap/tiflow/dm/syncer/TestStatus=1*return(\"none\");github.com/pingcap/tiflow/dm/syncer/ChangeDuration=return()"
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

echo "start test invalid connection with status none (multi-schema change)"

run_sql_source1 "ALTER TABLE gbk.invalid_conn_test1 MODIFY COLUMN n INT(4) NOT NULL DEFAULT _UTF8MB4'0', MODIFY COLUMN h INT(4) NOT NULL DEFAULT _UTF8MB4'0'"
run_sql_source1 "ALTER TABLE gbk.invalid_conn_test2 MODIFY COLUMN n INT(4) NOT NULL DEFAULT _UTF8MB4'0', MODIFY COLUMN h INT(4) NOT NULL DEFAULT _UTF8MB4'0'"

run_sql_tidb_with_retry "ADMIN SHOW DDL JOB QUERIES LIMIT 10 OFFSET 0" "ALTER TABLE \`gbk\`.\`invalid_conn_test1\` MODIFY COLUMN \`n\` INT(4) NOT NULL DEFAULT '0'"
echo "check count 1"
check_count "ALTER TABLE \`gbk\`.\`invalid_conn_test1\` MODIFY COLUMN \`n\` INT(4) NOT NULL DEFAULT '0'" 1
run_sql_tidb_with_retry "ADMIN SHOW DDL JOB QUERIES LIMIT 10 OFFSET 0" "ALTER TABLE \`gbk\`.\`invalid_conn_test1\` MODIFY COLUMN \`h\` INT(4) NOT NULL DEFAULT '0'"
echo "check count 2"
check_count "ALTER TABLE \`gbk\`.\`invalid_conn_test1\` MODIFY COLUMN \`h\` INT(4) NOT NULL DEFAULT '0'" 1
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
echo "check test invalid connection with status none (multi-schema change) successfully"

kill_dm_worker

# test inserting data after invalid connection (multi-schema change)
export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/syncer/TestHandleSpecialDDLError=return();github.com/pingcap/tiflow/dm/syncer/ChangeDuration=return()"
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

echo "start test inserting data after invalid connection (multi-schema change)"

run_sql_source1 "ALTER TABLE gbk.invalid_conn_test1 MODIFY COLUMN i TINYINT NOT NULL DEFAULT _UTF8MB4'0', MODIFY COLUMN j TINYINT NOT NULL DEFAULT _UTF8MB4'0'"
run_sql_source1 "INSERT INTO gbk.invalid_conn_test1 VALUES (5,5,5,5,5,5)"
run_sql_source1 "INSERT INTO gbk.invalid_conn_test1 VALUES (6,6,6,6,6,6)"

run_sql_tidb_with_retry "ADMIN SHOW DDL JOB QUERIES LIMIT 10 OFFSET 0" "ALTER TABLE \`gbk\`.\`invalid_conn_test1\` MODIFY COLUMN \`i\` TINYINT NOT NULL DEFAULT '0'"
echo "check count 1"
check_count "ALTER TABLE \`gbk\`.\`invalid_conn_test1\` MODIFY COLUMN \`i\` TINYINT NOT NULL DEFAULT '0'" 1
run_sql_tidb_with_retry "ADMIN SHOW DDL JOB QUERIES LIMIT 10 OFFSET 0" "ALTER TABLE \`gbk\`.\`invalid_conn_test1\` MODIFY COLUMN \`j\` TINYINT NOT NULL DEFAULT '0'"
echo "check count 2"
check_count "ALTER TABLE \`gbk\`.\`invalid_conn_test1\` MODIFY COLUMN \`j\` TINYINT NOT NULL DEFAULT '0'" 1
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
echo "check test inserting data after invalid connection (multi-schema change) successfully"

kill_dm_worker

# test adding UNIQUE on column with duplicate data (multi-schema change)
export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/syncer/TestHandleSpecialDDLError=return();github.com/pingcap/tiflow/dm/syncer/ChangeDuration=return()"
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

echo "start test adding UNIQUE on column with duplicate data (multi-schema change)"

run_sql_source1 "INSERT INTO gbk.invalid_conn_test1 VALUES (7,7,7,7,7,7)"
run_sql_tidb "INSERT INTO gbk.invalid_conn_test1 VALUES (8,8,7,7,8,8)"
run_sql_source1 "ALTER TABLE gbk.invalid_conn_test1 ADD UNIQUE(k), ADD UNIQUE(m)"

echo "check cancelled error"
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status gbk" \
"origin SQL: \[ALTER TABLE gbk.invalid_conn_test1 ADD UNIQUE(k), ADD UNIQUE(m)\]: DDL ALTER TABLE \`gbk\`.\`invalid_conn_test1\` ADD UNIQUE(\`k\`) executed in background and met error" 1
echo "check test adding UNIQUE on column with duplicate data (multi-schema change) successfully"
}

cleanup_data gbk gbk2 gbk3
Expand Down

0 comments on commit b34ecff

Please sign in to comment.