From a22db81db62edcb1ad9925354e07dea47bff20fa Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Tue, 3 Jan 2023 11:32:36 -0800 Subject: [PATCH] Remove deprecated code from queue implementation (#3770) --- common/dynamicconfig/constants.go | 8 ---- service/history/archival_queue_factory.go | 17 ++++++--- service/history/configs/config.go | 8 ---- service/history/historyEngine.go | 12 +----- service/history/queueFactoryBase.go | 7 +--- service/history/queues/queue.go | 2 - service/history/queues/queue_base.go | 15 +------- service/history/queues/queue_base_test.go | 8 +++- service/history/queues/queue_immediate.go | 2 + service/history/queues/queue_mock.go | 24 ------------ service/history/queues/queue_scheduled.go | 2 + .../history/queues/queue_scheduled_test.go | 37 ++++++++++++------- service/history/timerQueueFactory.go | 16 +++++--- service/history/transferQueueFactory.go | 16 +++++--- service/history/visibilityQueueFactory.go | 16 +++++--- 15 files changed, 79 insertions(+), 111 deletions(-) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index a29e6a369dc..05a07009be3 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -411,8 +411,6 @@ const ( TimerProcessorMaxPollInterval = "history.timerProcessorMaxPollInterval" // TimerProcessorMaxPollIntervalJitterCoefficient is the max poll interval jitter coefficient TimerProcessorMaxPollIntervalJitterCoefficient = "history.timerProcessorMaxPollIntervalJitterCoefficient" - // TimerProcessorMaxReschedulerSize is the threshold of the number of tasks in the redispatch queue for timer processor - TimerProcessorMaxReschedulerSize = "history.timerProcessorMaxReschedulerSize" // TimerProcessorPollBackoffInterval is the poll backoff interval if task redispatcher's size exceeds limit for timer processor TimerProcessorPollBackoffInterval = "history.timerProcessorPollBackoffInterval" // TimerProcessorMaxTimeShift is the max shift timer processor can have @@ -452,8 +450,6 @@ const ( TransferProcessorUpdateAckIntervalJitterCoefficient = "history.transferProcessorUpdateAckIntervalJitterCoefficient" // TransferProcessorCompleteTransferInterval is complete timer interval for transferQueueProcessor TransferProcessorCompleteTransferInterval = "history.transferProcessorCompleteTransferInterval" - // TransferProcessorMaxReschedulerSize is the threshold of the number of tasks in the redispatch queue for transferQueueProcessor - TransferProcessorMaxReschedulerSize = "history.transferProcessorMaxReschedulerSize" // TransferProcessorPollBackoffInterval is the poll backoff interval if task redispatcher's size exceeds limit for transferQueueProcessor TransferProcessorPollBackoffInterval = "history.transferProcessorPollBackoffInterval" // TransferProcessorVisibilityArchivalTimeLimit is the upper time limit for archiving visibility records @@ -485,8 +481,6 @@ const ( VisibilityProcessorUpdateAckIntervalJitterCoefficient = "history.visibilityProcessorUpdateAckIntervalJitterCoefficient" // VisibilityProcessorCompleteTaskInterval is complete timer interval for visibilityQueueProcessor VisibilityProcessorCompleteTaskInterval = "history.visibilityProcessorCompleteTaskInterval" - // VisibilityProcessorMaxReschedulerSize is the threshold of the number of tasks in the redispatch queue for visibilityQueueProcessor - VisibilityProcessorMaxReschedulerSize = "history.visibilityProcessorMaxReschedulerSize" // VisibilityProcessorPollBackoffInterval is the poll backoff interval if task redispatcher's size exceeds limit for visibilityQueueProcessor VisibilityProcessorPollBackoffInterval = "history.visibilityProcessorPollBackoffInterval" // VisibilityProcessorVisibilityArchivalTimeLimit is the upper time limit for archiving visibility records @@ -545,8 +539,6 @@ const ( ReplicatorProcessorUpdateAckInterval = "history.replicatorProcessorUpdateAckInterval" // ReplicatorProcessorUpdateAckIntervalJitterCoefficient is the update interval jitter coefficient ReplicatorProcessorUpdateAckIntervalJitterCoefficient = "history.replicatorProcessorUpdateAckIntervalJitterCoefficient" - // ReplicatorProcessorMaxReschedulerSize is the threshold of the number of tasks in the redispatch queue for ReplicatorProcessor - ReplicatorProcessorMaxReschedulerSize = "history.replicatorProcessorMaxReschedulerSize" // ReplicatorProcessorEnablePriorityTaskProcessor indicates whether priority task processor should be used for ReplicatorProcessor ReplicatorProcessorEnablePriorityTaskProcessor = "history.replicatorProcessorEnablePriorityTaskProcessor" // MaximumBufferedEventsBatch is max number of buffer event in mutable state diff --git a/service/history/archival_queue_factory.go b/service/history/archival_queue_factory.go index 194cc526313..e2174003698 100644 --- a/service/history/archival_queue_factory.go +++ b/service/history/archival_queue_factory.go @@ -112,11 +112,6 @@ func newQueueFactoryBase(params ArchivalQueueFactoryParams, hostScheduler queues return QueueFactoryBase{ HostScheduler: hostScheduler, HostPriorityAssigner: queues.NewPriorityAssigner(), - HostRateLimiter: NewQueueHostRateLimiter( - params.Config.ArchivalProcessorMaxPollHostRPS, - params.Config.PersistenceMaxQPS, - archivalQueuePersistenceMaxRPSRatio, - ), HostReaderRateLimiter: queues.NewReaderPriorityRateLimiter( NewHostRateLimiterRateFn( params.Config.ArchivalProcessorMaxPollHostRPS, @@ -152,10 +147,20 @@ func (f *archivalQueueFactory) newArchivalTaskExecutor(shard shard.Context, work // newScheduledQueue creates a new scheduled queue for the given shard with archival-specific configurations. func (f *archivalQueueFactory) newScheduledQueue(shard shard.Context, executor queues.Executor) queues.Queue { logger := log.With(shard.GetLogger(), tag.ComponentArchivalQueue) + metricsHandler := f.MetricsHandler.WithTags(metrics.OperationTag(metrics.OperationArchivalQueueProcessorScope)) + + rescheduler := queues.NewRescheduler( + f.HostScheduler, + shard.GetTimeSource(), + logger, + metricsHandler, + ) + return queues.NewScheduledQueue( shard, tasks.CategoryArchival, f.HostScheduler, + rescheduler, f.HostPriorityAssigner, executor, &queues.Options{ @@ -179,6 +184,6 @@ func (f *archivalQueueFactory) newScheduledQueue(shard shard.Context, executor q }, f.HostReaderRateLimiter, logger, - f.MetricsHandler.WithTags(metrics.OperationTag(metrics.OperationArchivalQueueProcessorScope)), + metricsHandler, ) } diff --git a/service/history/configs/config.go b/service/history/configs/config.go index bd04e16da97..d48eb21ae8a 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -110,7 +110,6 @@ type Config struct { TimerProcessorMaxPollHostRPS dynamicconfig.IntPropertyFn TimerProcessorMaxPollInterval dynamicconfig.DurationPropertyFn TimerProcessorMaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn - TimerProcessorMaxReschedulerSize dynamicconfig.IntPropertyFn TimerProcessorPollBackoffInterval dynamicconfig.DurationPropertyFn TimerProcessorMaxTimeShift dynamicconfig.DurationPropertyFn TimerProcessorHistoryArchivalSizeLimit dynamicconfig.IntPropertyFn @@ -132,7 +131,6 @@ type Config struct { TransferProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn TransferProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn TransferProcessorCompleteTransferInterval dynamicconfig.DurationPropertyFn - TransferProcessorMaxReschedulerSize dynamicconfig.IntPropertyFn TransferProcessorPollBackoffInterval dynamicconfig.DurationPropertyFn TransferProcessorVisibilityArchivalTimeLimit dynamicconfig.DurationPropertyFn TransferProcessorEnsureCloseBeforeDelete dynamicconfig.BoolPropertyFn @@ -147,7 +145,6 @@ type Config struct { ReplicatorProcessorMaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn ReplicatorProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn ReplicatorProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn - ReplicatorProcessorMaxReschedulerSize dynamicconfig.IntPropertyFn ReplicatorProcessorEnablePriorityTaskProcessor dynamicconfig.BoolPropertyFn ReplicatorProcessorFetchTasksBatchSize dynamicconfig.IntPropertyFn ReplicatorProcessorMaxSkipTaskCount dynamicconfig.IntPropertyFn @@ -263,7 +260,6 @@ type Config struct { VisibilityProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn VisibilityProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn VisibilityProcessorCompleteTaskInterval dynamicconfig.DurationPropertyFn - VisibilityProcessorMaxReschedulerSize dynamicconfig.IntPropertyFn VisibilityProcessorPollBackoffInterval dynamicconfig.DurationPropertyFn VisibilityProcessorVisibilityArchivalTimeLimit dynamicconfig.DurationPropertyFn VisibilityProcessorEnsureCloseBeforeDelete dynamicconfig.BoolPropertyFn @@ -366,7 +362,6 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis TimerProcessorMaxPollHostRPS: dc.GetIntProperty(dynamicconfig.TimerProcessorMaxPollHostRPS, 0), TimerProcessorMaxPollInterval: dc.GetDurationProperty(dynamicconfig.TimerProcessorMaxPollInterval, 5*time.Minute), TimerProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TimerProcessorMaxPollIntervalJitterCoefficient, 0.15), - TimerProcessorMaxReschedulerSize: dc.GetIntProperty(dynamicconfig.TimerProcessorMaxReschedulerSize, 10000), TimerProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.TimerProcessorPollBackoffInterval, 5*time.Second), TimerProcessorMaxTimeShift: dc.GetDurationProperty(dynamicconfig.TimerProcessorMaxTimeShift, 1*time.Second), TimerProcessorHistoryArchivalSizeLimit: dc.GetIntProperty(dynamicconfig.TimerProcessorHistoryArchivalSizeLimit, 500*1024), @@ -386,7 +381,6 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis TransferProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.TransferProcessorUpdateAckInterval, 30*time.Second), TransferProcessorUpdateAckIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TransferProcessorUpdateAckIntervalJitterCoefficient, 0.15), TransferProcessorCompleteTransferInterval: dc.GetDurationProperty(dynamicconfig.TransferProcessorCompleteTransferInterval, 60*time.Second), - TransferProcessorMaxReschedulerSize: dc.GetIntProperty(dynamicconfig.TransferProcessorMaxReschedulerSize, 10000), TransferProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.TransferProcessorPollBackoffInterval, 5*time.Second), TransferProcessorVisibilityArchivalTimeLimit: dc.GetDurationProperty(dynamicconfig.TransferProcessorVisibilityArchivalTimeLimit, 200*time.Millisecond), TransferProcessorEnsureCloseBeforeDelete: dc.GetBoolProperty(dynamicconfig.TransferProcessorEnsureCloseBeforeDelete, true), @@ -399,7 +393,6 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis ReplicatorProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.ReplicatorProcessorMaxPollIntervalJitterCoefficient, 0.15), ReplicatorProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.ReplicatorProcessorUpdateAckInterval, 5*time.Second), ReplicatorProcessorUpdateAckIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.ReplicatorProcessorUpdateAckIntervalJitterCoefficient, 0.15), - ReplicatorProcessorMaxReschedulerSize: dc.GetIntProperty(dynamicconfig.ReplicatorProcessorMaxReschedulerSize, 10000), ReplicatorProcessorEnablePriorityTaskProcessor: dc.GetBoolProperty(dynamicconfig.ReplicatorProcessorEnablePriorityTaskProcessor, false), ReplicatorProcessorFetchTasksBatchSize: dc.GetIntProperty(dynamicconfig.ReplicatorTaskBatchSize, 25), ReplicatorProcessorMaxSkipTaskCount: dc.GetIntProperty(dynamicconfig.ReplicatorMaxSkipTaskCount, 250), @@ -485,7 +478,6 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis VisibilityProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.VisibilityProcessorUpdateAckInterval, 30*time.Second), VisibilityProcessorUpdateAckIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.VisibilityProcessorUpdateAckIntervalJitterCoefficient, 0.15), VisibilityProcessorCompleteTaskInterval: dc.GetDurationProperty(dynamicconfig.VisibilityProcessorCompleteTaskInterval, 60*time.Second), - VisibilityProcessorMaxReschedulerSize: dc.GetIntProperty(dynamicconfig.VisibilityProcessorMaxReschedulerSize, 10000), VisibilityProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.VisibilityProcessorPollBackoffInterval, 5*time.Second), VisibilityProcessorVisibilityArchivalTimeLimit: dc.GetDurationProperty(dynamicconfig.VisibilityProcessorVisibilityArchivalTimeLimit, 200*time.Millisecond), VisibilityProcessorEnsureCloseBeforeDelete: dc.GetBoolProperty(dynamicconfig.VisibilityProcessorEnsureCloseBeforeDelete, false), diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 82f72ea6019..8269f0a3e57 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -353,18 +353,8 @@ func (e *historyEngineImpl) registerNamespaceFailoverCallback() { e.shard.GetNamespaceRegistry().RegisterNamespaceChangeCallback( e, 0, /* always want callback so UpdateHandoverNamespaces() can be called after shard reload */ - func() { - for _, queueProcessor := range e.queueProcessors { - queueProcessor.LockTaskProcessing() - } - }, + func() {}, func(prevNamespaces []*namespace.Namespace, nextNamespaces []*namespace.Namespace) { - defer func() { - for _, queueProcessor := range e.queueProcessors { - queueProcessor.UnlockTaskProcessing() - } - }() - if len(nextNamespaces) == 0 { return } diff --git a/service/history/queueFactoryBase.go b/service/history/queueFactoryBase.go index 4c11d5f0509..07b51852e53 100644 --- a/service/history/queueFactoryBase.go +++ b/service/history/queueFactoryBase.go @@ -78,11 +78,8 @@ type ( } QueueFactoryBase struct { - HostScheduler queues.Scheduler - HostPriorityAssigner queues.PriorityAssigner - HostRateLimiter quotas.RateLimiter - - // used by multi-cursor queue reader + HostScheduler queues.Scheduler + HostPriorityAssigner queues.PriorityAssigner HostReaderRateLimiter quotas.RequestRateLimiter } diff --git a/service/history/queues/queue.go b/service/history/queues/queue.go index 7f09c26e499..f2a7643378a 100644 --- a/service/history/queues/queue.go +++ b/service/history/queues/queue.go @@ -37,7 +37,5 @@ type ( Category() tasks.Category NotifyNewTasks(tasks []tasks.Task) FailoverNamespace(namespaceIDs map[string]struct{}) - LockTaskProcessing() - UnlockTaskProcessing() } ) diff --git a/service/history/queues/queue_base.go b/service/history/queues/queue_base.go index 0446eb96efe..33c9a3d6734 100644 --- a/service/history/queues/queue_base.go +++ b/service/history/queues/queue_base.go @@ -125,6 +125,7 @@ func newQueueBase( category tasks.Category, paginationFnProvider PaginationFnProvider, scheduler Scheduler, + rescheduler Rescheduler, priorityAssigner PriorityAssigner, executor Executor, options *Options, @@ -150,12 +151,6 @@ func newQueueBase( } timeSource := shard.GetTimeSource() - rescheduler := NewRescheduler( - scheduler, - timeSource, - logger, - metricsHandler, - ) monitor := newMonitor(category.Type(), &options.MonitorOptions) mitigator := newMitigator(monitor, logger, metricsHandler, options.MaxReaderCount) @@ -286,14 +281,6 @@ func (p *queueBase) FailoverNamespace( p.rescheduler.Reschedule(namespaceIDs) } -func (p *queueBase) LockTaskProcessing() { - // no-op -} - -func (p *queueBase) UnlockTaskProcessing() { - // no-op -} - func (p *queueBase) processNewRange() error { var newMaxKey tasks.Key switch categoryType := p.category.Type(); categoryType { diff --git a/service/history/queues/queue_base_test.go b/service/history/queues/queue_base_test.go index 186b5967e46..b79fce644c4 100644 --- a/service/history/queues/queue_base_test.go +++ b/service/history/queues/queue_base_test.go @@ -140,6 +140,7 @@ func (s *queueBaseSuite) TestNewProcessBase_NoPreviousState() { tasks.CategoryTransfer, nil, s.mockScheduler, + s.mockRescheduler, NewNoopPriorityAssigner(), nil, s.options, @@ -224,6 +225,7 @@ func (s *queueBaseSuite) TestNewProcessBase_WithPreviousState() { tasks.CategoryTransfer, nil, s.mockScheduler, + s.mockRescheduler, NewNoopPriorityAssigner(), nil, s.options, @@ -284,6 +286,7 @@ func (s *queueBaseSuite) TestStartStop() { tasks.CategoryTransfer, paginationFnProvider, s.mockScheduler, + s.mockRescheduler, NewNoopPriorityAssigner(), nil, s.options, @@ -291,7 +294,6 @@ func (s *queueBaseSuite) TestStartStop() { s.logger, s.metricsHandler, ) - base.rescheduler = s.mockRescheduler // replace with mock to verify Start/Stop s.mockRescheduler.EXPECT().Start().Times(1) base.Start() @@ -335,6 +337,7 @@ func (s *queueBaseSuite) TestProcessNewRange() { tasks.CategoryTimer, nil, s.mockScheduler, + s.mockRescheduler, NewNoopPriorityAssigner(), nil, s.options, @@ -392,6 +395,7 @@ func (s *queueBaseSuite) TestCheckPoint_WithPendingTasks() { tasks.CategoryTimer, nil, s.mockScheduler, + s.mockRescheduler, NewNoopPriorityAssigner(), nil, s.options, @@ -465,6 +469,7 @@ func (s *queueBaseSuite) TestCheckPoint_NoPendingTasks() { tasks.CategoryTimer, nil, s.mockScheduler, + s.mockRescheduler, NewNoopPriorityAssigner(), nil, s.options, @@ -553,6 +558,7 @@ func (s *queueBaseSuite) TestCheckPoint_MoveSlices() { tasks.CategoryTimer, nil, s.mockScheduler, + s.mockRescheduler, NewNoopPriorityAssigner(), nil, s.options, diff --git a/service/history/queues/queue_immediate.go b/service/history/queues/queue_immediate.go index cb4fe8bfdf6..1f34a1190e5 100644 --- a/service/history/queues/queue_immediate.go +++ b/service/history/queues/queue_immediate.go @@ -54,6 +54,7 @@ func NewImmediateQueue( shard shard.Context, category tasks.Category, scheduler Scheduler, + rescheduler Rescheduler, priorityAssigner PriorityAssigner, executor Executor, options *Options, @@ -90,6 +91,7 @@ func NewImmediateQueue( category, paginationFnProvider, scheduler, + rescheduler, priorityAssigner, executor, options, diff --git a/service/history/queues/queue_mock.go b/service/history/queues/queue_mock.go index 7f14b311d09..54415e828fc 100644 --- a/service/history/queues/queue_mock.go +++ b/service/history/queues/queue_mock.go @@ -84,18 +84,6 @@ func (mr *MockQueueMockRecorder) FailoverNamespace(namespaceIDs interface{}) *go return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FailoverNamespace", reflect.TypeOf((*MockQueue)(nil).FailoverNamespace), namespaceIDs) } -// LockTaskProcessing mocks base method. -func (m *MockQueue) LockTaskProcessing() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "LockTaskProcessing") -} - -// LockTaskProcessing indicates an expected call of LockTaskProcessing. -func (mr *MockQueueMockRecorder) LockTaskProcessing() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LockTaskProcessing", reflect.TypeOf((*MockQueue)(nil).LockTaskProcessing)) -} - // NotifyNewTasks mocks base method. func (m *MockQueue) NotifyNewTasks(tasks []tasks.Task) { m.ctrl.T.Helper() @@ -131,15 +119,3 @@ func (mr *MockQueueMockRecorder) Stop() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockQueue)(nil).Stop)) } - -// UnlockTaskProcessing mocks base method. -func (m *MockQueue) UnlockTaskProcessing() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "UnlockTaskProcessing") -} - -// UnlockTaskProcessing indicates an expected call of UnlockTaskProcessing. -func (mr *MockQueueMockRecorder) UnlockTaskProcessing() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UnlockTaskProcessing", reflect.TypeOf((*MockQueue)(nil).UnlockTaskProcessing)) -} diff --git a/service/history/queues/queue_scheduled.go b/service/history/queues/queue_scheduled.go index 226b744d2c0..7737e21f18e 100644 --- a/service/history/queues/queue_scheduled.go +++ b/service/history/queues/queue_scheduled.go @@ -66,6 +66,7 @@ func NewScheduledQueue( shard shard.Context, category tasks.Category, scheduler Scheduler, + rescheduler Rescheduler, priorityAssigner PriorityAssigner, executor Executor, options *Options, @@ -111,6 +112,7 @@ func NewScheduledQueue( category, paginationFnProvider, scheduler, + rescheduler, priorityAssigner, executor, options, diff --git a/service/history/queues/queue_scheduled_test.go b/service/history/queues/queue_scheduled_test.go index 86d642f78b1..b3192a73917 100644 --- a/service/history/queues/queue_scheduled_test.go +++ b/service/history/queues/queue_scheduled_test.go @@ -84,23 +84,32 @@ func (s *scheduledQueueSuite) SetupTest() { s.mockExecutionManager = s.mockShard.Resource.ExecutionMgr s.mockShard.Resource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() + scheduler := NewPriorityScheduler( + PrioritySchedulerOptions{ + WorkerCount: dynamicconfig.GetIntPropertyFn(10), + EnableRateLimiter: dynamicconfig.GetBoolPropertyFn(true), + }, + NewSchedulerRateLimiter( + s.mockShard.GetConfig().TaskSchedulerNamespaceMaxQPS, + s.mockShard.GetConfig().TaskSchedulerMaxQPS, + s.mockShard.GetConfig().PersistenceNamespaceMaxQPS, + s.mockShard.GetConfig().PersistenceMaxQPS, + ), + s.mockShard.GetTimeSource(), + log.NewTestLogger(), + ) + rescheduler := NewRescheduler( + scheduler, + s.mockShard.GetTimeSource(), + log.NewTestLogger(), + metrics.NoopMetricsHandler, + ) + s.scheduledQueue = NewScheduledQueue( s.mockShard, tasks.CategoryTimer, - NewPriorityScheduler( - PrioritySchedulerOptions{ - WorkerCount: dynamicconfig.GetIntPropertyFn(10), - EnableRateLimiter: dynamicconfig.GetBoolPropertyFn(true), - }, - NewSchedulerRateLimiter( - s.mockShard.GetConfig().TaskSchedulerNamespaceMaxQPS, - s.mockShard.GetConfig().TaskSchedulerMaxQPS, - s.mockShard.GetConfig().PersistenceNamespaceMaxQPS, - s.mockShard.GetConfig().PersistenceMaxQPS, - ), - s.mockShard.GetTimeSource(), - log.NewTestLogger(), - ), + scheduler, + rescheduler, nil, nil, testQueueOptions, diff --git a/service/history/timerQueueFactory.go b/service/history/timerQueueFactory.go index 7e6616d7193..75dae6c240c 100644 --- a/service/history/timerQueueFactory.go +++ b/service/history/timerQueueFactory.go @@ -87,11 +87,6 @@ func NewTimerQueueFactory( params.Logger, ), HostPriorityAssigner: queues.NewPriorityAssigner(), - HostRateLimiter: NewQueueHostRateLimiter( - params.Config.TimerProcessorMaxPollHostRPS, - params.Config.PersistenceMaxQPS, - timerQueuePersistenceMaxRPSRatio, - ), HostReaderRateLimiter: queues.NewReaderPriorityRateLimiter( NewHostRateLimiterRateFn( params.Config.TimerProcessorMaxPollHostRPS, @@ -109,6 +104,7 @@ func (f *timerQueueFactory) CreateQueue( workflowCache wcache.Cache, ) queues.Queue { logger := log.With(shard.GetLogger(), tag.ComponentTimerQueue) + metricsHandler := f.MetricsHandler.WithTags(metrics.OperationTag(metrics.OperationTimerQueueProcessorScope)) currentClusterName := f.ClusterMetadata.GetCurrentClusterName() workflowDeleteManager := deletemanager.NewDeleteManager( @@ -119,6 +115,13 @@ func (f *timerQueueFactory) CreateQueue( shard.GetTimeSource(), ) + rescheduler := queues.NewRescheduler( + f.HostScheduler, + shard.GetTimeSource(), + logger, + metricsHandler, + ) + activeExecutor := newTimerQueueActiveTaskExecutor( shard, workflowCache, @@ -170,6 +173,7 @@ func (f *timerQueueFactory) CreateQueue( shard, tasks.CategoryTimer, f.HostScheduler, + rescheduler, f.HostPriorityAssigner, executor, &queues.Options{ @@ -193,6 +197,6 @@ func (f *timerQueueFactory) CreateQueue( }, f.HostReaderRateLimiter, logger, - f.MetricsHandler.WithTags(metrics.OperationTag(metrics.OperationTimerQueueProcessorScope)), + metricsHandler, ) } diff --git a/service/history/transferQueueFactory.go b/service/history/transferQueueFactory.go index 58dc23d9731..9c25c5dd693 100644 --- a/service/history/transferQueueFactory.go +++ b/service/history/transferQueueFactory.go @@ -89,11 +89,6 @@ func NewTransferQueueFactory( params.Logger, ), HostPriorityAssigner: queues.NewPriorityAssigner(), - HostRateLimiter: NewQueueHostRateLimiter( - params.Config.TransferProcessorMaxPollHostRPS, - params.Config.PersistenceMaxQPS, - transferQueuePersistenceMaxRPSRatio, - ), HostReaderRateLimiter: queues.NewReaderPriorityRateLimiter( NewHostRateLimiterRateFn( params.Config.TransferProcessorMaxPollHostRPS, @@ -111,6 +106,14 @@ func (f *transferQueueFactory) CreateQueue( workflowCache wcache.Cache, ) queues.Queue { logger := log.With(shard.GetLogger(), tag.ComponentTransferQueue) + metricsHandler := f.MetricsHandler.WithTags(metrics.OperationTag(metrics.OperationTransferQueueProcessorScope)) + + rescheduler := queues.NewRescheduler( + f.HostScheduler, + shard.GetTimeSource(), + logger, + metricsHandler, + ) currentClusterName := f.ClusterMetadata.GetCurrentClusterName() activeExecutor := newTransferQueueActiveTaskExecutor( @@ -160,6 +163,7 @@ func (f *transferQueueFactory) CreateQueue( shard, tasks.CategoryTransfer, f.HostScheduler, + rescheduler, f.HostPriorityAssigner, executor, &queues.Options{ @@ -183,6 +187,6 @@ func (f *transferQueueFactory) CreateQueue( }, f.HostReaderRateLimiter, logger, - f.MetricsHandler.WithTags(metrics.OperationTag(metrics.OperationTransferQueueProcessorScope)), + metricsHandler, ) } diff --git a/service/history/visibilityQueueFactory.go b/service/history/visibilityQueueFactory.go index 20488b5a207..ba3d7e07bc7 100644 --- a/service/history/visibilityQueueFactory.go +++ b/service/history/visibilityQueueFactory.go @@ -78,11 +78,6 @@ func NewVisibilityQueueFactory( params.Logger, ), HostPriorityAssigner: queues.NewPriorityAssigner(), - HostRateLimiter: NewQueueHostRateLimiter( - params.Config.VisibilityProcessorMaxPollHostRPS, - params.Config.PersistenceMaxQPS, - visibilityQueuePersistenceMaxRPSRatio, - ), HostReaderRateLimiter: queues.NewReaderPriorityRateLimiter( NewHostRateLimiterRateFn( params.Config.VisibilityProcessorMaxPollHostRPS, @@ -100,6 +95,14 @@ func (f *visibilityQueueFactory) CreateQueue( workflowCache wcache.Cache, ) queues.Queue { logger := log.With(shard.GetLogger(), tag.ComponentVisibilityQueue) + metricsHandler := f.MetricsHandler.WithTags(metrics.OperationTag(metrics.OperationVisibilityQueueProcessorScope)) + + rescheduler := queues.NewRescheduler( + f.HostScheduler, + shard.GetTimeSource(), + logger, + metricsHandler, + ) executor := newVisibilityQueueTaskExecutor( shard, @@ -115,6 +118,7 @@ func (f *visibilityQueueFactory) CreateQueue( shard, tasks.CategoryVisibility, f.HostScheduler, + rescheduler, f.HostPriorityAssigner, executor, &queues.Options{ @@ -138,6 +142,6 @@ func (f *visibilityQueueFactory) CreateQueue( }, f.HostReaderRateLimiter, logger, - f.MetricsHandler.WithTags(metrics.OperationTag(metrics.OperationVisibilityQueueProcessorScope)), + metricsHandler, ) }