From 278f545c2906f69aef59584893a672cff7c3de12 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Tue, 24 Jan 2023 20:17:11 -0800 Subject: [PATCH] Add dynamic config for sdk worker options (#3806) --- common/dynamicconfig/constants.go | 5 + common/resource/fx.go | 7 +- common/sdk/factory.go | 33 ++++--- common/sdk/factory_mock.go | 35 ++----- service/worker/archiver/client_worker.go | 2 +- service/worker/batcher/batcher.go | 2 +- service/worker/parentclosepolicy/processor.go | 12 +-- service/worker/pernamespaceworker.go | 70 +++++++++++--- service/worker/pernamespaceworker_test.go | 93 ++++++++++++++++--- service/worker/scanner/scanner.go | 5 +- service/worker/scanner/scanner_test.go | 8 +- service/worker/service.go | 9 +- service/worker/worker.go | 4 +- tests/onebox.go | 3 +- 14 files changed, 185 insertions(+), 103 deletions(-) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index dc4af16cbd1..c054919a2c7 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -713,6 +713,11 @@ const ( WorkerParentCloseMaxConcurrentWorkflowTaskPollers = "worker.ParentCloseMaxConcurrentWorkflowTaskPollers" // WorkerPerNamespaceWorkerCount controls number of per-ns (scheduler, batcher, etc.) workers to run per namespace WorkerPerNamespaceWorkerCount = "worker.perNamespaceWorkerCount" + // WorkerPerNamespaceWorkerOptions are SDK worker options for per-namespace worker + WorkerPerNamespaceWorkerOptions = "worker.perNamespaceWorkerOptions" // WorkerEnableScheduler controls whether to start the worker for scheduled workflows WorkerEnableScheduler = "worker.enableScheduler" + // WorkerStickyCacheSize controls the sticky cache size for SDK workers on worker nodes + // (shared between all workers in the process, cannot be changed after startup) + WorkerStickyCacheSize = "worker.stickyCacheSize" ) diff --git a/common/resource/fx.go b/common/resource/fx.go index f2b88928a4e..ea4672495d7 100644 --- a/common/resource/fx.go +++ b/common/resource/fx.go @@ -132,7 +132,6 @@ var DefaultOptions = fx.Options( fx.Provide(ArchiverProviderProvider), fx.Provide(ThrottledLoggerProvider), fx.Provide(SdkClientFactoryProvider), - fx.Provide(SdkWorkerFactoryProvider), fx.Provide(DCRedirectionPolicyProvider), ) @@ -394,6 +393,7 @@ func SdkClientFactoryProvider( metricsHandler metrics.Handler, logger log.SnTaggedLogger, resolver membership.GRPCResolver, + dc *dynamicconfig.Collection, ) (sdk.ClientFactory, error) { frontendURL, frontendTLSConfig, err := getFrontendConnectionDetails(cfg, tlsConfigProvider, resolver) if err != nil { @@ -404,13 +404,10 @@ func SdkClientFactoryProvider( frontendTLSConfig, metricsHandler, logger, + dc.GetIntProperty(dynamicconfig.WorkerStickyCacheSize, 0), ), nil } -func SdkWorkerFactoryProvider() sdk.WorkerFactory { - return sdk.NewWorkerFactory() -} - func DCRedirectionPolicyProvider(cfg *config.Config) config.DCRedirectionPolicy { return cfg.DCRedirectionPolicy } diff --git a/common/sdk/factory.go b/common/sdk/factory.go index 05cb13d2d5d..a4b41f5acc0 100644 --- a/common/sdk/factory.go +++ b/common/sdk/factory.go @@ -36,6 +36,7 @@ import ( "go.temporal.io/server/common" "go.temporal.io/server/common/backoff" + "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" @@ -48,10 +49,7 @@ type ( // MetricsHandler, or Logger (they will be overwritten) NewClient(options sdkclient.Options) sdkclient.Client GetSystemClient() sdkclient.Client - } - - WorkerFactory interface { - New(client sdkclient.Client, taskQueue string, options sdkworker.Options) sdkworker.Worker + NewWorker(client sdkclient.Client, taskQueue string, options sdkworker.Options) sdkworker.Worker } clientFactory struct { @@ -61,15 +59,13 @@ type ( logger log.Logger sdklogger sdklog.Logger systemSdkClient sdkclient.Client + stickyCacheSize dynamicconfig.IntPropertyFn once sync.Once } - - workerFactory struct{} ) var ( _ ClientFactory = (*clientFactory)(nil) - _ WorkerFactory = (*workerFactory)(nil) ) func NewClientFactory( @@ -77,13 +73,15 @@ func NewClientFactory( tlsConfig *tls.Config, metricsHandler metrics.Handler, logger log.Logger, + stickyCacheSize dynamicconfig.IntPropertyFn, ) *clientFactory { return &clientFactory{ - hostPort: hostPort, - tlsConfig: tlsConfig, - metricsHandler: NewMetricsHandler(metricsHandler), - logger: logger, - sdklogger: log.NewSdkLogger(logger), + hostPort: hostPort, + tlsConfig: tlsConfig, + metricsHandler: NewMetricsHandler(metricsHandler), + logger: logger, + sdklogger: log.NewSdkLogger(logger), + stickyCacheSize: stickyCacheSize, } } @@ -122,15 +120,16 @@ func (f *clientFactory) GetSystemClient() sdkclient.Client { if err != nil { f.logger.Fatal("error creating sdk client", tag.Error(err)) } + + if size := f.stickyCacheSize(); size > 0 { + f.logger.Info("setting sticky workflow cache size", tag.NewInt("size", size)) + sdkworker.SetStickyWorkflowCacheSize(size) + } }) return f.systemSdkClient } -func NewWorkerFactory() *workerFactory { - return &workerFactory{} -} - -func (f *workerFactory) New( +func (f *clientFactory) NewWorker( client sdkclient.Client, taskQueue string, options sdkworker.Options, diff --git a/common/sdk/factory_mock.go b/common/sdk/factory_mock.go index 271f74bf905..d33677e2f7a 100644 --- a/common/sdk/factory_mock.go +++ b/common/sdk/factory_mock.go @@ -87,39 +87,16 @@ func (mr *MockClientFactoryMockRecorder) NewClient(options interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewClient", reflect.TypeOf((*MockClientFactory)(nil).NewClient), options) } -// MockWorkerFactory is a mock of WorkerFactory interface. -type MockWorkerFactory struct { - ctrl *gomock.Controller - recorder *MockWorkerFactoryMockRecorder -} - -// MockWorkerFactoryMockRecorder is the mock recorder for MockWorkerFactory. -type MockWorkerFactoryMockRecorder struct { - mock *MockWorkerFactory -} - -// NewMockWorkerFactory creates a new mock instance. -func NewMockWorkerFactory(ctrl *gomock.Controller) *MockWorkerFactory { - mock := &MockWorkerFactory{ctrl: ctrl} - mock.recorder = &MockWorkerFactoryMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockWorkerFactory) EXPECT() *MockWorkerFactoryMockRecorder { - return m.recorder -} - -// New mocks base method. -func (m *MockWorkerFactory) New(client client.Client, taskQueue string, options worker.Options) worker.Worker { +// NewWorker mocks base method. +func (m *MockClientFactory) NewWorker(client client.Client, taskQueue string, options worker.Options) worker.Worker { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "New", client, taskQueue, options) + ret := m.ctrl.Call(m, "NewWorker", client, taskQueue, options) ret0, _ := ret[0].(worker.Worker) return ret0 } -// New indicates an expected call of New. -func (mr *MockWorkerFactoryMockRecorder) New(client, taskQueue, options interface{}) *gomock.Call { +// NewWorker indicates an expected call of NewWorker. +func (mr *MockClientFactoryMockRecorder) NewWorker(client, taskQueue, options interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "New", reflect.TypeOf((*MockWorkerFactory)(nil).New), client, taskQueue, options) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewWorker", reflect.TypeOf((*MockClientFactory)(nil).NewWorker), client, taskQueue, options) } diff --git a/service/worker/archiver/client_worker.go b/service/worker/archiver/client_worker.go index ad78c3ee580..5ac020496f8 100644 --- a/service/worker/archiver/client_worker.go +++ b/service/worker/archiver/client_worker.go @@ -120,7 +120,7 @@ func NewClientWorker(container *BootstrapContainer) ClientWorker { BackgroundActivityContext: actCtx, } clientWorker := &clientWorker{ - worker: worker.New(sdkClient, workflowTaskQueue, wo), + worker: container.SdkClientFactory.NewWorker(sdkClient, workflowTaskQueue, wo), namespaceRegistry: container.NamespaceCache, } diff --git a/service/worker/batcher/batcher.go b/service/worker/batcher/batcher.go index eb0185ea0d7..cb28273e2ee 100644 --- a/service/worker/batcher/batcher.go +++ b/service/worker/batcher/batcher.go @@ -81,7 +81,7 @@ func (s *Batcher) Start() error { BackgroundActivityContext: ctx, } sdkClient := s.sdkClientFactory.GetSystemClient() - batchWorker := worker.New(sdkClient, taskQueueName, workerOpts) + batchWorker := s.sdkClientFactory.NewWorker(sdkClient, taskQueueName, workerOpts) batchWorker.RegisterWorkflowWithOptions(BatchWorkflow, workflow.RegisterOptions{Name: BatchWFTypeName}) batchWorker.RegisterActivity(&activities{ activityDeps: activityDeps{ diff --git a/service/worker/parentclosepolicy/processor.go b/service/worker/parentclosepolicy/processor.go index 13819681009..7af33c54290 100644 --- a/service/worker/parentclosepolicy/processor.go +++ b/service/worker/parentclosepolicy/processor.go @@ -50,10 +50,8 @@ type ( NumParentClosePolicySystemWorkflows dynamicconfig.IntPropertyFn } - // BootstrapParams contains the set of params needed to bootstrap - // the sub-system + // BootstrapParams contains the set of params needed to bootstrap the sub-system BootstrapParams struct { - // SdkSystemClient is an instance of temporal service client SdkClientFactory sdk.ClientFactory // MetricsHandler is an instance of metrics object for emitting stats MetricsHandler metrics.Handler @@ -69,7 +67,7 @@ type ( // Processor is the background sub-system that execute workflow for ParentClosePolicy Processor struct { - svcClientFactory sdk.ClientFactory + sdkClientFactory sdk.ClientFactory clientBean client.Bean metricsHandler metrics.Handler cfg Config @@ -81,7 +79,7 @@ type ( // New returns a new instance as daemon func New(params *BootstrapParams) *Processor { return &Processor{ - svcClientFactory: params.SdkClientFactory, + sdkClientFactory: params.SdkClientFactory, metricsHandler: params.MetricsHandler.WithTags(metrics.OperationTag(metrics.ParentClosePolicyProcessorScope)), cfg: params.Config, logger: log.With(params.Logger, tag.ComponentBatcher), @@ -92,8 +90,8 @@ func New(params *BootstrapParams) *Processor { // Start starts the scanner func (s *Processor) Start() error { - svcClient := s.svcClientFactory.GetSystemClient() - processorWorker := worker.New(svcClient, processorTaskQueueName, getWorkerOptions(s)) + svcClient := s.sdkClientFactory.GetSystemClient() + processorWorker := s.sdkClientFactory.NewWorker(svcClient, processorTaskQueueName, getWorkerOptions(s)) processorWorker.RegisterWorkflowWithOptions(ProcessorWorkflow, workflow.RegisterOptions{Name: processorWFTypeName}) processorWorker.RegisterActivityWithOptions(ProcessorActivity, activity.RegisterOptions{Name: processorActivityName}) diff --git a/service/worker/pernamespaceworker.go b/service/worker/pernamespaceworker.go index a8a9611cb5d..c6efd8b5c0e 100644 --- a/service/worker/pernamespaceworker.go +++ b/service/worker/pernamespaceworker.go @@ -26,6 +26,7 @@ package worker import ( "context" + "encoding/json" "errors" "fmt" "os" @@ -49,8 +50,10 @@ import ( "go.temporal.io/server/common/membership" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/primitives" + "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/common/resource" "go.temporal.io/server/common/sdk" + "go.temporal.io/server/common/util" workercommon "go.temporal.io/server/service/worker/common" ) @@ -63,7 +66,6 @@ type ( fx.In Logger log.Logger SdkClientFactory sdk.ClientFactory - SdkWorkerFactory sdk.WorkerFactory NamespaceRegistry namespace.Registry HostName resource.HostName Config *Config @@ -77,7 +79,6 @@ type ( // from init params or Start logger log.Logger sdkClientFactory sdk.ClientFactory - sdkWorkerFactory sdk.WorkerFactory namespaceRegistry namespace.Registry self *membership.HostInfo hostName resource.HostName @@ -105,6 +106,19 @@ type ( client sdkclient.Client worker sdkworker.Worker } + + sdkWorkerOptions struct { + // Copy of relevant fields from sdkworker.Options + MaxConcurrentActivityExecutionSize int + WorkerActivitiesPerSecond float64 + MaxConcurrentLocalActivityExecutionSize int + WorkerLocalActivitiesPerSecond float64 + MaxConcurrentActivityTaskPollers int + MaxConcurrentWorkflowTaskExecutionSize int + MaxConcurrentWorkflowTaskPollers int + StickyScheduleToStartTimeout string // parse into time.Duration + StickyScheduleToStartTimeoutDuration time.Duration + } ) var ( @@ -115,7 +129,6 @@ func NewPerNamespaceWorkerManager(params perNamespaceWorkerManagerInitParams) *p return &perNamespaceWorkerManager{ logger: log.With(params.Logger, tag.ComponentPerNSWorkerManager), sdkClientFactory: params.SdkClientFactory, - sdkWorkerFactory: params.SdkWorkerFactory, namespaceRegistry: params.NamespaceRegistry, hostName: params.HostName, config: params.Config, @@ -249,6 +262,22 @@ func (wm *perNamespaceWorkerManager) getWorkerMultiplicity(ns *namespace.Namespa return multiplicity, nil } +func (wm *perNamespaceWorkerManager) getWorkerOptions(ns *namespace.Namespace) sdkWorkerOptions { + optionsMap := wm.config.PerNamespaceWorkerOptions(ns.Name().String()) + var options sdkWorkerOptions + b, err := json.Marshal(optionsMap) + if err != nil { + return options + } + _ = json.Unmarshal(b, &options) // ignore errors, just use the zero value anyway + if len(options.StickyScheduleToStartTimeout) > 0 { + if options.StickyScheduleToStartTimeoutDuration, err = timestamp.ParseDuration(options.StickyScheduleToStartTimeout); err != nil { + wm.logger.Warn("invalid StickyScheduleToStartTimeout", tag.Error(err)) + } + } + return options +} + // called on namespace state change callback func (w *perNamespaceWorker) refreshWithNewNamespace(ns *namespace.Namespace, deleted bool) { w.lock.Lock() @@ -357,7 +386,11 @@ func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error { return errNoWorkerNeeded } // ensure this changes if multiplicity changes - componentSet += fmt.Sprintf("%d", multiplicity) + componentSet += fmt.Sprintf(",%d", multiplicity) + + // get sdk worker options + dcOptions := w.wm.getWorkerOptions(ns) + componentSet += fmt.Sprintf(",%+v", dcOptions) // we do need a worker, but maybe we have one already w.lock.Lock() @@ -373,7 +406,7 @@ func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error { // create new one. note that even before startWorker returns, the worker may have started // and already called the fatal error handler. we need to set w.client+worker+componentSet // before releasing the lock to keep our state consistent. - client, worker, err := w.startWorker(ns, enabledComponents, multiplicity) + client, worker, err := w.startWorker(ns, enabledComponents, multiplicity, dcOptions) if err != nil { // TODO: add metric also return err @@ -389,6 +422,7 @@ func (w *perNamespaceWorker) startWorker( ns *namespace.Namespace, components []workercommon.PerNSWorkerComponent, multiplicity int, + dcOptions sdkWorkerOptions, ) (sdkclient.Client, sdkworker.Worker, error) { nsName := ns.Name().String() // this should not block because it uses an existing grpc connection @@ -398,28 +432,38 @@ func (w *perNamespaceWorker) startWorker( }) var sdkoptions sdkworker.Options + + // copy from dynamic config + sdkoptions.MaxConcurrentActivityExecutionSize = dcOptions.MaxConcurrentActivityExecutionSize + sdkoptions.WorkerActivitiesPerSecond = dcOptions.WorkerActivitiesPerSecond + sdkoptions.MaxConcurrentLocalActivityExecutionSize = dcOptions.MaxConcurrentLocalActivityExecutionSize + sdkoptions.WorkerLocalActivitiesPerSecond = dcOptions.WorkerLocalActivitiesPerSecond + sdkoptions.MaxConcurrentActivityTaskPollers = util.Max(2, dcOptions.MaxConcurrentActivityTaskPollers) + sdkoptions.MaxConcurrentWorkflowTaskExecutionSize = dcOptions.MaxConcurrentWorkflowTaskExecutionSize + sdkoptions.MaxConcurrentWorkflowTaskPollers = util.Max(2, dcOptions.MaxConcurrentWorkflowTaskPollers) + sdkoptions.StickyScheduleToStartTimeout = dcOptions.StickyScheduleToStartTimeoutDuration + sdkoptions.BackgroundActivityContext = headers.SetCallerInfo(context.Background(), headers.NewBackgroundCallerInfo(ns.Name().String())) sdkoptions.Identity = fmt.Sprintf("server-worker@%d@%s@%s", os.Getpid(), w.wm.hostName, nsName) - // sdk default is 2, we increase it if we're supposed to run with more multiplicity. - // other defaults are already large enough. - sdkoptions.MaxConcurrentWorkflowTaskPollers = 2 * multiplicity - sdkoptions.MaxConcurrentActivityTaskPollers = 2 * multiplicity + // increase these if we're supposed to run with more multiplicity + sdkoptions.MaxConcurrentWorkflowTaskPollers *= multiplicity + sdkoptions.MaxConcurrentActivityTaskPollers *= multiplicity sdkoptions.OnFatalError = w.onFatalError // this should not block because the client already has server capabilities - sdkworker := w.wm.sdkWorkerFactory.New(client, primitives.PerNSWorkerTaskQueue, sdkoptions) + worker := w.wm.sdkClientFactory.NewWorker(client, primitives.PerNSWorkerTaskQueue, sdkoptions) for _, cmp := range components { - cmp.Register(sdkworker, ns) + cmp.Register(worker, ns) } // this blocks by calling DescribeNamespace a few times (with a 10s timeout) - err := sdkworker.Start() + err := worker.Start() if err != nil { client.Close() return nil, nil, err } - return client, sdkworker, nil + return client, worker, nil } func (w *perNamespaceWorker) onFatalError(err error) { diff --git a/service/worker/pernamespaceworker_test.go b/service/worker/pernamespaceworker_test.go index 20be02c9a13..31b104fd758 100644 --- a/service/worker/pernamespaceworker_test.go +++ b/service/worker/pernamespaceworker_test.go @@ -54,7 +54,6 @@ type perNsWorkerManagerSuite struct { controller *gomock.Controller logger log.Logger cfactory *sdk.MockClientFactory - wfactory *sdk.MockWorkerFactory registry *namespace.MockRegistry hostInfo *membership.HostInfo serviceResolver *membership.MockServiceResolver @@ -74,7 +73,6 @@ func (s *perNsWorkerManagerSuite) SetupTest() { s.logger = log.NewTestLogger() s.cfactory = sdk.NewMockClientFactory(s.controller) - s.wfactory = sdk.NewMockWorkerFactory(s.controller) s.registry = namespace.NewMockRegistry(s.controller) s.hostInfo = membership.NewHostInfo("self", nil) s.serviceResolver = membership.NewMockServiceResolver(s.controller) @@ -84,13 +82,27 @@ func (s *perNsWorkerManagerSuite) SetupTest() { s.manager = NewPerNamespaceWorkerManager(perNamespaceWorkerManagerInitParams{ Logger: s.logger, SdkClientFactory: s.cfactory, - SdkWorkerFactory: s.wfactory, NamespaceRegistry: s.registry, HostName: "self", Config: &Config{ PerNamespaceWorkerCount: func(ns string) int { return util.Max(1, map[string]int{"ns1": 1, "ns2": 2, "ns3": 3}[ns]) }, + PerNamespaceWorkerOptions: func(ns string) map[string]any { + switch ns { + case "ns1": + return map[string]any{ + "MaxConcurrentWorkflowTaskPollers": 100, + } + case "ns2": + return map[string]any{ + "WorkerLocalActivitiesPerSecond": 200.0, + "StickyScheduleToStartTimeout": "7.5s", + } + default: + return map[string]any{} + } + }, }, Components: []workercommon.PerNSWorkerComponent{s.cmp1, s.cmp2}, ClusterMetadata: cluster.NewMetadataForTest(cluster.NewTestClusterMetadataConfig(false, true)), @@ -169,7 +181,7 @@ func (s *perNsWorkerManagerSuite) TestEnabled() { cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1) wkr1 := mocksdk.NewMockWorker(s.controller) - s.wfactory.EXPECT().New(cli1, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1) + s.cfactory.EXPECT().NewWorker(matchStrict{cli1}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1) s.cmp1.EXPECT().Register(wkr1, ns) wkr1.EXPECT().Start() @@ -197,7 +209,7 @@ func (s *perNsWorkerManagerSuite) TestMultiplicity() { cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns3")).Return(cli1) wkr1 := mocksdk.NewMockWorker(s.controller) - s.wfactory.EXPECT().New(cli1, primitives.PerNSWorkerTaskQueue, gomock.Any()).Do(func(_, _ any, options sdkworker.Options) { + s.cfactory.EXPECT().NewWorker(matchStrict{cli1}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Do(func(_, _ any, options sdkworker.Options) { s.Equal(4, options.MaxConcurrentWorkflowTaskPollers) s.Equal(4, options.MaxConcurrentActivityTaskPollers) }).Return(wkr1) @@ -211,6 +223,55 @@ func (s *perNsWorkerManagerSuite) TestMultiplicity() { cli1.EXPECT().Close() } +func (s *perNsWorkerManagerSuite) TestOptions() { + ns1 := testns("ns1", enumspb.NAMESPACE_STATE_REGISTERED) + ns2 := testns("ns2", enumspb.NAMESPACE_STATE_REGISTERED) + ns3 := testns("ns3", enumspb.NAMESPACE_STATE_REGISTERED) + + s.cmp1.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&workercommon.PerNSDedicatedWorkerOptions{ + Enabled: true, + }).AnyTimes() + s.cmp2.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&workercommon.PerNSDedicatedWorkerOptions{ + Enabled: false, + }).AnyTimes() + + s.serviceResolver.EXPECT().Lookup(gomock.Any()).Return(membership.NewHostInfo("self", nil), nil).AnyTimes() + cli1 := mocksdk.NewMockClient(s.controller) + s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1) + cli2 := mocksdk.NewMockClient(s.controller) + s.cfactory.EXPECT().NewClient(matchOptions("ns2")).Return(cli2) + cli3 := mocksdk.NewMockClient(s.controller) + s.cfactory.EXPECT().NewClient(matchOptions("ns3")).Return(cli3) + wkr := mocksdk.NewMockWorker(s.controller) + s.cfactory.EXPECT().NewWorker(matchStrict{cli1}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Do(func(_, _ any, options sdkworker.Options) { + s.Equal(100, options.MaxConcurrentWorkflowTaskPollers) + s.Equal(2, options.MaxConcurrentActivityTaskPollers) + s.Equal(0.0, options.WorkerLocalActivitiesPerSecond) + }).Return(wkr) + s.cfactory.EXPECT().NewWorker(matchStrict{cli2}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Do(func(_, _ any, options sdkworker.Options) { + s.Equal(4, options.MaxConcurrentWorkflowTaskPollers) + s.Equal(200.0, options.WorkerLocalActivitiesPerSecond) + s.Equal(7500*time.Millisecond, options.StickyScheduleToStartTimeout) + }).Return(wkr) + s.cfactory.EXPECT().NewWorker(matchStrict{cli3}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Do(func(_, _ any, options sdkworker.Options) { + s.Equal(6, options.MaxConcurrentWorkflowTaskPollers) + s.Equal(0.0, options.WorkerLocalActivitiesPerSecond) + s.Equal(0*time.Millisecond, options.StickyScheduleToStartTimeout) + }).Return(wkr) + s.cmp1.EXPECT().Register(wkr, gomock.Any()).AnyTimes() + wkr.EXPECT().Start().AnyTimes() + + s.manager.namespaceCallback(ns1, false) + s.manager.namespaceCallback(ns2, false) + s.manager.namespaceCallback(ns3, false) + time.Sleep(50 * time.Millisecond) + + wkr.EXPECT().Stop().AnyTimes() + cli1.EXPECT().Close() + cli2.EXPECT().Close() + cli3.EXPECT().Close() +} + func (s *perNsWorkerManagerSuite) TestTwoNamespacesTwoComponents() { ns1 := testns("ns1", enumspb.NAMESPACE_STATE_REGISTERED) ns2 := testns("ns2", enumspb.NAMESPACE_STATE_REGISTERED) @@ -238,8 +299,8 @@ func (s *perNsWorkerManagerSuite) TestTwoNamespacesTwoComponents() { s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1) s.cfactory.EXPECT().NewClient(matchOptions("ns2")).Return(cli2) - s.wfactory.EXPECT().New(cli1, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1) - s.wfactory.EXPECT().New(cli2, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr2) + s.cfactory.EXPECT().NewWorker(matchStrict{cli1}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1) + s.cfactory.EXPECT().NewWorker(matchStrict{cli2}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr2) s.cmp1.EXPECT().Register(wkr1, ns1) s.cmp1.EXPECT().Register(wkr2, ns2) @@ -272,7 +333,7 @@ func (s *perNsWorkerManagerSuite) TestDeleteNs() { cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1) wkr1 := mocksdk.NewMockWorker(s.controller) - s.wfactory.EXPECT().New(cli1, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1) + s.cfactory.EXPECT().NewWorker(matchStrict{cli1}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1) s.cmp1.EXPECT().Register(wkr1, ns) wkr1.EXPECT().Start() @@ -292,7 +353,7 @@ func (s *perNsWorkerManagerSuite) TestDeleteNs() { cli2 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli2) wkr2 := mocksdk.NewMockWorker(s.controller) - s.wfactory.EXPECT().New(cli1, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr2) + s.cfactory.EXPECT().NewWorker(matchStrict{cli2}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr2) s.cmp1.EXPECT().Register(wkr2, ns) wkr2.EXPECT().Start() @@ -327,7 +388,7 @@ func (s *perNsWorkerManagerSuite) TestMembershipChanged() { cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1) wkr1 := mocksdk.NewMockWorker(s.controller) - s.wfactory.EXPECT().New(cli1, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1) + s.cfactory.EXPECT().NewWorker(matchStrict{cli1}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1) s.cmp1.EXPECT().Register(wkr1, ns) wkr1.EXPECT().Start() @@ -360,7 +421,7 @@ func (s *perNsWorkerManagerSuite) TestServiceResolverError() { cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1) wkr1 := mocksdk.NewMockWorker(s.controller) - s.wfactory.EXPECT().New(cli1, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1) + s.cfactory.EXPECT().NewWorker(matchStrict{cli1}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1) s.cmp1.EXPECT().Register(wkr1, ns) wkr1.EXPECT().Start() @@ -387,7 +448,7 @@ func (s *perNsWorkerManagerSuite) TestStartWorkerError() { cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1) wkr1 := mocksdk.NewMockWorker(s.controller) - s.wfactory.EXPECT().New(cli1, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1) + s.cfactory.EXPECT().NewWorker(matchStrict{cli1}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1) s.cmp1.EXPECT().Register(wkr1, ns) // first try fails to start @@ -398,7 +459,7 @@ func (s *perNsWorkerManagerSuite) TestStartWorkerError() { cli2 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli2) wkr2 := mocksdk.NewMockWorker(s.controller) - s.wfactory.EXPECT().New(cli1, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr2) + s.cfactory.EXPECT().NewWorker(matchStrict{cli2}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr2) s.cmp1.EXPECT().Register(wkr2, ns) wkr2.EXPECT().Start() @@ -449,3 +510,9 @@ func (ns matchOptions) Matches(v any) bool { func (ns matchOptions) String() string { return fmt.Sprintf("namespace=%q", string(ns)) } + +// matchStrict implements shallow matching for gomock (default uses reflect.DeepEqual) +type matchStrict struct{ v any } + +func (m matchStrict) Matches(v any) bool { return v == m.v } +func (m matchStrict) String() string { return fmt.Sprintf("%#v", m) } diff --git a/service/worker/scanner/scanner.go b/service/worker/scanner/scanner.go index 6472e117b31..f93a9605a57 100644 --- a/service/worker/scanner/scanner.go +++ b/service/worker/scanner/scanner.go @@ -95,7 +95,6 @@ type ( taskManager persistence.TaskManager historyClient historyservice.HistoryServiceClient adminClient adminservice.AdminServiceClient - workerFactory sdk.WorkerFactory namespaceRegistry namespace.Registry } @@ -124,7 +123,6 @@ func New( historyClient historyservice.HistoryServiceClient, adminClient adminservice.AdminServiceClient, registry namespace.Registry, - workerFactory sdk.WorkerFactory, ) *Scanner { return &Scanner{ context: scannerContext{ @@ -136,7 +134,6 @@ func New( taskManager: taskManager, historyClient: historyClient, adminClient: adminClient, - workerFactory: workerFactory, namespaceRegistry: registry, }, } @@ -177,7 +174,7 @@ func (s *Scanner) Start() error { } for _, tl := range workerTaskQueueNames { - work := s.context.workerFactory.New(s.context.sdkClientFactory.GetSystemClient(), tl, workerOpts) + work := s.context.sdkClientFactory.NewWorker(s.context.sdkClientFactory.GetSystemClient(), tl, workerOpts) work.RegisterWorkflowWithOptions(TaskQueueScannerWorkflow, workflow.RegisterOptions{Name: tqScannerWFTypeName}) work.RegisterWorkflowWithOptions(HistoryScannerWorkflow, workflow.RegisterOptions{Name: historyScannerWFTypeName}) diff --git a/service/worker/scanner/scanner_test.go b/service/worker/scanner/scanner_test.go index ae0b62645a9..9c346e8657d 100644 --- a/service/worker/scanner/scanner_test.go +++ b/service/worker/scanner/scanner_test.go @@ -157,7 +157,6 @@ func (s *scannerTestSuite) TestScannerEnabled() { } { s.Run(c.Name, func() { ctrl := gomock.NewController(s.T()) - mockWorkerFactory := sdk.NewMockWorkerFactory(ctrl) mockSdkClientFactory := sdk.NewMockClientFactory(ctrl) mockSdkClient := mocksdk.NewMockClient(ctrl) mockNamespaceRegistry := namespace.NewMockRegistry(ctrl) @@ -189,7 +188,6 @@ func (s *scannerTestSuite) TestScannerEnabled() { historyservicemock.NewMockHistoryServiceClient(ctrl), mockAdminClient, mockNamespaceRegistry, - mockWorkerFactory, ) var wg sync.WaitGroup for _, sc := range c.ExpectedScanners { @@ -198,7 +196,7 @@ func (s *scannerTestSuite) TestScannerEnabled() { worker.EXPECT().RegisterActivityWithOptions(gomock.Any(), gomock.Any()).AnyTimes() worker.EXPECT().RegisterWorkflowWithOptions(gomock.Any(), gomock.Any()).AnyTimes() worker.EXPECT().Start() - mockWorkerFactory.EXPECT().New(gomock.Any(), sc.TaskQueueName, gomock.Any()).Return(worker) + mockSdkClientFactory.EXPECT().NewWorker(gomock.Any(), sc.TaskQueueName, gomock.Any()).Return(worker) mockSdkClientFactory.EXPECT().GetSystemClient().Return(mockSdkClient).AnyTimes() mockSdkClient.EXPECT().ExecuteWorkflow(gomock.Any(), gomock.Any(), sc.WFTypeName, gomock.Any()).Do(func( @@ -234,7 +232,6 @@ func (s *scannerTestSuite) TestScannerShutdown() { mockSdkClient := mocksdk.NewMockClient(ctrl) mockNamespaceRegistry := namespace.NewMockRegistry(ctrl) mockAdminClient := adminservicemock.NewMockAdminServiceClient(ctrl) - mockWorkerFactory := sdk.NewMockWorkerFactory(ctrl) worker := mocksdk.NewMockWorker(ctrl) scanner := New( logger, @@ -260,13 +257,12 @@ func (s *scannerTestSuite) TestScannerShutdown() { historyservicemock.NewMockHistoryServiceClient(ctrl), mockAdminClient, mockNamespaceRegistry, - mockWorkerFactory, ) mockSdkClientFactory.EXPECT().GetSystemClient().Return(mockSdkClient).AnyTimes() worker.EXPECT().RegisterActivityWithOptions(gomock.Any(), gomock.Any()).AnyTimes() worker.EXPECT().RegisterWorkflowWithOptions(gomock.Any(), gomock.Any()).AnyTimes() worker.EXPECT().Start() - mockWorkerFactory.EXPECT().New(gomock.Any(), gomock.Any(), gomock.Any()).Return(worker) + mockSdkClientFactory.EXPECT().NewWorker(gomock.Any(), gomock.Any(), gomock.Any()).Return(worker) var wg sync.WaitGroup wg.Add(1) mockSdkClient.EXPECT().ExecuteWorkflow(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func( diff --git a/service/worker/service.go b/service/worker/service.go index a246d8d1f5a..4afe7ac4477 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -96,7 +96,6 @@ type ( workerManager *workerManager perNamespaceWorkerManager *perNamespaceWorkerManager scanner *scanner.Scanner - workerFactory sdk.WorkerFactory } // Config contains all the service config for worker @@ -114,6 +113,7 @@ type ( BatcherConcurrency dynamicconfig.IntPropertyFnWithNamespaceFilter EnableParentClosePolicyWorker dynamicconfig.BoolPropertyFn PerNamespaceWorkerCount dynamicconfig.IntPropertyFnWithNamespaceFilter + PerNamespaceWorkerOptions dynamicconfig.MapPropertyFnWithNamespaceFilter StandardVisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn StandardVisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn @@ -147,7 +147,6 @@ func NewService( workerManager *workerManager, perNamespaceWorkerManager *perNamespaceWorkerManager, visibilityManager manager.VisibilityManager, - workerFactory sdk.WorkerFactory, ) (*Service, error) { workerServiceResolver, err := membershipMonitor.GetResolver(primitives.WorkerService) if err != nil { @@ -180,7 +179,6 @@ func NewService( workerManager: workerManager, perNamespaceWorkerManager: perNamespaceWorkerManager, - workerFactory: workerFactory, } if err := s.initScanner(); err != nil { return nil, err @@ -316,6 +314,10 @@ func NewConfig(dc *dynamicconfig.Collection, persistenceConfig *config.Persisten dynamicconfig.WorkerPerNamespaceWorkerCount, 1, ), + PerNamespaceWorkerOptions: dc.GetMapPropertyFnWithNamespaceFilter( + dynamicconfig.WorkerPerNamespaceWorkerOptions, + map[string]any{}, + ), ThrottledLogRPS: dc.GetIntProperty( dynamicconfig.WorkerThrottledLogRPS, 20, @@ -490,7 +492,6 @@ func (s *Service) initScanner() error { s.historyClient, adminClient, s.namespaceRegistry, - s.workerFactory, ) return nil } diff --git a/service/worker/worker.go b/service/worker/worker.go index 0ad6854c353..cd277eae1f9 100644 --- a/service/worker/worker.go +++ b/service/worker/worker.go @@ -81,7 +81,7 @@ func (wm *workerManager) Start() { BackgroundActivityContext: headers.SetCallerType(context.Background(), headers.CallerTypeBackground), } sdkClient := wm.sdkClientFactory.GetSystemClient() - defaultWorker := sdkworker.New(sdkClient, DefaultWorkerTaskQueue, defaultWorkerOptions) + defaultWorker := wm.sdkClientFactory.NewWorker(sdkClient, DefaultWorkerTaskQueue, defaultWorkerOptions) wm.workers = []sdkworker.Worker{defaultWorker} for _, wc := range wm.workerComponents { @@ -91,7 +91,7 @@ func (wm *workerManager) Start() { wc.Register(defaultWorker) } else { // this worker component requires a dedicated worker - dedicatedWorker := sdkworker.New(sdkClient, workerOptions.TaskQueue, workerOptions.Options) + dedicatedWorker := wm.sdkClientFactory.NewWorker(sdkClient, workerOptions.TaskQueue, workerOptions.Options) wc.Register(dedicatedWorker) wm.workers = append(wm.workers, dedicatedWorker) } diff --git a/tests/onebox.go b/tests/onebox.go index 7ed55b7a34b..d7be3d7501b 100644 --- a/tests/onebox.go +++ b/tests/onebox.go @@ -642,7 +642,6 @@ func (c *temporalImpl) startWorker(hosts map[primitives.ServiceName][]string, st fx.Provide(func() carchiver.ArchivalMetadata { return c.archiverMetadata }), fx.Provide(func() provider.ArchiverProvider { return c.archiverProvider }), fx.Provide(sdkClientFactoryProvider), - fx.Provide(func() sdk.WorkerFactory { return sdk.NewWorkerFactory() }), fx.Provide(func() client.FactoryProvider { return client.NewFactoryProvider() }), fx.Provide(func() searchattribute.Mapper { return nil }), fx.Provide(func() resolver.ServiceResolver { return resolver.NewNoopResolver() }), @@ -743,12 +742,14 @@ func sdkClientFactoryProvider( resolver membership.GRPCResolver, metricsHandler metrics.Handler, logger log.Logger, + dc *dynamicconfig.Collection, ) sdk.ClientFactory { return sdk.NewClientFactory( resolver.MakeURL(primitives.FrontendService), nil, metricsHandler, logger, + dc.GetIntProperty(dynamicconfig.WorkerStickyCacheSize, 0), ) }