From 32212eabdd004777e094c5163df3c66a49721cce Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Fri, 18 Nov 2022 16:33:17 -0800 Subject: [PATCH] Task scheduler rate limiter (#3606) --- common/dynamicconfig/constants.go | 10 + common/persistence/client/quotas.go | 10 +- .../quotas/noop_request_rate_limiter_impl.go | 58 +++++ common/quotas/noop_reservation_impl.go | 7 +- common/quotas/priority_rate_limiter_impl.go | 18 +- .../quotas/priority_rate_limiter_impl_test.go | 6 +- .../request_rate_limiter_adapter_impl.go | 67 ++++++ common/quotas/routing_rate_limiter_impl.go | 2 +- common/tasks/benchmark_test.go | 19 +- .../tasks/interleaved_weighted_round_robin.go | 220 +++++++++++++++--- .../interleaved_weighted_round_robin_test.go | 72 ++++-- common/tasks/priority.go | 15 +- service/frontend/configs/quotas.go | 12 +- service/history/configs/config.go | 8 + service/history/configs/quotas.go | 4 +- service/history/queueFactoryBase.go | 25 +- service/history/queues/priority_assigner.go | 12 +- .../history/queues/priority_assigner_test.go | 14 ++ .../history/queues/queue_scheduled_test.go | 15 +- service/history/queues/reader_quotas.go | 4 +- service/history/queues/scheduler.go | 81 +++++-- service/history/queues/scheduler_quotas.go | 112 +++++++++ service/history/timerQueueActiveProcessor.go | 6 +- .../timerQueueActiveTaskExecutor_test.go | 5 + service/history/timerQueueFactory.go | 3 + service/history/timerQueueProcessor.go | 6 + service/history/timerQueueProcessorBase.go | 11 +- service/history/timerQueueStandbyProcessor.go | 3 +- .../history/transferQueueActiveProcessor.go | 6 +- service/history/transferQueueFactory.go | 3 + service/history/transferQueueProcessor.go | 42 ++-- service/history/transferQueueProcessorBase.go | 11 +- .../history/transferQueueStandbyProcessor.go | 3 +- service/history/visibilityQueueFactory.go | 3 + service/history/visibilityQueueProcessor.go | 14 +- service/matching/configs/quotas.go | 4 +- 36 files changed, 747 insertions(+), 164 deletions(-) create mode 100644 common/quotas/noop_request_rate_limiter_impl.go create mode 100644 common/quotas/request_rate_limiter_adapter_impl.go create mode 100644 service/history/queues/scheduler_quotas.go diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 8a4b1044a95..01a266d93e9 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -322,6 +322,7 @@ const ( // HistoryPersistenceGlobalMaxQPS is the max qps history cluster can query DB HistoryPersistenceGlobalMaxQPS = "history.persistenceGlobalMaxQPS" // HistoryPersistenceNamespaceMaxQPS is the max qps each namespace on history host can query DB + // If value less or equal to 0, will fall back to HistoryPersistenceMaxQPS HistoryPersistenceNamespaceMaxQPS = "history.persistenceNamespaceMaxQPS" // HistoryEnablePersistencePriorityRateLimiting indicates if priority rate limiting is enabled in history persistence client HistoryEnablePersistencePriorityRateLimiting = "history.enablePersistencePriorityRateLimiting" @@ -372,6 +373,15 @@ const ( // QueueMaxReaderCount is the max number of readers in one multi-cursor queue QueueMaxReaderCount = "history.queueMaxReaderCount" + // TaskSchedulerEnableRateLimiter indicates if rate limiter should be enabled in task scheduler + TaskSchedulerEnableRateLimiter = "history.taskSchedulerEnableRateLimiter" + // TaskSchedulerMaxQPS is the max qps task schedulers on a host can schedule tasks + // If value less or equal to 0, will fall back to HistoryPersistenceMaxQPS + TaskSchedulerMaxQPS = "history.taskSchedulerMaxQPS" + // TaskSchedulerNamespaceMaxQPS is the max qps task schedulers on a host can schedule tasks for a certain namespace + // If value less or equal to 0, will fall back to HistoryPersistenceNamespaceMaxQPS + TaskSchedulerNamespaceMaxQPS = "history.taskSchedulerNamespaceMaxQPS" + // TimerTaskBatchSize is batch size for timer processor to process tasks TimerTaskBatchSize = "history.timerTaskBatchSize" // TimerTaskWorkerCount is number of task workers for timer processor diff --git a/common/persistence/client/quotas.go b/common/persistence/client/quotas.go index 01b04860666..ef8875c457d 100644 --- a/common/persistence/client/quotas.go +++ b/common/persistence/client/quotas.go @@ -112,9 +112,9 @@ func newPriorityRateLimiter( rateFn quotas.RateFn, requestPriorityFn quotas.RequestPriorityFn, ) quotas.RequestRateLimiter { - rateLimiters := make(map[int]quotas.RateLimiter) + rateLimiters := make(map[int]quotas.RequestRateLimiter) for priority := range RequestPrioritiesOrdered { - rateLimiters[priority] = quotas.NewDefaultOutgoingRateLimiter(rateFn) + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(rateFn)) } return quotas.NewPriorityRateLimiter( @@ -130,10 +130,10 @@ func NewNoopPriorityRateLimiter( return quotas.NewPriorityRateLimiter( func(_ quotas.Request) int { return priority }, - map[int]quotas.RateLimiter{ - priority: quotas.NewDefaultOutgoingRateLimiter( + map[int]quotas.RequestRateLimiter{ + priority: quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter( func() float64 { return float64(maxQPS()) }, - ), + )), }, ) } diff --git a/common/quotas/noop_request_rate_limiter_impl.go b/common/quotas/noop_request_rate_limiter_impl.go new file mode 100644 index 00000000000..93267b7678f --- /dev/null +++ b/common/quotas/noop_request_rate_limiter_impl.go @@ -0,0 +1,58 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package quotas + +import ( + "context" + "time" +) + +type ( + // NoopRequestRateLimiterImpl is a no-op implementation for RequestRateLimiter + NoopRequestRateLimiterImpl struct{} +) + +var NoopRequestRateLimiter RequestRateLimiter = &NoopRequestRateLimiterImpl{} + +func (r *NoopRequestRateLimiterImpl) Allow( + _ time.Time, + _ Request, +) bool { + return true +} + +func (r *NoopRequestRateLimiterImpl) Reserve( + _ time.Time, + _ Request, +) Reservation { + return NoopReservation +} + +func (r *NoopRequestRateLimiterImpl) Wait( + _ context.Context, + _ Request, +) error { + return nil +} diff --git a/common/quotas/noop_reservation_impl.go b/common/quotas/noop_reservation_impl.go index 281ebc26387..6220d2f95e9 100644 --- a/common/quotas/noop_reservation_impl.go +++ b/common/quotas/noop_reservation_impl.go @@ -29,15 +29,12 @@ import ( ) type ( - NoopReservationImpl struct { - } + NoopReservationImpl struct{} ) var _ Reservation = (*NoopReservationImpl)(nil) -func NewNoopReservation() *NoopReservationImpl { - return &NoopReservationImpl{} -} +var NoopReservation Reservation = &NoopReservationImpl{} // OK returns whether the limiter can provide the requested number of tokens func (r *NoopReservationImpl) OK() bool { diff --git a/common/quotas/priority_rate_limiter_impl.go b/common/quotas/priority_rate_limiter_impl.go index 291479b5bc6..4e80cdce459 100644 --- a/common/quotas/priority_rate_limiter_impl.go +++ b/common/quotas/priority_rate_limiter_impl.go @@ -35,12 +35,12 @@ type ( // PriorityRateLimiterImpl is a wrapper around the golang rate limiter PriorityRateLimiterImpl struct { requestPriorityFn RequestPriorityFn - priorityToRateLimiters map[int]RateLimiter + priorityToRateLimiters map[int]RequestRateLimiter // priority value 0 means highest priority // sorted rate limiter from low priority value to high priority value priorityToIndex map[int]int - rateLimiters []RateLimiter + rateLimiters []RequestRateLimiter } ) @@ -50,7 +50,7 @@ var _ RequestRateLimiter = (*PriorityRateLimiterImpl)(nil) // configuration updates func NewPriorityRateLimiter( requestPriorityFn RequestPriorityFn, - priorityToRateLimiters map[int]RateLimiter, + priorityToRateLimiters map[int]RequestRateLimiter, ) *PriorityRateLimiterImpl { priorities := make([]int, 0, len(priorityToRateLimiters)) for priority := range priorityToRateLimiters { @@ -60,7 +60,7 @@ func NewPriorityRateLimiter( return priorities[i] < priorities[j] }) priorityToIndex := make(map[int]int, len(priorityToRateLimiters)) - rateLimiters := make([]RateLimiter, 0, len(priorityToRateLimiters)) + rateLimiters := make([]RequestRateLimiter, 0, len(priorityToRateLimiters)) for index, priority := range priorities { priorityToIndex[priority] = index rateLimiters = append(rateLimiters, priorityToRateLimiters[priority]) @@ -81,13 +81,13 @@ func (p *PriorityRateLimiterImpl) Allow( ) bool { decidingRateLimiter, consumeRateLimiters := p.getRateLimiters(request) - allow := decidingRateLimiter.AllowN(now, request.Token) + allow := decidingRateLimiter.Allow(now, request) if !allow { return false } for _, limiter := range consumeRateLimiters { - _ = limiter.ReserveN(now, request.Token) + _ = limiter.Reserve(now, request) } return allow } @@ -98,14 +98,14 @@ func (p *PriorityRateLimiterImpl) Reserve( ) Reservation { decidingRateLimiter, consumeRateLimiters := p.getRateLimiters(request) - decidingReservation := decidingRateLimiter.ReserveN(now, request.Token) + decidingReservation := decidingRateLimiter.Reserve(now, request) if !decidingReservation.OK() { return decidingReservation } otherReservations := make([]Reservation, len(consumeRateLimiters)) for index, limiter := range consumeRateLimiters { - otherReservations[index] = limiter.ReserveN(now, request.Token) + otherReservations[index] = limiter.Reserve(now, request) } return NewPriorityReservation(decidingReservation, otherReservations) } @@ -153,7 +153,7 @@ func (p *PriorityRateLimiterImpl) Wait( func (p *PriorityRateLimiterImpl) getRateLimiters( request Request, -) (RateLimiter, []RateLimiter) { +) (RequestRateLimiter, []RequestRateLimiter) { priority := p.requestPriorityFn(request) if _, ok := p.priorityToRateLimiters[priority]; !ok { panic("Request to priority & priority to rate limiter does not match") diff --git a/common/quotas/priority_rate_limiter_impl_test.go b/common/quotas/priority_rate_limiter_impl_test.go index cb57c3022e7..e10943d057a 100644 --- a/common/quotas/priority_rate_limiter_impl_test.go +++ b/common/quotas/priority_rate_limiter_impl_test.go @@ -79,9 +79,9 @@ func (s *priorityStageRateLimiterSuite) SetupTest() { s.highPriorityAPIName: 0, s.lowPriorityAPIName: 2, } - priorityToRateLimiters := map[int]RateLimiter{ - 0: s.highPriorityRateLimiter, - 2: s.lowPriorityRateLimiter, + priorityToRateLimiters := map[int]RequestRateLimiter{ + 0: NewRequestRateLimiterAdapter(s.highPriorityRateLimiter), + 2: NewRequestRateLimiterAdapter(s.lowPriorityRateLimiter), } s.rateLimiter = NewPriorityRateLimiter(func(req Request) int { return apiToPriority[req.API] diff --git a/common/quotas/request_rate_limiter_adapter_impl.go b/common/quotas/request_rate_limiter_adapter_impl.go new file mode 100644 index 00000000000..0c417d25960 --- /dev/null +++ b/common/quotas/request_rate_limiter_adapter_impl.go @@ -0,0 +1,67 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package quotas + +import ( + "context" + "time" +) + +type ( + RequestRateLimiterAdapterImpl struct { + rateLimiter RateLimiter + } +) + +var _ RequestRateLimiter = (*RequestRateLimiterAdapterImpl)(nil) + +func NewRequestRateLimiterAdapter( + rateLimiter RateLimiter, +) RequestRateLimiter { + return &RequestRateLimiterAdapterImpl{ + rateLimiter: rateLimiter, + } +} + +func (r *RequestRateLimiterAdapterImpl) Allow( + now time.Time, + request Request, +) bool { + return r.rateLimiter.AllowN(now, request.Token) +} + +func (r *RequestRateLimiterAdapterImpl) Reserve( + now time.Time, + request Request, +) Reservation { + return r.rateLimiter.ReserveN(now, request.Token) +} + +func (r *RequestRateLimiterAdapterImpl) Wait( + ctx context.Context, + request Request, +) error { + return r.rateLimiter.WaitN(ctx, request.Token) +} diff --git a/common/quotas/routing_rate_limiter_impl.go b/common/quotas/routing_rate_limiter_impl.go index d254f4f93de..05c7ee27315 100644 --- a/common/quotas/routing_rate_limiter_impl.go +++ b/common/quotas/routing_rate_limiter_impl.go @@ -68,7 +68,7 @@ func (r *RoutingRateLimiterImpl) Reserve( ) Reservation { rateLimiter, ok := r.apiToRateLimiter[request.API] if !ok { - return NewNoopReservation() + return NoopReservation } return rateLimiter.Reserve(now, request) } diff --git a/common/tasks/benchmark_test.go b/common/tasks/benchmark_test.go index fcdbaa23261..ad767ded1f0 100644 --- a/common/tasks/benchmark_test.go +++ b/common/tasks/benchmark_test.go @@ -30,7 +30,10 @@ import ( "testing" "go.temporal.io/server/common/backoff" + "go.temporal.io/server/common/clock" + "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" + "go.temporal.io/server/common/quotas" ) type ( @@ -59,10 +62,14 @@ func BenchmarkInterleavedWeightedRoundRobinScheduler_Sequential(b *testing.B) { scheduler := NewInterleavedWeightedRoundRobinScheduler( InterleavedWeightedRoundRobinSchedulerOptions[*noopTask, int]{ - TaskChannelKeyFn: func(nt *noopTask) int { return rand.Intn(4) }, - ChannelWeightFn: func(key int) int { return channelKeyToWeight[key] }, + TaskChannelKeyFn: func(nt *noopTask) int { return rand.Intn(4) }, + ChannelWeightFn: func(key int) int { return channelKeyToWeight[key] }, + ChannelQuotaRequestFn: func(key int) quotas.Request { return quotas.NewRequest("", 1, "", "", "") }, + EnableRateLimiter: dynamicconfig.GetBoolPropertyFn(true), }, Scheduler[*noopTask](&noopScheduler{}), + quotas.NoopRequestRateLimiter, + clock.NewRealTimeSource(), logger, ) scheduler.Start() @@ -85,10 +92,14 @@ func BenchmarkInterleavedWeightedRoundRobinScheduler_Parallel(b *testing.B) { scheduler := NewInterleavedWeightedRoundRobinScheduler( InterleavedWeightedRoundRobinSchedulerOptions[*noopTask, int]{ - TaskChannelKeyFn: func(nt *noopTask) int { return rand.Intn(4) }, - ChannelWeightFn: func(key int) int { return channelKeyToWeight[key] }, + TaskChannelKeyFn: func(nt *noopTask) int { return rand.Intn(4) }, + ChannelWeightFn: func(key int) int { return channelKeyToWeight[key] }, + ChannelQuotaRequestFn: func(key int) quotas.Request { return quotas.NewRequest("", 1, "", "", "") }, + EnableRateLimiter: dynamicconfig.GetBoolPropertyFn(true), }, Scheduler[*noopTask](&noopScheduler{}), + quotas.NoopRequestRateLimiter, + clock.NewRealTimeSource(), logger, ) scheduler.Start() diff --git a/common/tasks/interleaved_weighted_round_robin.go b/common/tasks/interleaved_weighted_round_robin.go index 20c7cef5b01..56eb7254ca8 100644 --- a/common/tasks/interleaved_weighted_round_robin.go +++ b/common/tasks/interleaved_weighted_round_robin.go @@ -25,12 +25,22 @@ package tasks import ( + "math/rand" "sort" "sync" "sync/atomic" + "time" "go.temporal.io/server/common" + "go.temporal.io/server/common/clock" + "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" + "go.temporal.io/server/common/quotas" +) + +const ( + iwrrDispatchThrottleDuration = 1 * time.Second + checkRateLimiterEnabledInterval = 1 * time.Minute ) var _ Scheduler[Task] = (*InterleavedWeightedRoundRobinScheduler[Task, struct{}])(nil) @@ -39,9 +49,16 @@ type ( // InterleavedWeightedRoundRobinSchedulerOptions is the config for // interleaved weighted round robin scheduler InterleavedWeightedRoundRobinSchedulerOptions[T Task, K comparable] struct { - TaskChannelKeyFn TaskChannelKeyFn[T, K] - ChannelWeightFn ChannelWeightFn[K] + // Required for mapping a task to it's corresponding task channel + TaskChannelKeyFn TaskChannelKeyFn[T, K] + // Required for getting the weight for a task channel + ChannelWeightFn ChannelWeightFn[K] + // Optional, if specified, re-evaluate task channel weight when channel is not empty ChannelWeightUpdateCh chan struct{} + // Required for converting task channel to rate limit request + ChannelQuotaRequestFn ChannelQuotaRequestFn[K] + // Required for determining if rate limiter should be enabled. + EnableRateLimiter dynamicconfig.BoolPropertyFn } // TaskChannelKeyFn is the function for mapping a task to its task channel (key) @@ -50,12 +67,17 @@ type ( // ChannelWeightFn is the function for mapping a task channel (key) to its weight ChannelWeightFn[K comparable] func(K) int + // ChannelQuotaRequestFn is the function for mapping a task channel (key) to its rate limit request + ChannelQuotaRequestFn[K comparable] func(K) quotas.Request + // InterleavedWeightedRoundRobinScheduler is a round robin scheduler implementation // ref: https://en.wikipedia.org/wiki/Weighted_round_robin#Interleaved_WRR InterleavedWeightedRoundRobinScheduler[T Task, K comparable] struct { status int32 fifoScheduler Scheduler[T] + rateLimiter quotas.RequestRateLimiter + timeSource clock.TimeSource logger log.Logger notifyChan chan struct{} @@ -68,6 +90,27 @@ type ( sync.RWMutex weightedChannels map[K]*WeightedChannel[T] + dispatchTimerLock sync.Mutex + dispatchTimer *time.Timer + rateLimiterEnabled atomic.Value + iwrrChannels atomic.Value + } + + channelWithStatus[T Task, K comparable] struct { + *WeightedChannel[T] + + key K + rateLimitRequest quotas.Request + + throttled bool + moreTasks bool // this is only a hint since there's no way to peek the channel + } + + channelsWithStatus[T Task, K comparable] []*channelWithStatus[T, K] + + iwrrChannels[T Task, K comparable] struct { + channels channelsWithStatus[T, K] + // precalculated / flattened task chan according to weight // e.g. if // ChannelKeyToWeight has the following mapping @@ -76,21 +119,27 @@ type ( // 2 -> 2 // 3 -> 1 // then iwrrChannels will contain chan [0, 0, 0, 1, 0, 1, 2, 0, 1, 2, 3] (ID-ed by channel key) - iwrrChannels atomic.Value // []*WeightedChannel + flattenedChannels channelsWithStatus[T, K] } ) func NewInterleavedWeightedRoundRobinScheduler[T Task, K comparable]( options InterleavedWeightedRoundRobinSchedulerOptions[T, K], fifoScheduler Scheduler[T], + rateLimiter quotas.RequestRateLimiter, + timeSource clock.TimeSource, logger log.Logger, ) *InterleavedWeightedRoundRobinScheduler[T, K] { - iwrrChannels := atomic.Value{} - iwrrChannels.Store(WeightedChannels[T]{}) + channels := atomic.Value{} + channels.Store(iwrrChannels[T, K]{}) + enableRateLimiter := atomic.Value{} + enableRateLimiter.Store(options.EnableRateLimiter()) return &InterleavedWeightedRoundRobinScheduler[T, K]{ status: common.DaemonStatusInitialized, fifoScheduler: fifoScheduler, + rateLimiter: rateLimiter, + timeSource: timeSource, logger: logger, options: options, @@ -98,9 +147,10 @@ func NewInterleavedWeightedRoundRobinScheduler[T Task, K comparable]( notifyChan: make(chan struct{}, 1), shutdownChan: make(chan struct{}), - numInflightTask: 0, - weightedChannels: make(map[K]*WeightedChannel[T]), - iwrrChannels: iwrrChannels, + numInflightTask: 0, + weightedChannels: make(map[K]*WeightedChannel[T]), + rateLimiterEnabled: enableRateLimiter, + iwrrChannels: channels, } } @@ -142,13 +192,15 @@ func (s *InterleavedWeightedRoundRobinScheduler[T, K]) Submit( task T, ) { numTasks := atomic.AddInt64(&s.numInflightTask, 1) - if numTasks == 1 { - s.doDispatchTaskDirectly(task) + channelKey := s.options.TaskChannelKeyFn(task) + if numTasks == 1 && s.tryDispatchTaskDirectly(channelKey, task) { return } // there are tasks pending dispatching, need to respect round roubin weight - channel := s.getOrCreateTaskChannel(s.options.TaskChannelKeyFn(task)) + // or currently unable to submit to fifo scheduler, either due to buffer is full + // or exceeding rate limit + channel := s.getOrCreateTaskChannel(channelKey) channel.Chan() <- task s.notifyDispatcher() } @@ -157,12 +209,13 @@ func (s *InterleavedWeightedRoundRobinScheduler[T, K]) TrySubmit( task T, ) bool { numTasks := atomic.AddInt64(&s.numInflightTask, 1) - if numTasks == 1 && s.tryDispatchTaskDirectly(task) { + channelKey := s.options.TaskChannelKeyFn(task) + if numTasks == 1 && s.tryDispatchTaskDirectly(channelKey, task) { return true } // there are tasks pending dispatching, need to respect round roubin weight - channel := s.getOrCreateTaskChannel(s.options.TaskChannelKeyFn(task)) + channel := s.getOrCreateTaskChannel(channelKey) select { case channel.Chan() <- task: s.notifyDispatcher() @@ -174,10 +227,15 @@ func (s *InterleavedWeightedRoundRobinScheduler[T, K]) TrySubmit( } func (s *InterleavedWeightedRoundRobinScheduler[T, K]) eventLoop() { + checkRateLimiterEnabledTimer := time.NewTicker(checkRateLimiterEnabledInterval) + defer checkRateLimiterEnabledTimer.Stop() + for { select { case <-s.notifyChan: s.dispatchTasksWithWeight() + case <-checkRateLimiterEnabledTimer.C: + s.rateLimiterEnabled.Store(s.options.EnableRateLimiter()) case <-s.shutdownChan: return } @@ -212,24 +270,52 @@ func (s *InterleavedWeightedRoundRobinScheduler[T, K]) getOrCreateTaskChannel( } func (s *InterleavedWeightedRoundRobinScheduler[T, K]) flattenWeightedChannelsLocked() { - weightedChannels := make(WeightedChannels[T], 0, len(s.weightedChannels)) - for _, weightedChan := range s.weightedChannels { - weightedChannels = append(weightedChannels, weightedChan) + weightedChannels := make(channelsWithStatus[T, K], 0, len(s.weightedChannels)) + for channelKey, weightedChan := range s.weightedChannels { + weightedChannels = append(weightedChannels, &channelWithStatus[T, K]{ + WeightedChannel: weightedChan, + key: channelKey, + rateLimitRequest: s.options.ChannelQuotaRequestFn(channelKey), + throttled: false, + moreTasks: false, + }) } - sort.Sort(weightedChannels) + sort.Slice(weightedChannels, func(i, j int) bool { + return weightedChannels[i].Weight() < weightedChannels[j].Weight() + }) - iwrrChannels := make(WeightedChannels[T], 0, len(weightedChannels)) + flattenedChannels := make(channelsWithStatus[T, K], 0, len(weightedChannels)) maxWeight := weightedChannels[len(weightedChannels)-1].Weight() for round := maxWeight - 1; round > -1; round-- { for index := len(weightedChannels) - 1; index > -1 && weightedChannels[index].Weight() > round; index-- { - iwrrChannels = append(iwrrChannels, weightedChannels[index]) + flattenedChannels = append(flattenedChannels, weightedChannels[index]) } } - s.iwrrChannels.Store(iwrrChannels) + s.iwrrChannels.Store(iwrrChannels[T, K]{ + channels: weightedChannels, + flattenedChannels: flattenedChannels, + }) +} + +func (s *InterleavedWeightedRoundRobinScheduler[T, K]) channels() iwrrChannels[T, K] { + return s.iwrrChannels.Load().(iwrrChannels[T, K]) } -func (s *InterleavedWeightedRoundRobinScheduler[T, K]) channels() WeightedChannels[T] { - return s.iwrrChannels.Load().(WeightedChannels[T]) +func (s *InterleavedWeightedRoundRobinScheduler[T, K]) setupDispatchTimer() { + s.dispatchTimerLock.Lock() + defer s.dispatchTimerLock.Unlock() + + if s.dispatchTimer != nil { + s.dispatchTimer.Stop() + } + + s.dispatchTimer = time.AfterFunc(iwrrDispatchThrottleDuration, func() { + s.dispatchTimerLock.Lock() + defer s.dispatchTimerLock.Unlock() + + s.dispatchTimer = nil + s.notifyDispatcher() + }) } func (s *InterleavedWeightedRoundRobinScheduler[T, K]) notifyDispatcher() { @@ -271,6 +357,7 @@ func (s *InterleavedWeightedRoundRobinScheduler[T, K]) updateChannelWeightLocked } func (s *InterleavedWeightedRoundRobinScheduler[T, K]) dispatchTasksWithWeight() { +LoopDispatch: for s.hasRemainingTasks() { if s.receiveWeightUpdateNotification() { s.Lock() @@ -279,38 +366,101 @@ func (s *InterleavedWeightedRoundRobinScheduler[T, K]) dispatchTasksWithWeight() s.Unlock() } - weightedChannels := s.channels() - s.doDispatchTasksWithWeight(weightedChannels) + iwrrChannels := s.channels() + enableRateLimiter := s.isRateLimiterEnabled() + s.doDispatchTasksWithWeight(iwrrChannels, enableRateLimiter) + + if !enableRateLimiter { + continue LoopDispatch + } + + // rate limiter enabled + // all channels = throttled channels + not throttled but has more task + not throttled and no more task + // - If there's channel that's not throttled but has more task, need to trigger next round + // of dispatch immediately. + // - Otherwise all channels = throttled channels + not throttled and no more task + // then as long as there's throttled channel, need to set a timer to try dispatch later + + numThrottled := 0 + for _, channel := range iwrrChannels.channels { + if channel.throttled { + numThrottled++ + continue + } + if channel.moreTasks { + // there's channel that is not throttled and may have more tasks + // start a new round of dispatch immediately + continue LoopDispatch + } + } + + if numThrottled != 0 { + s.setupDispatchTimer() + } + + return } } func (s *InterleavedWeightedRoundRobinScheduler[T, K]) doDispatchTasksWithWeight( - channels WeightedChannels[T], + iwrrChannels iwrrChannels[T, K], + enableRateLimiter bool, ) { + rateLimiter := quotas.NoopRequestRateLimiter + if enableRateLimiter { + rateLimiter = s.rateLimiter + for _, channel := range iwrrChannels.channels { + channel.throttled = false + channel.moreTasks = false + } + } + + numFlattenedChannels := len(iwrrChannels.flattenedChannels) + startIdx := rand.Intn(numFlattenedChannels) numTasks := int64(0) LoopDispatch: - for _, channel := range channels { + for i := 0; i != numFlattenedChannels; i++ { + channel := iwrrChannels.flattenedChannels[(startIdx+i)%numFlattenedChannels] + + if channel.throttled { + continue LoopDispatch + } + + now := s.timeSource.Now() + reservation := rateLimiter.Reserve( + now, + channel.rateLimitRequest, + ) + if reservation.DelayFrom(now) != 0 { + reservation.CancelAt(now) + channel.throttled = true + continue LoopDispatch + } select { case task := <-channel.Chan(): s.fifoScheduler.Submit(task) numTasks++ + channel.moreTasks = true default: + reservation.CancelAt(now) + channel.moreTasks = false continue LoopDispatch } } atomic.AddInt64(&s.numInflightTask, -numTasks) } -func (s *InterleavedWeightedRoundRobinScheduler[T, K]) doDispatchTaskDirectly( - task T, -) { - s.fifoScheduler.Submit(task) - atomic.AddInt64(&s.numInflightTask, -1) -} - func (s *InterleavedWeightedRoundRobinScheduler[T, K]) tryDispatchTaskDirectly( + channelKey K, task T, ) bool { + if s.isRateLimiterEnabled() && !s.rateLimiter.Allow( + s.timeSource.Now(), + s.options.ChannelQuotaRequestFn(channelKey), + ) { + return false + } + dispatched := s.fifoScheduler.TrySubmit(task) if dispatched { atomic.AddInt64(&s.numInflightTask, -1) @@ -343,6 +493,10 @@ DrainLoop: atomic.AddInt64(&s.numInflightTask, -numTasks) } +func (s *InterleavedWeightedRoundRobinScheduler[T, K]) isRateLimiterEnabled() bool { + return s.rateLimiterEnabled.Load().(bool) +} + func (s *InterleavedWeightedRoundRobinScheduler[T, K]) isStopped() bool { return atomic.LoadInt32(&s.status) == common.DaemonStatusStopped } diff --git a/common/tasks/interleaved_weighted_round_robin_test.go b/common/tasks/interleaved_weighted_round_robin_test.go index 6a5ea5afc38..0f4ca7ea626 100644 --- a/common/tasks/interleaved_weighted_round_robin_test.go +++ b/common/tasks/interleaved_weighted_round_robin_test.go @@ -29,12 +29,16 @@ import ( "sync" "sync/atomic" "testing" + "time" "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "go.temporal.io/server/common/clock" + "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" + "go.temporal.io/server/common/quotas" ) type ( @@ -89,8 +93,16 @@ func (s *interleavedWeightedRoundRobinSchedulerSuite) SetupTest() { TaskChannelKeyFn: func(task *testTask) int { return task.channelKey }, ChannelWeightFn: func(key int) int { return s.channelKeyToWeight[key] }, ChannelWeightUpdateCh: s.channelWeightUpdateCh, + ChannelQuotaRequestFn: func(key int) quotas.Request { return quotas.NewRequest("", 1, "", "", "") }, + EnableRateLimiter: dynamicconfig.GetBoolPropertyFn(true), }, Scheduler[*testTask](s.mockFIFOScheduler), + quotas.NewRequestRateLimiterAdapter( + quotas.NewDefaultOutgoingRateLimiter( + func() float64 { return 1000 }, + ), + ), + clock.NewRealTimeSource(), logger, ) } @@ -139,6 +151,8 @@ func (s *interleavedWeightedRoundRobinSchedulerSuite) TestTrySubmitSchedule_Fail s.True(s.scheduler.TrySubmit(mockTask)) testWaitGroup.Wait() + // need to wait for the dispatch event loop to update the numInflightTask count + time.Sleep(time.Millisecond * 100) s.Equal(int64(0), atomic.LoadInt64(&s.scheduler.numInflightTask)) } @@ -151,8 +165,9 @@ func (s *interleavedWeightedRoundRobinSchedulerSuite) TestSubmitSchedule_Success testWaitGroup.Add(1) mockTask := newTestTask(s.controller, 0) - s.mockFIFOScheduler.EXPECT().Submit(mockTask).Do(func(task Task) { + s.mockFIFOScheduler.EXPECT().TrySubmit(mockTask).DoAndReturn(func(task Task) bool { testWaitGroup.Done() + return true }) s.scheduler.Submit(mockTask) @@ -176,8 +191,9 @@ func (s *interleavedWeightedRoundRobinSchedulerSuite) TestSubmitSchedule_Fail() testWaitGroup.Done() }).MaxTimes(1) // or process by worker - s.mockFIFOScheduler.EXPECT().Submit(mockTask).Do(func(task Task) { + s.mockFIFOScheduler.EXPECT().TrySubmit(mockTask).DoAndReturn(func(task Task) bool { testWaitGroup.Done() + return true }).MaxTimes(1) s.scheduler.Submit(mockTask) @@ -203,7 +219,7 @@ func (s *interleavedWeightedRoundRobinSchedulerSuite) TestChannels() { mockTask0 := newTestTask(s.controller, 0) s.scheduler.Submit(mockTask0) numPendingTasks++ - for _, channel := range s.scheduler.channels() { + for _, channel := range s.scheduler.channels().flattenedChannels { channelWeights = append(channelWeights, channel.Weight()) } s.Equal([]int{5, 5, 5, 5, 5}, channelWeights) @@ -212,7 +228,7 @@ func (s *interleavedWeightedRoundRobinSchedulerSuite) TestChannels() { mockTask1 := newTestTask(s.controller, 1) s.scheduler.Submit(mockTask1) numPendingTasks++ - for _, channel := range s.scheduler.channels() { + for _, channel := range s.scheduler.channels().flattenedChannels { channelWeights = append(channelWeights, channel.Weight()) } s.Equal([]int{5, 5, 5, 3, 5, 3, 5, 3}, channelWeights) @@ -221,7 +237,7 @@ func (s *interleavedWeightedRoundRobinSchedulerSuite) TestChannels() { mockTask2 := newTestTask(s.controller, 2) s.scheduler.Submit(mockTask2) numPendingTasks++ - for _, channel := range s.scheduler.channels() { + for _, channel := range s.scheduler.channels().flattenedChannels { channelWeights = append(channelWeights, channel.Weight()) } s.Equal([]int{5, 5, 5, 3, 5, 3, 2, 5, 3, 2}, channelWeights) @@ -230,7 +246,7 @@ func (s *interleavedWeightedRoundRobinSchedulerSuite) TestChannels() { mockTask3 := newTestTask(s.controller, 3) s.scheduler.Submit(mockTask3) numPendingTasks++ - for _, channel := range s.scheduler.channels() { + for _, channel := range s.scheduler.channels().flattenedChannels { channelWeights = append(channelWeights, channel.Weight()) } s.Equal([]int{5, 5, 5, 3, 5, 3, 2, 5, 3, 2, 1}, channelWeights) @@ -241,13 +257,20 @@ func (s *interleavedWeightedRoundRobinSchedulerSuite) TestChannels() { s.scheduler.Submit(mockTask2) s.scheduler.Submit(mockTask3) numPendingTasks += 4 - for _, channel := range s.scheduler.channels() { + for _, channel := range s.scheduler.channels().flattenedChannels { channelWeights = append(channelWeights, channel.Weight()) } s.Equal([]int{5, 5, 5, 3, 5, 3, 2, 5, 3, 2, 1}, channelWeights) } func (s *interleavedWeightedRoundRobinSchedulerSuite) TestParallelSubmitSchedule() { + maxQPS := 1000000 + s.scheduler.rateLimiter = quotas.NewRequestRateLimiterAdapter( + quotas.NewDefaultOutgoingRateLimiter( + func() float64 { return float64(maxQPS) }, + ), + ) + s.mockFIFOScheduler.EXPECT().Start() s.scheduler.Start() s.mockFIFOScheduler.EXPECT().Stop() @@ -265,17 +288,25 @@ func (s *interleavedWeightedRoundRobinSchedulerSuite) TestParallelSubmitSchedule var tasksLock sync.Mutex submittedTasks := map[*testTask]struct{}{} + s.mockFIFOScheduler.EXPECT().TrySubmit(gomock.Any()).DoAndReturn(func(task Task) bool { + tasksLock.Lock() + submittedTasks[task.(*testTask)] = struct{}{} + tasksLock.Unlock() + testWaitGroup.Done() + return true + }).AnyTimes() + s.mockFIFOScheduler.EXPECT().Submit(gomock.Any()).Do(func(task Task) { + tasksLock.Lock() + submittedTasks[task.(*testTask)] = struct{}{} + tasksLock.Unlock() + testWaitGroup.Done() + }).AnyTimes() + + startTime := time.Now() for i := 0; i < numSubmitter; i++ { channel := make(chan *testTask, numTasks) for j := 0; j < numTasks; j++ { - mockTask := newTestTask(s.controller, rand.Intn(4)) - s.mockFIFOScheduler.EXPECT().Submit(gomock.Any()).Do(func(task Task) { - tasksLock.Lock() - submittedTasks[task.(*testTask)] = struct{}{} - tasksLock.Unlock() - testWaitGroup.Done() - }).Times(1) - channel <- mockTask + channel <- newTestTask(s.controller, rand.Intn(4)) } close(channel) @@ -294,8 +325,15 @@ func (s *interleavedWeightedRoundRobinSchedulerSuite) TestParallelSubmitSchedule endWaitGroup.Wait() testWaitGroup.Wait() + totalDuration := time.Since(startTime) + + // need to wait for the dispatch event loop to update the numInflightTask count + time.Sleep(time.Millisecond * 100) s.Equal(int64(0), atomic.LoadInt64(&s.scheduler.numInflightTask)) + s.Len(submittedTasks, numSubmitter*numTasks) + minDuration := time.Duration((numSubmitter*numTasks-maxQPS)/maxQPS) * time.Second + s.Greater(totalDuration, minDuration) } func (s *interleavedWeightedRoundRobinSchedulerSuite) TestUpdateWeight() { @@ -324,7 +362,7 @@ func (s *interleavedWeightedRoundRobinSchedulerSuite) TestUpdateWeight() { s.scheduler.Submit(mockTask3) channelWeights := []int{} - for _, channel := range s.scheduler.channels() { + for _, channel := range s.scheduler.channels().flattenedChannels { channelWeights = append(channelWeights, channel.Weight()) } s.Equal([]int{5, 5, 5, 3, 5, 3, 2, 5, 3, 2, 1}, channelWeights) @@ -343,7 +381,7 @@ func (s *interleavedWeightedRoundRobinSchedulerSuite) TestUpdateWeight() { taskWG.Wait() channelWeights = []int{} - for _, channel := range s.scheduler.channels() { + for _, channel := range s.scheduler.channels().flattenedChannels { channelWeights = append(channelWeights, channel.Weight()) } s.Equal([]int{8, 8, 8, 8, 5, 8, 5, 8, 5, 8, 5, 8, 5, 1, 1}, channelWeights) diff --git a/common/tasks/priority.go b/common/tasks/priority.go index 3049a00ad5e..8df512325f4 100644 --- a/common/tasks/priority.go +++ b/common/tasks/priority.go @@ -47,22 +47,19 @@ const ( ) var ( - PriorityHigh = getPriority(highPriorityClass, mediumPrioritySubclass) - PriorityMedium = getPriority(mediumPriorityClass, mediumPrioritySubclass) - PriorityLow = getPriority(lowPriorityClass, mediumPrioritySubclass) + PriorityHigh = getPriority(highPriorityClass, mediumPrioritySubclass) + PriorityLow = getPriority(lowPriorityClass, mediumPrioritySubclass) ) var ( PriorityName = map[Priority]string{ - PriorityHigh: "high", - PriorityMedium: "medium", - PriorityLow: "low", + PriorityHigh: "high", + PriorityLow: "low", } PriorityValue = map[string]Priority{ - "high": PriorityHigh, - "medium": PriorityMedium, - "low": PriorityLow, + "high": PriorityHigh, + "low": PriorityLow, } ) diff --git a/service/frontend/configs/quotas.go b/service/frontend/configs/quotas.go index be4d0d2650d..1e1415adae9 100644 --- a/service/frontend/configs/quotas.go +++ b/service/frontend/configs/quotas.go @@ -176,9 +176,9 @@ func NewRequestToRateLimiter( func NewExecutionPriorityRateLimiter( rateBurstFn quotas.RateBurst, ) quotas.RequestRateLimiter { - rateLimiters := make(map[int]quotas.RateLimiter) + rateLimiters := make(map[int]quotas.RequestRateLimiter) for priority := range ExecutionAPIPrioritiesOrdered { - rateLimiters[priority] = quotas.NewDynamicRateLimiter(rateBurstFn, time.Minute) + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(rateBurstFn, time.Minute)) } return quotas.NewPriorityRateLimiter(func(req quotas.Request) int { if priority, ok := ExecutionAPIToPriority[req.API]; ok { @@ -191,9 +191,9 @@ func NewExecutionPriorityRateLimiter( func NewVisibilityPriorityRateLimiter( rateBurstFn quotas.RateBurst, ) quotas.RequestRateLimiter { - rateLimiters := make(map[int]quotas.RateLimiter) + rateLimiters := make(map[int]quotas.RequestRateLimiter) for priority := range VisibilityAPIPrioritiesOrdered { - rateLimiters[priority] = quotas.NewDynamicRateLimiter(rateBurstFn, time.Minute) + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(rateBurstFn, time.Minute)) } return quotas.NewPriorityRateLimiter(func(req quotas.Request) int { if priority, ok := VisibilityAPIToPriority[req.API]; ok { @@ -206,9 +206,9 @@ func NewVisibilityPriorityRateLimiter( func NewOtherAPIPriorityRateLimiter( rateBurstFn quotas.RateBurst, ) quotas.RequestRateLimiter { - rateLimiters := make(map[int]quotas.RateLimiter) + rateLimiters := make(map[int]quotas.RequestRateLimiter) for priority := range OtherAPIPrioritiesOrdered { - rateLimiters[priority] = quotas.NewDynamicRateLimiter(rateBurstFn, time.Minute) + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(rateBurstFn, time.Minute)) } return quotas.NewPriorityRateLimiter(func(req quotas.Request) int { if priority, ok := OtherAPIToPriority[req.API]; ok { diff --git a/service/history/configs/config.go b/service/history/configs/config.go index a03e00e4c66..a5a58f52ba1 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -91,6 +91,10 @@ type Config struct { QueuePendingTaskMaxCount dynamicconfig.IntPropertyFn QueueMaxReaderCount dynamicconfig.IntPropertyFn + TaskSchedulerEnableRateLimiter dynamicconfig.BoolPropertyFn + TaskSchedulerMaxQPS dynamicconfig.IntPropertyFn + TaskSchedulerNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter + // TimerQueueProcessor settings TimerTaskHighPriorityRPS dynamicconfig.IntPropertyFnWithNamespaceFilter TimerTaskBatchSize dynamicconfig.IntPropertyFn @@ -348,6 +352,10 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis QueuePendingTaskMaxCount: dc.GetIntProperty(dynamicconfig.QueuePendingTaskMaxCount, 10000), QueueMaxReaderCount: dc.GetIntProperty(dynamicconfig.QueueMaxReaderCount, 2), + TaskSchedulerEnableRateLimiter: dc.GetBoolProperty(dynamicconfig.TaskSchedulerEnableRateLimiter, false), + TaskSchedulerMaxQPS: dc.GetIntProperty(dynamicconfig.TaskSchedulerMaxQPS, 0), + TaskSchedulerNamespaceMaxQPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.TaskSchedulerNamespaceMaxQPS, 0), + TimerTaskBatchSize: dc.GetIntProperty(dynamicconfig.TimerTaskBatchSize, 100), TimerTaskWorkerCount: dc.GetIntProperty(dynamicconfig.TimerTaskWorkerCount, 10), TimerTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.TimerTaskMaxRetryCount, 20), diff --git a/service/history/configs/quotas.go b/service/history/configs/quotas.go index 381a56bc606..12a68a10977 100644 --- a/service/history/configs/quotas.go +++ b/service/history/configs/quotas.go @@ -83,9 +83,9 @@ var ( func NewPriorityRateLimiter( rateFn quotas.RateFn, ) quotas.RequestRateLimiter { - rateLimiters := make(map[int]quotas.RateLimiter) + rateLimiters := make(map[int]quotas.RequestRateLimiter) for priority := range APIPrioritiesOrdered { - rateLimiters[priority] = quotas.NewDefaultIncomingRateLimiter(rateFn) + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultIncomingRateLimiter(rateFn)) } return quotas.NewPriorityRateLimiter(func(req quotas.Request) int { if priority, ok := APIToPriority[req.API]; ok { diff --git a/service/history/queueFactoryBase.go b/service/history/queueFactoryBase.go index 20a0aecd243..cd8363bcd89 100644 --- a/service/history/queueFactoryBase.go +++ b/service/history/queueFactoryBase.go @@ -64,12 +64,13 @@ type ( QueueFactoryBaseParams struct { fx.In - NamespaceRegistry namespace.Registry - ClusterMetadata cluster.Metadata - Config *configs.Config - TimeSource clock.TimeSource - MetricsHandler metrics.MetricsHandler - Logger log.SnTaggedLogger + NamespaceRegistry namespace.Registry + ClusterMetadata cluster.Metadata + Config *configs.Config + TimeSource clock.TimeSource + MetricsHandler metrics.MetricsHandler + Logger log.SnTaggedLogger + SchedulerRateLimiter queues.SchedulerRateLimiter } QueueFactoryBase struct { @@ -90,6 +91,7 @@ type ( ) var QueueModule = fx.Options( + fx.Provide(QueueSchedulerRateLimiterProvider), fx.Provide( fx.Annotated{ Group: QueueFactoryFxGroup, @@ -107,6 +109,17 @@ var QueueModule = fx.Options( fx.Invoke(QueueFactoryLifetimeHooks), ) +func QueueSchedulerRateLimiterProvider( + config *configs.Config, +) queues.SchedulerRateLimiter { + return queues.NewSchedulerRateLimiter( + config.TaskSchedulerNamespaceMaxQPS, + config.TaskSchedulerMaxQPS, + config.PersistenceNamespaceMaxQPS, + config.PersistenceMaxQPS, + ) +} + func QueueFactoryLifetimeHooks( params QueueFactoriesLifetimeHookParams, ) { diff --git a/service/history/queues/priority_assigner.go b/service/history/queues/priority_assigner.go index 437d44c60c0..d6d0b88ed48 100644 --- a/service/history/queues/priority_assigner.go +++ b/service/history/queues/priority_assigner.go @@ -47,15 +47,23 @@ func NewPriorityAssigner() PriorityAssigner { } func (a *priorityAssignerImpl) Assign(executable Executable) tasks.Priority { - switch executable.GetType() { + taskType := executable.GetType() + switch taskType { case enumsspb.TASK_TYPE_DELETE_HISTORY_EVENT, enumsspb.TASK_TYPE_TRANSFER_DELETE_EXECUTION, - enumsspb.TASK_TYPE_VISIBILITY_DELETE_EXECUTION: + enumsspb.TASK_TYPE_VISIBILITY_DELETE_EXECUTION, + enumsspb.TASK_TYPE_ARCHIVAL_ARCHIVE_EXECUTION, + enumsspb.TASK_TYPE_UNSPECIFIED: // add more task types here if we believe it's ok to delay those tasks // and assign them the same priority as throttled tasks return tasks.PriorityLow } + if _, ok := enumsspb.TaskType_name[int32(taskType)]; !ok { + // low priority for unknown task types + return tasks.PriorityLow + } + return tasks.PriorityHigh } diff --git a/service/history/queues/priority_assigner_test.go b/service/history/queues/priority_assigner_test.go index f6c233ad190..bef44ecb76b 100644 --- a/service/history/queues/priority_assigner_test.go +++ b/service/history/queues/priority_assigner_test.go @@ -69,3 +69,17 @@ func (s *priorityAssignerSuite) TestAssign_SelectedTaskTypes() { s.Equal(tasks.PriorityLow, s.priorityAssigner.Assign(mockExecutable)) } + +func (s *priorityAssignerSuite) TestAssign_UnknownTaskTypes() { + mockExecutable := NewMockExecutable(s.controller) + mockExecutable.EXPECT().GetType().Return(enumsspb.TaskType(1234)).Times(1) + + s.Equal(tasks.PriorityLow, s.priorityAssigner.Assign(mockExecutable)) +} + +func (s *priorityAssignerSuite) TestAssign_HighPriorityTaskTypes() { + mockExecutable := NewMockExecutable(s.controller) + mockExecutable.EXPECT().GetType().Return(enumsspb.TASK_TYPE_ACTIVITY_RETRY_TIMER).Times(1) + + s.Equal(tasks.PriorityHigh, s.priorityAssigner.Assign(mockExecutable)) +} diff --git a/service/history/queues/queue_scheduled_test.go b/service/history/queues/queue_scheduled_test.go index aa6369e40f4..86d642f78b1 100644 --- a/service/history/queues/queue_scheduled_test.go +++ b/service/history/queues/queue_scheduled_test.go @@ -87,11 +87,18 @@ func (s *scheduledQueueSuite) SetupTest() { s.scheduledQueue = NewScheduledQueue( s.mockShard, tasks.CategoryTimer, - NewFIFOScheduler( - FIFOSchedulerOptions{ - WorkerCount: dynamicconfig.GetIntPropertyFn(10), - QueueSize: 100, + NewPriorityScheduler( + PrioritySchedulerOptions{ + WorkerCount: dynamicconfig.GetIntPropertyFn(10), + EnableRateLimiter: dynamicconfig.GetBoolPropertyFn(true), }, + NewSchedulerRateLimiter( + s.mockShard.GetConfig().TaskSchedulerNamespaceMaxQPS, + s.mockShard.GetConfig().TaskSchedulerMaxQPS, + s.mockShard.GetConfig().PersistenceNamespaceMaxQPS, + s.mockShard.GetConfig().PersistenceMaxQPS, + ), + s.mockShard.GetTimeSource(), log.NewTestLogger(), ), nil, diff --git a/service/history/queues/reader_quotas.go b/service/history/queues/reader_quotas.go index 0af01d07821..daef28d404d 100644 --- a/service/history/queues/reader_quotas.go +++ b/service/history/queues/reader_quotas.go @@ -39,11 +39,11 @@ func NewReaderPriorityRateLimiter( rateFn quotas.RateFn, maxReaders int, ) quotas.RequestRateLimiter { - rateLimiters := make(map[int]quotas.RateLimiter, maxReaders) + rateLimiters := make(map[int]quotas.RequestRateLimiter, maxReaders) readerCallerToPriority := make(map[string]int, maxReaders) for readerId := DefaultReaderId; readerId != DefaultReaderId+maxReaders; readerId++ { // use readerId as priority - rateLimiters[readerId] = quotas.NewDefaultOutgoingRateLimiter(rateFn) + rateLimiters[readerId] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(rateFn)) // reader will use readerId (in string type) as caller when using the rate limiter readerCallerToPriority[newReaderRequest(int32(readerId)).Caller] = readerId } diff --git a/service/history/queues/scheduler.go b/service/history/queues/scheduler.go index 0928952acae..d4722ba3217 100644 --- a/service/history/queues/scheduler.go +++ b/service/history/queues/scheduler.go @@ -33,6 +33,7 @@ import ( "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/quotas" "go.temporal.io/server/common/tasks" "go.temporal.io/server/service/history/configs" ) @@ -40,8 +41,10 @@ import ( const ( // This is the task channel buffer size between // weighted round robin scheduler and the actual - // worker pool (parallel processor). - namespacePrioritySchedulerProcessorQueueSize = 10 + // worker pool (fifo processor). + prioritySchedulerProcessorQueueSize = 10 + + taskSchedulerToken = 1 ) type ( @@ -71,11 +74,13 @@ type ( WorkerCount dynamicconfig.IntPropertyFn ActiveNamespaceWeights dynamicconfig.MapPropertyFnWithNamespaceFilter StandbyNamespaceWeights dynamicconfig.MapPropertyFnWithNamespaceFilter + EnableRateLimiter dynamicconfig.BoolPropertyFn } - FIFOSchedulerOptions struct { - WorkerCount dynamicconfig.IntPropertyFn - QueueSize int + PrioritySchedulerOptions struct { + WorkerCount dynamicconfig.IntPropertyFn + Weight dynamicconfig.MapPropertyFn + EnableRateLimiter dynamicconfig.BoolPropertyFn } schedulerImpl struct { @@ -92,6 +97,7 @@ func NewNamespacePriorityScheduler( currentClusterName string, options NamespacePrioritySchedulerOptions, namespaceRegistry namespace.Registry, + rateLimiter SchedulerRateLimiter, timeSource clock.TimeSource, metricsHandler metrics.MetricsHandler, logger log.Logger, @@ -116,8 +122,15 @@ func NewNamespacePriorityScheduler( )[key.Priority] } channelWeightUpdateCh := make(chan struct{}, 1) + channelQuotaRequestFn := func(key TaskChannelKey) quotas.Request { + namespaceName, err := namespaceRegistry.GetNamespaceName(namespace.ID(key.NamespaceID)) + if err != nil { + namespaceName = namespace.EmptyName + } + return quotas.NewRequest("", taskSchedulerToken, namespaceName.String(), tasks.PriorityName[key.Priority], "") + } fifoSchedulerOptions := &tasks.FIFOSchedulerOptions{ - QueueSize: namespacePrioritySchedulerProcessorQueueSize, + QueueSize: prioritySchedulerProcessorQueueSize, WorkerCount: options.WorkerCount, } @@ -127,6 +140,8 @@ func NewNamespacePriorityScheduler( TaskChannelKeyFn: taskChannelKeyFn, ChannelWeightFn: channelWeightFn, ChannelWeightUpdateCh: channelWeightUpdateCh, + ChannelQuotaRequestFn: channelQuotaRequestFn, + EnableRateLimiter: options.EnableRateLimiter, }, tasks.Scheduler[Executable](tasks.NewFIFOScheduler[Executable]( newSchedulerMonitor( @@ -139,6 +154,8 @@ func NewNamespacePriorityScheduler( fifoSchedulerOptions, logger, )), + rateLimiter, + timeSource, logger, ), namespaceRegistry: namespaceRegistry, @@ -148,24 +165,54 @@ func NewNamespacePriorityScheduler( } } -// NewFIFOScheduler is used to create shard level task scheduler -// and always schedule tasks in fifo order regardless -// which namespace the task belongs to. -func NewFIFOScheduler( - options FIFOSchedulerOptions, +// NewPriorityScheduler ignores namespace when scheduleing tasks. +// currently only used for shard level task scheduler +func NewPriorityScheduler( + options PrioritySchedulerOptions, + rateLimiter SchedulerRateLimiter, + timeSource clock.TimeSource, logger log.Logger, ) Scheduler { - taskChannelKeyFn := func(_ Executable) TaskChannelKey { return TaskChannelKey{} } - channelWeightFn := func(_ TaskChannelKey) int { return 1 } + taskChannelKeyFn := func(e Executable) TaskChannelKey { + return TaskChannelKey{ + NamespaceID: namespace.EmptyID.String(), + Priority: e.GetPriority(), + } + } + channelWeightFn := func(key TaskChannelKey) int { + weight := configs.DefaultActiveTaskPriorityWeight + if options.Weight != nil { + weight = configs.ConvertDynamicConfigValueToWeights( + options.Weight(), + logger, + ) + } + return weight[key.Priority] + } + channelQuotaRequestFn := func(key TaskChannelKey) quotas.Request { + return quotas.NewRequest("", taskSchedulerToken, "", tasks.PriorityName[key.Priority], "") + } fifoSchedulerOptions := &tasks.FIFOSchedulerOptions{ - QueueSize: options.QueueSize, + QueueSize: prioritySchedulerProcessorQueueSize, WorkerCount: options.WorkerCount, } return &schedulerImpl{ - Scheduler: tasks.NewFIFOScheduler[Executable]( - noopScheduleMonitor, - fifoSchedulerOptions, + Scheduler: tasks.NewInterleavedWeightedRoundRobinScheduler( + tasks.InterleavedWeightedRoundRobinSchedulerOptions[Executable, TaskChannelKey]{ + TaskChannelKeyFn: taskChannelKeyFn, + ChannelWeightFn: channelWeightFn, + ChannelWeightUpdateCh: nil, + ChannelQuotaRequestFn: channelQuotaRequestFn, + EnableRateLimiter: options.EnableRateLimiter, + }, + tasks.Scheduler[Executable](tasks.NewFIFOScheduler[Executable]( + noopScheduleMonitor, + fifoSchedulerOptions, + logger, + )), + rateLimiter, + timeSource, logger, ), taskChannelKeyFn: taskChannelKeyFn, diff --git a/service/history/queues/scheduler_quotas.go b/service/history/queues/scheduler_quotas.go new file mode 100644 index 00000000000..98d3a961aa8 --- /dev/null +++ b/service/history/queues/scheduler_quotas.go @@ -0,0 +1,112 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package queues + +import ( + "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/quotas" + "go.temporal.io/server/common/tasks" +) + +type SchedulerRateLimiter quotas.RequestRateLimiter + +func NewSchedulerRateLimiter( + namespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter, + hostMaxQPS dynamicconfig.IntPropertyFn, + persistenceNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter, + persistenceHostMaxQPS dynamicconfig.IntPropertyFn, +) SchedulerRateLimiter { + hostRateFn := func() float64 { + hostMaxQPS := float64(hostMaxQPS()) + if hostMaxQPS > 0 { + return hostMaxQPS + } + return float64(persistenceHostMaxQPS()) + } + + requestPriorityFn := func(req quotas.Request) int { + // NOTE: task scheduler will use the string format for task priority as the caller type. + // see channelQuotaRequestFn in scheduler.go + // TODO: we don't need this hack when requestRateLimiter uses generics + if priority, ok := tasks.PriorityValue[req.CallerType]; ok { + return int(priority) + } + + // default to low priority + return int(tasks.PriorityLow) + } + + priorityToRateLimiters := make(map[int]quotas.RequestRateLimiter, len(tasks.PriorityName)) + for priority := range tasks.PriorityName { + var requestRateLimiter quotas.RequestRateLimiter + if priority == tasks.PriorityHigh { + requestRateLimiter = newHighPriorityTaskRequestRateLimiter( + namespaceMaxQPS, + persistenceNamespaceMaxQPS, + hostRateFn, + ) + } else { + requestRateLimiter = quotas.NewRequestRateLimiterAdapter( + quotas.NewDefaultOutgoingRateLimiter(hostRateFn), + ) + } + priorityToRateLimiters[int(priority)] = requestRateLimiter + } + + return quotas.NewPriorityRateLimiter( + requestPriorityFn, + priorityToRateLimiters, + ) +} + +func newHighPriorityTaskRequestRateLimiter( + namespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter, + persistenceNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter, + hostRateFn quotas.RateFn, +) quotas.RequestRateLimiter { + hostRequestRateLimiter := quotas.NewRequestRateLimiterAdapter( + quotas.NewDefaultOutgoingRateLimiter(hostRateFn), + ) + namespaceRequestRateLimiterFn := func(req quotas.Request) quotas.RequestRateLimiter { + return quotas.NewRequestRateLimiterAdapter( + quotas.NewDefaultOutgoingRateLimiter(func() float64 { + if namespaceQPS := float64(namespaceMaxQPS(req.Caller)); namespaceQPS > 0 { + return namespaceQPS + } + + if persistenceNamespaceQPS := float64(persistenceNamespaceMaxQPS(req.Caller)); persistenceNamespaceQPS > 0 { + return persistenceNamespaceQPS + } + + return hostRateFn() + }), + ) + } + + return quotas.NewMultiRequestRateLimiter( + quotas.NewNamespaceRateLimiter(namespaceRequestRateLimiterFn), + hostRequestRateLimiter, + ) +} diff --git a/service/history/timerQueueActiveProcessor.go b/service/history/timerQueueActiveProcessor.go index 479cedd46b1..996c04c99ec 100644 --- a/service/history/timerQueueActiveProcessor.go +++ b/service/history/timerQueueActiveProcessor.go @@ -66,6 +66,7 @@ func newTimerQueueActiveProcessor( taskAllocator taskAllocator, clientBean client.Bean, rateLimiter quotas.RateLimiter, + schedulerRateLimiter queues.SchedulerRateLimiter, logger log.Logger, metricProvider metrics.MetricsHandler, singleProcessor bool, @@ -91,7 +92,7 @@ func newTimerQueueActiveProcessor( processor := &timerQueueActiveProcessorImpl{} if scheduler == nil { - scheduler = newTimerTaskShardScheduler(shard, logger) + scheduler = newTimerTaskShardScheduler(shard, schedulerRateLimiter, logger) processor.ownedScheduler = scheduler } @@ -215,6 +216,7 @@ func newTimerQueueFailoverProcessor( matchingClient matchingservice.MatchingServiceClient, taskAllocator taskAllocator, rateLimiter quotas.RateLimiter, + schedulerRateLimiter queues.SchedulerRateLimiter, logger log.Logger, metricProvider metrics.MetricsHandler, ) (func(ackLevel tasks.Key) error, *timerQueueActiveProcessorImpl) { @@ -267,7 +269,7 @@ func newTimerQueueFailoverProcessor( ) if scheduler == nil { - scheduler = newTimerTaskShardScheduler(shard, logger) + scheduler = newTimerTaskShardScheduler(shard, schedulerRateLimiter, logger) processor.ownedScheduler = scheduler } diff --git a/service/history/timerQueueActiveTaskExecutor_test.go b/service/history/timerQueueActiveTaskExecutor_test.go index 84237cf9f3f..b1bd435cb2e 100644 --- a/service/history/timerQueueActiveTaskExecutor_test.go +++ b/service/history/timerQueueActiveTaskExecutor_test.go @@ -194,6 +194,11 @@ func (s *timerQueueActiveTaskExecutorSuite) SetupTest() { quotas.NewDefaultOutgoingRateLimiter( func() float64 { return float64(config.TimerProcessorMaxPollRPS()) }, ), + quotas.NewRequestRateLimiterAdapter( + quotas.NewDefaultOutgoingRateLimiter( + func() float64 { return float64(config.TaskSchedulerMaxQPS()) }, + ), + ), s.logger, metrics.NoopMetricsHandler, false, diff --git a/service/history/timerQueueFactory.go b/service/history/timerQueueFactory.go index 9da60cf255a..c3a46571273 100644 --- a/service/history/timerQueueFactory.go +++ b/service/history/timerQueueFactory.go @@ -75,8 +75,10 @@ func NewTimerQueueFactory( WorkerCount: params.Config.TimerProcessorSchedulerWorkerCount, ActiveNamespaceWeights: params.Config.TimerProcessorSchedulerActiveRoundRobinWeights, StandbyNamespaceWeights: params.Config.TimerProcessorSchedulerStandbyRoundRobinWeights, + EnableRateLimiter: params.Config.TaskSchedulerEnableRateLimiter, }, params.NamespaceRegistry, + params.SchedulerRateLimiter, params.TimeSource, params.MetricsHandler.WithTags(metrics.OperationTag(metrics.OperationTimerQueueProcessorScope)), params.Logger, @@ -209,5 +211,6 @@ func (f *timerQueueFactory) CreateQueue( f.MatchingClient, f.MetricsHandler, f.HostRateLimiter, + f.SchedulerRateLimiter, ) } diff --git a/service/history/timerQueueProcessor.go b/service/history/timerQueueProcessor.go index aff87f60fa3..a49a2921ae3 100644 --- a/service/history/timerQueueProcessor.go +++ b/service/history/timerQueueProcessor.go @@ -70,6 +70,7 @@ type ( workflowDeleteManager workflow.DeleteManager ackLevel tasks.Key hostRateLimiter quotas.RateLimiter + schedulerRateLimiter queues.SchedulerRateLimiter logger log.Logger clientBean client.Bean matchingClient matchingservice.MatchingServiceClient @@ -92,6 +93,7 @@ func newTimerQueueProcessor( matchingClient matchingservice.MatchingServiceClient, metricHandler metrics.MetricsHandler, hostRateLimiter quotas.RateLimiter, + schedulerRateLimiter queues.SchedulerRateLimiter, ) queues.Queue { singleProcessor := !shard.GetClusterMetadata().IsGlobalNamespaceEnabled() || @@ -122,6 +124,7 @@ func newTimerQueueProcessor( workflowDeleteManager: workflowDeleteManager, ackLevel: shard.GetQueueAckLevel(tasks.CategoryTimer), hostRateLimiter: hostRateLimiter, + schedulerRateLimiter: schedulerRateLimiter, logger: logger, clientBean: clientBean, matchingClient: matchingClient, @@ -140,6 +143,7 @@ func newTimerQueueProcessor( hostRateLimiter, config.TimerProcessorMaxPollRPS, ), + schedulerRateLimiter, logger, metricHandler, singleProcessor, @@ -251,6 +255,7 @@ func (t *timerQueueProcessorImpl) FailoverNamespace( t.hostRateLimiter, t.config.TimerProcessorFailoverMaxPollRPS, ), + t.schedulerRateLimiter, t.logger, t.metricHandler, ) @@ -413,6 +418,7 @@ func (t *timerQueueProcessorImpl) handleClusterMetadataUpdate( t.hostRateLimiter, t.config.TimerProcessorMaxPollRPS, ), + t.schedulerRateLimiter, t.logger, t.metricHandler, ) diff --git a/service/history/timerQueueProcessorBase.go b/service/history/timerQueueProcessorBase.go index 2a65347eae8..17208529fad 100644 --- a/service/history/timerQueueProcessorBase.go +++ b/service/history/timerQueueProcessorBase.go @@ -378,14 +378,17 @@ func (t *timerQueueProcessorBase) submitTask( func newTimerTaskShardScheduler( shard shard.Context, + rateLimiter queues.SchedulerRateLimiter, logger log.Logger, ) queues.Scheduler { config := shard.GetConfig() - return queues.NewFIFOScheduler( - queues.FIFOSchedulerOptions{ - WorkerCount: config.TimerTaskWorkerCount, - QueueSize: config.TimerTaskWorkerCount() * config.TimerTaskBatchSize(), + return queues.NewPriorityScheduler( + queues.PrioritySchedulerOptions{ + WorkerCount: config.TimerTaskWorkerCount, + EnableRateLimiter: config.TaskSchedulerEnableRateLimiter, }, + rateLimiter, + shard.GetTimeSource(), logger, ) } diff --git a/service/history/timerQueueStandbyProcessor.go b/service/history/timerQueueStandbyProcessor.go index 5eeb67b9b9d..9680582517e 100644 --- a/service/history/timerQueueStandbyProcessor.go +++ b/service/history/timerQueueStandbyProcessor.go @@ -67,6 +67,7 @@ func newTimerQueueStandbyProcessor( taskAllocator taskAllocator, clientBean client.Bean, rateLimiter quotas.RateLimiter, + schedulerRateLimiter queues.SchedulerRateLimiter, logger log.Logger, metricProvider metrics.MetricsHandler, ) *timerQueueStandbyProcessorImpl { @@ -125,7 +126,7 @@ func newTimerQueueStandbyProcessor( } if scheduler == nil { - scheduler = newTimerTaskShardScheduler(shard, logger) + scheduler = newTimerTaskShardScheduler(shard, schedulerRateLimiter, logger) processor.ownedScheduler = scheduler } diff --git a/service/history/transferQueueActiveProcessor.go b/service/history/transferQueueActiveProcessor.go index b629df31c48..e7692dd410e 100644 --- a/service/history/transferQueueActiveProcessor.go +++ b/service/history/transferQueueActiveProcessor.go @@ -70,6 +70,7 @@ func newTransferQueueActiveProcessor( taskAllocator taskAllocator, clientBean client.Bean, rateLimiter quotas.RateLimiter, + schedulerRateLimiter queues.SchedulerRateLimiter, logger log.Logger, metricProvider metrics.MetricsHandler, singleProcessor bool, @@ -119,7 +120,7 @@ func newTransferQueueActiveProcessor( } if scheduler == nil { - scheduler = newTransferTaskShardScheduler(shard, logger) + scheduler = newTransferTaskShardScheduler(shard, schedulerRateLimiter, logger) processor.ownedScheduler = scheduler } @@ -244,6 +245,7 @@ func newTransferQueueFailoverProcessor( maxLevel int64, taskAllocator taskAllocator, rateLimiter quotas.RateLimiter, + schedulerRateLimiter queues.SchedulerRateLimiter, logger log.Logger, metricProvider metrics.MetricsHandler, ) (func(ackLevel int64) error, *transferQueueActiveProcessorImpl) { @@ -314,7 +316,7 @@ func newTransferQueueFailoverProcessor( ) if scheduler == nil { - scheduler = newTransferTaskShardScheduler(shard, logger) + scheduler = newTransferTaskShardScheduler(shard, schedulerRateLimiter, logger) processor.ownedScheduler = scheduler } diff --git a/service/history/transferQueueFactory.go b/service/history/transferQueueFactory.go index 6a92dd220f3..47903fce09e 100644 --- a/service/history/transferQueueFactory.go +++ b/service/history/transferQueueFactory.go @@ -78,8 +78,10 @@ func NewTransferQueueFactory( WorkerCount: params.Config.TransferProcessorSchedulerWorkerCount, ActiveNamespaceWeights: params.Config.TransferProcessorSchedulerActiveRoundRobinWeights, StandbyNamespaceWeights: params.Config.TransferProcessorSchedulerStandbyRoundRobinWeights, + EnableRateLimiter: params.Config.TaskSchedulerEnableRateLimiter, }, params.NamespaceRegistry, + params.SchedulerRateLimiter, params.TimeSource, params.MetricsHandler.WithTags(metrics.OperationTag(metrics.OperationTransferQueueProcessorScope)), params.Logger, @@ -201,5 +203,6 @@ func (f *transferQueueFactory) CreateQueue( f.HistoryClient, f.MetricsHandler, f.HostRateLimiter, + f.SchedulerRateLimiter, ) } diff --git a/service/history/transferQueueProcessor.go b/service/history/transferQueueProcessor.go index e46a5f2b6dd..08232aac27d 100644 --- a/service/history/transferQueueProcessor.go +++ b/service/history/transferQueueProcessor.go @@ -72,6 +72,7 @@ type ( historyClient historyservice.HistoryServiceClient ackLevel int64 hostRateLimiter quotas.RateLimiter + schedulerRateLimiter queues.SchedulerRateLimiter logger log.Logger isStarted int32 isStopped int32 @@ -96,6 +97,7 @@ func newTransferQueueProcessor( historyClient historyservice.HistoryServiceClient, metricProvider metrics.MetricsHandler, hostRateLimiter quotas.RateLimiter, + schedulerRateLimiter queues.SchedulerRateLimiter, ) queues.Queue { singleProcessor := !shard.GetClusterMetadata().IsGlobalNamespaceEnabled() || @@ -107,24 +109,25 @@ func newTransferQueueProcessor( taskAllocator := newTaskAllocator(shard) return &transferQueueProcessorImpl{ - singleProcessor: singleProcessor, - currentClusterName: currentClusterName, - shard: shard, - workflowCache: workflowCache, - archivalClient: archivalClient, - sdkClientFactory: sdkClientFactory, - taskAllocator: taskAllocator, - config: config, - metricHandler: metricProvider, - clientBean: clientBean, - matchingClient: matchingClient, - historyClient: historyClient, - ackLevel: shard.GetQueueAckLevel(tasks.CategoryTransfer).TaskID, - hostRateLimiter: hostRateLimiter, - logger: logger, - shutdownChan: make(chan struct{}), - scheduler: scheduler, - priorityAssigner: priorityAssigner, + singleProcessor: singleProcessor, + currentClusterName: currentClusterName, + shard: shard, + workflowCache: workflowCache, + archivalClient: archivalClient, + sdkClientFactory: sdkClientFactory, + taskAllocator: taskAllocator, + config: config, + metricHandler: metricProvider, + clientBean: clientBean, + matchingClient: matchingClient, + historyClient: historyClient, + ackLevel: shard.GetQueueAckLevel(tasks.CategoryTransfer).TaskID, + hostRateLimiter: hostRateLimiter, + schedulerRateLimiter: schedulerRateLimiter, + logger: logger, + shutdownChan: make(chan struct{}), + scheduler: scheduler, + priorityAssigner: priorityAssigner, activeTaskProcessor: newTransferQueueActiveProcessor( shard, workflowCache, @@ -140,6 +143,7 @@ func newTransferQueueProcessor( hostRateLimiter, config.TransferProcessorMaxPollRPS, ), + schedulerRateLimiter, logger, metricProvider, singleProcessor, @@ -247,6 +251,7 @@ func (t *transferQueueProcessorImpl) FailoverNamespace( t.hostRateLimiter, t.config.TransferProcessorFailoverMaxPollRPS, ), + t.schedulerRateLimiter, t.logger, t.metricHandler, ) @@ -406,6 +411,7 @@ func (t *transferQueueProcessorImpl) handleClusterMetadataUpdate( t.hostRateLimiter, t.config.TransferProcessorMaxPollRPS, ), + t.schedulerRateLimiter, t.logger, t.metricHandler, t.matchingClient, diff --git a/service/history/transferQueueProcessorBase.go b/service/history/transferQueueProcessorBase.go index b4e12ff4548..14a9df5b4bf 100644 --- a/service/history/transferQueueProcessorBase.go +++ b/service/history/transferQueueProcessorBase.go @@ -107,14 +107,17 @@ func (t *transferQueueProcessorBase) queueShutdown() error { func newTransferTaskShardScheduler( shard shard.Context, + rateLimiter queues.SchedulerRateLimiter, logger log.Logger, ) queues.Scheduler { config := shard.GetConfig() - return queues.NewFIFOScheduler( - queues.FIFOSchedulerOptions{ - WorkerCount: config.TransferTaskWorkerCount, - QueueSize: config.TransferTaskBatchSize(), + return queues.NewPriorityScheduler( + queues.PrioritySchedulerOptions{ + WorkerCount: config.TransferTaskWorkerCount, + EnableRateLimiter: config.TaskSchedulerEnableRateLimiter, }, + rateLimiter, + shard.GetTimeSource(), logger, ) } diff --git a/service/history/transferQueueStandbyProcessor.go b/service/history/transferQueueStandbyProcessor.go index 260d2c9b9b4..0e4417d0f1a 100644 --- a/service/history/transferQueueStandbyProcessor.go +++ b/service/history/transferQueueStandbyProcessor.go @@ -65,6 +65,7 @@ func newTransferQueueStandbyProcessor( taskAllocator taskAllocator, clientBean client.Bean, rateLimiter quotas.RateLimiter, + schedulerRateLimiter queues.SchedulerRateLimiter, logger log.Logger, metricProvider metrics.MetricsHandler, matchingClient matchingservice.MatchingServiceClient, @@ -145,7 +146,7 @@ func newTransferQueueStandbyProcessor( ) if scheduler == nil { - scheduler = newTransferTaskShardScheduler(shard, logger) + scheduler = newTransferTaskShardScheduler(shard, schedulerRateLimiter, logger) processor.ownedScheduler = scheduler } diff --git a/service/history/visibilityQueueFactory.go b/service/history/visibilityQueueFactory.go index 3e0e16ee70f..92dcbd871e2 100644 --- a/service/history/visibilityQueueFactory.go +++ b/service/history/visibilityQueueFactory.go @@ -67,8 +67,10 @@ func NewVisibilityQueueFactory( WorkerCount: params.Config.VisibilityProcessorSchedulerWorkerCount, ActiveNamespaceWeights: params.Config.VisibilityProcessorSchedulerActiveRoundRobinWeights, StandbyNamespaceWeights: params.Config.VisibilityProcessorSchedulerStandbyRoundRobinWeights, + EnableRateLimiter: params.Config.TaskSchedulerEnableRateLimiter, }, params.NamespaceRegistry, + params.SchedulerRateLimiter, params.TimeSource, params.MetricsHandler.WithTags(metrics.OperationTag(metrics.OperationVisibilityQueueProcessorScope)), params.Logger, @@ -152,5 +154,6 @@ func (f *visibilityQueueFactory) CreateQueue( f.VisibilityMgr, f.MetricsHandler, f.HostRateLimiter, + f.SchedulerRateLimiter, ) } diff --git a/service/history/visibilityQueueProcessor.go b/service/history/visibilityQueueProcessor.go index ac61c4439f8..cbdeb64dd8e 100644 --- a/service/history/visibilityQueueProcessor.go +++ b/service/history/visibilityQueueProcessor.go @@ -80,6 +80,7 @@ func newVisibilityQueueProcessor( visibilityMgr manager.VisibilityManager, metricProvider metrics.MetricsHandler, hostRateLimiter quotas.RateLimiter, + schedulerRateLimiter queues.SchedulerRateLimiter, ) queues.Queue { config := shard.GetConfig() @@ -141,7 +142,7 @@ func newVisibilityQueueProcessor( ) if scheduler == nil { - scheduler = newVisibilityTaskShardScheduler(shard, logger) + scheduler = newVisibilityTaskShardScheduler(shard, schedulerRateLimiter, logger) retProcessor.ownedScheduler = scheduler } @@ -357,14 +358,17 @@ func (t *visibilityQueueProcessorImpl) queueShutdown() error { func newVisibilityTaskShardScheduler( shard shard.Context, + rateLimiter queues.SchedulerRateLimiter, logger log.Logger, ) queues.Scheduler { config := shard.GetConfig() - return queues.NewFIFOScheduler( - queues.FIFOSchedulerOptions{ - WorkerCount: config.VisibilityTaskWorkerCount, - QueueSize: config.VisibilityTaskBatchSize(), + return queues.NewPriorityScheduler( + queues.PrioritySchedulerOptions{ + WorkerCount: config.VisibilityTaskWorkerCount, + EnableRateLimiter: config.TaskSchedulerEnableRateLimiter, }, + rateLimiter, + shard.GetTimeSource(), logger, ) } diff --git a/service/matching/configs/quotas.go b/service/matching/configs/quotas.go index 1741766d813..0bc4d85960e 100644 --- a/service/matching/configs/quotas.go +++ b/service/matching/configs/quotas.go @@ -51,9 +51,9 @@ var ( func NewPriorityRateLimiter( rateFn quotas.RateFn, ) quotas.RequestRateLimiter { - rateLimiters := make(map[int]quotas.RateLimiter) + rateLimiters := make(map[int]quotas.RequestRateLimiter) for priority := range APIPrioritiesOrdered { - rateLimiters[priority] = quotas.NewDefaultIncomingRateLimiter(rateFn) + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultIncomingRateLimiter(rateFn)) } return quotas.NewPriorityRateLimiter(func(req quotas.Request) int { if priority, ok := APIToPriority[req.API]; ok {