diff --git a/common/persistence/visibility/store/elasticsearch/processor.go b/common/persistence/visibility/store/elasticsearch/processor.go index 14c36007179..6d04b41213f 100644 --- a/common/persistence/visibility/store/elasticsearch/processor.go +++ b/common/persistence/visibility/store/elasticsearch/processor.go @@ -33,6 +33,7 @@ import ( "errors" "fmt" "strings" + "sync" "sync/atomic" "time" @@ -69,6 +70,7 @@ type ( logger log.Logger metricsHandler metrics.Handler indexerConcurrency uint32 + shutdownLock sync.RWMutex } // ProcessorConfig contains all configs for processor @@ -97,6 +99,10 @@ const ( visibilityProcessorName = "visibility-processor" ) +var ( + errVisibilityShutdown = errors.New("visiblity processor was shut down") +) + // NewProcessor create new processorImpl func NewProcessor( cfg *ProcessorConfig, @@ -150,14 +156,15 @@ func (p *processorImpl) Stop() { return } + p.shutdownLock.Lock() + defer p.shutdownLock.Unlock() + err := p.bulkProcessor.Stop() if err != nil { // This could happen if ES is down when we're trying to shut down the server. p.logger.Error("Unable to stop Elasticsearch processor.", tag.LifeCycleStopFailed, tag.Error(err)) return } - p.mapToAckFuture = nil - p.bulkProcessor = nil } func (p *processorImpl) hashFn(key interface{}) uint32 { @@ -172,7 +179,17 @@ func (p *processorImpl) hashFn(key interface{}) uint32 { // Add request to the bulk and return a future object which will receive ack signal when request is processed. func (p *processorImpl) Add(request *client.BulkableRequest, visibilityTaskKey string) *future.FutureImpl[bool] { - newFuture := newAckFuture() + newFuture := newAckFuture() // Create future first to measure impact of following RWLock on latency + + p.shutdownLock.RLock() + defer p.shutdownLock.RUnlock() + + if atomic.LoadInt32(&p.status) == common.DaemonStatusStopped { + p.logger.Warn("Rejecting ES request for visibility task key because processor has been shut down.", tag.Key(visibilityTaskKey), tag.ESDocID(request.ID), tag.Value(request.Doc)) + newFuture.future.Set(false, errVisibilityShutdown) + return newFuture.future + } + _, isDup, _ := p.mapToAckFuture.PutOrDo(visibilityTaskKey, newFuture, func(key interface{}, value interface{}) error { existingFuture, ok := value.(*ackFuture) if !ok { diff --git a/common/persistence/visibility/store/elasticsearch/processor_test.go b/common/persistence/visibility/store/elasticsearch/processor_test.go index e1de4666024..611bd78fbdc 100644 --- a/common/persistence/visibility/store/elasticsearch/processor_test.go +++ b/common/persistence/visibility/store/elasticsearch/processor_test.go @@ -36,6 +36,7 @@ import ( "github.com/olivere/elastic/v7" "github.com/stretchr/testify/suite" + "go.temporal.io/server/common" "go.temporal.io/server/common/collection" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/future" @@ -89,6 +90,7 @@ func (s *processorSuite) SetupTest() { // esProcessor.Start mock s.esProcessor.mapToAckFuture = collection.NewShardedConcurrentTxMap(1024, s.esProcessor.hashFn) s.esProcessor.bulkProcessor = s.mockBulkProcessor + s.esProcessor.status = common.DaemonStatusStarted } func (s *processorSuite) TearDownTest() { @@ -126,8 +128,8 @@ func (s *processorSuite) TestNewESProcessorAndStartStop() { s.NotNil(p.bulkProcessor) p.Stop() - s.Nil(p.mapToAckFuture) - s.Nil(p.bulkProcessor) + s.NotNil(p.mapToAckFuture) + s.NotNil(p.bulkProcessor) } func (s *processorSuite) TestAdd() { @@ -219,6 +221,43 @@ func (s *processorSuite) TestAdd_ConcurrentAdd_Duplicates() { s.Equal(1, s.esProcessor.mapToAckFuture.Len(), "only one request should be in the bulk") } +func (s *processorSuite) TestAdd_ConcurrentAdd_Shutdown() { + request := &client.BulkableRequest{} + docsCount := 1000 + parallelFactor := 10 + futures := make([]future.Future[bool], docsCount) + + s.mockBulkProcessor.EXPECT().Add(request).MaxTimes(docsCount + 2) // +2 for explicit adds before and after shutdown + s.mockBulkProcessor.EXPECT().Stop().Return(nil).Times(1) + s.mockMetricHandler.EXPECT().Timer(metrics.ElasticsearchBulkProcessorWaitAddLatency.GetMetricName()).Return(metrics.NoopTimerMetricFunc).MaxTimes(docsCount + 2) + + addBefore := s.esProcessor.Add(request, "test-key-before") + + wg := sync.WaitGroup{} + wg.Add(parallelFactor + 1) // +1 for separate shutdown goroutine + for i := 0; i < parallelFactor; i++ { + go func(i int) { + for j := 0; j < docsCount/parallelFactor; j++ { + futures[i*docsCount/parallelFactor+j] = s.esProcessor.Add(request, fmt.Sprintf("test-key-%d-%d", i, j)) + } + wg.Done() + }(i) + } + go func() { + time.Sleep(1 * time.Millisecond) // slight delay so at least a few docs get added + s.esProcessor.Stop() + wg.Done() + }() + + wg.Wait() + addAfter := s.esProcessor.Add(request, "test-key-after") + + s.False(addBefore.Ready()) // first request should be in bulk + s.True(addAfter.Ready()) // final request should be only error + _, err := addAfter.Get(context.Background()) + s.ErrorIs(err, errVisibilityShutdown) +} + func (s *processorSuite) TestBulkAfterAction_Ack() { version := int64(3) testKey := "testKey"