Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

processor(ticdc): remove ctx from multiple place in the processor. #11006

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ type changefeed struct {
downstreamObserver observer.Observer
observerLastTick *atomic.Time

newDDLPuller func(ctx context.Context,
newDDLPuller func(
up *upstream.Upstream,
startTs uint64,
changefeed model.ChangeFeedID,
Expand Down Expand Up @@ -233,7 +233,7 @@ func newChangefeed4Test(
cfInfo *model.ChangeFeedInfo,
cfStatus *model.ChangeFeedStatus,
cfstateManager FeedStateManager, up *upstream.Upstream,
newDDLPuller func(ctx context.Context,
newDDLPuller func(
up *upstream.Upstream,
startTs uint64,
changefeed model.ChangeFeedID,
Expand Down Expand Up @@ -676,7 +676,7 @@ LOOP2:
})
c.ddlSink.run(cancelCtx)

c.ddlPuller = c.newDDLPuller(cancelCtx, c.upstream, ddlStartTs, c.id, c.schema, filter)
c.ddlPuller = c.newDDLPuller(c.upstream, ddlStartTs, c.id, c.schema, filter)
c.wg.Add(1)
go func() {
defer c.wg.Done()
Expand Down
12 changes: 6 additions & 6 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (m *mockDDLSink) run(ctx context.Context) {
}()
}

func (m *mockDDLSink) emitDDLEvent(ctx context.Context, ddl *model.DDLEvent) (bool, error) {
func (m *mockDDLSink) emitDDLEvent(_ context.Context, ddl *model.DDLEvent) (bool, error) {
m.ddlExecuting = ddl
defer func() {
if m.resetDDLDone {
Expand All @@ -126,7 +126,7 @@ func (m *mockDDLSink) emitDDLEvent(ctx context.Context, ddl *model.DDLEvent) (bo
return m.ddlDone, nil
}

func (m *mockDDLSink) emitSyncPoint(ctx context.Context, checkpointTs uint64) error {
func (m *mockDDLSink) emitSyncPoint(_ context.Context, checkpointTs uint64) error {
if checkpointTs == m.syncPoint {
return nil
}
Expand Down Expand Up @@ -184,18 +184,18 @@ func (m *mockScheduler) Tick(
}

// MoveTable is used to trigger manual table moves.
func (m *mockScheduler) MoveTable(tableID model.TableID, target model.CaptureID) {}
func (m *mockScheduler) MoveTable(_ model.TableID, _ model.CaptureID) {}

// Rebalance is used to trigger manual workload rebalances.
func (m *mockScheduler) Rebalance() {}

// DrainCapture implement scheduler interface
func (m *mockScheduler) DrainCapture(target model.CaptureID) (int, error) {
func (m *mockScheduler) DrainCapture(_ model.CaptureID) (int, error) {
return 0, nil
}

// Close closes the scheduler and releases resources.
func (m *mockScheduler) Close(ctx context.Context) {}
func (m *mockScheduler) Close(_ context.Context) {}

func newMockDDLSink(_ model.ChangeFeedID, _ *model.ChangeFeedInfo, _ func(error), _ func(error)) DDLSink {
return &mockDDLSink{
Expand All @@ -212,7 +212,7 @@ func newMockDDLSinkWithBootstrapError(_ model.ChangeFeedID, _ *model.ChangeFeedI
}
}

func newMockPuller(_ context.Context, _ *upstream.Upstream, startTs uint64, _ model.ChangeFeedID, schemaStorage entry.SchemaStorage, _ filter.Filter) puller.DDLPuller {
func newMockPuller(_ *upstream.Upstream, startTs uint64, _ model.ChangeFeedID, schemaStorage entry.SchemaStorage, _ filter.Filter) puller.DDLPuller {
return &mockDDLPuller{resolvedTs: startTs, schemaStorage: schemaStorage}
}

Expand Down
5 changes: 2 additions & 3 deletions cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var _ gc.Manager = (*mockManager)(nil)

// newOwner4Test creates a new Owner for test
func newOwner4Test(
newDDLPuller func(ctx context.Context,
newDDLPuller func(
up *upstream.Upstream,
startTs uint64,
changefeed model.ChangeFeedID,
Expand Down Expand Up @@ -108,8 +108,7 @@ func createOwner4Test(globalVars *vars.GlobalVars, t *testing.T) (*ownerImpl, *o

owner := newOwner4Test(
// new ddl puller
func(ctx context.Context,
up *upstream.Upstream,
func(up *upstream.Upstream,
startTs uint64,
changefeed model.ChangeFeedID,
schemaStorage entry.SchemaStorage,
Expand Down
13 changes: 4 additions & 9 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@
return errors.Trace(err)
}

if err = p.initDDLHandler(ctx); err != nil {
if err = p.initDDLHandler(); err != nil {

Check warning on line 623 in cdc/processor/processor.go

View check run for this annotation

Codecov / codecov/patch

cdc/processor/processor.go#L623

Added line #L623 was not covered by tests
return err
}
p.ddlHandler.name = "ddlHandler"
Expand Down Expand Up @@ -735,7 +735,7 @@
return cerror.ErrReactorFinished
}

func (p *processor) initDDLHandler(ctx context.Context) error {
func (p *processor) initDDLHandler() error {

Check warning on line 738 in cdc/processor/processor.go

View check run for this annotation

Codecov / codecov/patch

cdc/processor/processor.go#L738

Added line #L738 was not covered by tests
checkpointTs := p.latestInfo.GetCheckpointTs(p.latestStatus)
minTableBarrierTs := p.latestStatus.MinTableBarrierTs
forceReplicate := p.latestInfo.Config.ForceReplicate
Expand All @@ -748,21 +748,16 @@
} else {
ddlStartTs = checkpointTs - 1
}

f, err := filter.NewFilter(p.latestInfo.Config, "")
if err != nil {
return errors.Trace(err)
}
schemaStorage, err := entry.NewSchemaStorage(p.upstream.KVStorage, ddlStartTs,
forceReplicate, p.changefeedID, util.RoleProcessor, f)
forceReplicate, p.changefeedID, util.RoleProcessor, p.filter)

Check warning on line 752 in cdc/processor/processor.go

View check run for this annotation

Codecov / codecov/patch

cdc/processor/processor.go#L752

Added line #L752 was not covered by tests
if err != nil {
return errors.Trace(err)
}

serverCfg := config.GetGlobalServerConfig()
changefeedID := model.DefaultChangeFeedID(p.changefeedID.ID + "_processor_ddl_puller")
ddlPuller := puller.NewDDLJobPuller(
ctx, p.upstream, ddlStartTs, serverCfg, changefeedID, schemaStorage, p.filter,
p.upstream, ddlStartTs, serverCfg, changefeedID, schemaStorage, p.filter,

Check warning on line 760 in cdc/processor/processor.go

View check run for this annotation

Codecov / codecov/patch

cdc/processor/processor.go#L760

Added line #L760 was not covered by tests
)
p.ddlHandler.r = &ddlHandler{puller: ddlPuller, schemaStorage: schemaStorage}
return nil
Expand Down
6 changes: 2 additions & 4 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@
// NewDDLJobPuller creates a new NewDDLJobPuller,
// which fetches ddl events starting from checkpointTs.
func NewDDLJobPuller(
ctx context.Context,
up *upstream.Upstream,
checkpointTs uint64,
cfg *config.ServerConfig,
Expand Down Expand Up @@ -628,7 +627,7 @@
}

// NewDDLPuller return a puller for DDL Event
func NewDDLPuller(ctx context.Context,
func NewDDLPuller(
up *upstream.Upstream,
startTs uint64,
changefeed model.ChangeFeedID,
Expand All @@ -639,8 +638,7 @@
// storage can be nil only in the test
if up.KVStorage != nil {
changefeed.ID += "_owner_ddl_puller"
puller = NewDDLJobPuller(
ctx, up, startTs, config.GetGlobalServerConfig(),
puller = NewDDLJobPuller(up, startTs, config.GetGlobalServerConfig(),

Check warning on line 641 in cdc/puller/ddl_puller.go

View check run for this annotation

Codecov / codecov/patch

cdc/puller/ddl_puller.go#L641

Added line #L641 was not covered by tests
changefeed, schemaStorage, filter)
}

Expand Down
4 changes: 2 additions & 2 deletions cdc/puller/ddl_puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ func TestDDLPuller(t *testing.T) {
f,
)
require.Nil(t, err)
p := NewDDLPuller(ctx, up, startTs, model.DefaultChangeFeedID(changefeedInfo.ID), schemaStorage, f)
p := NewDDLPuller(up, startTs, model.DefaultChangeFeedID(changefeedInfo.ID), schemaStorage, f)
p.(*ddlPullerImpl).ddlJobPuller, _ = newMockDDLJobPuller(t, false)
ddlJobPullerImpl := p.(*ddlPullerImpl).ddlJobPuller.(*ddlJobPullerImpl)
ddlJobPullerImpl.setResolvedTs(startTs)
Expand Down Expand Up @@ -695,7 +695,7 @@ func TestResolvedTsStuck(t *testing.T) {
f,
)
require.Nil(t, err)
p := NewDDLPuller(ctx, up, startTs, model.DefaultChangeFeedID(changefeedInfo.ID), schemaStorage, f)
p := NewDDLPuller(up, startTs, model.DefaultChangeFeedID(changefeedInfo.ID), schemaStorage, f)

p.(*ddlPullerImpl).ddlJobPuller, _ = newMockDDLJobPuller(t, false)
ddlJobPullerImpl := p.(*ddlPullerImpl).ddlJobPuller.(*ddlJobPullerImpl)
Expand Down
Loading