Skip to content

Commit

Permalink
Task scheduler rate limiter (#3606)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Nov 19, 2022
1 parent df160db commit 32212ea
Show file tree
Hide file tree
Showing 36 changed files with 747 additions and 164 deletions.
10 changes: 10 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ const (
// HistoryPersistenceGlobalMaxQPS is the max qps history cluster can query DB
HistoryPersistenceGlobalMaxQPS = "history.persistenceGlobalMaxQPS"
// HistoryPersistenceNamespaceMaxQPS is the max qps each namespace on history host can query DB
// If value less or equal to 0, will fall back to HistoryPersistenceMaxQPS
HistoryPersistenceNamespaceMaxQPS = "history.persistenceNamespaceMaxQPS"
// HistoryEnablePersistencePriorityRateLimiting indicates if priority rate limiting is enabled in history persistence client
HistoryEnablePersistencePriorityRateLimiting = "history.enablePersistencePriorityRateLimiting"
Expand Down Expand Up @@ -372,6 +373,15 @@ const (
// QueueMaxReaderCount is the max number of readers in one multi-cursor queue
QueueMaxReaderCount = "history.queueMaxReaderCount"

// TaskSchedulerEnableRateLimiter indicates if rate limiter should be enabled in task scheduler
TaskSchedulerEnableRateLimiter = "history.taskSchedulerEnableRateLimiter"
// TaskSchedulerMaxQPS is the max qps task schedulers on a host can schedule tasks
// If value less or equal to 0, will fall back to HistoryPersistenceMaxQPS
TaskSchedulerMaxQPS = "history.taskSchedulerMaxQPS"
// TaskSchedulerNamespaceMaxQPS is the max qps task schedulers on a host can schedule tasks for a certain namespace
// If value less or equal to 0, will fall back to HistoryPersistenceNamespaceMaxQPS
TaskSchedulerNamespaceMaxQPS = "history.taskSchedulerNamespaceMaxQPS"

// TimerTaskBatchSize is batch size for timer processor to process tasks
TimerTaskBatchSize = "history.timerTaskBatchSize"
// TimerTaskWorkerCount is number of task workers for timer processor
Expand Down
10 changes: 5 additions & 5 deletions common/persistence/client/quotas.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ func newPriorityRateLimiter(
rateFn quotas.RateFn,
requestPriorityFn quotas.RequestPriorityFn,
) quotas.RequestRateLimiter {
rateLimiters := make(map[int]quotas.RateLimiter)
rateLimiters := make(map[int]quotas.RequestRateLimiter)
for priority := range RequestPrioritiesOrdered {
rateLimiters[priority] = quotas.NewDefaultOutgoingRateLimiter(rateFn)
rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(rateFn))
}

return quotas.NewPriorityRateLimiter(
Expand All @@ -130,10 +130,10 @@ func NewNoopPriorityRateLimiter(

return quotas.NewPriorityRateLimiter(
func(_ quotas.Request) int { return priority },
map[int]quotas.RateLimiter{
priority: quotas.NewDefaultOutgoingRateLimiter(
map[int]quotas.RequestRateLimiter{
priority: quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(
func() float64 { return float64(maxQPS()) },
),
)),
},
)
}
Expand Down
58 changes: 58 additions & 0 deletions common/quotas/noop_request_rate_limiter_impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package quotas

import (
"context"
"time"
)

type (
// NoopRequestRateLimiterImpl is a no-op implementation for RequestRateLimiter
NoopRequestRateLimiterImpl struct{}
)

var NoopRequestRateLimiter RequestRateLimiter = &NoopRequestRateLimiterImpl{}

func (r *NoopRequestRateLimiterImpl) Allow(
_ time.Time,
_ Request,
) bool {
return true
}

func (r *NoopRequestRateLimiterImpl) Reserve(
_ time.Time,
_ Request,
) Reservation {
return NoopReservation
}

func (r *NoopRequestRateLimiterImpl) Wait(
_ context.Context,
_ Request,
) error {
return nil
}
7 changes: 2 additions & 5 deletions common/quotas/noop_reservation_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,12 @@ import (
)

type (
NoopReservationImpl struct {
}
NoopReservationImpl struct{}
)

var _ Reservation = (*NoopReservationImpl)(nil)

func NewNoopReservation() *NoopReservationImpl {
return &NoopReservationImpl{}
}
var NoopReservation Reservation = &NoopReservationImpl{}

// OK returns whether the limiter can provide the requested number of tokens
func (r *NoopReservationImpl) OK() bool {
Expand Down
18 changes: 9 additions & 9 deletions common/quotas/priority_rate_limiter_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ type (
// PriorityRateLimiterImpl is a wrapper around the golang rate limiter
PriorityRateLimiterImpl struct {
requestPriorityFn RequestPriorityFn
priorityToRateLimiters map[int]RateLimiter
priorityToRateLimiters map[int]RequestRateLimiter

// priority value 0 means highest priority
// sorted rate limiter from low priority value to high priority value
priorityToIndex map[int]int
rateLimiters []RateLimiter
rateLimiters []RequestRateLimiter
}
)

Expand All @@ -50,7 +50,7 @@ var _ RequestRateLimiter = (*PriorityRateLimiterImpl)(nil)
// configuration updates
func NewPriorityRateLimiter(
requestPriorityFn RequestPriorityFn,
priorityToRateLimiters map[int]RateLimiter,
priorityToRateLimiters map[int]RequestRateLimiter,
) *PriorityRateLimiterImpl {
priorities := make([]int, 0, len(priorityToRateLimiters))
for priority := range priorityToRateLimiters {
Expand All @@ -60,7 +60,7 @@ func NewPriorityRateLimiter(
return priorities[i] < priorities[j]
})
priorityToIndex := make(map[int]int, len(priorityToRateLimiters))
rateLimiters := make([]RateLimiter, 0, len(priorityToRateLimiters))
rateLimiters := make([]RequestRateLimiter, 0, len(priorityToRateLimiters))
for index, priority := range priorities {
priorityToIndex[priority] = index
rateLimiters = append(rateLimiters, priorityToRateLimiters[priority])
Expand All @@ -81,13 +81,13 @@ func (p *PriorityRateLimiterImpl) Allow(
) bool {
decidingRateLimiter, consumeRateLimiters := p.getRateLimiters(request)

allow := decidingRateLimiter.AllowN(now, request.Token)
allow := decidingRateLimiter.Allow(now, request)
if !allow {
return false
}

for _, limiter := range consumeRateLimiters {
_ = limiter.ReserveN(now, request.Token)
_ = limiter.Reserve(now, request)
}
return allow
}
Expand All @@ -98,14 +98,14 @@ func (p *PriorityRateLimiterImpl) Reserve(
) Reservation {
decidingRateLimiter, consumeRateLimiters := p.getRateLimiters(request)

decidingReservation := decidingRateLimiter.ReserveN(now, request.Token)
decidingReservation := decidingRateLimiter.Reserve(now, request)
if !decidingReservation.OK() {
return decidingReservation
}

otherReservations := make([]Reservation, len(consumeRateLimiters))
for index, limiter := range consumeRateLimiters {
otherReservations[index] = limiter.ReserveN(now, request.Token)
otherReservations[index] = limiter.Reserve(now, request)
}
return NewPriorityReservation(decidingReservation, otherReservations)
}
Expand Down Expand Up @@ -153,7 +153,7 @@ func (p *PriorityRateLimiterImpl) Wait(

func (p *PriorityRateLimiterImpl) getRateLimiters(
request Request,
) (RateLimiter, []RateLimiter) {
) (RequestRateLimiter, []RequestRateLimiter) {
priority := p.requestPriorityFn(request)
if _, ok := p.priorityToRateLimiters[priority]; !ok {
panic("Request to priority & priority to rate limiter does not match")
Expand Down
6 changes: 3 additions & 3 deletions common/quotas/priority_rate_limiter_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ func (s *priorityStageRateLimiterSuite) SetupTest() {
s.highPriorityAPIName: 0,
s.lowPriorityAPIName: 2,
}
priorityToRateLimiters := map[int]RateLimiter{
0: s.highPriorityRateLimiter,
2: s.lowPriorityRateLimiter,
priorityToRateLimiters := map[int]RequestRateLimiter{
0: NewRequestRateLimiterAdapter(s.highPriorityRateLimiter),
2: NewRequestRateLimiterAdapter(s.lowPriorityRateLimiter),
}
s.rateLimiter = NewPriorityRateLimiter(func(req Request) int {
return apiToPriority[req.API]
Expand Down
67 changes: 67 additions & 0 deletions common/quotas/request_rate_limiter_adapter_impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package quotas

import (
"context"
"time"
)

type (
RequestRateLimiterAdapterImpl struct {
rateLimiter RateLimiter
}
)

var _ RequestRateLimiter = (*RequestRateLimiterAdapterImpl)(nil)

func NewRequestRateLimiterAdapter(
rateLimiter RateLimiter,
) RequestRateLimiter {
return &RequestRateLimiterAdapterImpl{
rateLimiter: rateLimiter,
}
}

func (r *RequestRateLimiterAdapterImpl) Allow(
now time.Time,
request Request,
) bool {
return r.rateLimiter.AllowN(now, request.Token)
}

func (r *RequestRateLimiterAdapterImpl) Reserve(
now time.Time,
request Request,
) Reservation {
return r.rateLimiter.ReserveN(now, request.Token)
}

func (r *RequestRateLimiterAdapterImpl) Wait(
ctx context.Context,
request Request,
) error {
return r.rateLimiter.WaitN(ctx, request.Token)
}
2 changes: 1 addition & 1 deletion common/quotas/routing_rate_limiter_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (r *RoutingRateLimiterImpl) Reserve(
) Reservation {
rateLimiter, ok := r.apiToRateLimiter[request.API]
if !ok {
return NewNoopReservation()
return NoopReservation
}
return rateLimiter.Reserve(now, request)
}
Expand Down
19 changes: 15 additions & 4 deletions common/tasks/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ import (
"testing"

"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/quotas"
)

type (
Expand Down Expand Up @@ -59,10 +62,14 @@ func BenchmarkInterleavedWeightedRoundRobinScheduler_Sequential(b *testing.B) {

scheduler := NewInterleavedWeightedRoundRobinScheduler(
InterleavedWeightedRoundRobinSchedulerOptions[*noopTask, int]{
TaskChannelKeyFn: func(nt *noopTask) int { return rand.Intn(4) },
ChannelWeightFn: func(key int) int { return channelKeyToWeight[key] },
TaskChannelKeyFn: func(nt *noopTask) int { return rand.Intn(4) },
ChannelWeightFn: func(key int) int { return channelKeyToWeight[key] },
ChannelQuotaRequestFn: func(key int) quotas.Request { return quotas.NewRequest("", 1, "", "", "") },
EnableRateLimiter: dynamicconfig.GetBoolPropertyFn(true),
},
Scheduler[*noopTask](&noopScheduler{}),
quotas.NoopRequestRateLimiter,
clock.NewRealTimeSource(),
logger,
)
scheduler.Start()
Expand All @@ -85,10 +92,14 @@ func BenchmarkInterleavedWeightedRoundRobinScheduler_Parallel(b *testing.B) {

scheduler := NewInterleavedWeightedRoundRobinScheduler(
InterleavedWeightedRoundRobinSchedulerOptions[*noopTask, int]{
TaskChannelKeyFn: func(nt *noopTask) int { return rand.Intn(4) },
ChannelWeightFn: func(key int) int { return channelKeyToWeight[key] },
TaskChannelKeyFn: func(nt *noopTask) int { return rand.Intn(4) },
ChannelWeightFn: func(key int) int { return channelKeyToWeight[key] },
ChannelQuotaRequestFn: func(key int) quotas.Request { return quotas.NewRequest("", 1, "", "", "") },
EnableRateLimiter: dynamicconfig.GetBoolPropertyFn(true),
},
Scheduler[*noopTask](&noopScheduler{}),
quotas.NoopRequestRateLimiter,
clock.NewRealTimeSource(),
logger,
)
scheduler.Start()
Expand Down
Loading

0 comments on commit 32212ea

Please sign in to comment.