Skip to content

Commit

Permalink
Remove deprecated code from queue implementation (#3770)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Jan 3, 2023
1 parent adf7c54 commit a22db81
Show file tree
Hide file tree
Showing 15 changed files with 79 additions and 111 deletions.
8 changes: 0 additions & 8 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,6 @@ const (
TimerProcessorMaxPollInterval = "history.timerProcessorMaxPollInterval"
// TimerProcessorMaxPollIntervalJitterCoefficient is the max poll interval jitter coefficient
TimerProcessorMaxPollIntervalJitterCoefficient = "history.timerProcessorMaxPollIntervalJitterCoefficient"
// TimerProcessorMaxReschedulerSize is the threshold of the number of tasks in the redispatch queue for timer processor
TimerProcessorMaxReschedulerSize = "history.timerProcessorMaxReschedulerSize"
// TimerProcessorPollBackoffInterval is the poll backoff interval if task redispatcher's size exceeds limit for timer processor
TimerProcessorPollBackoffInterval = "history.timerProcessorPollBackoffInterval"
// TimerProcessorMaxTimeShift is the max shift timer processor can have
Expand Down Expand Up @@ -452,8 +450,6 @@ const (
TransferProcessorUpdateAckIntervalJitterCoefficient = "history.transferProcessorUpdateAckIntervalJitterCoefficient"
// TransferProcessorCompleteTransferInterval is complete timer interval for transferQueueProcessor
TransferProcessorCompleteTransferInterval = "history.transferProcessorCompleteTransferInterval"
// TransferProcessorMaxReschedulerSize is the threshold of the number of tasks in the redispatch queue for transferQueueProcessor
TransferProcessorMaxReschedulerSize = "history.transferProcessorMaxReschedulerSize"
// TransferProcessorPollBackoffInterval is the poll backoff interval if task redispatcher's size exceeds limit for transferQueueProcessor
TransferProcessorPollBackoffInterval = "history.transferProcessorPollBackoffInterval"
// TransferProcessorVisibilityArchivalTimeLimit is the upper time limit for archiving visibility records
Expand Down Expand Up @@ -485,8 +481,6 @@ const (
VisibilityProcessorUpdateAckIntervalJitterCoefficient = "history.visibilityProcessorUpdateAckIntervalJitterCoefficient"
// VisibilityProcessorCompleteTaskInterval is complete timer interval for visibilityQueueProcessor
VisibilityProcessorCompleteTaskInterval = "history.visibilityProcessorCompleteTaskInterval"
// VisibilityProcessorMaxReschedulerSize is the threshold of the number of tasks in the redispatch queue for visibilityQueueProcessor
VisibilityProcessorMaxReschedulerSize = "history.visibilityProcessorMaxReschedulerSize"
// VisibilityProcessorPollBackoffInterval is the poll backoff interval if task redispatcher's size exceeds limit for visibilityQueueProcessor
VisibilityProcessorPollBackoffInterval = "history.visibilityProcessorPollBackoffInterval"
// VisibilityProcessorVisibilityArchivalTimeLimit is the upper time limit for archiving visibility records
Expand Down Expand Up @@ -545,8 +539,6 @@ const (
ReplicatorProcessorUpdateAckInterval = "history.replicatorProcessorUpdateAckInterval"
// ReplicatorProcessorUpdateAckIntervalJitterCoefficient is the update interval jitter coefficient
ReplicatorProcessorUpdateAckIntervalJitterCoefficient = "history.replicatorProcessorUpdateAckIntervalJitterCoefficient"
// ReplicatorProcessorMaxReschedulerSize is the threshold of the number of tasks in the redispatch queue for ReplicatorProcessor
ReplicatorProcessorMaxReschedulerSize = "history.replicatorProcessorMaxReschedulerSize"
// ReplicatorProcessorEnablePriorityTaskProcessor indicates whether priority task processor should be used for ReplicatorProcessor
ReplicatorProcessorEnablePriorityTaskProcessor = "history.replicatorProcessorEnablePriorityTaskProcessor"
// MaximumBufferedEventsBatch is max number of buffer event in mutable state
Expand Down
17 changes: 11 additions & 6 deletions service/history/archival_queue_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,6 @@ func newQueueFactoryBase(params ArchivalQueueFactoryParams, hostScheduler queues
return QueueFactoryBase{
HostScheduler: hostScheduler,
HostPriorityAssigner: queues.NewPriorityAssigner(),
HostRateLimiter: NewQueueHostRateLimiter(
params.Config.ArchivalProcessorMaxPollHostRPS,
params.Config.PersistenceMaxQPS,
archivalQueuePersistenceMaxRPSRatio,
),
HostReaderRateLimiter: queues.NewReaderPriorityRateLimiter(
NewHostRateLimiterRateFn(
params.Config.ArchivalProcessorMaxPollHostRPS,
Expand Down Expand Up @@ -152,10 +147,20 @@ func (f *archivalQueueFactory) newArchivalTaskExecutor(shard shard.Context, work
// newScheduledQueue creates a new scheduled queue for the given shard with archival-specific configurations.
func (f *archivalQueueFactory) newScheduledQueue(shard shard.Context, executor queues.Executor) queues.Queue {
logger := log.With(shard.GetLogger(), tag.ComponentArchivalQueue)
metricsHandler := f.MetricsHandler.WithTags(metrics.OperationTag(metrics.OperationArchivalQueueProcessorScope))

rescheduler := queues.NewRescheduler(
f.HostScheduler,
shard.GetTimeSource(),
logger,
metricsHandler,
)

return queues.NewScheduledQueue(
shard,
tasks.CategoryArchival,
f.HostScheduler,
rescheduler,
f.HostPriorityAssigner,
executor,
&queues.Options{
Expand All @@ -179,6 +184,6 @@ func (f *archivalQueueFactory) newScheduledQueue(shard shard.Context, executor q
},
f.HostReaderRateLimiter,
logger,
f.MetricsHandler.WithTags(metrics.OperationTag(metrics.OperationArchivalQueueProcessorScope)),
metricsHandler,
)
}
8 changes: 0 additions & 8 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ type Config struct {
TimerProcessorMaxPollHostRPS dynamicconfig.IntPropertyFn
TimerProcessorMaxPollInterval dynamicconfig.DurationPropertyFn
TimerProcessorMaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
TimerProcessorMaxReschedulerSize dynamicconfig.IntPropertyFn
TimerProcessorPollBackoffInterval dynamicconfig.DurationPropertyFn
TimerProcessorMaxTimeShift dynamicconfig.DurationPropertyFn
TimerProcessorHistoryArchivalSizeLimit dynamicconfig.IntPropertyFn
Expand All @@ -132,7 +131,6 @@ type Config struct {
TransferProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn
TransferProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
TransferProcessorCompleteTransferInterval dynamicconfig.DurationPropertyFn
TransferProcessorMaxReschedulerSize dynamicconfig.IntPropertyFn
TransferProcessorPollBackoffInterval dynamicconfig.DurationPropertyFn
TransferProcessorVisibilityArchivalTimeLimit dynamicconfig.DurationPropertyFn
TransferProcessorEnsureCloseBeforeDelete dynamicconfig.BoolPropertyFn
Expand All @@ -147,7 +145,6 @@ type Config struct {
ReplicatorProcessorMaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
ReplicatorProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn
ReplicatorProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
ReplicatorProcessorMaxReschedulerSize dynamicconfig.IntPropertyFn
ReplicatorProcessorEnablePriorityTaskProcessor dynamicconfig.BoolPropertyFn
ReplicatorProcessorFetchTasksBatchSize dynamicconfig.IntPropertyFn
ReplicatorProcessorMaxSkipTaskCount dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -263,7 +260,6 @@ type Config struct {
VisibilityProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn
VisibilityProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
VisibilityProcessorCompleteTaskInterval dynamicconfig.DurationPropertyFn
VisibilityProcessorMaxReschedulerSize dynamicconfig.IntPropertyFn
VisibilityProcessorPollBackoffInterval dynamicconfig.DurationPropertyFn
VisibilityProcessorVisibilityArchivalTimeLimit dynamicconfig.DurationPropertyFn
VisibilityProcessorEnsureCloseBeforeDelete dynamicconfig.BoolPropertyFn
Expand Down Expand Up @@ -366,7 +362,6 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
TimerProcessorMaxPollHostRPS: dc.GetIntProperty(dynamicconfig.TimerProcessorMaxPollHostRPS, 0),
TimerProcessorMaxPollInterval: dc.GetDurationProperty(dynamicconfig.TimerProcessorMaxPollInterval, 5*time.Minute),
TimerProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TimerProcessorMaxPollIntervalJitterCoefficient, 0.15),
TimerProcessorMaxReschedulerSize: dc.GetIntProperty(dynamicconfig.TimerProcessorMaxReschedulerSize, 10000),
TimerProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.TimerProcessorPollBackoffInterval, 5*time.Second),
TimerProcessorMaxTimeShift: dc.GetDurationProperty(dynamicconfig.TimerProcessorMaxTimeShift, 1*time.Second),
TimerProcessorHistoryArchivalSizeLimit: dc.GetIntProperty(dynamicconfig.TimerProcessorHistoryArchivalSizeLimit, 500*1024),
Expand All @@ -386,7 +381,6 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
TransferProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.TransferProcessorUpdateAckInterval, 30*time.Second),
TransferProcessorUpdateAckIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TransferProcessorUpdateAckIntervalJitterCoefficient, 0.15),
TransferProcessorCompleteTransferInterval: dc.GetDurationProperty(dynamicconfig.TransferProcessorCompleteTransferInterval, 60*time.Second),
TransferProcessorMaxReschedulerSize: dc.GetIntProperty(dynamicconfig.TransferProcessorMaxReschedulerSize, 10000),
TransferProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.TransferProcessorPollBackoffInterval, 5*time.Second),
TransferProcessorVisibilityArchivalTimeLimit: dc.GetDurationProperty(dynamicconfig.TransferProcessorVisibilityArchivalTimeLimit, 200*time.Millisecond),
TransferProcessorEnsureCloseBeforeDelete: dc.GetBoolProperty(dynamicconfig.TransferProcessorEnsureCloseBeforeDelete, true),
Expand All @@ -399,7 +393,6 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
ReplicatorProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.ReplicatorProcessorMaxPollIntervalJitterCoefficient, 0.15),
ReplicatorProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.ReplicatorProcessorUpdateAckInterval, 5*time.Second),
ReplicatorProcessorUpdateAckIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.ReplicatorProcessorUpdateAckIntervalJitterCoefficient, 0.15),
ReplicatorProcessorMaxReschedulerSize: dc.GetIntProperty(dynamicconfig.ReplicatorProcessorMaxReschedulerSize, 10000),
ReplicatorProcessorEnablePriorityTaskProcessor: dc.GetBoolProperty(dynamicconfig.ReplicatorProcessorEnablePriorityTaskProcessor, false),
ReplicatorProcessorFetchTasksBatchSize: dc.GetIntProperty(dynamicconfig.ReplicatorTaskBatchSize, 25),
ReplicatorProcessorMaxSkipTaskCount: dc.GetIntProperty(dynamicconfig.ReplicatorMaxSkipTaskCount, 250),
Expand Down Expand Up @@ -485,7 +478,6 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
VisibilityProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.VisibilityProcessorUpdateAckInterval, 30*time.Second),
VisibilityProcessorUpdateAckIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.VisibilityProcessorUpdateAckIntervalJitterCoefficient, 0.15),
VisibilityProcessorCompleteTaskInterval: dc.GetDurationProperty(dynamicconfig.VisibilityProcessorCompleteTaskInterval, 60*time.Second),
VisibilityProcessorMaxReschedulerSize: dc.GetIntProperty(dynamicconfig.VisibilityProcessorMaxReschedulerSize, 10000),
VisibilityProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.VisibilityProcessorPollBackoffInterval, 5*time.Second),
VisibilityProcessorVisibilityArchivalTimeLimit: dc.GetDurationProperty(dynamicconfig.VisibilityProcessorVisibilityArchivalTimeLimit, 200*time.Millisecond),
VisibilityProcessorEnsureCloseBeforeDelete: dc.GetBoolProperty(dynamicconfig.VisibilityProcessorEnsureCloseBeforeDelete, false),
Expand Down
12 changes: 1 addition & 11 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,18 +353,8 @@ func (e *historyEngineImpl) registerNamespaceFailoverCallback() {
e.shard.GetNamespaceRegistry().RegisterNamespaceChangeCallback(
e,
0, /* always want callback so UpdateHandoverNamespaces() can be called after shard reload */
func() {
for _, queueProcessor := range e.queueProcessors {
queueProcessor.LockTaskProcessing()
}
},
func() {},
func(prevNamespaces []*namespace.Namespace, nextNamespaces []*namespace.Namespace) {
defer func() {
for _, queueProcessor := range e.queueProcessors {
queueProcessor.UnlockTaskProcessing()
}
}()

if len(nextNamespaces) == 0 {
return
}
Expand Down
7 changes: 2 additions & 5 deletions service/history/queueFactoryBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,8 @@ type (
}

QueueFactoryBase struct {
HostScheduler queues.Scheduler
HostPriorityAssigner queues.PriorityAssigner
HostRateLimiter quotas.RateLimiter

// used by multi-cursor queue reader
HostScheduler queues.Scheduler
HostPriorityAssigner queues.PriorityAssigner
HostReaderRateLimiter quotas.RequestRateLimiter
}

Expand Down
2 changes: 0 additions & 2 deletions service/history/queues/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,5 @@ type (
Category() tasks.Category
NotifyNewTasks(tasks []tasks.Task)
FailoverNamespace(namespaceIDs map[string]struct{})
LockTaskProcessing()
UnlockTaskProcessing()
}
)
15 changes: 1 addition & 14 deletions service/history/queues/queue_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func newQueueBase(
category tasks.Category,
paginationFnProvider PaginationFnProvider,
scheduler Scheduler,
rescheduler Rescheduler,
priorityAssigner PriorityAssigner,
executor Executor,
options *Options,
Expand All @@ -150,12 +151,6 @@ func newQueueBase(
}

timeSource := shard.GetTimeSource()
rescheduler := NewRescheduler(
scheduler,
timeSource,
logger,
metricsHandler,
)

monitor := newMonitor(category.Type(), &options.MonitorOptions)
mitigator := newMitigator(monitor, logger, metricsHandler, options.MaxReaderCount)
Expand Down Expand Up @@ -286,14 +281,6 @@ func (p *queueBase) FailoverNamespace(
p.rescheduler.Reschedule(namespaceIDs)
}

func (p *queueBase) LockTaskProcessing() {
// no-op
}

func (p *queueBase) UnlockTaskProcessing() {
// no-op
}

func (p *queueBase) processNewRange() error {
var newMaxKey tasks.Key
switch categoryType := p.category.Type(); categoryType {
Expand Down
8 changes: 7 additions & 1 deletion service/history/queues/queue_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func (s *queueBaseSuite) TestNewProcessBase_NoPreviousState() {
tasks.CategoryTransfer,
nil,
s.mockScheduler,
s.mockRescheduler,
NewNoopPriorityAssigner(),
nil,
s.options,
Expand Down Expand Up @@ -224,6 +225,7 @@ func (s *queueBaseSuite) TestNewProcessBase_WithPreviousState() {
tasks.CategoryTransfer,
nil,
s.mockScheduler,
s.mockRescheduler,
NewNoopPriorityAssigner(),
nil,
s.options,
Expand Down Expand Up @@ -284,14 +286,14 @@ func (s *queueBaseSuite) TestStartStop() {
tasks.CategoryTransfer,
paginationFnProvider,
s.mockScheduler,
s.mockRescheduler,
NewNoopPriorityAssigner(),
nil,
s.options,
s.rateLimiter,
s.logger,
s.metricsHandler,
)
base.rescheduler = s.mockRescheduler // replace with mock to verify Start/Stop

s.mockRescheduler.EXPECT().Start().Times(1)
base.Start()
Expand Down Expand Up @@ -335,6 +337,7 @@ func (s *queueBaseSuite) TestProcessNewRange() {
tasks.CategoryTimer,
nil,
s.mockScheduler,
s.mockRescheduler,
NewNoopPriorityAssigner(),
nil,
s.options,
Expand Down Expand Up @@ -392,6 +395,7 @@ func (s *queueBaseSuite) TestCheckPoint_WithPendingTasks() {
tasks.CategoryTimer,
nil,
s.mockScheduler,
s.mockRescheduler,
NewNoopPriorityAssigner(),
nil,
s.options,
Expand Down Expand Up @@ -465,6 +469,7 @@ func (s *queueBaseSuite) TestCheckPoint_NoPendingTasks() {
tasks.CategoryTimer,
nil,
s.mockScheduler,
s.mockRescheduler,
NewNoopPriorityAssigner(),
nil,
s.options,
Expand Down Expand Up @@ -553,6 +558,7 @@ func (s *queueBaseSuite) TestCheckPoint_MoveSlices() {
tasks.CategoryTimer,
nil,
s.mockScheduler,
s.mockRescheduler,
NewNoopPriorityAssigner(),
nil,
s.options,
Expand Down
2 changes: 2 additions & 0 deletions service/history/queues/queue_immediate.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func NewImmediateQueue(
shard shard.Context,
category tasks.Category,
scheduler Scheduler,
rescheduler Rescheduler,
priorityAssigner PriorityAssigner,
executor Executor,
options *Options,
Expand Down Expand Up @@ -90,6 +91,7 @@ func NewImmediateQueue(
category,
paginationFnProvider,
scheduler,
rescheduler,
priorityAssigner,
executor,
options,
Expand Down
24 changes: 0 additions & 24 deletions service/history/queues/queue_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions service/history/queues/queue_scheduled.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func NewScheduledQueue(
shard shard.Context,
category tasks.Category,
scheduler Scheduler,
rescheduler Rescheduler,
priorityAssigner PriorityAssigner,
executor Executor,
options *Options,
Expand Down Expand Up @@ -111,6 +112,7 @@ func NewScheduledQueue(
category,
paginationFnProvider,
scheduler,
rescheduler,
priorityAssigner,
executor,
options,
Expand Down
37 changes: 23 additions & 14 deletions service/history/queues/queue_scheduled_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,23 +84,32 @@ func (s *scheduledQueueSuite) SetupTest() {
s.mockExecutionManager = s.mockShard.Resource.ExecutionMgr
s.mockShard.Resource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()

scheduler := NewPriorityScheduler(
PrioritySchedulerOptions{
WorkerCount: dynamicconfig.GetIntPropertyFn(10),
EnableRateLimiter: dynamicconfig.GetBoolPropertyFn(true),
},
NewSchedulerRateLimiter(
s.mockShard.GetConfig().TaskSchedulerNamespaceMaxQPS,
s.mockShard.GetConfig().TaskSchedulerMaxQPS,
s.mockShard.GetConfig().PersistenceNamespaceMaxQPS,
s.mockShard.GetConfig().PersistenceMaxQPS,
),
s.mockShard.GetTimeSource(),
log.NewTestLogger(),
)
rescheduler := NewRescheduler(
scheduler,
s.mockShard.GetTimeSource(),
log.NewTestLogger(),
metrics.NoopMetricsHandler,
)

s.scheduledQueue = NewScheduledQueue(
s.mockShard,
tasks.CategoryTimer,
NewPriorityScheduler(
PrioritySchedulerOptions{
WorkerCount: dynamicconfig.GetIntPropertyFn(10),
EnableRateLimiter: dynamicconfig.GetBoolPropertyFn(true),
},
NewSchedulerRateLimiter(
s.mockShard.GetConfig().TaskSchedulerNamespaceMaxQPS,
s.mockShard.GetConfig().TaskSchedulerMaxQPS,
s.mockShard.GetConfig().PersistenceNamespaceMaxQPS,
s.mockShard.GetConfig().PersistenceMaxQPS,
),
s.mockShard.GetTimeSource(),
log.NewTestLogger(),
),
scheduler,
rescheduler,
nil,
nil,
testQueueOptions,
Expand Down
Loading

0 comments on commit a22db81

Please sign in to comment.