Skip to content

Commit

Permalink
Add explicit per-namespace rate limit to schedules (#3838)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnr authored Jan 26, 2023
1 parent 5c23836 commit 7e89af4
Show file tree
Hide file tree
Showing 12 changed files with 316 additions and 123 deletions.
198 changes: 123 additions & 75 deletions api/schedule/v1/message.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,4 +720,6 @@ const (
// WorkerStickyCacheSize controls the sticky cache size for SDK workers on worker nodes
// (shared between all workers in the process, cannot be changed after startup)
WorkerStickyCacheSize = "worker.stickyCacheSize"
// SchedulerNamespaceStartWorkflowRPS is the per-namespace limit for starting workflows by schedules
SchedulerNamespaceStartWorkflowRPS = "worker.schedulerNamespaceStartWorkflowRPS"
)
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ message StartWorkflowRequest {
reserved 3;
temporal.api.common.v1.Payloads last_completion_result = 4;
temporal.api.failure.v1.Failure continued_failure = 5;
bool completed_rate_limit_sleep = 6;
}

message StartWorkflowResponse {
Expand Down
2 changes: 1 addition & 1 deletion service/worker/batcher/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (s *workerComponent) DedicatedWorkerOptions(ns *namespace.Namespace) *worke
}
}

func (s *workerComponent) Register(worker sdkworker.Worker, ns *namespace.Namespace) {
func (s *workerComponent) Register(worker sdkworker.Worker, ns *namespace.Namespace, _ workercommon.RegistrationDetails) {
worker.RegisterWorkflowWithOptions(BatchWorkflow, workflow.RegisterOptions{Name: BatchWFTypeName})
worker.RegisterActivity(s.activities(ns.Name(), ns.ID()))
}
Expand Down
12 changes: 11 additions & 1 deletion service/worker/common/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type (
PerNSWorkerComponent interface {
// Register registers Workflow and Activity types provided by this worker component.
// The namespace that this worker is running in is also provided.
Register(sdkworker.Worker, *namespace.Namespace)
Register(sdkworker.Worker, *namespace.Namespace, RegistrationDetails)
// DedicatedWorkerOptions returns a PerNSDedicatedWorkerOptions for this worker component.
DedicatedWorkerOptions(*namespace.Namespace) *PerNSDedicatedWorkerOptions
}
Expand All @@ -63,4 +63,14 @@ type (
// Set this to false to disable this worker for this namespace
Enabled bool
}

RegistrationDetails struct {
// TotalWorkers is the number of requested per-namespace workers for this namespace.
TotalWorkers int
// Multiplicity is the number of those workers that this particular sdkworker.Worker
// represents. It may be more than one if the requested number is more than the total
// number of worker nodes or if consistent hashing decided to place more than one on
// the same node.
Multiplicity int
}
)
8 changes: 4 additions & 4 deletions service/worker/common/interface_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 14 additions & 9 deletions service/worker/pernamespaceworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,24 +246,24 @@ func (wm *perNamespaceWorkerManager) removeWorker(ns *namespace.Namespace) {
delete(wm.workers, ns.ID())
}

func (wm *perNamespaceWorkerManager) getWorkerMultiplicity(ns *namespace.Namespace) (int, error) {
workerCount := wm.config.PerNamespaceWorkerCount(ns.Name().String())
// This can result in fewer than the intended number of workers if numWorkers > 1, because
func (wm *perNamespaceWorkerManager) getWorkerMultiplicity(ns *namespace.Namespace) (int, int, error) {
totalWorkers := wm.config.PerNamespaceWorkerCount(ns.Name().String())
// This can result in fewer than the intended number of workers if totalWorkers > 1, because
// multiple lookups might land on the same node. To compensate, we increase the number of
// pollers in that case, but it would be better to try to spread them across our nodes.
// TODO: implement this properly using LookupN in serviceResolver
multiplicity := 0
for i := 0; i < workerCount; i++ {
for i := 0; i < totalWorkers; i++ {
key := fmt.Sprintf("%s/%d", ns.ID().String(), i)
target, err := wm.serviceResolver.Lookup(key)
if err != nil {
return 0, err
return 0, 0, err
}
if target.Identity() == wm.self.Identity() {
multiplicity++
}
}
return multiplicity, nil
return multiplicity, totalWorkers, nil
}

func (wm *perNamespaceWorkerManager) getWorkerOptions(ns *namespace.Namespace) sdkWorkerOptions {
Expand Down Expand Up @@ -379,7 +379,7 @@ func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error {
}

// check if we are responsible for this namespace at all
multiplicity, err := w.wm.getWorkerMultiplicity(ns)
multiplicity, totalWorkers, err := w.wm.getWorkerMultiplicity(ns)
if err != nil {
w.logger.Error("Failed to look up hosts", tag.Error(err))
// TODO: add metric also
Expand Down Expand Up @@ -410,7 +410,7 @@ func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error {
// create new one. note that even before startWorker returns, the worker may have started
// and already called the fatal error handler. we need to set w.client+worker+componentSet
// before releasing the lock to keep our state consistent.
client, worker, err := w.startWorker(ns, enabledComponents, multiplicity, dcOptions)
client, worker, err := w.startWorker(ns, enabledComponents, multiplicity, totalWorkers, dcOptions)
if err != nil {
// TODO: add metric also
return err
Expand All @@ -426,6 +426,7 @@ func (w *perNamespaceWorker) startWorker(
ns *namespace.Namespace,
components []workercommon.PerNSWorkerComponent,
multiplicity int,
totalWorkers int,
dcOptions sdkWorkerOptions,
) (sdkclient.Client, sdkworker.Worker, error) {
nsName := ns.Name().String()
Expand Down Expand Up @@ -456,8 +457,12 @@ func (w *perNamespaceWorker) startWorker(

// this should not block because the client already has server capabilities
worker := w.wm.sdkClientFactory.NewWorker(client, primitives.PerNSWorkerTaskQueue, sdkoptions)
details := workercommon.RegistrationDetails{
TotalWorkers: totalWorkers,
Multiplicity: multiplicity,
}
for _, cmp := range components {
cmp.Register(worker, ns)
cmp.Register(worker, ns, details)
}

// this blocks by calling DescribeNamespace a few times (with a 10s timeout)
Expand Down
24 changes: 12 additions & 12 deletions service/worker/pernamespaceworker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (s *perNsWorkerManagerSuite) TestEnabled() {
s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1)
wkr1 := mocksdk.NewMockWorker(s.controller)
s.cfactory.EXPECT().NewWorker(matchStrict{cli1}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1)
s.cmp1.EXPECT().Register(wkr1, ns)
s.cmp1.EXPECT().Register(wkr1, ns, workercommon.RegistrationDetails{TotalWorkers: 1, Multiplicity: 1})
wkr1.EXPECT().Start()

s.manager.namespaceCallback(ns, false)
Expand Down Expand Up @@ -213,7 +213,7 @@ func (s *perNsWorkerManagerSuite) TestMultiplicity() {
s.Equal(4, options.MaxConcurrentWorkflowTaskPollers)
s.Equal(4, options.MaxConcurrentActivityTaskPollers)
}).Return(wkr1)
s.cmp1.EXPECT().Register(wkr1, ns)
s.cmp1.EXPECT().Register(wkr1, ns, workercommon.RegistrationDetails{TotalWorkers: 3, Multiplicity: 2})
wkr1.EXPECT().Start()

s.manager.namespaceCallback(ns, false)
Expand Down Expand Up @@ -258,7 +258,7 @@ func (s *perNsWorkerManagerSuite) TestOptions() {
s.Equal(0.0, options.WorkerLocalActivitiesPerSecond)
s.Equal(0*time.Millisecond, options.StickyScheduleToStartTimeout)
}).Return(wkr)
s.cmp1.EXPECT().Register(wkr, gomock.Any()).AnyTimes()
s.cmp1.EXPECT().Register(wkr, gomock.Any(), gomock.Any()).AnyTimes()
wkr.EXPECT().Start().AnyTimes()

s.manager.namespaceCallback(ns1, false)
Expand Down Expand Up @@ -302,9 +302,9 @@ func (s *perNsWorkerManagerSuite) TestTwoNamespacesTwoComponents() {
s.cfactory.EXPECT().NewWorker(matchStrict{cli1}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1)
s.cfactory.EXPECT().NewWorker(matchStrict{cli2}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr2)

s.cmp1.EXPECT().Register(wkr1, ns1)
s.cmp1.EXPECT().Register(wkr2, ns2)
s.cmp2.EXPECT().Register(wkr1, ns1)
s.cmp1.EXPECT().Register(wkr1, ns1, workercommon.RegistrationDetails{TotalWorkers: 1, Multiplicity: 1})
s.cmp1.EXPECT().Register(wkr2, ns2, workercommon.RegistrationDetails{TotalWorkers: 2, Multiplicity: 2})
s.cmp2.EXPECT().Register(wkr1, ns1, workercommon.RegistrationDetails{TotalWorkers: 1, Multiplicity: 1})

wkr1.EXPECT().Start()
wkr2.EXPECT().Start()
Expand Down Expand Up @@ -334,7 +334,7 @@ func (s *perNsWorkerManagerSuite) TestDeleteNs() {
s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1)
wkr1 := mocksdk.NewMockWorker(s.controller)
s.cfactory.EXPECT().NewWorker(matchStrict{cli1}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1)
s.cmp1.EXPECT().Register(wkr1, ns)
s.cmp1.EXPECT().Register(wkr1, ns, workercommon.RegistrationDetails{TotalWorkers: 1, Multiplicity: 1})
wkr1.EXPECT().Start()

s.manager.namespaceCallback(ns, false)
Expand All @@ -354,7 +354,7 @@ func (s *perNsWorkerManagerSuite) TestDeleteNs() {
s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli2)
wkr2 := mocksdk.NewMockWorker(s.controller)
s.cfactory.EXPECT().NewWorker(matchStrict{cli2}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr2)
s.cmp1.EXPECT().Register(wkr2, ns)
s.cmp1.EXPECT().Register(wkr2, ns, workercommon.RegistrationDetails{TotalWorkers: 1, Multiplicity: 1})
wkr2.EXPECT().Start()

s.manager.namespaceCallback(nsRestored, false)
Expand Down Expand Up @@ -389,7 +389,7 @@ func (s *perNsWorkerManagerSuite) TestMembershipChanged() {
s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1)
wkr1 := mocksdk.NewMockWorker(s.controller)
s.cfactory.EXPECT().NewWorker(matchStrict{cli1}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1)
s.cmp1.EXPECT().Register(wkr1, ns)
s.cmp1.EXPECT().Register(wkr1, ns, workercommon.RegistrationDetails{TotalWorkers: 1, Multiplicity: 1})
wkr1.EXPECT().Start()

s.manager.membershipChangedCh <- nil
Expand Down Expand Up @@ -422,7 +422,7 @@ func (s *perNsWorkerManagerSuite) TestServiceResolverError() {
s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1)
wkr1 := mocksdk.NewMockWorker(s.controller)
s.cfactory.EXPECT().NewWorker(matchStrict{cli1}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1)
s.cmp1.EXPECT().Register(wkr1, ns)
s.cmp1.EXPECT().Register(wkr1, ns, workercommon.RegistrationDetails{TotalWorkers: 1, Multiplicity: 1})
wkr1.EXPECT().Start()

s.manager.namespaceCallback(ns, false)
Expand All @@ -449,7 +449,7 @@ func (s *perNsWorkerManagerSuite) TestStartWorkerError() {
s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1)
wkr1 := mocksdk.NewMockWorker(s.controller)
s.cfactory.EXPECT().NewWorker(matchStrict{cli1}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1)
s.cmp1.EXPECT().Register(wkr1, ns)
s.cmp1.EXPECT().Register(wkr1, ns, workercommon.RegistrationDetails{TotalWorkers: 1, Multiplicity: 1})

// first try fails to start
wkr1.EXPECT().Start().Return(errors.New("start worker error"))
Expand All @@ -460,7 +460,7 @@ func (s *perNsWorkerManagerSuite) TestStartWorkerError() {
s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli2)
wkr2 := mocksdk.NewMockWorker(s.controller)
s.cfactory.EXPECT().NewWorker(matchStrict{cli2}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr2)
s.cmp1.EXPECT().Register(wkr2, ns)
s.cmp1.EXPECT().Register(wkr2, ns, workercommon.RegistrationDetails{TotalWorkers: 1, Multiplicity: 1})
wkr2.EXPECT().Start()

s.manager.namespaceCallback(ns, false)
Expand Down
24 changes: 24 additions & 0 deletions service/worker/scheduler/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,28 +44,52 @@ import (
"go.temporal.io/server/common"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/quotas"
)

type (
activities struct {
activityDeps
namespace namespace.Name
namespaceID namespace.ID
// Rate limiter for start workflow requests. Note that the scope is all schedules in
// this namespace on this worker.
startWorkflowRateLimiter quotas.RateLimiter
}

errFollow string

rateLimitedDetails struct {
Delay time.Duration
}
)

var (
errTryAgain = errors.New("try again")
errWrongChain = errors.New("found running workflow with wrong FirstExecutionRunId")
errNoEvents = errors.New("GetEvents didn't return any events")
errNoAttrs = errors.New("last event did not have correct attrs")
errBlocked = errors.New("rate limiter doesn't allow any progress")
)

func (e errFollow) Error() string { return string(e) }

func (a *activities) StartWorkflow(ctx context.Context, req *schedspb.StartWorkflowRequest) (*schedspb.StartWorkflowResponse, error) {
if !req.CompletedRateLimitSleep {
reservation := a.startWorkflowRateLimiter.Reserve()
if !reservation.OK() {
return nil, translateError(errBlocked, "StartWorkflowExecution")
}
delay := reservation.Delay()
if delay > 1*time.Second {
// for a long sleep, ask the workflow to do it in workflow logic
return nil, temporal.NewNonRetryableApplicationError(
rateLimitedErrorType, rateLimitedErrorType, nil, rateLimitedDetails{Delay: delay})
}
// short sleep can be done in-line
time.Sleep(delay)
}

req.Request.Namespace = a.namespace.String()

request := common.CreateHistoryStartWorkflowRequest(
Expand Down
24 changes: 16 additions & 8 deletions service/worker/scheduler/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/quotas"
workercommon "go.temporal.io/server/service/worker/common"
)

Expand All @@ -46,8 +47,9 @@ const (

type (
workerComponent struct {
activityDeps activityDeps
enabledForNs dynamicconfig.BoolPropertyFnWithNamespaceFilter
activityDeps activityDeps
enabledForNs dynamicconfig.BoolPropertyFnWithNamespaceFilter
globalNSStartWorkflowRPS dynamicconfig.FloatPropertyFnWithNamespaceFilter
}

activityDeps struct {
Expand Down Expand Up @@ -77,6 +79,8 @@ func NewResult(
activityDeps: params,
enabledForNs: dcCollection.GetBoolPropertyFnWithNamespaceFilter(
dynamicconfig.WorkerEnableScheduler, true),
globalNSStartWorkflowRPS: dcCollection.GetFloatPropertyFilteredByNamespace(
dynamicconfig.SchedulerNamespaceStartWorkflowRPS, 30.0),
},
}
}
Expand All @@ -87,15 +91,19 @@ func (s *workerComponent) DedicatedWorkerOptions(ns *namespace.Namespace) *worke
}
}

func (s *workerComponent) Register(worker sdkworker.Worker, ns *namespace.Namespace) {
func (s *workerComponent) Register(worker sdkworker.Worker, ns *namespace.Namespace, details workercommon.RegistrationDetails) {
worker.RegisterWorkflowWithOptions(SchedulerWorkflow, workflow.RegisterOptions{Name: WorkflowType})
worker.RegisterActivity(s.activities(ns.Name(), ns.ID()))
worker.RegisterActivity(s.activities(ns.Name(), ns.ID(), details))
}

func (s *workerComponent) activities(name namespace.Name, id namespace.ID) *activities {
func (s *workerComponent) activities(name namespace.Name, id namespace.ID, details workercommon.RegistrationDetails) *activities {
localRPS := func() float64 {
return float64(details.Multiplicity) * s.globalNSStartWorkflowRPS(name.String()) / float64(details.TotalWorkers)
}
return &activities{
activityDeps: s.activityDeps,
namespace: name,
namespaceID: id,
activityDeps: s.activityDeps,
namespace: name,
namespaceID: id,
startWorkflowRateLimiter: quotas.NewDefaultOutgoingRateLimiter(localRPS),
}
}
Loading

0 comments on commit 7e89af4

Please sign in to comment.