From e0081fde1eaa95785d691230070bc94f3dbf8a8b Mon Sep 17 00:00:00 2001 From: Michael Snowden Date: Thu, 19 Jan 2023 21:40:26 -0800 Subject: [PATCH] Fix a rare deadlock in scanner.Stop (#3818) * Fix a rare deadlock in scanner.Stop * Update service/worker/scanner/scanner.go Co-authored-by: David Reiss Co-authored-by: David Reiss --- service/worker/scanner/scanner.go | 37 ++++---- service/worker/scanner/scanner_test.go | 112 ++++++++++++++++++++----- 2 files changed, 112 insertions(+), 37 deletions(-) diff --git a/service/worker/scanner/scanner.go b/service/worker/scanner/scanner.go index 37a56ce7f14..6472e117b31 100644 --- a/service/worker/scanner/scanner.go +++ b/service/worker/scanner/scanner.go @@ -37,6 +37,7 @@ import ( "go.temporal.io/server/api/adminservice/v1" "go.temporal.io/server/api/historyservice/v1" + "go.temporal.io/server/common" "go.temporal.io/server/common/config" "go.temporal.io/server/common/headers" "go.temporal.io/server/common/log" @@ -102,8 +103,9 @@ type ( // of database tables to cleanup resources, monitor anamolies // and emit stats for analytics Scanner struct { - context scannerContext - wg sync.WaitGroup + context scannerContext + wg sync.WaitGroup + lifecycleCancel context.CancelFunc } ) @@ -144,6 +146,7 @@ func New( func (s *Scanner) Start() error { ctx := context.WithValue(context.Background(), scannerContextKey, s.context) ctx = headers.SetCallerInfo(ctx, headers.SystemBackgroundCallerInfo) + ctx, s.lifecycleCancel = context.WithCancel(ctx) workerOpts := worker.Options{ MaxConcurrentActivityExecutionSize: s.context.cfg.MaxConcurrentActivityExecutionSize(), @@ -157,19 +160,19 @@ func (s *Scanner) Start() error { var workerTaskQueueNames []string if s.context.cfg.ExecutionsScannerEnabled() { s.wg.Add(1) - go s.startWorkflowWithRetry(executionsScannerWFStartOptions, executionsScannerWFTypeName) + go s.startWorkflowWithRetry(ctx, executionsScannerWFStartOptions, executionsScannerWFTypeName) workerTaskQueueNames = append(workerTaskQueueNames, executionsScannerTaskQueueName) } if s.context.cfg.Persistence.DefaultStoreType() == config.StoreTypeSQL && s.context.cfg.TaskQueueScannerEnabled() { s.wg.Add(1) - go s.startWorkflowWithRetry(tlScannerWFStartOptions, tqScannerWFTypeName) + go s.startWorkflowWithRetry(ctx, tlScannerWFStartOptions, tqScannerWFTypeName) workerTaskQueueNames = append(workerTaskQueueNames, tqScannerTaskQueueName) } if s.context.cfg.HistoryScannerEnabled() { s.wg.Add(1) - go s.startWorkflowWithRetry(historyScannerWFStartOptions, historyScannerWFTypeName) + go s.startWorkflowWithRetry(ctx, historyScannerWFStartOptions, historyScannerWFTypeName) workerTaskQueueNames = append(workerTaskQueueNames, historyScannerTaskQueueName) } @@ -192,37 +195,41 @@ func (s *Scanner) Start() error { } func (s *Scanner) Stop() { + s.lifecycleCancel() s.wg.Wait() } -func (s *Scanner) startWorkflowWithRetry( - options sdkclient.StartWorkflowOptions, - workflowType string, - workflowArgs ...interface{}, -) { +func (s *Scanner) startWorkflowWithRetry(ctx context.Context, options sdkclient.StartWorkflowOptions, workflowType string, workflowArgs ...interface{}) { defer s.wg.Done() policy := backoff.NewExponentialRetryPolicy(time.Second). WithMaximumInterval(time.Minute). WithExpirationInterval(backoff.NoInterval) - err := backoff.ThrottleRetry(func() error { - return s.startWorkflow(s.context.sdkClientFactory.GetSystemClient(), options, workflowType, workflowArgs...) + err := backoff.ThrottleRetryContext(ctx, func(ctx context.Context) error { + return s.startWorkflow( + ctx, + s.context.sdkClientFactory.GetSystemClient(), + options, + workflowType, + workflowArgs..., + ) }, policy, func(err error) bool { return true }) - if err != nil { + // if the scanner shuts down before the workflow is started, then the error will be context canceled + if err != nil && !common.IsContextCanceledErr(err) { s.context.logger.Fatal("unable to start scanner", tag.WorkflowType(workflowType), tag.Error(err)) } } func (s *Scanner) startWorkflow( + ctx context.Context, client sdkclient.Client, options sdkclient.StartWorkflowOptions, workflowType string, workflowArgs ...interface{}, ) error { - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + ctx, cancel := context.WithTimeout(ctx, 5*time.Minute) _, err := client.ExecuteWorkflow(ctx, options, workflowType, workflowArgs...) cancel() if err != nil { diff --git a/service/worker/scanner/scanner_test.go b/service/worker/scanner/scanner_test.go index f6f65b47a82..ae0b62645a9 100644 --- a/service/worker/scanner/scanner_test.go +++ b/service/worker/scanner/scanner_test.go @@ -25,14 +25,18 @@ package scanner import ( + "context" + "sync" "testing" "github.com/golang/mock/gomock" "github.com/stretchr/testify/suite" + "go.temporal.io/sdk/client" "go.temporal.io/server/api/adminservicemock/v1" "go.temporal.io/server/api/historyservicemock/v1" "go.temporal.io/server/common/config" + "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" @@ -161,27 +165,13 @@ func (s *scannerTestSuite) TestScannerEnabled() { scanner := New( log.NewNoopLogger(), &Config{ - MaxConcurrentActivityExecutionSize: func() int { - return 1 - }, - MaxConcurrentWorkflowTaskExecutionSize: func() int { - return 1 - }, - MaxConcurrentActivityTaskPollers: func() int { - return 1 - }, - MaxConcurrentWorkflowTaskPollers: func() int { - return 1 - }, - ExecutionsScannerEnabled: func() bool { - return c.ExecutionsScannerEnabled - }, - HistoryScannerEnabled: func() bool { - return c.HistoryScannerEnabled - }, - TaskQueueScannerEnabled: func() bool { - return c.TaskQueueScannerEnabled - }, + MaxConcurrentActivityExecutionSize: dynamicconfig.GetIntPropertyFn(1), + MaxConcurrentWorkflowTaskExecutionSize: dynamicconfig.GetIntPropertyFn(1), + MaxConcurrentActivityTaskPollers: dynamicconfig.GetIntPropertyFn(1), + MaxConcurrentWorkflowTaskPollers: dynamicconfig.GetIntPropertyFn(1), + HistoryScannerEnabled: dynamicconfig.GetBoolPropertyFn(c.HistoryScannerEnabled), + ExecutionsScannerEnabled: dynamicconfig.GetBoolPropertyFn(c.ExecutionsScannerEnabled), + TaskQueueScannerEnabled: dynamicconfig.GetBoolPropertyFn(c.TaskQueueScannerEnabled), Persistence: &config.Persistence{ DefaultStore: c.DefaultStore, DataStores: map[string]config.DataStore{ @@ -201,18 +191,96 @@ func (s *scannerTestSuite) TestScannerEnabled() { mockNamespaceRegistry, mockWorkerFactory, ) + var wg sync.WaitGroup for _, sc := range c.ExpectedScanners { + wg.Add(1) worker := mocksdk.NewMockWorker(ctrl) worker.EXPECT().RegisterActivityWithOptions(gomock.Any(), gomock.Any()).AnyTimes() worker.EXPECT().RegisterWorkflowWithOptions(gomock.Any(), gomock.Any()).AnyTimes() worker.EXPECT().Start() mockWorkerFactory.EXPECT().New(gomock.Any(), sc.TaskQueueName, gomock.Any()).Return(worker) mockSdkClientFactory.EXPECT().GetSystemClient().Return(mockSdkClient).AnyTimes() - mockSdkClient.EXPECT().ExecuteWorkflow(gomock.Any(), gomock.Any(), sc.WFTypeName, gomock.Any()) + mockSdkClient.EXPECT().ExecuteWorkflow(gomock.Any(), gomock.Any(), sc.WFTypeName, + gomock.Any()).Do(func( + _ context.Context, + _ client.StartWorkflowOptions, + _ string, + _ ...interface{}, + ) { + wg.Done() + }) } err := scanner.Start() s.NoError(err) + wg.Wait() scanner.Stop() }) } } + +// TestScannerWorkflow tests that the scanner can be shut down even when it hasn't finished starting. +// This fixes a rare issue that can occur when Stop() is called quickly after Start(). When Start() is called, the +// scanner starts a new goroutine for each scanner type. In that goroutine, an sdk client is created which dials the +// frontend service. If the test driver calls Stop() on the server, then the server stops the frontend service and the +// history service. In some cases, the frontend services stops before the sdk client has finished connecting to it. +// This causes the startWorkflow() call to fail with an error. However, startWorkflowWithRetry retries the call for +// a whole minute, which causes the test to take a long time to fail. So, instead we immediately cancel all async +// requests when Stop() is called. +func (s *scannerTestSuite) TestScannerShutdown() { + ctrl := gomock.NewController(s.T()) + + logger := log.NewTestLogger() + mockSdkClientFactory := sdk.NewMockClientFactory(ctrl) + mockSdkClient := mocksdk.NewMockClient(ctrl) + mockNamespaceRegistry := namespace.NewMockRegistry(ctrl) + mockAdminClient := adminservicemock.NewMockAdminServiceClient(ctrl) + mockWorkerFactory := sdk.NewMockWorkerFactory(ctrl) + worker := mocksdk.NewMockWorker(ctrl) + scanner := New( + logger, + &Config{ + MaxConcurrentActivityExecutionSize: dynamicconfig.GetIntPropertyFn(1), + MaxConcurrentWorkflowTaskExecutionSize: dynamicconfig.GetIntPropertyFn(1), + MaxConcurrentActivityTaskPollers: dynamicconfig.GetIntPropertyFn(1), + MaxConcurrentWorkflowTaskPollers: dynamicconfig.GetIntPropertyFn(1), + HistoryScannerEnabled: dynamicconfig.GetBoolPropertyFn(true), + ExecutionsScannerEnabled: dynamicconfig.GetBoolPropertyFn(false), + TaskQueueScannerEnabled: dynamicconfig.GetBoolPropertyFn(false), + Persistence: &config.Persistence{ + DefaultStore: config.StoreTypeNoSQL, + DataStores: map[string]config.DataStore{ + config.StoreTypeNoSQL: {}, + }, + }, + }, + mockSdkClientFactory, + metrics.NoopMetricsHandler, + p.NewMockExecutionManager(ctrl), + p.NewMockTaskManager(ctrl), + historyservicemock.NewMockHistoryServiceClient(ctrl), + mockAdminClient, + mockNamespaceRegistry, + mockWorkerFactory, + ) + mockSdkClientFactory.EXPECT().GetSystemClient().Return(mockSdkClient).AnyTimes() + worker.EXPECT().RegisterActivityWithOptions(gomock.Any(), gomock.Any()).AnyTimes() + worker.EXPECT().RegisterWorkflowWithOptions(gomock.Any(), gomock.Any()).AnyTimes() + worker.EXPECT().Start() + mockWorkerFactory.EXPECT().New(gomock.Any(), gomock.Any(), gomock.Any()).Return(worker) + var wg sync.WaitGroup + wg.Add(1) + mockSdkClient.EXPECT().ExecuteWorkflow(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func( + ctx context.Context, + _ client.StartWorkflowOptions, + _ string, + _ ...interface{}, + ) (client.WorkflowRun, error) { + wg.Done() + <-ctx.Done() + return nil, ctx.Err() + }) + err := scanner.Start() + s.NoError(err) + wg.Wait() + scanner.Stop() +}