Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler(ticdc): revert 3b8d55 and do not return error when resolvedTs less than checkpoint #9953

Merged
merged 2 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,9 @@ LOOP2:
}

checkpointTs := c.latestStatus.CheckpointTs
c.resolvedTs = checkpointTs
if c.resolvedTs == 0 {
c.resolvedTs = checkpointTs
}
minTableBarrierTs := c.latestStatus.MinTableBarrierTs

failpoint.Inject("NewChangefeedNoRetryError", func() {
Expand Down Expand Up @@ -630,7 +632,6 @@ LOOP2:
return err
}
if c.redoMetaMgr.Enabled() {
c.resolvedTs = c.redoMetaMgr.GetFlushedMeta().ResolvedTs
c.wg.Add(1)
go func() {
defer c.wg.Done()
Expand Down Expand Up @@ -748,7 +749,6 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) {
c.barriers = nil
c.initialized = false
c.isReleased = true
c.resolvedTs = 0

log.Info("changefeed closed",
zap.String("namespace", c.id.Namespace),
Expand Down
5 changes: 2 additions & 3 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,12 @@ func (p *processor) AddTableSpan(
// table is `prepared`, and a `isPrepare = false` request indicate that old table should
// be stopped on original capture already, it's safe to start replicating data now.
if !isPrepare {
redoStartTs := checkpoint.ResolvedTs
if p.redo.r.Enabled() {
// ResolvedTs is store in external storage when redo log is enabled, so we need to
// start table with ResolvedTs in redoDMLManager.
p.redo.r.StartTable(span, redoStartTs)
p.redo.r.StartTable(span, checkpoint.ResolvedTs)
}
if err := p.sinkManager.r.StartTable(span, startTs, redoStartTs); err != nil {
if err := p.sinkManager.r.StartTable(span, startTs); err != nil {
return false, errors.Trace(err)
}
}
Expand Down
8 changes: 2 additions & 6 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,11 +821,7 @@ func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs mod
}

// StartTable sets the table(TableSink) state to replicating.
func (m *SinkManager) StartTable(
span tablepb.Span,
startTs model.Ts,
redoStartTs model.Ts,
) error {
func (m *SinkManager) StartTable(span tablepb.Span, startTs model.Ts) error {
log.Info("Start table sink",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
Expand All @@ -852,7 +848,7 @@ func (m *SinkManager) StartTable(
if m.redoDMLMgr != nil {
m.redoProgressHeap.push(&progress{
span: span,
nextLowerBoundPos: engine.Position{StartTs: 0, CommitTs: redoStartTs + 1},
nextLowerBoundPos: engine.Position{StartTs: 0, CommitTs: startTs + 1},
version: tableSink.(*tableSinkWrapper).version,
})
}
Expand Down
12 changes: 6 additions & 6 deletions cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestAddTable(t *testing.T) {
require.True(t, ok)
require.NotNil(t, tableSink)
require.Equal(t, 0, manager.sinkProgressHeap.len(), "Not started table shout not in progress heap")
err := manager.StartTable(span, 1, 1)
err := manager.StartTable(span, 1)
require.NoError(t, err)
require.Equal(t, uint64(0x7ffffffffffbffff), tableSink.(*tableSinkWrapper).replicateTs)

Expand All @@ -144,7 +144,7 @@ func TestRemoveTable(t *testing.T) {
tableSink, ok := manager.tableSinks.Load(span)
require.True(t, ok)
require.NotNil(t, tableSink)
err := manager.StartTable(span, 0, 0)
err := manager.StartTable(span, 0)
require.NoError(t, err)
addTableAndAddEventsToSortEngine(t, e, span)
manager.UpdateBarrierTs(4, nil)
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestGenerateTableSinkTaskWithBarrierTs(t *testing.T) {
manager.UpdateBarrierTs(4, nil)
manager.UpdateReceivedSorterResolvedTs(span, 5)
manager.schemaStorage.AdvanceResolvedTs(5)
err := manager.StartTable(span, 0, 0)
err := manager.StartTable(span, 0)
require.NoError(t, err)

require.Eventually(t, func() bool {
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestGenerateTableSinkTaskWithResolvedTs(t *testing.T) {
manager.UpdateBarrierTs(4, nil)
manager.UpdateReceivedSorterResolvedTs(span, 3)
manager.schemaStorage.AdvanceResolvedTs(4)
err := manager.StartTable(span, 0, 0)
err := manager.StartTable(span, 0)
require.NoError(t, err)

require.Eventually(t, func() bool {
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestGetTableStatsToReleaseMemQuota(t *testing.T) {
manager.UpdateBarrierTs(4, nil)
manager.UpdateReceivedSorterResolvedTs(span, 5)
manager.schemaStorage.AdvanceResolvedTs(5)
err := manager.StartTable(span, 0, 0)
err := manager.StartTable(span, 0)
require.NoError(t, err)

require.Eventually(t, func() bool {
Expand Down Expand Up @@ -341,7 +341,7 @@ func TestSinkManagerRunWithErrors(t *testing.T) {

source.AddTable(span, "test", 100)
manager.AddTable(span, 100, math.MaxUint64)
manager.StartTable(span, 100, 0)
manager.StartTable(span, 100)
source.Add(span, model.NewResolvedPolymorphicEvent(0, 101))
manager.UpdateReceivedSorterResolvedTs(span, 101)
manager.UpdateBarrierTs(101, nil)
Expand Down
24 changes: 0 additions & 24 deletions cdc/scheduler/internal/v3/agent/main_test.go

This file was deleted.

4 changes: 2 additions & 2 deletions cdc/scheduler/internal/v3/agent/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (t *tableSpan) getTableSpanStatus(collectStat bool) tablepb.TableStatus {

func newAddTableResponseMessage(status tablepb.TableStatus) *schedulepb.Message {
if status.Checkpoint.ResolvedTs < status.Checkpoint.CheckpointTs {
log.Panic("schedulerv3: resolved ts should not less than checkpoint ts",
log.Warn("schedulerv3: resolved ts should not less than checkpoint ts",
zap.Any("tableStatus", status),
zap.Any("checkpoint", status.Checkpoint.CheckpointTs),
zap.Any("resolved", status.Checkpoint.ResolvedTs))
Expand All @@ -100,7 +100,7 @@ func newRemoveTableResponseMessage(status tablepb.TableStatus) *schedulepb.Messa
// Advance resolved ts to checkpoint ts if table is removed.
status.Checkpoint.ResolvedTs = status.Checkpoint.CheckpointTs
} else {
log.Panic("schedulerv3: resolved ts should not less than checkpoint ts",
log.Warn("schedulerv3: resolved ts should not less than checkpoint ts",
zap.Any("tableStatus", status),
zap.Any("checkpoint", status.Checkpoint.CheckpointTs),
zap.Any("resolved", status.Checkpoint.ResolvedTs))
Expand Down
24 changes: 0 additions & 24 deletions cdc/scheduler/internal/v3/compat/main_test.go

This file was deleted.

2 changes: 1 addition & 1 deletion cdc/scheduler/internal/v3/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (c *coordinator) poll(
currentSpans := c.reconciler.Reconcile(
ctx, &c.tableRanges, replications, c.captureM.Captures, c.compat)
allTasks := c.schedulerM.Schedule(
checkpointTs, currentSpans, c.captureM.Captures, replications, runningTasks, c.redoMetaManager)
checkpointTs, currentSpans, c.captureM.Captures, replications, runningTasks)

// Handle generated schedule tasks.
msgs, err = c.replicationM.HandleTasks(allTasks)
Expand Down
2 changes: 1 addition & 1 deletion cdc/scheduler/internal/v3/coordinator_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func BenchmarkCoordinatorHeartbeatResponse(b *testing.B) {
captureID := fmt.Sprint(i % captureCount)
span := tablepb.Span{TableID: tableID}
rep, err := replication.NewReplicationSet(
span, tablepb.Checkpoint{}, map[string]*tablepb.TableStatus{
span, 0, map[string]*tablepb.TableStatus{
captureID: {
Span: tablepb.Span{TableID: tableID},
State: tablepb.TableStateReplicating,
Expand Down
24 changes: 0 additions & 24 deletions cdc/scheduler/internal/v3/keyspan/main_test.go

This file was deleted.

24 changes: 0 additions & 24 deletions cdc/scheduler/internal/v3/member/main_test.go

This file was deleted.

24 changes: 0 additions & 24 deletions cdc/scheduler/internal/v3/replication/main_test.go

This file was deleted.

19 changes: 7 additions & 12 deletions cdc/scheduler/internal/v3/replication/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ func (t MoveTable) String() string {

// AddTable is a schedule task for adding a table.
type AddTable struct {
Span tablepb.Span
CaptureID model.CaptureID
Checkpoint tablepb.Checkpoint
Span tablepb.Span
CaptureID model.CaptureID
CheckpointTs model.Ts
}

func (t AddTable) String() string {
return fmt.Sprintf("AddTable, span: %s, capture: %s, checkpointTs: %d, resolvedTs: %d",
t.Span.String(), t.CaptureID, t.Checkpoint.CheckpointTs, t.Checkpoint.ResolvedTs)
return fmt.Sprintf("AddTable, span: %s, capture: %s, checkpointTs: %d",
t.Span.String(), t.CaptureID, t.CheckpointTs)
}

// RemoveTable is a schedule task for removing a table.
Expand Down Expand Up @@ -200,12 +200,7 @@ func (r *Manager) HandleCaptureChanges(
}
var err error
spanStatusMap.Ascend(func(span tablepb.Span, status map[string]*tablepb.TableStatus) bool {
checkpoint := tablepb.Checkpoint{
CheckpointTs: checkpointTs,
// Note that the real resolved ts is stored in the status.
ResolvedTs: checkpointTs,
}
table, err1 := NewReplicationSet(span, checkpoint, status, r.changefeedID)
table, err1 := NewReplicationSet(span, checkpointTs, status, r.changefeedID)
if err1 != nil {
err = errors.Trace(err1)
return false
Expand Down Expand Up @@ -442,7 +437,7 @@ func (r *Manager) handleAddTableTask(
var err error
table, ok := r.spans.Get(task.Span)
if !ok {
table, err = NewReplicationSet(task.Span, task.Checkpoint, nil, r.changefeedID)
table, err = NewReplicationSet(task.Span, task.CheckpointTs, nil, r.changefeedID)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
Loading
Loading