Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove deprecated code from queue implementation #3770

Merged
merged 2 commits into from
Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,6 @@ const (
TimerProcessorMaxPollInterval = "history.timerProcessorMaxPollInterval"
// TimerProcessorMaxPollIntervalJitterCoefficient is the max poll interval jitter coefficient
TimerProcessorMaxPollIntervalJitterCoefficient = "history.timerProcessorMaxPollIntervalJitterCoefficient"
// TimerProcessorMaxReschedulerSize is the threshold of the number of tasks in the redispatch queue for timer processor
TimerProcessorMaxReschedulerSize = "history.timerProcessorMaxReschedulerSize"
// TimerProcessorPollBackoffInterval is the poll backoff interval if task redispatcher's size exceeds limit for timer processor
TimerProcessorPollBackoffInterval = "history.timerProcessorPollBackoffInterval"
// TimerProcessorMaxTimeShift is the max shift timer processor can have
Expand Down Expand Up @@ -452,8 +450,6 @@ const (
TransferProcessorUpdateAckIntervalJitterCoefficient = "history.transferProcessorUpdateAckIntervalJitterCoefficient"
// TransferProcessorCompleteTransferInterval is complete timer interval for transferQueueProcessor
TransferProcessorCompleteTransferInterval = "history.transferProcessorCompleteTransferInterval"
// TransferProcessorMaxReschedulerSize is the threshold of the number of tasks in the redispatch queue for transferQueueProcessor
TransferProcessorMaxReschedulerSize = "history.transferProcessorMaxReschedulerSize"
// TransferProcessorPollBackoffInterval is the poll backoff interval if task redispatcher's size exceeds limit for transferQueueProcessor
TransferProcessorPollBackoffInterval = "history.transferProcessorPollBackoffInterval"
// TransferProcessorVisibilityArchivalTimeLimit is the upper time limit for archiving visibility records
Expand Down Expand Up @@ -485,8 +481,6 @@ const (
VisibilityProcessorUpdateAckIntervalJitterCoefficient = "history.visibilityProcessorUpdateAckIntervalJitterCoefficient"
// VisibilityProcessorCompleteTaskInterval is complete timer interval for visibilityQueueProcessor
VisibilityProcessorCompleteTaskInterval = "history.visibilityProcessorCompleteTaskInterval"
// VisibilityProcessorMaxReschedulerSize is the threshold of the number of tasks in the redispatch queue for visibilityQueueProcessor
VisibilityProcessorMaxReschedulerSize = "history.visibilityProcessorMaxReschedulerSize"
// VisibilityProcessorPollBackoffInterval is the poll backoff interval if task redispatcher's size exceeds limit for visibilityQueueProcessor
VisibilityProcessorPollBackoffInterval = "history.visibilityProcessorPollBackoffInterval"
// VisibilityProcessorVisibilityArchivalTimeLimit is the upper time limit for archiving visibility records
Expand Down Expand Up @@ -545,8 +539,6 @@ const (
ReplicatorProcessorUpdateAckInterval = "history.replicatorProcessorUpdateAckInterval"
// ReplicatorProcessorUpdateAckIntervalJitterCoefficient is the update interval jitter coefficient
ReplicatorProcessorUpdateAckIntervalJitterCoefficient = "history.replicatorProcessorUpdateAckIntervalJitterCoefficient"
// ReplicatorProcessorMaxReschedulerSize is the threshold of the number of tasks in the redispatch queue for ReplicatorProcessor
ReplicatorProcessorMaxReschedulerSize = "history.replicatorProcessorMaxReschedulerSize"
// ReplicatorProcessorEnablePriorityTaskProcessor indicates whether priority task processor should be used for ReplicatorProcessor
ReplicatorProcessorEnablePriorityTaskProcessor = "history.replicatorProcessorEnablePriorityTaskProcessor"
// MaximumBufferedEventsBatch is max number of buffer event in mutable state
Expand Down
17 changes: 11 additions & 6 deletions service/history/archival_queue_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,6 @@ func newQueueFactoryBase(params ArchivalQueueFactoryParams, hostScheduler queues
return QueueFactoryBase{
HostScheduler: hostScheduler,
HostPriorityAssigner: queues.NewPriorityAssigner(),
HostRateLimiter: NewQueueHostRateLimiter(
params.Config.ArchivalProcessorMaxPollHostRPS,
params.Config.PersistenceMaxQPS,
archivalQueuePersistenceMaxRPSRatio,
),
HostReaderRateLimiter: queues.NewReaderPriorityRateLimiter(
NewHostRateLimiterRateFn(
params.Config.ArchivalProcessorMaxPollHostRPS,
Expand Down Expand Up @@ -152,10 +147,20 @@ func (f *archivalQueueFactory) newArchivalTaskExecutor(shard shard.Context, work
// newScheduledQueue creates a new scheduled queue for the given shard with archival-specific configurations.
func (f *archivalQueueFactory) newScheduledQueue(shard shard.Context, executor queues.Executor) queues.Queue {
logger := log.With(shard.GetLogger(), tag.ComponentArchivalQueue)
metricsHandler := f.MetricsHandler.WithTags(metrics.OperationTag(metrics.OperationArchivalQueueProcessorScope))

rescheduler := queues.NewRescheduler(
f.HostScheduler,
shard.GetTimeSource(),
logger,
metricsHandler,
)

return queues.NewScheduledQueue(
shard,
tasks.CategoryArchival,
f.HostScheduler,
rescheduler,
f.HostPriorityAssigner,
executor,
&queues.Options{
Expand All @@ -179,6 +184,6 @@ func (f *archivalQueueFactory) newScheduledQueue(shard shard.Context, executor q
},
f.HostReaderRateLimiter,
logger,
f.MetricsHandler.WithTags(metrics.OperationTag(metrics.OperationArchivalQueueProcessorScope)),
metricsHandler,
)
}
8 changes: 0 additions & 8 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ type Config struct {
TimerProcessorMaxPollHostRPS dynamicconfig.IntPropertyFn
TimerProcessorMaxPollInterval dynamicconfig.DurationPropertyFn
TimerProcessorMaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
TimerProcessorMaxReschedulerSize dynamicconfig.IntPropertyFn
TimerProcessorPollBackoffInterval dynamicconfig.DurationPropertyFn
TimerProcessorMaxTimeShift dynamicconfig.DurationPropertyFn
TimerProcessorHistoryArchivalSizeLimit dynamicconfig.IntPropertyFn
Expand All @@ -132,7 +131,6 @@ type Config struct {
TransferProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn
TransferProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
TransferProcessorCompleteTransferInterval dynamicconfig.DurationPropertyFn
TransferProcessorMaxReschedulerSize dynamicconfig.IntPropertyFn
TransferProcessorPollBackoffInterval dynamicconfig.DurationPropertyFn
TransferProcessorVisibilityArchivalTimeLimit dynamicconfig.DurationPropertyFn
TransferProcessorEnsureCloseBeforeDelete dynamicconfig.BoolPropertyFn
Expand All @@ -147,7 +145,6 @@ type Config struct {
ReplicatorProcessorMaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
ReplicatorProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn
ReplicatorProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
ReplicatorProcessorMaxReschedulerSize dynamicconfig.IntPropertyFn
ReplicatorProcessorEnablePriorityTaskProcessor dynamicconfig.BoolPropertyFn
ReplicatorProcessorFetchTasksBatchSize dynamicconfig.IntPropertyFn
ReplicatorProcessorMaxSkipTaskCount dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -263,7 +260,6 @@ type Config struct {
VisibilityProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn
VisibilityProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
VisibilityProcessorCompleteTaskInterval dynamicconfig.DurationPropertyFn
VisibilityProcessorMaxReschedulerSize dynamicconfig.IntPropertyFn
VisibilityProcessorPollBackoffInterval dynamicconfig.DurationPropertyFn
VisibilityProcessorVisibilityArchivalTimeLimit dynamicconfig.DurationPropertyFn
VisibilityProcessorEnsureCloseBeforeDelete dynamicconfig.BoolPropertyFn
Expand Down Expand Up @@ -366,7 +362,6 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
TimerProcessorMaxPollHostRPS: dc.GetIntProperty(dynamicconfig.TimerProcessorMaxPollHostRPS, 0),
TimerProcessorMaxPollInterval: dc.GetDurationProperty(dynamicconfig.TimerProcessorMaxPollInterval, 5*time.Minute),
TimerProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TimerProcessorMaxPollIntervalJitterCoefficient, 0.15),
TimerProcessorMaxReschedulerSize: dc.GetIntProperty(dynamicconfig.TimerProcessorMaxReschedulerSize, 10000),
TimerProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.TimerProcessorPollBackoffInterval, 5*time.Second),
TimerProcessorMaxTimeShift: dc.GetDurationProperty(dynamicconfig.TimerProcessorMaxTimeShift, 1*time.Second),
TimerProcessorHistoryArchivalSizeLimit: dc.GetIntProperty(dynamicconfig.TimerProcessorHistoryArchivalSizeLimit, 500*1024),
Expand All @@ -386,7 +381,6 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
TransferProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.TransferProcessorUpdateAckInterval, 30*time.Second),
TransferProcessorUpdateAckIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TransferProcessorUpdateAckIntervalJitterCoefficient, 0.15),
TransferProcessorCompleteTransferInterval: dc.GetDurationProperty(dynamicconfig.TransferProcessorCompleteTransferInterval, 60*time.Second),
TransferProcessorMaxReschedulerSize: dc.GetIntProperty(dynamicconfig.TransferProcessorMaxReschedulerSize, 10000),
TransferProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.TransferProcessorPollBackoffInterval, 5*time.Second),
TransferProcessorVisibilityArchivalTimeLimit: dc.GetDurationProperty(dynamicconfig.TransferProcessorVisibilityArchivalTimeLimit, 200*time.Millisecond),
TransferProcessorEnsureCloseBeforeDelete: dc.GetBoolProperty(dynamicconfig.TransferProcessorEnsureCloseBeforeDelete, true),
Expand All @@ -399,7 +393,6 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
ReplicatorProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.ReplicatorProcessorMaxPollIntervalJitterCoefficient, 0.15),
ReplicatorProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.ReplicatorProcessorUpdateAckInterval, 5*time.Second),
ReplicatorProcessorUpdateAckIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.ReplicatorProcessorUpdateAckIntervalJitterCoefficient, 0.15),
ReplicatorProcessorMaxReschedulerSize: dc.GetIntProperty(dynamicconfig.ReplicatorProcessorMaxReschedulerSize, 10000),
ReplicatorProcessorEnablePriorityTaskProcessor: dc.GetBoolProperty(dynamicconfig.ReplicatorProcessorEnablePriorityTaskProcessor, false),
ReplicatorProcessorFetchTasksBatchSize: dc.GetIntProperty(dynamicconfig.ReplicatorTaskBatchSize, 25),
ReplicatorProcessorMaxSkipTaskCount: dc.GetIntProperty(dynamicconfig.ReplicatorMaxSkipTaskCount, 250),
Expand Down Expand Up @@ -485,7 +478,6 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
VisibilityProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.VisibilityProcessorUpdateAckInterval, 30*time.Second),
VisibilityProcessorUpdateAckIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.VisibilityProcessorUpdateAckIntervalJitterCoefficient, 0.15),
VisibilityProcessorCompleteTaskInterval: dc.GetDurationProperty(dynamicconfig.VisibilityProcessorCompleteTaskInterval, 60*time.Second),
VisibilityProcessorMaxReschedulerSize: dc.GetIntProperty(dynamicconfig.VisibilityProcessorMaxReschedulerSize, 10000),
VisibilityProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.VisibilityProcessorPollBackoffInterval, 5*time.Second),
VisibilityProcessorVisibilityArchivalTimeLimit: dc.GetDurationProperty(dynamicconfig.VisibilityProcessorVisibilityArchivalTimeLimit, 200*time.Millisecond),
VisibilityProcessorEnsureCloseBeforeDelete: dc.GetBoolProperty(dynamicconfig.VisibilityProcessorEnsureCloseBeforeDelete, false),
Expand Down
12 changes: 1 addition & 11 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,18 +353,8 @@ func (e *historyEngineImpl) registerNamespaceFailoverCallback() {
e.shard.GetNamespaceRegistry().RegisterNamespaceChangeCallback(
e,
0, /* always want callback so UpdateHandoverNamespaces() can be called after shard reload */
func() {
for _, queueProcessor := range e.queueProcessors {
queueProcessor.LockTaskProcessing()
}
},
func() {},
func(prevNamespaces []*namespace.Namespace, nextNamespaces []*namespace.Namespace) {
defer func() {
for _, queueProcessor := range e.queueProcessors {
queueProcessor.UnlockTaskProcessing()
}
}()

if len(nextNamespaces) == 0 {
return
}
Expand Down
7 changes: 2 additions & 5 deletions service/history/queueFactoryBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,8 @@ type (
}

QueueFactoryBase struct {
HostScheduler queues.Scheduler
HostPriorityAssigner queues.PriorityAssigner
HostRateLimiter quotas.RateLimiter

// used by multi-cursor queue reader
HostScheduler queues.Scheduler
HostPriorityAssigner queues.PriorityAssigner
HostReaderRateLimiter quotas.RequestRateLimiter
}

Expand Down
2 changes: 0 additions & 2 deletions service/history/queues/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,5 @@ type (
Category() tasks.Category
NotifyNewTasks(tasks []tasks.Task)
FailoverNamespace(namespaceIDs map[string]struct{})
LockTaskProcessing()
UnlockTaskProcessing()
}
)
15 changes: 1 addition & 14 deletions service/history/queues/queue_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func newQueueBase(
category tasks.Category,
paginationFnProvider PaginationFnProvider,
scheduler Scheduler,
rescheduler Rescheduler,
priorityAssigner PriorityAssigner,
executor Executor,
options *Options,
Expand All @@ -150,12 +151,6 @@ func newQueueBase(
}

timeSource := shard.GetTimeSource()
rescheduler := NewRescheduler(
scheduler,
timeSource,
logger,
metricsHandler,
)

monitor := newMonitor(category.Type(), &options.MonitorOptions)
mitigator := newMitigator(monitor, logger, metricsHandler, options.MaxReaderCount)
Expand Down Expand Up @@ -286,14 +281,6 @@ func (p *queueBase) FailoverNamespace(
p.rescheduler.Reschedule(namespaceIDs)
}

func (p *queueBase) LockTaskProcessing() {
// no-op
}

func (p *queueBase) UnlockTaskProcessing() {
// no-op
}

func (p *queueBase) processNewRange() error {
var newMaxKey tasks.Key
switch categoryType := p.category.Type(); categoryType {
Expand Down
8 changes: 7 additions & 1 deletion service/history/queues/queue_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func (s *queueBaseSuite) TestNewProcessBase_NoPreviousState() {
tasks.CategoryTransfer,
nil,
s.mockScheduler,
s.mockRescheduler,
NewNoopPriorityAssigner(),
nil,
s.options,
Expand Down Expand Up @@ -224,6 +225,7 @@ func (s *queueBaseSuite) TestNewProcessBase_WithPreviousState() {
tasks.CategoryTransfer,
nil,
s.mockScheduler,
s.mockRescheduler,
NewNoopPriorityAssigner(),
nil,
s.options,
Expand Down Expand Up @@ -284,14 +286,14 @@ func (s *queueBaseSuite) TestStartStop() {
tasks.CategoryTransfer,
paginationFnProvider,
s.mockScheduler,
s.mockRescheduler,
NewNoopPriorityAssigner(),
nil,
s.options,
s.rateLimiter,
s.logger,
s.metricsHandler,
)
base.rescheduler = s.mockRescheduler // replace with mock to verify Start/Stop

s.mockRescheduler.EXPECT().Start().Times(1)
base.Start()
Expand Down Expand Up @@ -335,6 +337,7 @@ func (s *queueBaseSuite) TestProcessNewRange() {
tasks.CategoryTimer,
nil,
s.mockScheduler,
s.mockRescheduler,
NewNoopPriorityAssigner(),
nil,
s.options,
Expand Down Expand Up @@ -392,6 +395,7 @@ func (s *queueBaseSuite) TestCheckPoint_WithPendingTasks() {
tasks.CategoryTimer,
nil,
s.mockScheduler,
s.mockRescheduler,
NewNoopPriorityAssigner(),
nil,
s.options,
Expand Down Expand Up @@ -465,6 +469,7 @@ func (s *queueBaseSuite) TestCheckPoint_NoPendingTasks() {
tasks.CategoryTimer,
nil,
s.mockScheduler,
s.mockRescheduler,
NewNoopPriorityAssigner(),
nil,
s.options,
Expand Down Expand Up @@ -553,6 +558,7 @@ func (s *queueBaseSuite) TestCheckPoint_MoveSlices() {
tasks.CategoryTimer,
nil,
s.mockScheduler,
s.mockRescheduler,
NewNoopPriorityAssigner(),
nil,
s.options,
Expand Down
2 changes: 2 additions & 0 deletions service/history/queues/queue_immediate.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func NewImmediateQueue(
shard shard.Context,
category tasks.Category,
scheduler Scheduler,
rescheduler Rescheduler,
priorityAssigner PriorityAssigner,
executor Executor,
options *Options,
Expand Down Expand Up @@ -90,6 +91,7 @@ func NewImmediateQueue(
category,
paginationFnProvider,
scheduler,
rescheduler,
priorityAssigner,
executor,
options,
Expand Down
24 changes: 0 additions & 24 deletions service/history/queues/queue_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions service/history/queues/queue_scheduled.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func NewScheduledQueue(
shard shard.Context,
category tasks.Category,
scheduler Scheduler,
rescheduler Rescheduler,
priorityAssigner PriorityAssigner,
executor Executor,
options *Options,
Expand Down Expand Up @@ -111,6 +112,7 @@ func NewScheduledQueue(
category,
paginationFnProvider,
scheduler,
rescheduler,
priorityAssigner,
executor,
options,
Expand Down
37 changes: 23 additions & 14 deletions service/history/queues/queue_scheduled_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,23 +84,32 @@ func (s *scheduledQueueSuite) SetupTest() {
s.mockExecutionManager = s.mockShard.Resource.ExecutionMgr
s.mockShard.Resource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()

scheduler := NewPriorityScheduler(
PrioritySchedulerOptions{
WorkerCount: dynamicconfig.GetIntPropertyFn(10),
EnableRateLimiter: dynamicconfig.GetBoolPropertyFn(true),
},
NewSchedulerRateLimiter(
s.mockShard.GetConfig().TaskSchedulerNamespaceMaxQPS,
s.mockShard.GetConfig().TaskSchedulerMaxQPS,
s.mockShard.GetConfig().PersistenceNamespaceMaxQPS,
s.mockShard.GetConfig().PersistenceMaxQPS,
),
s.mockShard.GetTimeSource(),
log.NewTestLogger(),
)
rescheduler := NewRescheduler(
scheduler,
s.mockShard.GetTimeSource(),
log.NewTestLogger(),
metrics.NoopMetricsHandler,
)

s.scheduledQueue = NewScheduledQueue(
s.mockShard,
tasks.CategoryTimer,
NewPriorityScheduler(
PrioritySchedulerOptions{
WorkerCount: dynamicconfig.GetIntPropertyFn(10),
EnableRateLimiter: dynamicconfig.GetBoolPropertyFn(true),
},
NewSchedulerRateLimiter(
s.mockShard.GetConfig().TaskSchedulerNamespaceMaxQPS,
s.mockShard.GetConfig().TaskSchedulerMaxQPS,
s.mockShard.GetConfig().PersistenceNamespaceMaxQPS,
s.mockShard.GetConfig().PersistenceMaxQPS,
),
s.mockShard.GetTimeSource(),
log.NewTestLogger(),
),
scheduler,
rescheduler,
nil,
nil,
testQueueOptions,
Expand Down
Loading