Skip to content

Commit

Permalink
scheduler(ticdc): revert 3b8d55 and do not return error when resolved…
Browse files Browse the repository at this point in the history
…Ts less than checkpoint (pingcap#9953)

ref pingcap#9830, ref pingcap#9926
  • Loading branch information
CharlesCheung96 committed Oct 27, 2023
1 parent 5600f6b commit 81f029e
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 25 deletions.
4 changes: 2 additions & 2 deletions cdc/scheduler/internal/v3/agent/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (t *tableSpan) getTableSpanStatus(collectStat bool) tablepb.TableStatus {

func newAddTableResponseMessage(status tablepb.TableStatus) *schedulepb.Message {
if status.Checkpoint.ResolvedTs < status.Checkpoint.CheckpointTs {
log.Panic("schedulerv3: resolved ts should not less than checkpoint ts",
log.Warn("schedulerv3: resolved ts should not less than checkpoint ts",
zap.Any("tableStatus", status),
zap.Any("checkpoint", status.Checkpoint.CheckpointTs),
zap.Any("resolved", status.Checkpoint.ResolvedTs))
Expand All @@ -100,7 +100,7 @@ func newRemoveTableResponseMessage(status tablepb.TableStatus) *schedulepb.Messa
// Advance resolved ts to checkpoint ts if table is removed.
status.Checkpoint.ResolvedTs = status.Checkpoint.CheckpointTs
} else {
log.Panic("schedulerv3: resolved ts should not less than checkpoint ts",
log.Warn("schedulerv3: resolved ts should not less than checkpoint ts",
zap.Any("tableStatus", status),
zap.Any("checkpoint", status.Checkpoint.CheckpointTs),
zap.Any("resolved", status.Checkpoint.ResolvedTs))
Expand Down
34 changes: 11 additions & 23 deletions cdc/scheduler/internal/v3/replication/replication_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,7 @@ func NewReplicationSet(
return nil, r.inconsistentError(table, captureID,
"schedulerv3: table id inconsistent")
}
if err := r.updateCheckpointAndStats(table.Checkpoint, table.Stats); err != nil {
return nil, errors.Trace(err)
}
r.updateCheckpointAndStats(table.Checkpoint, table.Stats)

switch table.State {
case tablepb.TableStateReplicating:
Expand Down Expand Up @@ -500,8 +498,8 @@ func (r *ReplicationSet) pollOnPrepare(
}
case tablepb.TableStateReplicating:
if r.Primary == captureID {
err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats)
return nil, false, err
r.updateCheckpointAndStats(input.Checkpoint, input.Stats)
return nil, false, nil
}
case tablepb.TableStateStopping, tablepb.TableStateStopped:
if r.Primary == captureID {
Expand Down Expand Up @@ -614,9 +612,7 @@ func (r *ReplicationSet) pollOnCommit(

case tablepb.TableStateStopped, tablepb.TableStateAbsent:
if r.Primary == captureID {
if err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats); err != nil {
return nil, false, errors.Trace(err)
}
r.updateCheckpointAndStats(input.Checkpoint, input.Stats)
original := r.Primary
r.clearPrimary()
if !r.hasRole(RoleSecondary) {
Expand Down Expand Up @@ -688,9 +684,7 @@ func (r *ReplicationSet) pollOnCommit(

case tablepb.TableStateReplicating:
if r.Primary == captureID {
if err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats); err != nil {
return nil, false, errors.Trace(err)
}
r.updateCheckpointAndStats(input.Checkpoint, input.Stats)
if r.hasRole(RoleSecondary) {
// Original primary is not stopped, ask for stopping.
return &schedulepb.Message{
Expand Down Expand Up @@ -725,8 +719,8 @@ func (r *ReplicationSet) pollOnCommit(

case tablepb.TableStateStopping:
if r.Primary == captureID && r.hasRole(RoleSecondary) {
err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats)
return nil, false, err
r.updateCheckpointAndStats(input.Checkpoint, input.Stats)
return nil, false, nil
} else if r.isInRole(captureID, RoleUndetermined) {
log.Info("schedulerv3: capture is stopping during Commit",
zap.String("namespace", r.Changefeed.Namespace),
Expand Down Expand Up @@ -755,8 +749,8 @@ func (r *ReplicationSet) pollOnReplicating(
switch input.State {
case tablepb.TableStateReplicating:
if r.Primary == captureID {
err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats)
return nil, false, err
r.updateCheckpointAndStats(input.Checkpoint, input.Stats)
return nil, false, nil
}
return nil, false, r.multiplePrimaryError(
input, captureID, "schedulerv3: multiple primary")
Expand All @@ -767,10 +761,7 @@ func (r *ReplicationSet) pollOnReplicating(
case tablepb.TableStateStopping:
case tablepb.TableStateStopped:
if r.Primary == captureID {
if err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats); err != nil {
return nil, false, errors.Trace(err)
}

r.updateCheckpointAndStats(input.Checkpoint, input.Stats)
// Primary is stopped, but we still has secondary.
// Clear primary and promote secondary when it's prepared.
log.Info("schedulerv3: primary is stopped during Replicating",
Expand Down Expand Up @@ -1000,7 +991,7 @@ func (r *ReplicationSet) handleCaptureShutdown(

func (r *ReplicationSet) updateCheckpointAndStats(
checkpoint tablepb.Checkpoint, stats tablepb.Stats,
) error {
) {
if checkpoint.ResolvedTs < checkpoint.CheckpointTs {
log.Warn("schedulerv3: resolved ts should not less than checkpoint ts",
zap.String("namespace", r.Changefeed.Namespace),
Expand Down Expand Up @@ -1028,11 +1019,8 @@ func (r *ReplicationSet) updateCheckpointAndStats(
zap.Any("replicationSet", r),
zap.Any("checkpointTs", r.Checkpoint.CheckpointTs),
zap.Any("resolvedTs", r.Checkpoint.ResolvedTs))
return errors.ErrInvalidCheckpointTs.GenWithStackByArgs(r.Checkpoint.CheckpointTs,
r.Checkpoint.ResolvedTs)
}
r.Stats = stats
return nil
}

// SetHeap is a max-heap, it implements heap.Interface.
Expand Down
26 changes: 26 additions & 0 deletions cdc/scheduler/internal/v3/replication/replication_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1455,3 +1455,29 @@ func TestReplicationSetHeap_MinK(t *testing.T) {
require.Equal(t, expectedTables, tables)
require.Equal(t, 0, h.Len())
}

func TestUpdateCheckpointAndStats(t *testing.T) {
cases := []struct {
checkpoint tablepb.Checkpoint
stats tablepb.Stats
}{
{
checkpoint: tablepb.Checkpoint{
CheckpointTs: 1,
ResolvedTs: 2,
},
stats: tablepb.Stats{},
},
{
checkpoint: tablepb.Checkpoint{
CheckpointTs: 2,
ResolvedTs: 1,
},
stats: tablepb.Stats{},
},
}
r := &ReplicationSet{}
for _, c := range cases {
r.updateCheckpointAndStats(c.checkpoint, c.stats)
}
}

0 comments on commit 81f029e

Please sign in to comment.