Skip to content

Commit

Permalink
Revert "scheduler(ticdc): fix invlaid checkpoint when redo enabled (p…
Browse files Browse the repository at this point in the history
…ingcap#9851)"

This reverts commit 3b8d55b.
  • Loading branch information
CharlesCheung96 committed Oct 27, 2023
1 parent 4b2529f commit 5600f6b
Show file tree
Hide file tree
Showing 30 changed files with 132 additions and 395 deletions.
7 changes: 4 additions & 3 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,10 @@ LOOP2:
}

checkpointTs := c.state.Status.CheckpointTs
c.resolvedTs = checkpointTs
if c.resolvedTs == 0 {
c.resolvedTs = checkpointTs
}

minTableBarrierTs := c.state.Status.MinTableBarrierTs

failpoint.Inject("NewChangefeedNoRetryError", func() {
Expand Down Expand Up @@ -601,7 +604,6 @@ LOOP2:
return err
}
if c.redoMetaMgr.Enabled() {
c.resolvedTs = c.redoMetaMgr.GetFlushedMeta().ResolvedTs
c.wg.Add(1)
go func() {
defer c.wg.Done()
Expand Down Expand Up @@ -719,7 +721,6 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) {
c.barriers = nil
c.initialized = false
c.isReleased = true
c.resolvedTs = 0

log.Info("changefeed closed",
zap.String("namespace", c.id.Namespace),
Expand Down
5 changes: 2 additions & 3 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,12 @@ func (p *processor) AddTableSpan(
// table is `prepared`, and a `isPrepare = false` request indicate that old table should
// be stopped on original capture already, it's safe to start replicating data now.
if !isPrepare {
redoStartTs := checkpoint.ResolvedTs
if p.redo.r.Enabled() {
// ResolvedTs is store in external storage when redo log is enabled, so we need to
// start table with ResolvedTs in redoDMLManager.
p.redo.r.StartTable(span, redoStartTs)
p.redo.r.StartTable(span, checkpoint.ResolvedTs)
}
if err := p.sinkManager.r.StartTable(span, startTs, redoStartTs); err != nil {
if err := p.sinkManager.r.StartTable(span, startTs); err != nil {
return false, errors.Trace(err)
}
}
Expand Down
8 changes: 2 additions & 6 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,11 +821,7 @@ func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs mod
}

// StartTable sets the table(TableSink) state to replicating.
func (m *SinkManager) StartTable(
span tablepb.Span,
startTs model.Ts,
redoStartTs model.Ts,
) error {
func (m *SinkManager) StartTable(span tablepb.Span, startTs model.Ts) error {
log.Info("Start table sink",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
Expand All @@ -852,7 +848,7 @@ func (m *SinkManager) StartTable(
if m.redoDMLMgr != nil {
m.redoProgressHeap.push(&progress{
span: span,
nextLowerBoundPos: engine.Position{StartTs: 0, CommitTs: redoStartTs + 1},
nextLowerBoundPos: engine.Position{StartTs: 0, CommitTs: startTs + 1},
version: tableSink.(*tableSinkWrapper).version,
})
}
Expand Down
12 changes: 6 additions & 6 deletions cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestAddTable(t *testing.T) {
require.True(t, ok)
require.NotNil(t, tableSink)
require.Equal(t, 0, manager.sinkProgressHeap.len(), "Not started table shout not in progress heap")
err := manager.StartTable(span, 1, 1)
err := manager.StartTable(span, 1)
require.NoError(t, err)
require.Equal(t, uint64(0x7ffffffffffbffff), tableSink.(*tableSinkWrapper).replicateTs)

Expand All @@ -144,7 +144,7 @@ func TestRemoveTable(t *testing.T) {
tableSink, ok := manager.tableSinks.Load(span)
require.True(t, ok)
require.NotNil(t, tableSink)
err := manager.StartTable(span, 0, 0)
err := manager.StartTable(span, 0)
require.NoError(t, err)
addTableAndAddEventsToSortEngine(t, e, span)
manager.UpdateBarrierTs(4, nil)
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestGenerateTableSinkTaskWithBarrierTs(t *testing.T) {
manager.UpdateBarrierTs(4, nil)
manager.UpdateReceivedSorterResolvedTs(span, 5)
manager.schemaStorage.AdvanceResolvedTs(5)
err := manager.StartTable(span, 0, 0)
err := manager.StartTable(span, 0)
require.NoError(t, err)

require.Eventually(t, func() bool {
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestGenerateTableSinkTaskWithResolvedTs(t *testing.T) {
manager.UpdateBarrierTs(4, nil)
manager.UpdateReceivedSorterResolvedTs(span, 3)
manager.schemaStorage.AdvanceResolvedTs(4)
err := manager.StartTable(span, 0, 0)
err := manager.StartTable(span, 0)
require.NoError(t, err)

require.Eventually(t, func() bool {
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestGetTableStatsToReleaseMemQuota(t *testing.T) {
manager.UpdateBarrierTs(4, nil)
manager.UpdateReceivedSorterResolvedTs(span, 5)
manager.schemaStorage.AdvanceResolvedTs(5)
err := manager.StartTable(span, 0, 0)
err := manager.StartTable(span, 0)
require.NoError(t, err)

require.Eventually(t, func() bool {
Expand Down Expand Up @@ -341,7 +341,7 @@ func TestSinkManagerRunWithErrors(t *testing.T) {

source.AddTable(span, "test", 100)
manager.AddTable(span, 100, math.MaxUint64)
manager.StartTable(span, 100, 0)
manager.StartTable(span, 100)
source.Add(span, model.NewResolvedPolymorphicEvent(0, 101))
manager.UpdateReceivedSorterResolvedTs(span, 101)
manager.UpdateBarrierTs(101, nil)
Expand Down
24 changes: 0 additions & 24 deletions cdc/scheduler/internal/v3/agent/main_test.go

This file was deleted.

24 changes: 0 additions & 24 deletions cdc/scheduler/internal/v3/compat/main_test.go

This file was deleted.

2 changes: 1 addition & 1 deletion cdc/scheduler/internal/v3/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (c *coordinator) poll(
currentSpans := c.reconciler.Reconcile(
ctx, &c.tableRanges, replications, c.captureM.Captures, c.compat)
allTasks := c.schedulerM.Schedule(
checkpointTs, currentSpans, c.captureM.Captures, replications, runningTasks, c.redoMetaManager)
checkpointTs, currentSpans, c.captureM.Captures, replications, runningTasks)

// Handle generated schedule tasks.
msgs, err = c.replicationM.HandleTasks(allTasks)
Expand Down
2 changes: 1 addition & 1 deletion cdc/scheduler/internal/v3/coordinator_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func BenchmarkCoordinatorHeartbeatResponse(b *testing.B) {
captureID := fmt.Sprint(i % captureCount)
span := tablepb.Span{TableID: tableID}
rep, err := replication.NewReplicationSet(
span, tablepb.Checkpoint{}, map[string]*tablepb.TableStatus{
span, 0, map[string]*tablepb.TableStatus{
captureID: {
Span: tablepb.Span{TableID: tableID},
State: tablepb.TableStateReplicating,
Expand Down
24 changes: 0 additions & 24 deletions cdc/scheduler/internal/v3/keyspan/main_test.go

This file was deleted.

24 changes: 0 additions & 24 deletions cdc/scheduler/internal/v3/member/main_test.go

This file was deleted.

24 changes: 0 additions & 24 deletions cdc/scheduler/internal/v3/replication/main_test.go

This file was deleted.

19 changes: 7 additions & 12 deletions cdc/scheduler/internal/v3/replication/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ func (t MoveTable) String() string {

// AddTable is a schedule task for adding a table.
type AddTable struct {
Span tablepb.Span
CaptureID model.CaptureID
Checkpoint tablepb.Checkpoint
Span tablepb.Span
CaptureID model.CaptureID
CheckpointTs model.Ts
}

func (t AddTable) String() string {
return fmt.Sprintf("AddTable, span: %s, capture: %s, checkpointTs: %d, resolvedTs: %d",
t.Span.String(), t.CaptureID, t.Checkpoint.CheckpointTs, t.Checkpoint.ResolvedTs)
return fmt.Sprintf("AddTable, span: %s, capture: %s, checkpointTs: %d",
t.Span.String(), t.CaptureID, t.CheckpointTs)
}

// RemoveTable is a schedule task for removing a table.
Expand Down Expand Up @@ -200,12 +200,7 @@ func (r *Manager) HandleCaptureChanges(
}
var err error
spanStatusMap.Ascend(func(span tablepb.Span, status map[string]*tablepb.TableStatus) bool {
checkpoint := tablepb.Checkpoint{
CheckpointTs: checkpointTs,
// Note that the real resolved ts is stored in the status.
ResolvedTs: checkpointTs,
}
table, err1 := NewReplicationSet(span, checkpoint, status, r.changefeedID)
table, err1 := NewReplicationSet(span, checkpointTs, status, r.changefeedID)
if err1 != nil {
err = errors.Trace(err1)
return false
Expand Down Expand Up @@ -442,7 +437,7 @@ func (r *Manager) handleAddTableTask(
var err error
table, ok := r.spans.Get(task.Span)
if !ok {
table, err = NewReplicationSet(task.Span, task.Checkpoint, nil, r.changefeedID)
table, err = NewReplicationSet(task.Span, task.CheckpointTs, nil, r.changefeedID)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 5600f6b

Please sign in to comment.