Skip to content

Commit

Permalink
Fix a rare deadlock in scanner.Stop (#3818)
Browse files Browse the repository at this point in the history
* Fix a rare deadlock in scanner.Stop

* Update service/worker/scanner/scanner.go

Co-authored-by: David Reiss <david@temporal.io>

Co-authored-by: David Reiss <david@temporal.io>
  • Loading branch information
MichaelSnowden and dnr authored Jan 20, 2023
1 parent eb4b574 commit e0081fd
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 37 deletions.
37 changes: 22 additions & 15 deletions service/worker/scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (

"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
Expand Down Expand Up @@ -102,8 +103,9 @@ type (
// of database tables to cleanup resources, monitor anamolies
// and emit stats for analytics
Scanner struct {
context scannerContext
wg sync.WaitGroup
context scannerContext
wg sync.WaitGroup
lifecycleCancel context.CancelFunc
}
)

Expand Down Expand Up @@ -144,6 +146,7 @@ func New(
func (s *Scanner) Start() error {
ctx := context.WithValue(context.Background(), scannerContextKey, s.context)
ctx = headers.SetCallerInfo(ctx, headers.SystemBackgroundCallerInfo)
ctx, s.lifecycleCancel = context.WithCancel(ctx)

workerOpts := worker.Options{
MaxConcurrentActivityExecutionSize: s.context.cfg.MaxConcurrentActivityExecutionSize(),
Expand All @@ -157,19 +160,19 @@ func (s *Scanner) Start() error {
var workerTaskQueueNames []string
if s.context.cfg.ExecutionsScannerEnabled() {
s.wg.Add(1)
go s.startWorkflowWithRetry(executionsScannerWFStartOptions, executionsScannerWFTypeName)
go s.startWorkflowWithRetry(ctx, executionsScannerWFStartOptions, executionsScannerWFTypeName)
workerTaskQueueNames = append(workerTaskQueueNames, executionsScannerTaskQueueName)
}

if s.context.cfg.Persistence.DefaultStoreType() == config.StoreTypeSQL && s.context.cfg.TaskQueueScannerEnabled() {
s.wg.Add(1)
go s.startWorkflowWithRetry(tlScannerWFStartOptions, tqScannerWFTypeName)
go s.startWorkflowWithRetry(ctx, tlScannerWFStartOptions, tqScannerWFTypeName)
workerTaskQueueNames = append(workerTaskQueueNames, tqScannerTaskQueueName)
}

if s.context.cfg.HistoryScannerEnabled() {
s.wg.Add(1)
go s.startWorkflowWithRetry(historyScannerWFStartOptions, historyScannerWFTypeName)
go s.startWorkflowWithRetry(ctx, historyScannerWFStartOptions, historyScannerWFTypeName)
workerTaskQueueNames = append(workerTaskQueueNames, historyScannerTaskQueueName)
}

Expand All @@ -192,37 +195,41 @@ func (s *Scanner) Start() error {
}

func (s *Scanner) Stop() {
s.lifecycleCancel()
s.wg.Wait()
}

func (s *Scanner) startWorkflowWithRetry(
options sdkclient.StartWorkflowOptions,
workflowType string,
workflowArgs ...interface{},
) {
func (s *Scanner) startWorkflowWithRetry(ctx context.Context, options sdkclient.StartWorkflowOptions, workflowType string, workflowArgs ...interface{}) {
defer s.wg.Done()

policy := backoff.NewExponentialRetryPolicy(time.Second).
WithMaximumInterval(time.Minute).
WithExpirationInterval(backoff.NoInterval)
err := backoff.ThrottleRetry(func() error {
return s.startWorkflow(s.context.sdkClientFactory.GetSystemClient(), options, workflowType, workflowArgs...)
err := backoff.ThrottleRetryContext(ctx, func(ctx context.Context) error {
return s.startWorkflow(
ctx,
s.context.sdkClientFactory.GetSystemClient(),
options,
workflowType,
workflowArgs...,
)
}, policy, func(err error) bool {
return true
})
if err != nil {
// if the scanner shuts down before the workflow is started, then the error will be context canceled
if err != nil && !common.IsContextCanceledErr(err) {
s.context.logger.Fatal("unable to start scanner", tag.WorkflowType(workflowType), tag.Error(err))
}
}

func (s *Scanner) startWorkflow(
ctx context.Context,
client sdkclient.Client,
options sdkclient.StartWorkflowOptions,
workflowType string,
workflowArgs ...interface{},
) error {

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
_, err := client.ExecuteWorkflow(ctx, options, workflowType, workflowArgs...)
cancel()
if err != nil {
Expand Down
112 changes: 90 additions & 22 deletions service/worker/scanner/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,18 @@
package scanner

import (
"context"
"sync"
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/suite"
"go.temporal.io/sdk/client"

"go.temporal.io/server/api/adminservicemock/v1"
"go.temporal.io/server/api/historyservicemock/v1"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
Expand Down Expand Up @@ -161,27 +165,13 @@ func (s *scannerTestSuite) TestScannerEnabled() {
scanner := New(
log.NewNoopLogger(),
&Config{
MaxConcurrentActivityExecutionSize: func() int {
return 1
},
MaxConcurrentWorkflowTaskExecutionSize: func() int {
return 1
},
MaxConcurrentActivityTaskPollers: func() int {
return 1
},
MaxConcurrentWorkflowTaskPollers: func() int {
return 1
},
ExecutionsScannerEnabled: func() bool {
return c.ExecutionsScannerEnabled
},
HistoryScannerEnabled: func() bool {
return c.HistoryScannerEnabled
},
TaskQueueScannerEnabled: func() bool {
return c.TaskQueueScannerEnabled
},
MaxConcurrentActivityExecutionSize: dynamicconfig.GetIntPropertyFn(1),
MaxConcurrentWorkflowTaskExecutionSize: dynamicconfig.GetIntPropertyFn(1),
MaxConcurrentActivityTaskPollers: dynamicconfig.GetIntPropertyFn(1),
MaxConcurrentWorkflowTaskPollers: dynamicconfig.GetIntPropertyFn(1),
HistoryScannerEnabled: dynamicconfig.GetBoolPropertyFn(c.HistoryScannerEnabled),
ExecutionsScannerEnabled: dynamicconfig.GetBoolPropertyFn(c.ExecutionsScannerEnabled),
TaskQueueScannerEnabled: dynamicconfig.GetBoolPropertyFn(c.TaskQueueScannerEnabled),
Persistence: &config.Persistence{
DefaultStore: c.DefaultStore,
DataStores: map[string]config.DataStore{
Expand All @@ -201,18 +191,96 @@ func (s *scannerTestSuite) TestScannerEnabled() {
mockNamespaceRegistry,
mockWorkerFactory,
)
var wg sync.WaitGroup
for _, sc := range c.ExpectedScanners {
wg.Add(1)
worker := mocksdk.NewMockWorker(ctrl)
worker.EXPECT().RegisterActivityWithOptions(gomock.Any(), gomock.Any()).AnyTimes()
worker.EXPECT().RegisterWorkflowWithOptions(gomock.Any(), gomock.Any()).AnyTimes()
worker.EXPECT().Start()
mockWorkerFactory.EXPECT().New(gomock.Any(), sc.TaskQueueName, gomock.Any()).Return(worker)
mockSdkClientFactory.EXPECT().GetSystemClient().Return(mockSdkClient).AnyTimes()
mockSdkClient.EXPECT().ExecuteWorkflow(gomock.Any(), gomock.Any(), sc.WFTypeName, gomock.Any())
mockSdkClient.EXPECT().ExecuteWorkflow(gomock.Any(), gomock.Any(), sc.WFTypeName,
gomock.Any()).Do(func(
_ context.Context,
_ client.StartWorkflowOptions,
_ string,
_ ...interface{},
) {
wg.Done()
})
}
err := scanner.Start()
s.NoError(err)
wg.Wait()
scanner.Stop()
})
}
}

// TestScannerWorkflow tests that the scanner can be shut down even when it hasn't finished starting.
// This fixes a rare issue that can occur when Stop() is called quickly after Start(). When Start() is called, the
// scanner starts a new goroutine for each scanner type. In that goroutine, an sdk client is created which dials the
// frontend service. If the test driver calls Stop() on the server, then the server stops the frontend service and the
// history service. In some cases, the frontend services stops before the sdk client has finished connecting to it.
// This causes the startWorkflow() call to fail with an error. However, startWorkflowWithRetry retries the call for
// a whole minute, which causes the test to take a long time to fail. So, instead we immediately cancel all async
// requests when Stop() is called.
func (s *scannerTestSuite) TestScannerShutdown() {
ctrl := gomock.NewController(s.T())

logger := log.NewTestLogger()
mockSdkClientFactory := sdk.NewMockClientFactory(ctrl)
mockSdkClient := mocksdk.NewMockClient(ctrl)
mockNamespaceRegistry := namespace.NewMockRegistry(ctrl)
mockAdminClient := adminservicemock.NewMockAdminServiceClient(ctrl)
mockWorkerFactory := sdk.NewMockWorkerFactory(ctrl)
worker := mocksdk.NewMockWorker(ctrl)
scanner := New(
logger,
&Config{
MaxConcurrentActivityExecutionSize: dynamicconfig.GetIntPropertyFn(1),
MaxConcurrentWorkflowTaskExecutionSize: dynamicconfig.GetIntPropertyFn(1),
MaxConcurrentActivityTaskPollers: dynamicconfig.GetIntPropertyFn(1),
MaxConcurrentWorkflowTaskPollers: dynamicconfig.GetIntPropertyFn(1),
HistoryScannerEnabled: dynamicconfig.GetBoolPropertyFn(true),
ExecutionsScannerEnabled: dynamicconfig.GetBoolPropertyFn(false),
TaskQueueScannerEnabled: dynamicconfig.GetBoolPropertyFn(false),
Persistence: &config.Persistence{
DefaultStore: config.StoreTypeNoSQL,
DataStores: map[string]config.DataStore{
config.StoreTypeNoSQL: {},
},
},
},
mockSdkClientFactory,
metrics.NoopMetricsHandler,
p.NewMockExecutionManager(ctrl),
p.NewMockTaskManager(ctrl),
historyservicemock.NewMockHistoryServiceClient(ctrl),
mockAdminClient,
mockNamespaceRegistry,
mockWorkerFactory,
)
mockSdkClientFactory.EXPECT().GetSystemClient().Return(mockSdkClient).AnyTimes()
worker.EXPECT().RegisterActivityWithOptions(gomock.Any(), gomock.Any()).AnyTimes()
worker.EXPECT().RegisterWorkflowWithOptions(gomock.Any(), gomock.Any()).AnyTimes()
worker.EXPECT().Start()
mockWorkerFactory.EXPECT().New(gomock.Any(), gomock.Any(), gomock.Any()).Return(worker)
var wg sync.WaitGroup
wg.Add(1)
mockSdkClient.EXPECT().ExecuteWorkflow(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(
ctx context.Context,
_ client.StartWorkflowOptions,
_ string,
_ ...interface{},
) (client.WorkflowRun, error) {
wg.Done()
<-ctx.Done()
return nil, ctx.Err()
})
err := scanner.Start()
s.NoError(err)
wg.Wait()
scanner.Stop()
}

0 comments on commit e0081fd

Please sign in to comment.