From 2449b3bcaff423f157b421fa9052526b4bfd7c96 Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Sun, 8 Oct 2023 11:25:53 +0800 Subject: [PATCH] scheduler (ticdc): does not return error when resolvedTs less than checkpoint (#9833) ref pingcap/tiflow#9830 --- cdc/scheduler/internal/v3/agent/table.go | 4 +-- .../v3/replication/replication_set.go | 34 ++++++------------- .../v3/replication/replication_set_test.go | 26 ++++++++++++++ 3 files changed, 39 insertions(+), 25 deletions(-) diff --git a/cdc/scheduler/internal/v3/agent/table.go b/cdc/scheduler/internal/v3/agent/table.go index be1f7ec929b..506bef29a54 100644 --- a/cdc/scheduler/internal/v3/agent/table.go +++ b/cdc/scheduler/internal/v3/agent/table.go @@ -75,7 +75,7 @@ func (t *tableSpan) getTableSpanStatus(collectStat bool) tablepb.TableStatus { func newAddTableResponseMessage(status tablepb.TableStatus) *schedulepb.Message { if status.Checkpoint.ResolvedTs < status.Checkpoint.CheckpointTs { - log.Panic("schedulerv3: resolved ts should not less than checkpoint ts", + log.Warn("schedulerv3: resolved ts should not less than checkpoint ts", zap.Any("tableStatus", status), zap.Any("checkpoint", status.Checkpoint.CheckpointTs), zap.Any("resolved", status.Checkpoint.ResolvedTs)) @@ -100,7 +100,7 @@ func newRemoveTableResponseMessage(status tablepb.TableStatus) *schedulepb.Messa // Advance resolved ts to checkpoint ts if table is removed. status.Checkpoint.ResolvedTs = status.Checkpoint.CheckpointTs } else { - log.Panic("schedulerv3: resolved ts should not less than checkpoint ts", + log.Warn("schedulerv3: resolved ts should not less than checkpoint ts", zap.Any("tableStatus", status), zap.Any("checkpoint", status.Checkpoint.CheckpointTs), zap.Any("resolved", status.Checkpoint.ResolvedTs)) diff --git a/cdc/scheduler/internal/v3/replication/replication_set.go b/cdc/scheduler/internal/v3/replication/replication_set.go index ae05321c823..cdcf933597c 100644 --- a/cdc/scheduler/internal/v3/replication/replication_set.go +++ b/cdc/scheduler/internal/v3/replication/replication_set.go @@ -162,9 +162,7 @@ func NewReplicationSet( return nil, r.inconsistentError(table, captureID, "schedulerv3: table id inconsistent") } - if err := r.updateCheckpointAndStats(table.Checkpoint, table.Stats); err != nil { - return nil, errors.Trace(err) - } + r.updateCheckpointAndStats(table.Checkpoint, table.Stats) switch table.State { case tablepb.TableStateReplicating: @@ -485,8 +483,8 @@ func (r *ReplicationSet) pollOnPrepare( } case tablepb.TableStateReplicating: if r.Primary == captureID { - err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats) - return nil, false, err + r.updateCheckpointAndStats(input.Checkpoint, input.Stats) + return nil, false, nil } case tablepb.TableStateStopping, tablepb.TableStateStopped: if r.Primary == captureID { @@ -589,9 +587,7 @@ func (r *ReplicationSet) pollOnCommit( case tablepb.TableStateStopped, tablepb.TableStateAbsent: if r.Primary == captureID { - if err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats); err != nil { - return nil, false, errors.Trace(err) - } + r.updateCheckpointAndStats(input.Checkpoint, input.Stats) original := r.Primary r.clearPrimary() if !r.hasRole(RoleSecondary) { @@ -655,9 +651,7 @@ func (r *ReplicationSet) pollOnCommit( case tablepb.TableStateReplicating: if r.Primary == captureID { - if err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats); err != nil { - return nil, false, errors.Trace(err) - } + r.updateCheckpointAndStats(input.Checkpoint, input.Stats) if r.hasRole(RoleSecondary) { // Original primary is not stopped, ask for stopping. return &schedulepb.Message{ @@ -692,8 +686,8 @@ func (r *ReplicationSet) pollOnCommit( case tablepb.TableStateStopping: if r.Primary == captureID && r.hasRole(RoleSecondary) { - err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats) - return nil, false, err + r.updateCheckpointAndStats(input.Checkpoint, input.Stats) + return nil, false, nil } else if r.isInRole(captureID, RoleUndetermined) { log.Info("schedulerv3: capture is stopping during Commit", zap.Stringer("tableState", input), @@ -718,8 +712,8 @@ func (r *ReplicationSet) pollOnReplicating( switch input.State { case tablepb.TableStateReplicating: if r.Primary == captureID { - err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats) - return nil, false, err + r.updateCheckpointAndStats(input.Checkpoint, input.Stats) + return nil, false, nil } return nil, false, r.multiplePrimaryError( input, captureID, "schedulerv3: multiple primary") @@ -730,10 +724,7 @@ func (r *ReplicationSet) pollOnReplicating( case tablepb.TableStateStopping: case tablepb.TableStateStopped: if r.Primary == captureID { - if err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats); err != nil { - return nil, false, errors.Trace(err) - } - + r.updateCheckpointAndStats(input.Checkpoint, input.Stats) // Primary is stopped, but we still has secondary. // Clear primary and promote secondary when it's prepared. log.Info("schedulerv3: primary is stopped during Replicating", @@ -925,7 +916,7 @@ func (r *ReplicationSet) handleCaptureShutdown( func (r *ReplicationSet) updateCheckpointAndStats( checkpoint tablepb.Checkpoint, stats tablepb.Stats, -) error { +) { if checkpoint.ResolvedTs < checkpoint.CheckpointTs { log.Warn("schedulerv3: resolved ts should not less than checkpoint ts", zap.Any("replicationSet", r), @@ -947,11 +938,8 @@ func (r *ReplicationSet) updateCheckpointAndStats( zap.Any("replicationSet", r), zap.Any("checkpointTs", r.Checkpoint.CheckpointTs), zap.Any("resolvedTs", r.Checkpoint.ResolvedTs)) - return errors.ErrInvalidCheckpointTs.GenWithStackByArgs(r.Checkpoint.CheckpointTs, - r.Checkpoint.ResolvedTs) } r.Stats = stats - return nil } // SetHeap is a max-heap, it implements heap.Interface. diff --git a/cdc/scheduler/internal/v3/replication/replication_set_test.go b/cdc/scheduler/internal/v3/replication/replication_set_test.go index bf1b67bd498..da417438bc0 100644 --- a/cdc/scheduler/internal/v3/replication/replication_set_test.go +++ b/cdc/scheduler/internal/v3/replication/replication_set_test.go @@ -1455,3 +1455,29 @@ func TestReplicationSetHeap_MinK(t *testing.T) { require.Equal(t, expectedTables, tables) require.Equal(t, 0, h.Len()) } + +func TestUpdateCheckpointAndStats(t *testing.T) { + cases := []struct { + checkpoint tablepb.Checkpoint + stats tablepb.Stats + }{ + { + checkpoint: tablepb.Checkpoint{ + CheckpointTs: 1, + ResolvedTs: 2, + }, + stats: tablepb.Stats{}, + }, + { + checkpoint: tablepb.Checkpoint{ + CheckpointTs: 2, + ResolvedTs: 1, + }, + stats: tablepb.Stats{}, + }, + } + r := &ReplicationSet{} + for _, c := range cases { + r.updateCheckpointAndStats(c.checkpoint, c.stats) + } +}