Skip to content

Commit

Permalink
Add dynamic config for sdk worker options (#3806)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnr authored Jan 25, 2023
1 parent a06da89 commit 278f545
Show file tree
Hide file tree
Showing 14 changed files with 185 additions and 103 deletions.
5 changes: 5 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,11 @@ const (
WorkerParentCloseMaxConcurrentWorkflowTaskPollers = "worker.ParentCloseMaxConcurrentWorkflowTaskPollers"
// WorkerPerNamespaceWorkerCount controls number of per-ns (scheduler, batcher, etc.) workers to run per namespace
WorkerPerNamespaceWorkerCount = "worker.perNamespaceWorkerCount"
// WorkerPerNamespaceWorkerOptions are SDK worker options for per-namespace worker
WorkerPerNamespaceWorkerOptions = "worker.perNamespaceWorkerOptions"
// WorkerEnableScheduler controls whether to start the worker for scheduled workflows
WorkerEnableScheduler = "worker.enableScheduler"
// WorkerStickyCacheSize controls the sticky cache size for SDK workers on worker nodes
// (shared between all workers in the process, cannot be changed after startup)
WorkerStickyCacheSize = "worker.stickyCacheSize"
)
7 changes: 2 additions & 5 deletions common/resource/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ var DefaultOptions = fx.Options(
fx.Provide(ArchiverProviderProvider),
fx.Provide(ThrottledLoggerProvider),
fx.Provide(SdkClientFactoryProvider),
fx.Provide(SdkWorkerFactoryProvider),
fx.Provide(DCRedirectionPolicyProvider),
)

Expand Down Expand Up @@ -394,6 +393,7 @@ func SdkClientFactoryProvider(
metricsHandler metrics.Handler,
logger log.SnTaggedLogger,
resolver membership.GRPCResolver,
dc *dynamicconfig.Collection,
) (sdk.ClientFactory, error) {
frontendURL, frontendTLSConfig, err := getFrontendConnectionDetails(cfg, tlsConfigProvider, resolver)
if err != nil {
Expand All @@ -404,13 +404,10 @@ func SdkClientFactoryProvider(
frontendTLSConfig,
metricsHandler,
logger,
dc.GetIntProperty(dynamicconfig.WorkerStickyCacheSize, 0),
), nil
}

func SdkWorkerFactoryProvider() sdk.WorkerFactory {
return sdk.NewWorkerFactory()
}

func DCRedirectionPolicyProvider(cfg *config.Config) config.DCRedirectionPolicy {
return cfg.DCRedirectionPolicy
}
Expand Down
33 changes: 16 additions & 17 deletions common/sdk/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
Expand All @@ -48,10 +49,7 @@ type (
// MetricsHandler, or Logger (they will be overwritten)
NewClient(options sdkclient.Options) sdkclient.Client
GetSystemClient() sdkclient.Client
}

WorkerFactory interface {
New(client sdkclient.Client, taskQueue string, options sdkworker.Options) sdkworker.Worker
NewWorker(client sdkclient.Client, taskQueue string, options sdkworker.Options) sdkworker.Worker
}

clientFactory struct {
Expand All @@ -61,29 +59,29 @@ type (
logger log.Logger
sdklogger sdklog.Logger
systemSdkClient sdkclient.Client
stickyCacheSize dynamicconfig.IntPropertyFn
once sync.Once
}

workerFactory struct{}
)

var (
_ ClientFactory = (*clientFactory)(nil)
_ WorkerFactory = (*workerFactory)(nil)
)

func NewClientFactory(
hostPort string,
tlsConfig *tls.Config,
metricsHandler metrics.Handler,
logger log.Logger,
stickyCacheSize dynamicconfig.IntPropertyFn,
) *clientFactory {
return &clientFactory{
hostPort: hostPort,
tlsConfig: tlsConfig,
metricsHandler: NewMetricsHandler(metricsHandler),
logger: logger,
sdklogger: log.NewSdkLogger(logger),
hostPort: hostPort,
tlsConfig: tlsConfig,
metricsHandler: NewMetricsHandler(metricsHandler),
logger: logger,
sdklogger: log.NewSdkLogger(logger),
stickyCacheSize: stickyCacheSize,
}
}

Expand Down Expand Up @@ -122,15 +120,16 @@ func (f *clientFactory) GetSystemClient() sdkclient.Client {
if err != nil {
f.logger.Fatal("error creating sdk client", tag.Error(err))
}

if size := f.stickyCacheSize(); size > 0 {
f.logger.Info("setting sticky workflow cache size", tag.NewInt("size", size))
sdkworker.SetStickyWorkflowCacheSize(size)
}
})
return f.systemSdkClient
}

func NewWorkerFactory() *workerFactory {
return &workerFactory{}
}

func (f *workerFactory) New(
func (f *clientFactory) NewWorker(
client sdkclient.Client,
taskQueue string,
options sdkworker.Options,
Expand Down
35 changes: 6 additions & 29 deletions common/sdk/factory_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion service/worker/archiver/client_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func NewClientWorker(container *BootstrapContainer) ClientWorker {
BackgroundActivityContext: actCtx,
}
clientWorker := &clientWorker{
worker: worker.New(sdkClient, workflowTaskQueue, wo),
worker: container.SdkClientFactory.NewWorker(sdkClient, workflowTaskQueue, wo),
namespaceRegistry: container.NamespaceCache,
}

Expand Down
2 changes: 1 addition & 1 deletion service/worker/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (s *Batcher) Start() error {
BackgroundActivityContext: ctx,
}
sdkClient := s.sdkClientFactory.GetSystemClient()
batchWorker := worker.New(sdkClient, taskQueueName, workerOpts)
batchWorker := s.sdkClientFactory.NewWorker(sdkClient, taskQueueName, workerOpts)
batchWorker.RegisterWorkflowWithOptions(BatchWorkflow, workflow.RegisterOptions{Name: BatchWFTypeName})
batchWorker.RegisterActivity(&activities{
activityDeps: activityDeps{
Expand Down
12 changes: 5 additions & 7 deletions service/worker/parentclosepolicy/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,8 @@ type (
NumParentClosePolicySystemWorkflows dynamicconfig.IntPropertyFn
}

// BootstrapParams contains the set of params needed to bootstrap
// the sub-system
// BootstrapParams contains the set of params needed to bootstrap the sub-system
BootstrapParams struct {
// SdkSystemClient is an instance of temporal service client
SdkClientFactory sdk.ClientFactory
// MetricsHandler is an instance of metrics object for emitting stats
MetricsHandler metrics.Handler
Expand All @@ -69,7 +67,7 @@ type (

// Processor is the background sub-system that execute workflow for ParentClosePolicy
Processor struct {
svcClientFactory sdk.ClientFactory
sdkClientFactory sdk.ClientFactory
clientBean client.Bean
metricsHandler metrics.Handler
cfg Config
Expand All @@ -81,7 +79,7 @@ type (
// New returns a new instance as daemon
func New(params *BootstrapParams) *Processor {
return &Processor{
svcClientFactory: params.SdkClientFactory,
sdkClientFactory: params.SdkClientFactory,
metricsHandler: params.MetricsHandler.WithTags(metrics.OperationTag(metrics.ParentClosePolicyProcessorScope)),
cfg: params.Config,
logger: log.With(params.Logger, tag.ComponentBatcher),
Expand All @@ -92,8 +90,8 @@ func New(params *BootstrapParams) *Processor {

// Start starts the scanner
func (s *Processor) Start() error {
svcClient := s.svcClientFactory.GetSystemClient()
processorWorker := worker.New(svcClient, processorTaskQueueName, getWorkerOptions(s))
svcClient := s.sdkClientFactory.GetSystemClient()
processorWorker := s.sdkClientFactory.NewWorker(svcClient, processorTaskQueueName, getWorkerOptions(s))
processorWorker.RegisterWorkflowWithOptions(ProcessorWorkflow, workflow.RegisterOptions{Name: processorWFTypeName})
processorWorker.RegisterActivityWithOptions(ProcessorActivity, activity.RegisterOptions{Name: processorActivityName})

Expand Down
70 changes: 57 additions & 13 deletions service/worker/pernamespaceworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package worker

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
Expand All @@ -49,8 +50,10 @@ import (
"go.temporal.io/server/common/membership"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/primitives"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/resource"
"go.temporal.io/server/common/sdk"
"go.temporal.io/server/common/util"
workercommon "go.temporal.io/server/service/worker/common"
)

Expand All @@ -63,7 +66,6 @@ type (
fx.In
Logger log.Logger
SdkClientFactory sdk.ClientFactory
SdkWorkerFactory sdk.WorkerFactory
NamespaceRegistry namespace.Registry
HostName resource.HostName
Config *Config
Expand All @@ -77,7 +79,6 @@ type (
// from init params or Start
logger log.Logger
sdkClientFactory sdk.ClientFactory
sdkWorkerFactory sdk.WorkerFactory
namespaceRegistry namespace.Registry
self *membership.HostInfo
hostName resource.HostName
Expand Down Expand Up @@ -105,6 +106,19 @@ type (
client sdkclient.Client
worker sdkworker.Worker
}

sdkWorkerOptions struct {
// Copy of relevant fields from sdkworker.Options
MaxConcurrentActivityExecutionSize int
WorkerActivitiesPerSecond float64
MaxConcurrentLocalActivityExecutionSize int
WorkerLocalActivitiesPerSecond float64
MaxConcurrentActivityTaskPollers int
MaxConcurrentWorkflowTaskExecutionSize int
MaxConcurrentWorkflowTaskPollers int
StickyScheduleToStartTimeout string // parse into time.Duration
StickyScheduleToStartTimeoutDuration time.Duration
}
)

var (
Expand All @@ -115,7 +129,6 @@ func NewPerNamespaceWorkerManager(params perNamespaceWorkerManagerInitParams) *p
return &perNamespaceWorkerManager{
logger: log.With(params.Logger, tag.ComponentPerNSWorkerManager),
sdkClientFactory: params.SdkClientFactory,
sdkWorkerFactory: params.SdkWorkerFactory,
namespaceRegistry: params.NamespaceRegistry,
hostName: params.HostName,
config: params.Config,
Expand Down Expand Up @@ -249,6 +262,22 @@ func (wm *perNamespaceWorkerManager) getWorkerMultiplicity(ns *namespace.Namespa
return multiplicity, nil
}

func (wm *perNamespaceWorkerManager) getWorkerOptions(ns *namespace.Namespace) sdkWorkerOptions {
optionsMap := wm.config.PerNamespaceWorkerOptions(ns.Name().String())
var options sdkWorkerOptions
b, err := json.Marshal(optionsMap)
if err != nil {
return options
}
_ = json.Unmarshal(b, &options) // ignore errors, just use the zero value anyway
if len(options.StickyScheduleToStartTimeout) > 0 {
if options.StickyScheduleToStartTimeoutDuration, err = timestamp.ParseDuration(options.StickyScheduleToStartTimeout); err != nil {
wm.logger.Warn("invalid StickyScheduleToStartTimeout", tag.Error(err))
}
}
return options
}

// called on namespace state change callback
func (w *perNamespaceWorker) refreshWithNewNamespace(ns *namespace.Namespace, deleted bool) {
w.lock.Lock()
Expand Down Expand Up @@ -357,7 +386,11 @@ func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error {
return errNoWorkerNeeded
}
// ensure this changes if multiplicity changes
componentSet += fmt.Sprintf("%d", multiplicity)
componentSet += fmt.Sprintf(",%d", multiplicity)

// get sdk worker options
dcOptions := w.wm.getWorkerOptions(ns)
componentSet += fmt.Sprintf(",%+v", dcOptions)

// we do need a worker, but maybe we have one already
w.lock.Lock()
Expand All @@ -373,7 +406,7 @@ func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error {
// create new one. note that even before startWorker returns, the worker may have started
// and already called the fatal error handler. we need to set w.client+worker+componentSet
// before releasing the lock to keep our state consistent.
client, worker, err := w.startWorker(ns, enabledComponents, multiplicity)
client, worker, err := w.startWorker(ns, enabledComponents, multiplicity, dcOptions)
if err != nil {
// TODO: add metric also
return err
Expand All @@ -389,6 +422,7 @@ func (w *perNamespaceWorker) startWorker(
ns *namespace.Namespace,
components []workercommon.PerNSWorkerComponent,
multiplicity int,
dcOptions sdkWorkerOptions,
) (sdkclient.Client, sdkworker.Worker, error) {
nsName := ns.Name().String()
// this should not block because it uses an existing grpc connection
Expand All @@ -398,28 +432,38 @@ func (w *perNamespaceWorker) startWorker(
})

var sdkoptions sdkworker.Options

// copy from dynamic config
sdkoptions.MaxConcurrentActivityExecutionSize = dcOptions.MaxConcurrentActivityExecutionSize
sdkoptions.WorkerActivitiesPerSecond = dcOptions.WorkerActivitiesPerSecond
sdkoptions.MaxConcurrentLocalActivityExecutionSize = dcOptions.MaxConcurrentLocalActivityExecutionSize
sdkoptions.WorkerLocalActivitiesPerSecond = dcOptions.WorkerLocalActivitiesPerSecond
sdkoptions.MaxConcurrentActivityTaskPollers = util.Max(2, dcOptions.MaxConcurrentActivityTaskPollers)
sdkoptions.MaxConcurrentWorkflowTaskExecutionSize = dcOptions.MaxConcurrentWorkflowTaskExecutionSize
sdkoptions.MaxConcurrentWorkflowTaskPollers = util.Max(2, dcOptions.MaxConcurrentWorkflowTaskPollers)
sdkoptions.StickyScheduleToStartTimeout = dcOptions.StickyScheduleToStartTimeoutDuration

sdkoptions.BackgroundActivityContext = headers.SetCallerInfo(context.Background(), headers.NewBackgroundCallerInfo(ns.Name().String()))
sdkoptions.Identity = fmt.Sprintf("server-worker@%d@%s@%s", os.Getpid(), w.wm.hostName, nsName)
// sdk default is 2, we increase it if we're supposed to run with more multiplicity.
// other defaults are already large enough.
sdkoptions.MaxConcurrentWorkflowTaskPollers = 2 * multiplicity
sdkoptions.MaxConcurrentActivityTaskPollers = 2 * multiplicity
// increase these if we're supposed to run with more multiplicity
sdkoptions.MaxConcurrentWorkflowTaskPollers *= multiplicity
sdkoptions.MaxConcurrentActivityTaskPollers *= multiplicity
sdkoptions.OnFatalError = w.onFatalError

// this should not block because the client already has server capabilities
sdkworker := w.wm.sdkWorkerFactory.New(client, primitives.PerNSWorkerTaskQueue, sdkoptions)
worker := w.wm.sdkClientFactory.NewWorker(client, primitives.PerNSWorkerTaskQueue, sdkoptions)
for _, cmp := range components {
cmp.Register(sdkworker, ns)
cmp.Register(worker, ns)
}

// this blocks by calling DescribeNamespace a few times (with a 10s timeout)
err := sdkworker.Start()
err := worker.Start()
if err != nil {
client.Close()
return nil, nil, err
}

return client, sdkworker, nil
return client, worker, nil
}

func (w *perNamespaceWorker) onFatalError(err error) {
Expand Down
Loading

0 comments on commit 278f545

Please sign in to comment.