Skip to content

Commit

Permalink
use two ints instead of float
Browse files Browse the repository at this point in the history
  • Loading branch information
dnr committed Jan 26, 2023
1 parent dbc50cf commit 3e11552
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 21 deletions.
12 changes: 7 additions & 5 deletions service/worker/common/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,12 @@ type (
}

RegistrationDetails struct {
// WorkerFactor is the proportion of workers for this namespace that this worker
// represents. E.g. if it's the only worker for this namespace, it'll be 1; if it's
// one of two, it'll be 1/2; if three workers were requested and two landed on this
// node, it'll be 2/3.
WorkerFactor float64
// TotalWorkers is the number of requested per-namespace workers for this namespace.
TotalWorkers int
// Multiplicity is the number of those workers that this particular sdkworker.Worker
// represents. It may be more than one if the requested number is more than the total
// number of worker nodes or if consistent hashing decided to place more than one on
// the same node.
Multiplicity int
}
)
3 changes: 2 additions & 1 deletion service/worker/pernamespaceworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,8 @@ func (w *perNamespaceWorker) startWorker(
// this should not block because the client already has server capabilities
worker := w.wm.sdkClientFactory.NewWorker(client, primitives.PerNSWorkerTaskQueue, sdkoptions)
details := workercommon.RegistrationDetails{
WorkerFactor: float64(multiplicity) / float64(totalWorkers),
TotalWorkers: totalWorkers,
Multiplicity: multiplicity,
}
for _, cmp := range components {
cmp.Register(worker, ns, details)
Expand Down
24 changes: 12 additions & 12 deletions service/worker/pernamespaceworker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (s *perNsWorkerManagerSuite) TestEnabled() {
s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1)
wkr1 := mocksdk.NewMockWorker(s.controller)
s.cfactory.EXPECT().NewWorker(matchStrict{cli1}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1)
s.cmp1.EXPECT().Register(wkr1, ns, workercommon.RegistrationDetails{WorkerFactor: 1.0})
s.cmp1.EXPECT().Register(wkr1, ns, workercommon.RegistrationDetails{TotalWorkers: 1, Multiplicity: 1})
wkr1.EXPECT().Start()

s.manager.namespaceCallback(ns, false)
Expand Down Expand Up @@ -213,7 +213,7 @@ func (s *perNsWorkerManagerSuite) TestMultiplicity() {
s.Equal(4, options.MaxConcurrentWorkflowTaskPollers)
s.Equal(4, options.MaxConcurrentActivityTaskPollers)
}).Return(wkr1)
s.cmp1.EXPECT().Register(wkr1, ns, workercommon.RegistrationDetails{WorkerFactor: 0.6666666666666666})
s.cmp1.EXPECT().Register(wkr1, ns, workercommon.RegistrationDetails{TotalWorkers: 3, Multiplicity: 2})
wkr1.EXPECT().Start()

s.manager.namespaceCallback(ns, false)
Expand Down Expand Up @@ -258,7 +258,7 @@ func (s *perNsWorkerManagerSuite) TestOptions() {
s.Equal(0.0, options.WorkerLocalActivitiesPerSecond)
s.Equal(0*time.Millisecond, options.StickyScheduleToStartTimeout)
}).Return(wkr)
s.cmp1.EXPECT().Register(wkr, gomock.Any()).AnyTimes()
s.cmp1.EXPECT().Register(wkr, gomock.Any(), gomock.Any()).AnyTimes()
wkr.EXPECT().Start().AnyTimes()

s.manager.namespaceCallback(ns1, false)
Expand Down Expand Up @@ -302,9 +302,9 @@ func (s *perNsWorkerManagerSuite) TestTwoNamespacesTwoComponents() {
s.cfactory.EXPECT().NewWorker(matchStrict{cli1}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1)
s.cfactory.EXPECT().NewWorker(matchStrict{cli2}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr2)

s.cmp1.EXPECT().Register(wkr1, ns1, workercommon.RegistrationDetails{WorkerFactor: 1.0})
s.cmp1.EXPECT().Register(wkr2, ns2, workercommon.RegistrationDetails{WorkerFactor: 1.0})
s.cmp2.EXPECT().Register(wkr1, ns1, workercommon.RegistrationDetails{WorkerFactor: 1.0})
s.cmp1.EXPECT().Register(wkr1, ns1, workercommon.RegistrationDetails{TotalWorkers: 1, Multiplicity: 1})
s.cmp1.EXPECT().Register(wkr2, ns2, workercommon.RegistrationDetails{TotalWorkers: 2, Multiplicity: 2})
s.cmp2.EXPECT().Register(wkr1, ns1, workercommon.RegistrationDetails{TotalWorkers: 1, Multiplicity: 1})

wkr1.EXPECT().Start()
wkr2.EXPECT().Start()
Expand Down Expand Up @@ -334,7 +334,7 @@ func (s *perNsWorkerManagerSuite) TestDeleteNs() {
s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1)
wkr1 := mocksdk.NewMockWorker(s.controller)
s.cfactory.EXPECT().NewWorker(matchStrict{cli1}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1)
s.cmp1.EXPECT().Register(wkr1, ns, workercommon.RegistrationDetails{WorkerFactor: 1.0})
s.cmp1.EXPECT().Register(wkr1, ns, workercommon.RegistrationDetails{TotalWorkers: 1, Multiplicity: 1})
wkr1.EXPECT().Start()

s.manager.namespaceCallback(ns, false)
Expand All @@ -354,7 +354,7 @@ func (s *perNsWorkerManagerSuite) TestDeleteNs() {
s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli2)
wkr2 := mocksdk.NewMockWorker(s.controller)
s.cfactory.EXPECT().NewWorker(matchStrict{cli2}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr2)
s.cmp1.EXPECT().Register(wkr2, ns, workercommon.RegistrationDetails{WorkerFactor: 1.0})
s.cmp1.EXPECT().Register(wkr2, ns, workercommon.RegistrationDetails{TotalWorkers: 1, Multiplicity: 1})
wkr2.EXPECT().Start()

s.manager.namespaceCallback(nsRestored, false)
Expand Down Expand Up @@ -389,7 +389,7 @@ func (s *perNsWorkerManagerSuite) TestMembershipChanged() {
s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1)
wkr1 := mocksdk.NewMockWorker(s.controller)
s.cfactory.EXPECT().NewWorker(matchStrict{cli1}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1)
s.cmp1.EXPECT().Register(wkr1, ns, workercommon.RegistrationDetails{WorkerFactor: 1.0})
s.cmp1.EXPECT().Register(wkr1, ns, workercommon.RegistrationDetails{TotalWorkers: 1, Multiplicity: 1})
wkr1.EXPECT().Start()

s.manager.membershipChangedCh <- nil
Expand Down Expand Up @@ -422,7 +422,7 @@ func (s *perNsWorkerManagerSuite) TestServiceResolverError() {
s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1)
wkr1 := mocksdk.NewMockWorker(s.controller)
s.cfactory.EXPECT().NewWorker(matchStrict{cli1}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1)
s.cmp1.EXPECT().Register(wkr1, ns, workercommon.RegistrationDetails{WorkerFactor: 1.0})
s.cmp1.EXPECT().Register(wkr1, ns, workercommon.RegistrationDetails{TotalWorkers: 1, Multiplicity: 1})
wkr1.EXPECT().Start()

s.manager.namespaceCallback(ns, false)
Expand All @@ -449,7 +449,7 @@ func (s *perNsWorkerManagerSuite) TestStartWorkerError() {
s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1)
wkr1 := mocksdk.NewMockWorker(s.controller)
s.cfactory.EXPECT().NewWorker(matchStrict{cli1}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1)
s.cmp1.EXPECT().Register(wkr1, ns, workercommon.RegistrationDetails{WorkerFactor: 1.0})
s.cmp1.EXPECT().Register(wkr1, ns, workercommon.RegistrationDetails{TotalWorkers: 1, Multiplicity: 1})

// first try fails to start
wkr1.EXPECT().Start().Return(errors.New("start worker error"))
Expand All @@ -460,7 +460,7 @@ func (s *perNsWorkerManagerSuite) TestStartWorkerError() {
s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli2)
wkr2 := mocksdk.NewMockWorker(s.controller)
s.cfactory.EXPECT().NewWorker(matchStrict{cli2}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr2)
s.cmp1.EXPECT().Register(wkr2, ns, workercommon.RegistrationDetails{WorkerFactor: 1.0})
s.cmp1.EXPECT().Register(wkr2, ns, workercommon.RegistrationDetails{TotalWorkers: 1, Multiplicity: 1})
wkr2.EXPECT().Start()

s.manager.namespaceCallback(ns, false)
Expand Down
6 changes: 3 additions & 3 deletions service/worker/scheduler/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ func (s *workerComponent) DedicatedWorkerOptions(ns *namespace.Namespace) *worke

func (s *workerComponent) Register(worker sdkworker.Worker, ns *namespace.Namespace, details workercommon.RegistrationDetails) {
worker.RegisterWorkflowWithOptions(SchedulerWorkflow, workflow.RegisterOptions{Name: WorkflowType})
worker.RegisterActivity(s.activities(ns.Name(), ns.ID(), details.WorkerFactor))
worker.RegisterActivity(s.activities(ns.Name(), ns.ID(), details))
}

func (s *workerComponent) activities(name namespace.Name, id namespace.ID, workerFactor float64) *activities {
func (s *workerComponent) activities(name namespace.Name, id namespace.ID, details workercommon.RegistrationDetails) *activities {
localRPS := func() float64 {
return s.globalNSStartWorkflowRPS(name.String()) * workerFactor
return float64(details.Multiplicity) * s.globalNSStartWorkflowRPS(name.String()) / float64(details.TotalWorkers)
}
return &activities{
activityDeps: s.activityDeps,
Expand Down

0 comments on commit 3e11552

Please sign in to comment.