Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dynamic config for sdk worker options #3806

Merged
merged 14 commits into from
Jan 25, 2023
5 changes: 5 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,11 @@ const (
WorkerParentCloseMaxConcurrentWorkflowTaskPollers = "worker.ParentCloseMaxConcurrentWorkflowTaskPollers"
// WorkerPerNamespaceWorkerCount controls number of per-ns (scheduler, batcher, etc.) workers to run per namespace
WorkerPerNamespaceWorkerCount = "worker.perNamespaceWorkerCount"
// WorkerPerNamespaceWorkerOptions are SDK worker options for per-namespace worker
WorkerPerNamespaceWorkerOptions = "worker.perNamespaceWorkerOptions"
// WorkerEnableScheduler controls whether to start the worker for scheduled workflows
WorkerEnableScheduler = "worker.enableScheduler"
// WorkerStickyCacheSize controls the sticky cache size for SDK workers on worker nodes
// (shared between all workers in the process, cannot be changed after startup)
WorkerStickyCacheSize = "worker.stickyCacheSize"
)
7 changes: 2 additions & 5 deletions common/resource/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ var DefaultOptions = fx.Options(
fx.Provide(ArchiverProviderProvider),
fx.Provide(ThrottledLoggerProvider),
fx.Provide(SdkClientFactoryProvider),
fx.Provide(SdkWorkerFactoryProvider),
fx.Provide(DCRedirectionPolicyProvider),
)

Expand Down Expand Up @@ -394,6 +393,7 @@ func SdkClientFactoryProvider(
metricsHandler metrics.Handler,
logger log.SnTaggedLogger,
resolver membership.GRPCResolver,
dc *dynamicconfig.Collection,
) (sdk.ClientFactory, error) {
frontendURL, frontendTLSConfig, err := getFrontendConnectionDetails(cfg, tlsConfigProvider, resolver)
if err != nil {
Expand All @@ -404,13 +404,10 @@ func SdkClientFactoryProvider(
frontendTLSConfig,
metricsHandler,
logger,
dc.GetIntProperty(dynamicconfig.WorkerStickyCacheSize, 0),
), nil
}

func SdkWorkerFactoryProvider() sdk.WorkerFactory {
return sdk.NewWorkerFactory()
}

func DCRedirectionPolicyProvider(cfg *config.Config) config.DCRedirectionPolicy {
return cfg.DCRedirectionPolicy
}
Expand Down
33 changes: 16 additions & 17 deletions common/sdk/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
Expand All @@ -48,10 +49,7 @@ type (
// MetricsHandler, or Logger (they will be overwritten)
NewClient(options sdkclient.Options) sdkclient.Client
GetSystemClient() sdkclient.Client
}

WorkerFactory interface {
New(client sdkclient.Client, taskQueue string, options sdkworker.Options) sdkworker.Worker
NewWorker(client sdkclient.Client, taskQueue string, options sdkworker.Options) sdkworker.Worker
}

clientFactory struct {
Expand All @@ -61,29 +59,29 @@ type (
logger log.Logger
sdklogger sdklog.Logger
systemSdkClient sdkclient.Client
stickyCacheSize dynamicconfig.IntPropertyFn
once sync.Once
}

workerFactory struct{}
)

var (
_ ClientFactory = (*clientFactory)(nil)
_ WorkerFactory = (*workerFactory)(nil)
)

func NewClientFactory(
hostPort string,
tlsConfig *tls.Config,
metricsHandler metrics.Handler,
logger log.Logger,
stickyCacheSize dynamicconfig.IntPropertyFn,
) *clientFactory {
return &clientFactory{
hostPort: hostPort,
tlsConfig: tlsConfig,
metricsHandler: NewMetricsHandler(metricsHandler),
logger: logger,
sdklogger: log.NewSdkLogger(logger),
hostPort: hostPort,
tlsConfig: tlsConfig,
metricsHandler: NewMetricsHandler(metricsHandler),
logger: logger,
sdklogger: log.NewSdkLogger(logger),
stickyCacheSize: stickyCacheSize,
}
}

Expand Down Expand Up @@ -122,15 +120,16 @@ func (f *clientFactory) GetSystemClient() sdkclient.Client {
if err != nil {
f.logger.Fatal("error creating sdk client", tag.Error(err))
}

if size := f.stickyCacheSize(); size > 0 {
f.logger.Info("setting sticky workflow cache size", tag.NewInt("size", size))
sdkworker.SetStickyWorkflowCacheSize(size)
}
})
return f.systemSdkClient
}

func NewWorkerFactory() *workerFactory {
return &workerFactory{}
}

func (f *workerFactory) New(
func (f *clientFactory) NewWorker(
client sdkclient.Client,
taskQueue string,
options sdkworker.Options,
Expand Down
35 changes: 6 additions & 29 deletions common/sdk/factory_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion service/worker/archiver/client_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func NewClientWorker(container *BootstrapContainer) ClientWorker {
BackgroundActivityContext: actCtx,
}
clientWorker := &clientWorker{
worker: worker.New(sdkClient, workflowTaskQueue, wo),
worker: container.SdkClientFactory.NewWorker(sdkClient, workflowTaskQueue, wo),
namespaceRegistry: container.NamespaceCache,
}

Expand Down
2 changes: 1 addition & 1 deletion service/worker/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (s *Batcher) Start() error {
BackgroundActivityContext: ctx,
}
sdkClient := s.sdkClientFactory.GetSystemClient()
batchWorker := worker.New(sdkClient, taskQueueName, workerOpts)
batchWorker := s.sdkClientFactory.NewWorker(sdkClient, taskQueueName, workerOpts)
batchWorker.RegisterWorkflowWithOptions(BatchWorkflow, workflow.RegisterOptions{Name: BatchWFTypeName})
batchWorker.RegisterActivity(&activities{
activityDeps: activityDeps{
Expand Down
12 changes: 5 additions & 7 deletions service/worker/parentclosepolicy/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,8 @@ type (
NumParentClosePolicySystemWorkflows dynamicconfig.IntPropertyFn
}

// BootstrapParams contains the set of params needed to bootstrap
// the sub-system
// BootstrapParams contains the set of params needed to bootstrap the sub-system
BootstrapParams struct {
// SdkSystemClient is an instance of temporal service client
SdkClientFactory sdk.ClientFactory
// MetricsHandler is an instance of metrics object for emitting stats
MetricsHandler metrics.Handler
Expand All @@ -69,7 +67,7 @@ type (

// Processor is the background sub-system that execute workflow for ParentClosePolicy
Processor struct {
svcClientFactory sdk.ClientFactory
sdkClientFactory sdk.ClientFactory
clientBean client.Bean
metricsHandler metrics.Handler
cfg Config
Expand All @@ -81,7 +79,7 @@ type (
// New returns a new instance as daemon
func New(params *BootstrapParams) *Processor {
return &Processor{
svcClientFactory: params.SdkClientFactory,
sdkClientFactory: params.SdkClientFactory,
metricsHandler: params.MetricsHandler.WithTags(metrics.OperationTag(metrics.ParentClosePolicyProcessorScope)),
cfg: params.Config,
logger: log.With(params.Logger, tag.ComponentBatcher),
Expand All @@ -92,8 +90,8 @@ func New(params *BootstrapParams) *Processor {

// Start starts the scanner
func (s *Processor) Start() error {
svcClient := s.svcClientFactory.GetSystemClient()
processorWorker := worker.New(svcClient, processorTaskQueueName, getWorkerOptions(s))
svcClient := s.sdkClientFactory.GetSystemClient()
processorWorker := s.sdkClientFactory.NewWorker(svcClient, processorTaskQueueName, getWorkerOptions(s))
processorWorker.RegisterWorkflowWithOptions(ProcessorWorkflow, workflow.RegisterOptions{Name: processorWFTypeName})
processorWorker.RegisterActivityWithOptions(ProcessorActivity, activity.RegisterOptions{Name: processorActivityName})

Expand Down
70 changes: 57 additions & 13 deletions service/worker/pernamespaceworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package worker

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
Expand All @@ -49,8 +50,10 @@ import (
"go.temporal.io/server/common/membership"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/primitives"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/resource"
"go.temporal.io/server/common/sdk"
"go.temporal.io/server/common/util"
workercommon "go.temporal.io/server/service/worker/common"
)

Expand All @@ -63,7 +66,6 @@ type (
fx.In
Logger log.Logger
SdkClientFactory sdk.ClientFactory
SdkWorkerFactory sdk.WorkerFactory
NamespaceRegistry namespace.Registry
HostName resource.HostName
Config *Config
Expand All @@ -77,7 +79,6 @@ type (
// from init params or Start
logger log.Logger
sdkClientFactory sdk.ClientFactory
sdkWorkerFactory sdk.WorkerFactory
namespaceRegistry namespace.Registry
self *membership.HostInfo
hostName resource.HostName
Expand Down Expand Up @@ -105,6 +106,19 @@ type (
client sdkclient.Client
worker sdkworker.Worker
}

sdkWorkerOptions struct {
// Copy of relevant fields from sdkworker.Options
MaxConcurrentActivityExecutionSize int
WorkerActivitiesPerSecond float64
MaxConcurrentLocalActivityExecutionSize int
WorkerLocalActivitiesPerSecond float64
MaxConcurrentActivityTaskPollers int
MaxConcurrentWorkflowTaskExecutionSize int
MaxConcurrentWorkflowTaskPollers int
StickyScheduleToStartTimeout string // parse into time.Duration
StickyScheduleToStartTimeoutDuration time.Duration
}
)

var (
Expand All @@ -115,7 +129,6 @@ func NewPerNamespaceWorkerManager(params perNamespaceWorkerManagerInitParams) *p
return &perNamespaceWorkerManager{
logger: log.With(params.Logger, tag.ComponentPerNSWorkerManager),
sdkClientFactory: params.SdkClientFactory,
sdkWorkerFactory: params.SdkWorkerFactory,
namespaceRegistry: params.NamespaceRegistry,
hostName: params.HostName,
config: params.Config,
Expand Down Expand Up @@ -249,6 +262,22 @@ func (wm *perNamespaceWorkerManager) getWorkerMultiplicity(ns *namespace.Namespa
return multiplicity, nil
}

func (wm *perNamespaceWorkerManager) getWorkerOptions(ns *namespace.Namespace) sdkWorkerOptions {
optionsMap := wm.config.PerNamespaceWorkerOptions(ns.Name().String())
var options sdkWorkerOptions
b, err := json.Marshal(optionsMap)
if err != nil {
return options
}
_ = json.Unmarshal(b, &options) // ignore errors, just use the zero value anyway
if len(options.StickyScheduleToStartTimeout) > 0 {
if options.StickyScheduleToStartTimeoutDuration, err = timestamp.ParseDuration(options.StickyScheduleToStartTimeout); err != nil {
wm.logger.Warn("invalid StickyScheduleToStartTimeout", tag.Error(err))
}
}
return options
}

// called on namespace state change callback
func (w *perNamespaceWorker) refreshWithNewNamespace(ns *namespace.Namespace, deleted bool) {
w.lock.Lock()
Expand Down Expand Up @@ -357,7 +386,11 @@ func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error {
return errNoWorkerNeeded
}
// ensure this changes if multiplicity changes
componentSet += fmt.Sprintf("%d", multiplicity)
componentSet += fmt.Sprintf(",%d", multiplicity)

// get sdk worker options
dcOptions := w.wm.getWorkerOptions(ns)
componentSet += fmt.Sprintf(",%+v", dcOptions)

// we do need a worker, but maybe we have one already
w.lock.Lock()
Expand All @@ -373,7 +406,7 @@ func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error {
// create new one. note that even before startWorker returns, the worker may have started
// and already called the fatal error handler. we need to set w.client+worker+componentSet
// before releasing the lock to keep our state consistent.
client, worker, err := w.startWorker(ns, enabledComponents, multiplicity)
client, worker, err := w.startWorker(ns, enabledComponents, multiplicity, dcOptions)
if err != nil {
// TODO: add metric also
return err
Expand All @@ -389,6 +422,7 @@ func (w *perNamespaceWorker) startWorker(
ns *namespace.Namespace,
components []workercommon.PerNSWorkerComponent,
multiplicity int,
dcOptions sdkWorkerOptions,
) (sdkclient.Client, sdkworker.Worker, error) {
nsName := ns.Name().String()
// this should not block because it uses an existing grpc connection
Expand All @@ -398,28 +432,38 @@ func (w *perNamespaceWorker) startWorker(
})

var sdkoptions sdkworker.Options

// copy from dynamic config
sdkoptions.MaxConcurrentActivityExecutionSize = dcOptions.MaxConcurrentActivityExecutionSize
sdkoptions.WorkerActivitiesPerSecond = dcOptions.WorkerActivitiesPerSecond
sdkoptions.MaxConcurrentLocalActivityExecutionSize = dcOptions.MaxConcurrentLocalActivityExecutionSize
sdkoptions.WorkerLocalActivitiesPerSecond = dcOptions.WorkerLocalActivitiesPerSecond
sdkoptions.MaxConcurrentActivityTaskPollers = util.Max(2, dcOptions.MaxConcurrentActivityTaskPollers)
sdkoptions.MaxConcurrentWorkflowTaskExecutionSize = dcOptions.MaxConcurrentWorkflowTaskExecutionSize
sdkoptions.MaxConcurrentWorkflowTaskPollers = util.Max(2, dcOptions.MaxConcurrentWorkflowTaskPollers)
sdkoptions.StickyScheduleToStartTimeout = dcOptions.StickyScheduleToStartTimeoutDuration

sdkoptions.BackgroundActivityContext = headers.SetCallerInfo(context.Background(), headers.NewBackgroundCallerInfo(ns.Name().String()))
sdkoptions.Identity = fmt.Sprintf("server-worker@%d@%s@%s", os.Getpid(), w.wm.hostName, nsName)
// sdk default is 2, we increase it if we're supposed to run with more multiplicity.
// other defaults are already large enough.
sdkoptions.MaxConcurrentWorkflowTaskPollers = 2 * multiplicity
sdkoptions.MaxConcurrentActivityTaskPollers = 2 * multiplicity
// increase these if we're supposed to run with more multiplicity
sdkoptions.MaxConcurrentWorkflowTaskPollers *= multiplicity
sdkoptions.MaxConcurrentActivityTaskPollers *= multiplicity
sdkoptions.OnFatalError = w.onFatalError

// this should not block because the client already has server capabilities
sdkworker := w.wm.sdkWorkerFactory.New(client, primitives.PerNSWorkerTaskQueue, sdkoptions)
worker := w.wm.sdkClientFactory.NewWorker(client, primitives.PerNSWorkerTaskQueue, sdkoptions)
for _, cmp := range components {
cmp.Register(sdkworker, ns)
cmp.Register(worker, ns)
}

// this blocks by calling DescribeNamespace a few times (with a 10s timeout)
err := sdkworker.Start()
err := worker.Start()
if err != nil {
client.Close()
return nil, nil, err
}

return client, sdkworker, nil
return client, worker, nil
}

func (w *perNamespaceWorker) onFatalError(err error) {
Expand Down
Loading