Skip to content

Commit

Permalink
scheduler(ticdc): fix invlaid checkpoint when redo enabled (#9851) (#…
Browse files Browse the repository at this point in the history
…9907)

close #9774, close #9830
  • Loading branch information
ti-chi-bot authored Oct 17, 2023
1 parent ff869a6 commit bef3924
Show file tree
Hide file tree
Showing 30 changed files with 395 additions and 132 deletions.
7 changes: 3 additions & 4 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,10 +448,7 @@ LOOP2:
}

checkpointTs := c.state.Status.CheckpointTs
if c.resolvedTs == 0 {
c.resolvedTs = checkpointTs
}

c.resolvedTs = checkpointTs
minTableBarrierTs := c.state.Status.MinTableBarrierTs

failpoint.Inject("NewChangefeedNoRetryError", func() {
Expand Down Expand Up @@ -600,6 +597,7 @@ LOOP2:
return err
}
if c.redoMetaMgr.Enabled() {
c.resolvedTs = c.redoMetaMgr.GetFlushedMeta().ResolvedTs
c.wg.Add(1)
go func() {
defer c.wg.Done()
Expand Down Expand Up @@ -715,6 +713,7 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) {
c.barriers = nil
c.initialized = false
c.isReleased = true
c.resolvedTs = 0

log.Info("changefeed closed",
zap.String("namespace", c.id.Namespace),
Expand Down
5 changes: 3 additions & 2 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,13 @@ func (p *processor) AddTableSpan(
// table is `prepared`, and a `isPrepare = false` request indicate that old table should
// be stopped on original capture already, it's safe to start replicating data now.
if !isPrepare {
redoStartTs := checkpoint.ResolvedTs
if p.redo.r.Enabled() {
// ResolvedTs is store in external storage when redo log is enabled, so we need to
// start table with ResolvedTs in redoDMLManager.
p.redo.r.StartTable(span, checkpoint.ResolvedTs)
p.redo.r.StartTable(span, redoStartTs)
}
if err := p.sinkManager.r.StartTable(span, startTs); err != nil {
if err := p.sinkManager.r.StartTable(span, startTs, redoStartTs); err != nil {
return false, errors.Trace(err)
}
}
Expand Down
8 changes: 6 additions & 2 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,11 @@ func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs mod
}

// StartTable sets the table(TableSink) state to replicating.
func (m *SinkManager) StartTable(span tablepb.Span, startTs model.Ts) error {
func (m *SinkManager) StartTable(
span tablepb.Span,
startTs model.Ts,
redoStartTs model.Ts,
) error {
log.Info("Start table sink",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
Expand All @@ -847,7 +851,7 @@ func (m *SinkManager) StartTable(span tablepb.Span, startTs model.Ts) error {
if m.redoDMLMgr != nil {
m.redoProgressHeap.push(&progress{
span: span,
nextLowerBoundPos: engine.Position{StartTs: 0, CommitTs: startTs + 1},
nextLowerBoundPos: engine.Position{StartTs: 0, CommitTs: redoStartTs + 1},
version: tableSink.(*tableSinkWrapper).version,
})
}
Expand Down
12 changes: 6 additions & 6 deletions cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestAddTable(t *testing.T) {
require.True(t, ok)
require.NotNil(t, tableSink)
require.Equal(t, 0, manager.sinkProgressHeap.len(), "Not started table shout not in progress heap")
err := manager.StartTable(span, 1)
err := manager.StartTable(span, 1, 1)
require.NoError(t, err)
require.Equal(t, uint64(0x7ffffffffffbffff), tableSink.(*tableSinkWrapper).replicateTs)

Expand All @@ -144,7 +144,7 @@ func TestRemoveTable(t *testing.T) {
tableSink, ok := manager.tableSinks.Load(span)
require.True(t, ok)
require.NotNil(t, tableSink)
err := manager.StartTable(span, 0)
err := manager.StartTable(span, 0, 0)
require.NoError(t, err)
addTableAndAddEventsToSortEngine(t, e, span)
manager.UpdateBarrierTs(4, nil)
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestGenerateTableSinkTaskWithBarrierTs(t *testing.T) {
manager.UpdateBarrierTs(4, nil)
manager.UpdateReceivedSorterResolvedTs(span, 5)
manager.schemaStorage.AdvanceResolvedTs(5)
err := manager.StartTable(span, 0)
err := manager.StartTable(span, 0, 0)
require.NoError(t, err)

require.Eventually(t, func() bool {
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestGenerateTableSinkTaskWithResolvedTs(t *testing.T) {
manager.UpdateBarrierTs(4, nil)
manager.UpdateReceivedSorterResolvedTs(span, 3)
manager.schemaStorage.AdvanceResolvedTs(4)
err := manager.StartTable(span, 0)
err := manager.StartTable(span, 0, 0)
require.NoError(t, err)

require.Eventually(t, func() bool {
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestGetTableStatsToReleaseMemQuota(t *testing.T) {
manager.UpdateBarrierTs(4, nil)
manager.UpdateReceivedSorterResolvedTs(span, 5)
manager.schemaStorage.AdvanceResolvedTs(5)
err := manager.StartTable(span, 0)
err := manager.StartTable(span, 0, 0)
require.NoError(t, err)

require.Eventually(t, func() bool {
Expand Down Expand Up @@ -341,7 +341,7 @@ func TestSinkManagerRunWithErrors(t *testing.T) {

source.AddTable(span, "test", 100)
manager.AddTable(span, 100, math.MaxUint64)
manager.StartTable(span, 100)
manager.StartTable(span, 100, 0)
source.Add(span, model.NewResolvedPolymorphicEvent(0, 101))
manager.UpdateReceivedSorterResolvedTs(span, 101)
manager.UpdateBarrierTs(101, nil)
Expand Down
24 changes: 24 additions & 0 deletions cdc/scheduler/internal/v3/agent/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package agent

import (
"testing"

"github.com/pingcap/tiflow/pkg/leakutil"
)

func TestMain(m *testing.M) {
leakutil.SetUpLeakTest(m)
}
24 changes: 24 additions & 0 deletions cdc/scheduler/internal/v3/compat/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package compat

import (
"testing"

"github.com/pingcap/tiflow/pkg/leakutil"
)

func TestMain(m *testing.M) {
leakutil.SetUpLeakTest(m)
}
2 changes: 1 addition & 1 deletion cdc/scheduler/internal/v3/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func (c *coordinator) poll(
currentSpans := c.reconciler.Reconcile(
ctx, &c.tableRanges, replications, c.captureM.Captures, c.compat)
allTasks := c.schedulerM.Schedule(
checkpointTs, currentSpans, c.captureM.Captures, replications, runningTasks)
checkpointTs, currentSpans, c.captureM.Captures, replications, runningTasks, c.redoMetaManager)

// Handle generated schedule tasks.
msgs, err = c.replicationM.HandleTasks(allTasks)
Expand Down
2 changes: 1 addition & 1 deletion cdc/scheduler/internal/v3/coordinator_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func BenchmarkCoordinatorHeartbeatResponse(b *testing.B) {
captureID := fmt.Sprint(i % captureCount)
span := tablepb.Span{TableID: tableID}
rep, err := replication.NewReplicationSet(
span, 0, map[string]*tablepb.TableStatus{
span, tablepb.Checkpoint{}, map[string]*tablepb.TableStatus{
captureID: {
Span: tablepb.Span{TableID: tableID},
State: tablepb.TableStateReplicating,
Expand Down
24 changes: 24 additions & 0 deletions cdc/scheduler/internal/v3/keyspan/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package keyspan

import (
"testing"

"github.com/pingcap/tiflow/pkg/leakutil"
)

func TestMain(m *testing.M) {
leakutil.SetUpLeakTest(m)
}
24 changes: 24 additions & 0 deletions cdc/scheduler/internal/v3/member/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package member

import (
"testing"

"github.com/pingcap/tiflow/pkg/leakutil"
)

func TestMain(m *testing.M) {
leakutil.SetUpLeakTest(m)
}
24 changes: 24 additions & 0 deletions cdc/scheduler/internal/v3/replication/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package replication

import (
"testing"

"github.com/pingcap/tiflow/pkg/leakutil"
)

func TestMain(m *testing.M) {
leakutil.SetUpLeakTest(m)
}
19 changes: 12 additions & 7 deletions cdc/scheduler/internal/v3/replication/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ func (t MoveTable) String() string {

// AddTable is a schedule task for adding a table.
type AddTable struct {
Span tablepb.Span
CaptureID model.CaptureID
CheckpointTs model.Ts
Span tablepb.Span
CaptureID model.CaptureID
Checkpoint tablepb.Checkpoint
}

func (t AddTable) String() string {
return fmt.Sprintf("AddTable, span: %s, capture: %s, checkpointTs: %d",
t.Span.String(), t.CaptureID, t.CheckpointTs)
return fmt.Sprintf("AddTable, span: %s, capture: %s, checkpointTs: %d, resolvedTs: %d",
t.Span.String(), t.CaptureID, t.Checkpoint.CheckpointTs, t.Checkpoint.ResolvedTs)
}

// RemoveTable is a schedule task for removing a table.
Expand Down Expand Up @@ -200,7 +200,12 @@ func (r *Manager) HandleCaptureChanges(
}
var err error
spanStatusMap.Ascend(func(span tablepb.Span, status map[string]*tablepb.TableStatus) bool {
table, err1 := NewReplicationSet(span, checkpointTs, status, r.changefeedID)
checkpoint := tablepb.Checkpoint{
CheckpointTs: checkpointTs,
// Note that the real resolved ts is stored in the status.
ResolvedTs: checkpointTs,
}
table, err1 := NewReplicationSet(span, checkpoint, status, r.changefeedID)
if err1 != nil {
err = errors.Trace(err1)
return false
Expand Down Expand Up @@ -437,7 +442,7 @@ func (r *Manager) handleAddTableTask(
var err error
table, ok := r.spans.Get(task.Span)
if !ok {
table, err = NewReplicationSet(task.Span, task.CheckpointTs, nil, r.changefeedID)
table, err = NewReplicationSet(task.Span, task.Checkpoint, nil, r.changefeedID)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
Loading

0 comments on commit bef3924

Please sign in to comment.