diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 9b6e0413738..d3ed1e99c34 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -448,10 +448,7 @@ LOOP2: } checkpointTs := c.state.Status.CheckpointTs - if c.resolvedTs == 0 { - c.resolvedTs = checkpointTs - } - + c.resolvedTs = checkpointTs minTableBarrierTs := c.state.Status.MinTableBarrierTs failpoint.Inject("NewChangefeedNoRetryError", func() { @@ -600,6 +597,7 @@ LOOP2: return err } if c.redoMetaMgr.Enabled() { + c.resolvedTs = c.redoMetaMgr.GetFlushedMeta().ResolvedTs c.wg.Add(1) go func() { defer c.wg.Done() @@ -715,6 +713,7 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) { c.barriers = nil c.initialized = false c.isReleased = true + c.resolvedTs = 0 log.Info("changefeed closed", zap.String("namespace", c.id.Namespace), diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 52b3fdc229f..8a6806f00f4 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -145,12 +145,13 @@ func (p *processor) AddTableSpan( // table is `prepared`, and a `isPrepare = false` request indicate that old table should // be stopped on original capture already, it's safe to start replicating data now. if !isPrepare { + redoStartTs := checkpoint.ResolvedTs if p.redo.r.Enabled() { // ResolvedTs is store in external storage when redo log is enabled, so we need to // start table with ResolvedTs in redoDMLManager. - p.redo.r.StartTable(span, checkpoint.ResolvedTs) + p.redo.r.StartTable(span, redoStartTs) } - if err := p.sinkManager.r.StartTable(span, startTs); err != nil { + if err := p.sinkManager.r.StartTable(span, startTs, redoStartTs); err != nil { return false, errors.Trace(err) } } diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 35391e1f44e..9393751f18e 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -820,7 +820,11 @@ func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs mod } // StartTable sets the table(TableSink) state to replicating. -func (m *SinkManager) StartTable(span tablepb.Span, startTs model.Ts) error { +func (m *SinkManager) StartTable( + span tablepb.Span, + startTs model.Ts, + redoStartTs model.Ts, +) error { log.Info("Start table sink", zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID), @@ -847,7 +851,7 @@ func (m *SinkManager) StartTable(span tablepb.Span, startTs model.Ts) error { if m.redoDMLMgr != nil { m.redoProgressHeap.push(&progress{ span: span, - nextLowerBoundPos: engine.Position{StartTs: 0, CommitTs: startTs + 1}, + nextLowerBoundPos: engine.Position{StartTs: 0, CommitTs: redoStartTs + 1}, version: tableSink.(*tableSinkWrapper).version, }) } diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index b70e17fc3ab..16806580c02 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -117,7 +117,7 @@ func TestAddTable(t *testing.T) { require.True(t, ok) require.NotNil(t, tableSink) require.Equal(t, 0, manager.sinkProgressHeap.len(), "Not started table shout not in progress heap") - err := manager.StartTable(span, 1) + err := manager.StartTable(span, 1, 1) require.NoError(t, err) require.Equal(t, uint64(0x7ffffffffffbffff), tableSink.(*tableSinkWrapper).replicateTs) @@ -144,7 +144,7 @@ func TestRemoveTable(t *testing.T) { tableSink, ok := manager.tableSinks.Load(span) require.True(t, ok) require.NotNil(t, tableSink) - err := manager.StartTable(span, 0) + err := manager.StartTable(span, 0, 0) require.NoError(t, err) addTableAndAddEventsToSortEngine(t, e, span) manager.UpdateBarrierTs(4, nil) @@ -191,7 +191,7 @@ func TestGenerateTableSinkTaskWithBarrierTs(t *testing.T) { manager.UpdateBarrierTs(4, nil) manager.UpdateReceivedSorterResolvedTs(span, 5) manager.schemaStorage.AdvanceResolvedTs(5) - err := manager.StartTable(span, 0) + err := manager.StartTable(span, 0, 0) require.NoError(t, err) require.Eventually(t, func() bool { @@ -222,7 +222,7 @@ func TestGenerateTableSinkTaskWithResolvedTs(t *testing.T) { manager.UpdateBarrierTs(4, nil) manager.UpdateReceivedSorterResolvedTs(span, 3) manager.schemaStorage.AdvanceResolvedTs(4) - err := manager.StartTable(span, 0) + err := manager.StartTable(span, 0, 0) require.NoError(t, err) require.Eventually(t, func() bool { @@ -252,7 +252,7 @@ func TestGetTableStatsToReleaseMemQuota(t *testing.T) { manager.UpdateBarrierTs(4, nil) manager.UpdateReceivedSorterResolvedTs(span, 5) manager.schemaStorage.AdvanceResolvedTs(5) - err := manager.StartTable(span, 0) + err := manager.StartTable(span, 0, 0) require.NoError(t, err) require.Eventually(t, func() bool { @@ -341,7 +341,7 @@ func TestSinkManagerRunWithErrors(t *testing.T) { source.AddTable(span, "test", 100) manager.AddTable(span, 100, math.MaxUint64) - manager.StartTable(span, 100) + manager.StartTable(span, 100, 0) source.Add(span, model.NewResolvedPolymorphicEvent(0, 101)) manager.UpdateReceivedSorterResolvedTs(span, 101) manager.UpdateBarrierTs(101, nil) diff --git a/cdc/scheduler/internal/v3/agent/main_test.go b/cdc/scheduler/internal/v3/agent/main_test.go new file mode 100644 index 00000000000..cb7775cac71 --- /dev/null +++ b/cdc/scheduler/internal/v3/agent/main_test.go @@ -0,0 +1,24 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package agent + +import ( + "testing" + + "github.com/pingcap/tiflow/pkg/leakutil" +) + +func TestMain(m *testing.M) { + leakutil.SetUpLeakTest(m) +} diff --git a/cdc/scheduler/internal/v3/compat/main_test.go b/cdc/scheduler/internal/v3/compat/main_test.go new file mode 100644 index 00000000000..d8ad350ef4d --- /dev/null +++ b/cdc/scheduler/internal/v3/compat/main_test.go @@ -0,0 +1,24 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package compat + +import ( + "testing" + + "github.com/pingcap/tiflow/pkg/leakutil" +) + +func TestMain(m *testing.M) { + leakutil.SetUpLeakTest(m) +} diff --git a/cdc/scheduler/internal/v3/coordinator.go b/cdc/scheduler/internal/v3/coordinator.go index 2a98c7be959..c09fc76acdd 100644 --- a/cdc/scheduler/internal/v3/coordinator.go +++ b/cdc/scheduler/internal/v3/coordinator.go @@ -338,7 +338,7 @@ func (c *coordinator) poll( currentSpans := c.reconciler.Reconcile( ctx, &c.tableRanges, replications, c.captureM.Captures, c.compat) allTasks := c.schedulerM.Schedule( - checkpointTs, currentSpans, c.captureM.Captures, replications, runningTasks) + checkpointTs, currentSpans, c.captureM.Captures, replications, runningTasks, c.redoMetaManager) // Handle generated schedule tasks. msgs, err = c.replicationM.HandleTasks(allTasks) diff --git a/cdc/scheduler/internal/v3/coordinator_bench_test.go b/cdc/scheduler/internal/v3/coordinator_bench_test.go index 75094ba1d91..cdf3adc86d3 100644 --- a/cdc/scheduler/internal/v3/coordinator_bench_test.go +++ b/cdc/scheduler/internal/v3/coordinator_bench_test.go @@ -149,7 +149,7 @@ func BenchmarkCoordinatorHeartbeatResponse(b *testing.B) { captureID := fmt.Sprint(i % captureCount) span := tablepb.Span{TableID: tableID} rep, err := replication.NewReplicationSet( - span, 0, map[string]*tablepb.TableStatus{ + span, tablepb.Checkpoint{}, map[string]*tablepb.TableStatus{ captureID: { Span: tablepb.Span{TableID: tableID}, State: tablepb.TableStateReplicating, diff --git a/cdc/scheduler/internal/v3/keyspan/main_test.go b/cdc/scheduler/internal/v3/keyspan/main_test.go new file mode 100644 index 00000000000..da122358394 --- /dev/null +++ b/cdc/scheduler/internal/v3/keyspan/main_test.go @@ -0,0 +1,24 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package keyspan + +import ( + "testing" + + "github.com/pingcap/tiflow/pkg/leakutil" +) + +func TestMain(m *testing.M) { + leakutil.SetUpLeakTest(m) +} diff --git a/cdc/scheduler/internal/v3/member/main_test.go b/cdc/scheduler/internal/v3/member/main_test.go new file mode 100644 index 00000000000..02b77a22186 --- /dev/null +++ b/cdc/scheduler/internal/v3/member/main_test.go @@ -0,0 +1,24 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package member + +import ( + "testing" + + "github.com/pingcap/tiflow/pkg/leakutil" +) + +func TestMain(m *testing.M) { + leakutil.SetUpLeakTest(m) +} diff --git a/cdc/scheduler/internal/v3/replication/main_test.go b/cdc/scheduler/internal/v3/replication/main_test.go new file mode 100644 index 00000000000..cefa385ed62 --- /dev/null +++ b/cdc/scheduler/internal/v3/replication/main_test.go @@ -0,0 +1,24 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package replication + +import ( + "testing" + + "github.com/pingcap/tiflow/pkg/leakutil" +) + +func TestMain(m *testing.M) { + leakutil.SetUpLeakTest(m) +} diff --git a/cdc/scheduler/internal/v3/replication/replication_manager.go b/cdc/scheduler/internal/v3/replication/replication_manager.go index 0ec7c7a671b..416aaf4b29d 100644 --- a/cdc/scheduler/internal/v3/replication/replication_manager.go +++ b/cdc/scheduler/internal/v3/replication/replication_manager.go @@ -78,14 +78,14 @@ func (t MoveTable) String() string { // AddTable is a schedule task for adding a table. type AddTable struct { - Span tablepb.Span - CaptureID model.CaptureID - CheckpointTs model.Ts + Span tablepb.Span + CaptureID model.CaptureID + Checkpoint tablepb.Checkpoint } func (t AddTable) String() string { - return fmt.Sprintf("AddTable, span: %s, capture: %s, checkpointTs: %d", - t.Span.String(), t.CaptureID, t.CheckpointTs) + return fmt.Sprintf("AddTable, span: %s, capture: %s, checkpointTs: %d, resolvedTs: %d", + t.Span.String(), t.CaptureID, t.Checkpoint.CheckpointTs, t.Checkpoint.ResolvedTs) } // RemoveTable is a schedule task for removing a table. @@ -200,7 +200,12 @@ func (r *Manager) HandleCaptureChanges( } var err error spanStatusMap.Ascend(func(span tablepb.Span, status map[string]*tablepb.TableStatus) bool { - table, err1 := NewReplicationSet(span, checkpointTs, status, r.changefeedID) + checkpoint := tablepb.Checkpoint{ + CheckpointTs: checkpointTs, + // Note that the real resolved ts is stored in the status. + ResolvedTs: checkpointTs, + } + table, err1 := NewReplicationSet(span, checkpoint, status, r.changefeedID) if err1 != nil { err = errors.Trace(err1) return false @@ -437,7 +442,7 @@ func (r *Manager) handleAddTableTask( var err error table, ok := r.spans.Get(task.Span) if !ok { - table, err = NewReplicationSet(task.Span, task.CheckpointTs, nil, r.changefeedID) + table, err = NewReplicationSet(task.Span, task.Checkpoint, nil, r.changefeedID) if err != nil { return nil, errors.Trace(err) } diff --git a/cdc/scheduler/internal/v3/replication/replication_manager_test.go b/cdc/scheduler/internal/v3/replication/replication_manager_test.go index e8a7f8c6431..5acceb3a9c1 100644 --- a/cdc/scheduler/internal/v3/replication/replication_manager_test.go +++ b/cdc/scheduler/internal/v3/replication/replication_manager_test.go @@ -35,7 +35,12 @@ func TestReplicationManagerHandleAddTableTask(t *testing.T) { // Absent -> Prepare msgs, err := r.HandleTasks([]*ScheduleTask{{ AddTable: &AddTable{ - Span: spanz.TableIDToComparableSpan(1), CaptureID: "1", CheckpointTs: 1, + Span: spanz.TableIDToComparableSpan(1), + CaptureID: "1", + Checkpoint: tablepb.Checkpoint{ + CheckpointTs: 1, + ResolvedTs: 1, + }, }, Accept: func() { addTableCh <- 1 @@ -150,9 +155,15 @@ func TestReplicationManagerRemoveTable(t *testing.T) { // Add the table. span := spanz.TableIDToComparableSpan(1) - tbl, err := NewReplicationSet(span, 0, map[string]*tablepb.TableStatus{ - "1": {Span: span, State: tablepb.TableStateReplicating}, - }, model.ChangeFeedID{}) + tbl, err := NewReplicationSet(span, + tablepb.Checkpoint{ + CheckpointTs: 0, + ResolvedTs: 0, + }, + map[string]*tablepb.TableStatus{ + "1": {Span: span, State: tablepb.TableStateReplicating}, + }, + model.ChangeFeedID{}) require.Nil(t, err) require.Equal(t, ReplicationSetStateReplicating, tbl.State) r.spans.ReplaceOrInsert(spanz.TableIDToComparableSpan(1), tbl) @@ -246,9 +257,15 @@ func TestReplicationManagerMoveTable(t *testing.T) { // Add the table. span := spanz.TableIDToComparableSpan(1) - tbl, err := NewReplicationSet(span, 0, map[string]*tablepb.TableStatus{ - source: {Span: span, State: tablepb.TableStateReplicating}, - }, model.ChangeFeedID{}) + tbl, err := NewReplicationSet(span, + tablepb.Checkpoint{ + CheckpointTs: 0, + ResolvedTs: 0, + }, + map[string]*tablepb.TableStatus{ + source: {Span: span, State: tablepb.TableStateReplicating}, + }, + model.ChangeFeedID{}) require.Nil(t, err) require.Equal(t, ReplicationSetStateReplicating, tbl.State) r.spans.ReplaceOrInsert(spanz.TableIDToComparableSpan(1), tbl) @@ -377,19 +394,23 @@ func TestReplicationManagerBurstBalance(t *testing.T) { r := NewReplicationManager(1, model.ChangeFeedID{}) balanceTableCh := make(chan int, 1) + checkpoint := tablepb.Checkpoint{ + CheckpointTs: 1, + ResolvedTs: 1, + } // Burst balance is not limited by maxTaskConcurrency. msgs, err := r.HandleTasks([]*ScheduleTask{{ AddTable: &AddTable{ - Span: spanz.TableIDToComparableSpan(1), CaptureID: "0", CheckpointTs: 1, + Span: spanz.TableIDToComparableSpan(1), CaptureID: "0", Checkpoint: checkpoint, }, }, { BurstBalance: &BurstBalance{ AddTables: []AddTable{{ - Span: spanz.TableIDToComparableSpan(1), CaptureID: "1", CheckpointTs: 1, + Span: spanz.TableIDToComparableSpan(1), CaptureID: "1", Checkpoint: checkpoint, }, { - Span: spanz.TableIDToComparableSpan(2), CaptureID: "2", CheckpointTs: 1, + Span: spanz.TableIDToComparableSpan(2), CaptureID: "2", Checkpoint: checkpoint, }, { - Span: spanz.TableIDToComparableSpan(3), CaptureID: "3", CheckpointTs: 1, + Span: spanz.TableIDToComparableSpan(3), CaptureID: "3", Checkpoint: checkpoint, }}, }, Accept: func() { @@ -424,19 +445,25 @@ func TestReplicationManagerBurstBalance(t *testing.T) { // Add a new table. span := spanz.TableIDToComparableSpan(5) - table5, err := NewReplicationSet(span, 0, map[string]*tablepb.TableStatus{ - "5": {Span: span, State: tablepb.TableStateReplicating}, - }, model.ChangeFeedID{}) + table5, err := NewReplicationSet(span, + tablepb.Checkpoint{}, + map[string]*tablepb.TableStatus{ + "5": {Span: span, State: tablepb.TableStateReplicating}, + }, model.ChangeFeedID{}) require.Nil(t, err) r.spans.ReplaceOrInsert(span, table5) + checkpoint = tablepb.Checkpoint{ + CheckpointTs: 2, + ResolvedTs: 2, + } // More burst balance is still allowed. msgs, err = r.HandleTasks([]*ScheduleTask{{ BurstBalance: &BurstBalance{ AddTables: []AddTable{{ - Span: spanz.TableIDToComparableSpan(4), CaptureID: "4", CheckpointTs: 2, + Span: spanz.TableIDToComparableSpan(4), CaptureID: "4", Checkpoint: checkpoint, }, { - Span: spanz.TableIDToComparableSpan(1), CaptureID: "0", CheckpointTs: 2, + Span: spanz.TableIDToComparableSpan(1), CaptureID: "0", Checkpoint: checkpoint, }}, RemoveTables: []RemoveTable{{ Span: spanz.TableIDToComparableSpan(5), CaptureID: "5", @@ -489,13 +516,13 @@ func TestReplicationManagerBurstBalanceMoveTables(t *testing.T) { var err error // Two tables in "1". span := spanz.TableIDToComparableSpan(1) - table, err := NewReplicationSet(span, 0, map[string]*tablepb.TableStatus{ + table, err := NewReplicationSet(span, tablepb.Checkpoint{}, map[string]*tablepb.TableStatus{ "1": {Span: span, State: tablepb.TableStateReplicating}, }, model.ChangeFeedID{}) require.Nil(t, err) r.spans.ReplaceOrInsert(span, table) span2 := spanz.TableIDToComparableSpan(2) - table2, err := NewReplicationSet(span2, 0, map[string]*tablepb.TableStatus{ + table2, err := NewReplicationSet(span2, tablepb.Checkpoint{}, map[string]*tablepb.TableStatus{ "1": { Span: span2, State: tablepb.TableStateReplicating, Checkpoint: tablepb.Checkpoint{CheckpointTs: 1, ResolvedTs: 1}, @@ -609,7 +636,11 @@ func TestReplicationManagerAdvanceCheckpoint(t *testing.T) { r := NewReplicationManager(1, model.ChangeFeedID{}) span := spanz.TableIDToComparableSpan(1) - rs, err := NewReplicationSet(span, model.Ts(10), + rs, err := NewReplicationSet(span, + tablepb.Checkpoint{ + CheckpointTs: 10, + ResolvedTs: 10, + }, map[model.CaptureID]*tablepb.TableStatus{ "1": { Span: spanz.TableIDToComparableSpan(1), @@ -624,7 +655,11 @@ func TestReplicationManagerAdvanceCheckpoint(t *testing.T) { r.spans.ReplaceOrInsert(span, rs) span2 := spanz.TableIDToComparableSpan(2) - rs, err = NewReplicationSet(span2, model.Ts(15), + rs, err = NewReplicationSet(span2, + tablepb.Checkpoint{ + CheckpointTs: 15, + ResolvedTs: 15, + }, map[model.CaptureID]*tablepb.TableStatus{ "2": { Span: spanz.TableIDToComparableSpan(2), @@ -660,7 +695,11 @@ func TestReplicationManagerAdvanceCheckpoint(t *testing.T) { require.Equal(t, checkpointCannotProceed, resolved) span3 := spanz.TableIDToComparableSpan(3) - rs, err = NewReplicationSet(span3, model.Ts(5), + rs, err = NewReplicationSet(span3, + tablepb.Checkpoint{ + CheckpointTs: 5, + ResolvedTs: 5, + }, map[model.CaptureID]*tablepb.TableStatus{ "1": { Span: spanz.TableIDToComparableSpan(3), @@ -687,7 +726,11 @@ func TestReplicationManagerAdvanceCheckpoint(t *testing.T) { currentTables.UpdateTables([]model.TableID{1, 2, 3, 4}) span4 := spanz.TableIDToComparableSpan(4) - rs, err = NewReplicationSet(span4, model.Ts(3), + rs, err = NewReplicationSet(span4, + tablepb.Checkpoint{ + CheckpointTs: 3, + ResolvedTs: 3, + }, map[model.CaptureID]*tablepb.TableStatus{ "1": { Span: spanz.TableIDToComparableSpan(4), @@ -711,7 +754,11 @@ func TestReplicationManagerAdvanceCheckpoint(t *testing.T) { span5_2 := spanz.TableIDToComparableSpan(5) span5_2.StartKey = append(span5_2.StartKey, 0) for _, span := range []tablepb.Span{span5_1, span5_2} { - rs, err = NewReplicationSet(span, model.Ts(3), + rs, err = NewReplicationSet(span, + tablepb.Checkpoint{ + CheckpointTs: 3, + ResolvedTs: 3, + }, map[model.CaptureID]*tablepb.TableStatus{ "1": { Span: span, @@ -745,7 +792,11 @@ func TestReplicationManagerAdvanceCheckpoint(t *testing.T) { // redo is enabled currentTables.UpdateTables([]model.TableID{4}) spanRedo := spanz.TableIDToComparableSpan(4) - rs, err = NewReplicationSet(spanRedo, model.Ts(3), + rs, err = NewReplicationSet(spanRedo, + tablepb.Checkpoint{ + CheckpointTs: 3, + ResolvedTs: 3, + }, map[model.CaptureID]*tablepb.TableStatus{ "1": { Span: spanz.TableIDToComparableSpan(4), @@ -771,7 +822,11 @@ func TestReplicationManagerAdvanceCheckpointWithRedoEnabled(t *testing.T) { t.Parallel() r := NewReplicationManager(1, model.ChangeFeedID{}) span := spanz.TableIDToComparableSpan(1) - rs, err := NewReplicationSet(span, model.Ts(10), + rs, err := NewReplicationSet(span, + tablepb.Checkpoint{ + CheckpointTs: 10, + ResolvedTs: 10, + }, map[model.CaptureID]*tablepb.TableStatus{ "1": { Span: spanz.TableIDToComparableSpan(1), @@ -786,7 +841,11 @@ func TestReplicationManagerAdvanceCheckpointWithRedoEnabled(t *testing.T) { r.spans.ReplaceOrInsert(span, rs) span2 := spanz.TableIDToComparableSpan(2) - rs, err = NewReplicationSet(span2, model.Ts(15), + rs, err = NewReplicationSet(span2, + tablepb.Checkpoint{ + CheckpointTs: 15, + ResolvedTs: 15, + }, map[model.CaptureID]*tablepb.TableStatus{ "2": { Span: spanz.TableIDToComparableSpan(2), diff --git a/cdc/scheduler/internal/v3/replication/replication_set.go b/cdc/scheduler/internal/v3/replication/replication_set.go index fec432499a8..4096efc127e 100644 --- a/cdc/scheduler/internal/v3/replication/replication_set.go +++ b/cdc/scheduler/internal/v3/replication/replication_set.go @@ -141,7 +141,7 @@ type ReplicationSet struct { //nolint:revive // NewReplicationSet returns a new replication set. func NewReplicationSet( span tablepb.Span, - checkpoint model.Ts, + checkpoint tablepb.Checkpoint, tableStatus map[model.CaptureID]*tablepb.TableStatus, changefeed model.ChangeFeedID, ) (*ReplicationSet, error) { @@ -149,10 +149,7 @@ func NewReplicationSet( Changefeed: changefeed, Span: span, Captures: make(map[string]Role), - Checkpoint: tablepb.Checkpoint{ - CheckpointTs: checkpoint, - ResolvedTs: checkpoint, - }, + Checkpoint: checkpoint, } // Count of captures that is in Stopping states. stoppingCount := 0 diff --git a/cdc/scheduler/internal/v3/replication/replication_set_test.go b/cdc/scheduler/internal/v3/replication/replication_set_test.go index da417438bc0..a786535656a 100644 --- a/cdc/scheduler/internal/v3/replication/replication_set_test.go +++ b/cdc/scheduler/internal/v3/replication/replication_set_test.go @@ -220,7 +220,10 @@ func TestNewReplicationSet(t *testing.T) { for id, tc := range testcases { set := tc.set status := tc.tableStatus - checkpoint := tc.checkpoint + checkpoint := tablepb.Checkpoint{ + CheckpointTs: tc.checkpoint, + ResolvedTs: tc.checkpoint, + } span := tablepb.Span{TableID: 0} output, err := NewReplicationSet(span, checkpoint, status, model.ChangeFeedID{}) @@ -268,7 +271,7 @@ func TestReplicationSetPoll(t *testing.T) { } } span := tablepb.Span{TableID: 1} - r, _ := NewReplicationSet(span, 0, status, model.ChangeFeedID{}) + r, _ := NewReplicationSet(span, tablepb.Checkpoint{}, status, model.ChangeFeedID{}) var tableStates []int for state := range tablepb.TableState_name { tableStates = append(tableStates, int(state)) @@ -300,7 +303,7 @@ func TestReplicationSetPollUnknownCapture(t *testing.T) { tableID := model.TableID(1) span := tablepb.Span{TableID: tableID} - r, err := NewReplicationSet(span, 0, map[model.CaptureID]*tablepb.TableStatus{ + r, err := NewReplicationSet(span, tablepb.Checkpoint{}, map[model.CaptureID]*tablepb.TableStatus{ "1": { Span: tablepb.Span{TableID: tableID}, State: tablepb.TableStateReplicating, @@ -337,7 +340,7 @@ func TestReplicationSetAddTable(t *testing.T) { from := "1" tableID := model.TableID(1) span := tablepb.Span{TableID: tableID} - r, err := NewReplicationSet(span, 0, nil, model.ChangeFeedID{}) + r, err := NewReplicationSet(span, tablepb.Checkpoint{}, nil, model.ChangeFeedID{}) require.Nil(t, err) // Absent -> Prepare @@ -482,7 +485,7 @@ func TestReplicationSetRemoveTable(t *testing.T) { from := "1" tableID := model.TableID(1) span := tablepb.Span{TableID: tableID} - r, err := NewReplicationSet(span, 0, nil, model.ChangeFeedID{}) + r, err := NewReplicationSet(span, tablepb.Checkpoint{}, nil, model.ChangeFeedID{}) require.Nil(t, err) // Ignore removing table if it's not in replicating. @@ -563,7 +566,7 @@ func TestReplicationSetMoveTable(t *testing.T) { tableID := model.TableID(1) span := tablepb.Span{TableID: tableID} - r, err := NewReplicationSet(span, 0, nil, model.ChangeFeedID{}) + r, err := NewReplicationSet(span, tablepb.Checkpoint{}, nil, model.ChangeFeedID{}) require.Nil(t, err) source := "1" @@ -795,7 +798,7 @@ func TestReplicationSetCaptureShutdown(t *testing.T) { from := "1" tableID := model.TableID(1) span := tablepb.Span{TableID: tableID} - r, err := NewReplicationSet(span, 0, nil, model.ChangeFeedID{}) + r, err := NewReplicationSet(span, tablepb.Checkpoint{}, nil, model.ChangeFeedID{}) require.Nil(t, err) // Add table, Absent -> Prepare @@ -1101,7 +1104,7 @@ func TestReplicationSetCaptureShutdownAfterReconstructCommitState(t *testing.T) from: {Span: tablepb.Span{TableID: tableID}, State: tablepb.TableStatePrepared}, } span := tablepb.Span{TableID: tableID} - r, err := NewReplicationSet(span, 0, tableStatus, model.ChangeFeedID{}) + r, err := NewReplicationSet(span, tablepb.Checkpoint{}, tableStatus, model.ChangeFeedID{}) require.Nil(t, err) require.Equal(t, ReplicationSetStateCommit, r.State) require.Equal(t, "", r.Primary) @@ -1122,7 +1125,7 @@ func TestReplicationSetMoveTableWithHeartbeatResponse(t *testing.T) { tableID := model.TableID(1) span := tablepb.Span{TableID: tableID} - r, err := NewReplicationSet(span, 0, nil, model.ChangeFeedID{}) + r, err := NewReplicationSet(span, tablepb.Checkpoint{}, nil, model.ChangeFeedID{}) require.Nil(t, err) source := "1" @@ -1210,7 +1213,7 @@ func TestReplicationSetMoveTableSameDestCapture(t *testing.T) { tableID := model.TableID(1) span := tablepb.Span{TableID: tableID} - r, err := NewReplicationSet(span, 0, nil, model.ChangeFeedID{}) + r, err := NewReplicationSet(span, tablepb.Checkpoint{}, nil, model.ChangeFeedID{}) require.Nil(t, err) source := "1" @@ -1243,7 +1246,7 @@ func TestReplicationSetCommitRestart(t *testing.T) { }, } span := tablepb.Span{TableID: 0} - r, err := NewReplicationSet(span, 0, tableStatus, model.ChangeFeedID{}) + r, err := NewReplicationSet(span, tablepb.Checkpoint{}, tableStatus, model.ChangeFeedID{}) require.Nil(t, err) require.Equal(t, ReplicationSetStateCommit, r.State) require.EqualValues(t, RoleSecondary, r.Captures["1"]) @@ -1326,7 +1329,7 @@ func TestReplicationSetRemoveRestart(t *testing.T) { }, } span := tablepb.Span{TableID: 0} - r, err := NewReplicationSet(span, 0, tableStatus, model.ChangeFeedID{}) + r, err := NewReplicationSet(span, tablepb.Checkpoint{}, tableStatus, model.ChangeFeedID{}) require.Nil(t, err) require.Equal(t, ReplicationSetStateRemoving, r.State) require.False(t, r.hasRole(RoleSecondary)) diff --git a/cdc/scheduler/internal/v3/scheduler/main_test.go b/cdc/scheduler/internal/v3/scheduler/main_test.go new file mode 100644 index 00000000000..47da658bfe6 --- /dev/null +++ b/cdc/scheduler/internal/v3/scheduler/main_test.go @@ -0,0 +1,24 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler + +import ( + "testing" + + "github.com/pingcap/tiflow/pkg/leakutil" +) + +func TestMain(m *testing.M) { + leakutil.SetUpLeakTest(m) +} diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler.go b/cdc/scheduler/internal/v3/scheduler/scheduler.go index 96088ec52ff..fd117849bc3 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler.go @@ -24,7 +24,7 @@ import ( type scheduler interface { Name() string Schedule( - checkpointTs model.Ts, + checkpoint tablepb.Checkpoint, currentSpans []tablepb.Span, aliveCaptures map[model.CaptureID]*member.CaptureStatus, replications *spanz.BtreeMap[*replication.ReplicationSet], diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_balance.go b/cdc/scheduler/internal/v3/scheduler/scheduler_balance.go index 21c642a9178..26dce7da249 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_balance.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_balance.go @@ -56,7 +56,7 @@ func (b *balanceScheduler) Name() string { } func (b *balanceScheduler) Schedule( - _ model.Ts, + _ tablepb.Checkpoint, _ []tablepb.Span, captures map[model.CaptureID]*member.CaptureStatus, replications *spanz.BtreeMap[*replication.ReplicationSet], diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_balance_test.go b/cdc/scheduler/internal/v3/scheduler/scheduler_balance_test.go index a8efb07ad35..a66b52b328e 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_balance_test.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_balance_test.go @@ -18,6 +18,7 @@ import ( "time" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/scheduler/internal/v3/member" "github.com/pingcap/tiflow/cdc/scheduler/internal/v3/replication" "github.com/pingcap/tiflow/pkg/spanz" @@ -27,6 +28,7 @@ import ( func TestSchedulerBalanceCaptureOnline(t *testing.T) { t.Parallel() + var checkpoint tablepb.Checkpoint sched := newBalanceScheduler(time.Duration(0), 3) sched.random = nil @@ -37,14 +39,14 @@ func TestSchedulerBalanceCaptureOnline(t *testing.T) { 1: {State: replication.ReplicationSetStateReplicating, Primary: "a"}, 2: {State: replication.ReplicationSetStateReplicating, Primary: "a"}, }) - tasks := sched.Schedule(0, currentTables, captures, replications) + tasks := sched.Schedule(checkpoint, currentTables, captures, replications) require.Len(t, tasks, 1) require.NotNil(t, tasks[0].MoveTable) require.Equal(t, tasks[0].MoveTable.Span.TableID, model.TableID(1)) // New capture "b" online, but this time has capture is stopping captures["a"].State = member.CaptureStateStopping - tasks = sched.Schedule(0, currentTables, captures, replications) + tasks = sched.Schedule(checkpoint, currentTables, captures, replications) require.Len(t, tasks, 0) // New capture "b" online, it keeps balancing, even though it has not pass @@ -56,7 +58,7 @@ func TestSchedulerBalanceCaptureOnline(t *testing.T) { 1: {State: replication.ReplicationSetStateReplicating, Primary: "a"}, 2: {State: replication.ReplicationSetStateReplicating, Primary: "a"}, }) - tasks = sched.Schedule(0, currentTables, captures, replications) + tasks = sched.Schedule(checkpoint, currentTables, captures, replications) require.Len(t, tasks, 1) // New capture "b" online, but this time it not pass check balance interval. @@ -68,13 +70,14 @@ func TestSchedulerBalanceCaptureOnline(t *testing.T) { 1: {State: replication.ReplicationSetStateReplicating, Primary: "a"}, 2: {State: replication.ReplicationSetStateReplicating, Primary: "a"}, }) - tasks = sched.Schedule(0, currentTables, captures, replications) + tasks = sched.Schedule(checkpoint, currentTables, captures, replications) require.Len(t, tasks, 0) } func TestSchedulerBalanceTaskLimit(t *testing.T) { t.Parallel() + var checkpoint tablepb.Checkpoint sched := newBalanceScheduler(time.Duration(0), 2) sched.random = nil @@ -87,10 +90,10 @@ func TestSchedulerBalanceTaskLimit(t *testing.T) { 3: {State: replication.ReplicationSetStateReplicating, Primary: "a"}, 4: {State: replication.ReplicationSetStateReplicating, Primary: "a"}, }) - tasks := sched.Schedule(0, currentTables, captures, replications) + tasks := sched.Schedule(checkpoint, currentTables, captures, replications) require.Len(t, tasks, 2) sched = newBalanceScheduler(time.Duration(0), 1) - tasks = sched.Schedule(0, currentTables, captures, replications) + tasks = sched.Schedule(checkpoint, currentTables, captures, replications) require.Len(t, tasks, 1) } diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_basic.go b/cdc/scheduler/internal/v3/scheduler/scheduler_basic.go index 2efffdbda7a..bb6e613f9de 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_basic.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_basic.go @@ -49,7 +49,7 @@ func (b *basicScheduler) Name() string { } func (b *basicScheduler) Schedule( - checkpointTs model.Ts, + checkpoint tablepb.Checkpoint, currentSpans []tablepb.Span, captures map[model.CaptureID]*member.CaptureStatus, replications *spanz.BtreeMap[*replication.ReplicationSet], @@ -102,7 +102,7 @@ func (b *basicScheduler) Schedule( return tasks } tasks = append( - tasks, newBurstAddTables(b.changefeedID, checkpointTs, newSpans, captureIDs)) + tasks, newBurstAddTables(b.changefeedID, checkpoint, newSpans, captureIDs)) } // Build remove table tasks. @@ -140,16 +140,16 @@ func (b *basicScheduler) Schedule( // newBurstAddTables add each new table to captures in a round-robin way. func newBurstAddTables( changefeedID model.ChangeFeedID, - checkpointTs model.Ts, newSpans []tablepb.Span, captureIDs []model.CaptureID, + checkpoint tablepb.Checkpoint, newSpans []tablepb.Span, captureIDs []model.CaptureID, ) *replication.ScheduleTask { idx := 0 tables := make([]replication.AddTable, 0, len(newSpans)) for _, span := range newSpans { targetCapture := captureIDs[idx] tables = append(tables, replication.AddTable{ - Span: span, - CaptureID: targetCapture, - CheckpointTs: checkpointTs, + Span: span, + CaptureID: targetCapture, + Checkpoint: checkpoint, }) log.Info("schedulerv3: burst add table", zap.String("namespace", changefeedID.Namespace), diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_basic_test.go b/cdc/scheduler/internal/v3/scheduler/scheduler_basic_test.go index 83d00ed36b0..ad8ecd63a3d 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_basic_test.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_basic_test.go @@ -36,6 +36,7 @@ func mapToSpanMap[T any](in map[model.TableID]T) *spanz.BtreeMap[T] { func TestSchedulerBasic(t *testing.T) { t.Parallel() + var checkpoint tablepb.Checkpoint captures := map[model.CaptureID]*member.CaptureStatus{"a": {}, "b": {}} currentTables := spanz.ArrayToSpan([]model.TableID{1, 2, 3, 4}) @@ -46,7 +47,7 @@ func TestSchedulerBasic(t *testing.T) { // one capture stopping, another one is initialized captures["a"].State = member.CaptureStateStopping - tasks := b.Schedule(0, currentTables, captures, replications) + tasks := b.Schedule(checkpoint, currentTables, captures, replications) require.Len(t, tasks, 1) require.Len(t, tasks[0].BurstBalance.AddTables, 2) require.Equal(t, tasks[0].BurstBalance.AddTables[0].CaptureID, "b") @@ -54,12 +55,12 @@ func TestSchedulerBasic(t *testing.T) { // all capture's stopping, cannot add table captures["b"].State = member.CaptureStateStopping - tasks = b.Schedule(0, currentTables, captures, replications) + tasks = b.Schedule(checkpoint, currentTables, captures, replications) require.Len(t, tasks, 0) captures["a"].State = member.CaptureStateInitialized captures["b"].State = member.CaptureStateInitialized - tasks = b.Schedule(0, currentTables, captures, replications) + tasks = b.Schedule(checkpoint, currentTables, captures, replications) require.Len(t, tasks, 1) require.Len(t, tasks[0].BurstBalance.AddTables, 2) require.Equal(t, tasks[0].BurstBalance.AddTables[0].Span.TableID, model.TableID(1)) @@ -88,10 +89,11 @@ func TestSchedulerBasic(t *testing.T) { }, 4: {State: replication.ReplicationSetStateAbsent}, }) - tasks = b.Schedule(1, currentTables, captures, replications) + checkpoint1 := tablepb.Checkpoint{CheckpointTs: 1, ResolvedTs: 1} + tasks = b.Schedule(checkpoint1, currentTables, captures, replications) require.Len(t, tasks, 1) require.Equal(t, tasks[0].BurstBalance.AddTables[0].Span.TableID, model.TableID(4)) - require.Equal(t, tasks[0].BurstBalance.AddTables[0].CheckpointTs, model.Ts(1)) + require.Equal(t, tasks[0].BurstBalance.AddTables[0].Checkpoint, checkpoint1) // DDL CREATE/DROP/TRUNCATE TABLE. // AddTable 4, and RemoveTable 5. @@ -121,15 +123,16 @@ func TestSchedulerBasic(t *testing.T) { }, }, }) - tasks = b.Schedule(2, currentTables, captures, replications) + checkpoint2 := tablepb.Checkpoint{CheckpointTs: 2, ResolvedTs: 2} + tasks = b.Schedule(checkpoint2, currentTables, captures, replications) require.Len(t, tasks, 2) if tasks[0].BurstBalance.AddTables != nil { require.Equal(t, tasks[0].BurstBalance.AddTables[0].Span.TableID, model.TableID(4)) - require.Equal(t, tasks[0].BurstBalance.AddTables[0].CheckpointTs, model.Ts(2)) + require.Equal(t, tasks[0].BurstBalance.AddTables[0].Checkpoint, checkpoint2) require.Equal(t, tasks[1].BurstBalance.RemoveTables[0].Span.TableID, model.TableID(5)) } else { require.Equal(t, tasks[1].BurstBalance.AddTables[0].Span.TableID, model.TableID(4)) - require.Equal(t, tasks[0].BurstBalance.AddTables[0].CheckpointTs, model.Ts(2)) + require.Equal(t, tasks[0].BurstBalance.AddTables[0].Checkpoint, checkpoint2) require.Equal(t, tasks[0].BurstBalance.RemoveTables[0].Span.TableID, model.TableID(5)) } @@ -166,7 +169,8 @@ func TestSchedulerBasic(t *testing.T) { }, }, }) - tasks = b.Schedule(3, currentTables, captures, replications) + checkpoint3 := tablepb.Checkpoint{CheckpointTs: 3, ResolvedTs: 3} + tasks = b.Schedule(checkpoint3, currentTables, captures, replications) require.Len(t, tasks, 1) require.Equal(t, tasks[0].BurstBalance.RemoveTables[0].Span.TableID, model.TableID(5)) } @@ -193,12 +197,13 @@ func benchmarkSchedulerBalance( ), ) { size := 16384 + var checkpoint tablepb.Checkpoint for total := 1; total <= size; total *= 2 { name, currentTables, captures, replications, sched := factory(total) b.ResetTimer() b.Run(name, func(b *testing.B) { for i := 0; i < b.N; i++ { - sched.Schedule(0, currentTables, captures, replications) + sched.Schedule(checkpoint, currentTables, captures, replications) } }) b.StopTimer() diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_drain_capture.go b/cdc/scheduler/internal/v3/scheduler/scheduler_drain_capture.go index 66de141b7a4..a1995a58506 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_drain_capture.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_drain_capture.go @@ -71,7 +71,7 @@ func (d *drainCaptureScheduler) setTarget(target model.CaptureID) bool { } func (d *drainCaptureScheduler) Schedule( - _ model.Ts, + _ tablepb.Checkpoint, _ []tablepb.Span, captures map[model.CaptureID]*member.CaptureStatus, replications *spanz.BtreeMap[*replication.ReplicationSet], diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_drain_capture_test.go b/cdc/scheduler/internal/v3/scheduler/scheduler_drain_capture_test.go index 03f1a99197e..81af850e20a 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_drain_capture_test.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_drain_capture_test.go @@ -30,18 +30,18 @@ func TestDrainCapture(t *testing.T) { scheduler := newDrainCaptureScheduler(10, model.ChangeFeedID{}) require.Equal(t, "drain-capture-scheduler", scheduler.Name()) - var checkpointTs model.Ts + var checkpoint tablepb.Checkpoint captures := make(map[model.CaptureID]*member.CaptureStatus) currentTables := make([]tablepb.Span, 0) replications := mapToSpanMap(make(map[model.TableID]*replication.ReplicationSet)) - tasks := scheduler.Schedule(checkpointTs, currentTables, captures, replications) + tasks := scheduler.Schedule(checkpoint, currentTables, captures, replications) require.Len(t, tasks, 0) ok := scheduler.setTarget("a") require.True(t, ok) - tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) require.Len(t, tasks, 0) // the target capture has no table at the beginning, so reset the target require.Equal(t, captureIDNotDraining, scheduler.target) @@ -50,7 +50,7 @@ func TestDrainCapture(t *testing.T) { ok = scheduler.setTarget("b") require.True(t, ok) - tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) require.Len(t, tasks, 0) // the target capture cannot be found in the latest captures require.Equal(t, captureIDNotDraining, scheduler.target) @@ -105,7 +105,7 @@ func TestDrainCapture(t *testing.T) { ok = scheduler.setTarget("a") require.True(t, ok) // not all table is replicating, skip this tick. - tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) require.Equal(t, "a", scheduler.target) require.Len(t, tasks, 0) @@ -118,13 +118,13 @@ func TestDrainCapture(t *testing.T) { 7: {State: replication.ReplicationSetStateReplicating, Primary: "b"}, }) - tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) require.Equal(t, "a", scheduler.target) require.Len(t, tasks, 3) scheduler = newDrainCaptureScheduler(1, model.ChangeFeedID{}) require.True(t, scheduler.setTarget("a")) - tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) require.Equal(t, "a", scheduler.target) require.Len(t, tasks, 1) } @@ -132,13 +132,13 @@ func TestDrainCapture(t *testing.T) { func TestDrainStoppingCapture(t *testing.T) { t.Parallel() - var checkpointTs model.Ts + var checkpoint tablepb.Checkpoint captures := make(map[model.CaptureID]*member.CaptureStatus) currentTables := make([]tablepb.Span, 0) replications := mapToSpanMap(make(map[model.TableID]*replication.ReplicationSet)) scheduler := newDrainCaptureScheduler(10, model.ChangeFeedID{}) - tasks := scheduler.Schedule(checkpointTs, currentTables, captures, replications) + tasks := scheduler.Schedule(checkpoint, currentTables, captures, replications) require.Empty(t, tasks) captures["a"] = &member.CaptureStatus{} @@ -147,7 +147,7 @@ func TestDrainStoppingCapture(t *testing.T) { 1: {State: replication.ReplicationSetStateReplicating, Primary: "a"}, 2: {State: replication.ReplicationSetStateReplicating, Primary: "b"}, }) - tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) require.Len(t, tasks, 1) require.EqualValues(t, 2, tasks[0].MoveTable.Span.TableID) require.EqualValues(t, "a", tasks[0].MoveTable.DestCapture) @@ -157,7 +157,7 @@ func TestDrainStoppingCapture(t *testing.T) { func TestDrainSkipOwner(t *testing.T) { t.Parallel() - var checkpointTs model.Ts + var checkpoint tablepb.Checkpoint currentTables := make([]tablepb.Span, 0) captures := map[model.CaptureID]*member.CaptureStatus{ "a": {}, @@ -168,7 +168,7 @@ func TestDrainSkipOwner(t *testing.T) { 2: {State: replication.ReplicationSetStateReplicating, Primary: "b"}, }) scheduler := newDrainCaptureScheduler(10, model.ChangeFeedID{}) - tasks := scheduler.Schedule(checkpointTs, currentTables, captures, replications) + tasks := scheduler.Schedule(checkpoint, currentTables, captures, replications) require.Len(t, tasks, 0) require.EqualValues(t, captureIDNotDraining, scheduler.getTarget()) } @@ -176,7 +176,7 @@ func TestDrainSkipOwner(t *testing.T) { func TestDrainImbalanceCluster(t *testing.T) { t.Parallel() - var checkpointTs model.Ts + var checkpoint tablepb.Checkpoint currentTables := make([]tablepb.Span, 0) captures := map[model.CaptureID]*member.CaptureStatus{ "a": {State: member.CaptureStateInitialized}, @@ -188,7 +188,7 @@ func TestDrainImbalanceCluster(t *testing.T) { }) scheduler := newDrainCaptureScheduler(10, model.ChangeFeedID{}) scheduler.setTarget("a") - tasks := scheduler.Schedule(checkpointTs, currentTables, captures, replications) + tasks := scheduler.Schedule(checkpoint, currentTables, captures, replications) require.Len(t, tasks, 2) require.EqualValues(t, "a", scheduler.getTarget()) } @@ -196,7 +196,7 @@ func TestDrainImbalanceCluster(t *testing.T) { func TestDrainEvenlyDistributedTables(t *testing.T) { t.Parallel() - var checkpointTs model.Ts + var checkpoint tablepb.Checkpoint currentTables := make([]tablepb.Span, 0) captures := map[model.CaptureID]*member.CaptureStatus{ "a": {State: member.CaptureStateInitialized}, @@ -211,7 +211,7 @@ func TestDrainEvenlyDistributedTables(t *testing.T) { }) scheduler := newDrainCaptureScheduler(10, model.ChangeFeedID{}) scheduler.setTarget("a") - tasks := scheduler.Schedule(checkpointTs, currentTables, captures, replications) + tasks := scheduler.Schedule(checkpoint, currentTables, captures, replications) require.Len(t, tasks, 3) taskMap := make(map[model.CaptureID]int) for _, t := range tasks { diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_manager.go b/cdc/scheduler/internal/v3/scheduler/scheduler_manager.go index 76817b344f6..21ab13b3ef5 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_manager.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_manager.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/tablepb" + "github.com/pingcap/tiflow/cdc/redo" "github.com/pingcap/tiflow/cdc/scheduler/internal/v3/member" "github.com/pingcap/tiflow/cdc/scheduler/internal/v3/replication" "github.com/pingcap/tiflow/pkg/config" @@ -69,7 +70,22 @@ func (sm *Manager) Schedule( aliveCaptures map[model.CaptureID]*member.CaptureStatus, replications *spanz.BtreeMap[*replication.ReplicationSet], runTasking *spanz.BtreeMap[*replication.ScheduleTask], + redoMetaManager redo.MetaManager, ) []*replication.ScheduleTask { + checkpoint := tablepb.Checkpoint{ + CheckpointTs: checkpointTs, + ResolvedTs: checkpointTs, + } + if redoMetaManager != nil && redoMetaManager.Enabled() { + flushedMeta := redoMetaManager.GetFlushedMeta() + if flushedMeta.ResolvedTs < checkpointTs { + log.Panic("schedulerv3: flushed resolved ts is less than checkpoint ts", + zap.Uint64("checkpointTs", checkpointTs), + zap.Any("flushedMeta", flushedMeta)) + } + checkpoint.ResolvedTs = flushedMeta.ResolvedTs + } + for sid, scheduler := range sm.schedulers { // Basic scheduler bypasses max task check, because it handles the most // critical scheduling, e.g. add table via CREATE TABLE DDL. @@ -80,7 +96,7 @@ func (sm *Manager) Schedule( return nil } } - tasks := scheduler.Schedule(checkpointTs, currentSpans, aliveCaptures, replications) + tasks := scheduler.Schedule(checkpoint, currentSpans, aliveCaptures, replications) for _, t := range tasks { name := struct { scheduler, task string diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_manager_test.go b/cdc/scheduler/internal/v3/scheduler/scheduler_manager_test.go index e7b3fb83a64..613273b81c6 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_manager_test.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_manager_test.go @@ -18,6 +18,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/tablepb" + "github.com/pingcap/tiflow/cdc/redo" "github.com/pingcap/tiflow/cdc/scheduler/internal/v3/member" "github.com/pingcap/tiflow/cdc/scheduler/internal/v3/replication" "github.com/pingcap/tiflow/pkg/config" @@ -53,14 +54,16 @@ func TestSchedulerManagerScheduler(t *testing.T) { // schedulerPriorityBasic bypasses task check. replications := mapToSpanMap(map[model.TableID]*replication.ReplicationSet{}) runningTasks := mapToSpanMap(map[model.TableID]*replication.ScheduleTask{1: {}}) - tasks := m.Schedule(0, currentSpans, captures, replications, runningTasks) + + redoMetaManager := redo.NewDisabledMetaManager() + tasks := m.Schedule(0, currentSpans, captures, replications, runningTasks, redoMetaManager) require.Len(t, tasks, 1) // No more task. replications = mapToSpanMap(map[model.TableID]*replication.ReplicationSet{ 1: {State: replication.ReplicationSetStateReplicating, Primary: "a"}, }) - tasks = m.Schedule(0, currentSpans, captures, replications, runningTasks) + tasks = m.Schedule(0, currentSpans, captures, replications, runningTasks, redoMetaManager) require.Len(t, tasks, 0) // Move table is dropped because of running tasks. @@ -68,7 +71,7 @@ func TestSchedulerManagerScheduler(t *testing.T) { replications = mapToSpanMap(map[model.TableID]*replication.ReplicationSet{ 1: {State: replication.ReplicationSetStateReplicating, Primary: "a"}, }) - tasks = m.Schedule(0, currentSpans, captures, replications, runningTasks) + tasks = m.Schedule(0, currentSpans, captures, replications, runningTasks, redoMetaManager) require.Len(t, tasks, 0) // Move table can proceed after clean up tasks. @@ -77,6 +80,6 @@ func TestSchedulerManagerScheduler(t *testing.T) { 1: {State: replication.ReplicationSetStateReplicating, Primary: "a"}, }) runningTasks = spanz.NewBtreeMap[*replication.ScheduleTask]() - tasks = m.Schedule(0, currentSpans, captures, replications, runningTasks) + tasks = m.Schedule(0, currentSpans, captures, replications, runningTasks, redoMetaManager) require.Len(t, tasks, 1) } diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_move_table.go b/cdc/scheduler/internal/v3/scheduler/scheduler_move_table.go index 10b44de77e5..bf9ef491f57 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_move_table.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_move_table.go @@ -67,7 +67,7 @@ func (m *moveTableScheduler) addTask(span tablepb.Span, target model.CaptureID) } func (m *moveTableScheduler) Schedule( - _ model.Ts, + _ tablepb.Checkpoint, currentSpans []tablepb.Span, captures map[model.CaptureID]*member.CaptureStatus, replications *spanz.BtreeMap[*replication.ReplicationSet], diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_move_table_test.go b/cdc/scheduler/internal/v3/scheduler/scheduler_move_table_test.go index e485db9ebed..3adc047bfbc 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_move_table_test.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_move_table_test.go @@ -27,7 +27,7 @@ import ( func TestSchedulerMoveTable(t *testing.T) { t.Parallel() - var checkpointTs model.Ts + var checkpoint tablepb.Checkpoint captures := map[model.CaptureID]*member.CaptureStatus{"a": { State: member.CaptureStateInitialized, }, "b": { @@ -43,38 +43,38 @@ func TestSchedulerMoveTable(t *testing.T) { require.Equal(t, "move-table-scheduler", scheduler.Name()) tasks := scheduler.Schedule( - checkpointTs, currentTables, map[model.CaptureID]*member.CaptureStatus{}, replications) + checkpoint, currentTables, map[model.CaptureID]*member.CaptureStatus{}, replications) require.Len(t, tasks, 0) scheduler.addTask(tablepb.Span{TableID: 0}, "a") tasks = scheduler.Schedule( - checkpointTs, currentTables, map[model.CaptureID]*member.CaptureStatus{}, replications) + checkpoint, currentTables, map[model.CaptureID]*member.CaptureStatus{}, replications) require.Len(t, tasks, 0) // move a not exist table scheduler.addTask(tablepb.Span{TableID: 0}, "a") - tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) require.Len(t, tasks, 0) // move table to a not exist capture scheduler.addTask(tablepb.Span{TableID: 1}, "c") - tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) require.Len(t, tasks, 0) // move table not replicating scheduler.addTask(tablepb.Span{TableID: 1}, "b") tasks = scheduler.Schedule( - checkpointTs, currentTables, captures, spanz.NewBtreeMap[*replication.ReplicationSet]()) + checkpoint, currentTables, captures, spanz.NewBtreeMap[*replication.ReplicationSet]()) require.Len(t, tasks, 0) scheduler.addTask(tablepb.Span{TableID: 1}, "b") replications.GetV(tablepb.Span{TableID: 1}).State = replication.ReplicationSetStatePrepare - tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) require.Len(t, tasks, 0) scheduler.addTask(tablepb.Span{TableID: 1}, "b") replications.GetV(tablepb.Span{TableID: 1}).State = replication.ReplicationSetStateReplicating - tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) require.Len(t, tasks, 1) require.Equal(t, model.TableID(1), tasks[0].MoveTable.Span.TableID) require.Equal(t, "b", tasks[0].MoveTable.DestCapture) @@ -83,7 +83,7 @@ func TestSchedulerMoveTable(t *testing.T) { // the target capture is stopping scheduler.addTask(tablepb.Span{TableID: 1}, "b") captures["b"].State = member.CaptureStateStopping - tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) require.Len(t, tasks, 0) require.False(t, scheduler.tasks.Has(tablepb.Span{TableID: 1})) } diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_rebalance.go b/cdc/scheduler/internal/v3/scheduler/scheduler_rebalance.go index ecc02d37c20..2bb4a4ecbda 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_rebalance.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_rebalance.go @@ -51,7 +51,7 @@ func (r *rebalanceScheduler) Name() string { } func (r *rebalanceScheduler) Schedule( - _ model.Ts, + _ tablepb.Checkpoint, currentSpans []tablepb.Span, captures map[model.CaptureID]*member.CaptureStatus, replications *spanz.BtreeMap[*replication.ReplicationSet], diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_rebalance_test.go b/cdc/scheduler/internal/v3/scheduler/scheduler_rebalance_test.go index e6b1ff53d83..c2b3ea9c878 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_rebalance_test.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_rebalance_test.go @@ -28,7 +28,7 @@ import ( func TestSchedulerRebalance(t *testing.T) { t.Parallel() - var checkpointTs model.Ts + var checkpoint tablepb.Checkpoint captures := map[model.CaptureID]*member.CaptureStatus{"a": {}, "b": {}} currentTables := spanz.ArrayToSpan([]model.TableID{1, 2, 3, 4}) @@ -57,22 +57,22 @@ func TestSchedulerRebalance(t *testing.T) { scheduler := newRebalanceScheduler(model.ChangeFeedID{}) require.Equal(t, "rebalance-scheduler", scheduler.Name()) // rebalance is not triggered - tasks := scheduler.Schedule(checkpointTs, currentTables, captures, replications) + tasks := scheduler.Schedule(checkpoint, currentTables, captures, replications) require.Len(t, tasks, 0) atomic.StoreInt32(&scheduler.rebalance, 1) // no captures tasks = scheduler.Schedule( - checkpointTs, currentTables, map[model.CaptureID]*member.CaptureStatus{}, replications) + checkpoint, currentTables, map[model.CaptureID]*member.CaptureStatus{}, replications) require.Len(t, tasks, 0) // table not in the replication set, tasks = scheduler.Schedule( - checkpointTs, spanz.ArrayToSpan([]model.TableID{0}), captures, replications) + checkpoint, spanz.ArrayToSpan([]model.TableID{0}), captures, replications) require.Len(t, tasks, 0) // not all tables are replicating, - tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) require.Len(t, tasks, 0) // table distribution is balanced, should have no task. @@ -82,7 +82,7 @@ func TestSchedulerRebalance(t *testing.T) { 3: {State: replication.ReplicationSetStateReplicating, Primary: "b"}, 4: {State: replication.ReplicationSetStateReplicating, Primary: "b"}, }) - tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) require.Len(t, tasks, 0) // Imbalance. @@ -97,14 +97,14 @@ func TestSchedulerRebalance(t *testing.T) { // capture is stopping, ignore the request captures["a"].State = member.CaptureStateStopping - tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) require.Len(t, tasks, 0) require.Equal(t, atomic.LoadInt32(&scheduler.rebalance), int32(0)) captures["a"].State = member.CaptureStateInitialized atomic.StoreInt32(&scheduler.rebalance, 1) scheduler.random = nil // disable random to make test easier. - tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) require.Len(t, tasks, 1) require.Contains(t, tasks[0].BurstBalance.MoveTables, replication.MoveTable{ Span: tablepb.Span{TableID: 1}, DestCapture: "b", @@ -115,6 +115,6 @@ func TestSchedulerRebalance(t *testing.T) { require.EqualValues(t, 0, atomic.LoadInt32(&scheduler.rebalance)) // pending task is not consumed yet, this turn should have no tasks. - tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) require.Len(t, tasks, 0) } diff --git a/cdc/scheduler/internal/v3/transport/main_test.go b/cdc/scheduler/internal/v3/transport/main_test.go new file mode 100644 index 00000000000..bdb388e4cbc --- /dev/null +++ b/cdc/scheduler/internal/v3/transport/main_test.go @@ -0,0 +1,24 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package transport + +import ( + "testing" + + "github.com/pingcap/tiflow/pkg/leakutil" +) + +func TestMain(m *testing.M) { + leakutil.SetUpLeakTest(m) +}