Skip to content

Commit

Permalink
Fix visibility processor panic on add after stop (#3830)
Browse files Browse the repository at this point in the history
* add guard clause

* cleanup

* wait for in progress adds before shutdown

* use RWLock

* concurrency test

* comments

* test

* define error

* remove nil assignment

* err name

* test

* tests

* reuse future
  • Loading branch information
pdoerner authored Feb 1, 2023
1 parent c257d84 commit de0b049
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 5 deletions.
23 changes: 20 additions & 3 deletions common/persistence/visibility/store/elasticsearch/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -69,6 +70,7 @@ type (
logger log.Logger
metricsHandler metrics.Handler
indexerConcurrency uint32
shutdownLock sync.RWMutex
}

// ProcessorConfig contains all configs for processor
Expand Down Expand Up @@ -97,6 +99,10 @@ const (
visibilityProcessorName = "visibility-processor"
)

var (
errVisibilityShutdown = errors.New("visiblity processor was shut down")
)

// NewProcessor create new processorImpl
func NewProcessor(
cfg *ProcessorConfig,
Expand Down Expand Up @@ -150,14 +156,15 @@ func (p *processorImpl) Stop() {
return
}

p.shutdownLock.Lock()
defer p.shutdownLock.Unlock()

err := p.bulkProcessor.Stop()
if err != nil {
// This could happen if ES is down when we're trying to shut down the server.
p.logger.Error("Unable to stop Elasticsearch processor.", tag.LifeCycleStopFailed, tag.Error(err))
return
}
p.mapToAckFuture = nil
p.bulkProcessor = nil
}

func (p *processorImpl) hashFn(key interface{}) uint32 {
Expand All @@ -172,7 +179,17 @@ func (p *processorImpl) hashFn(key interface{}) uint32 {

// Add request to the bulk and return a future object which will receive ack signal when request is processed.
func (p *processorImpl) Add(request *client.BulkableRequest, visibilityTaskKey string) *future.FutureImpl[bool] {
newFuture := newAckFuture()
newFuture := newAckFuture() // Create future first to measure impact of following RWLock on latency

p.shutdownLock.RLock()
defer p.shutdownLock.RUnlock()

if atomic.LoadInt32(&p.status) == common.DaemonStatusStopped {
p.logger.Warn("Rejecting ES request for visibility task key because processor has been shut down.", tag.Key(visibilityTaskKey), tag.ESDocID(request.ID), tag.Value(request.Doc))
newFuture.future.Set(false, errVisibilityShutdown)
return newFuture.future
}

_, isDup, _ := p.mapToAckFuture.PutOrDo(visibilityTaskKey, newFuture, func(key interface{}, value interface{}) error {
existingFuture, ok := value.(*ackFuture)
if !ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/olivere/elastic/v7"
"github.com/stretchr/testify/suite"

"go.temporal.io/server/common"
"go.temporal.io/server/common/collection"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/future"
Expand Down Expand Up @@ -89,6 +90,7 @@ func (s *processorSuite) SetupTest() {
// esProcessor.Start mock
s.esProcessor.mapToAckFuture = collection.NewShardedConcurrentTxMap(1024, s.esProcessor.hashFn)
s.esProcessor.bulkProcessor = s.mockBulkProcessor
s.esProcessor.status = common.DaemonStatusStarted
}

func (s *processorSuite) TearDownTest() {
Expand Down Expand Up @@ -126,8 +128,8 @@ func (s *processorSuite) TestNewESProcessorAndStartStop() {
s.NotNil(p.bulkProcessor)

p.Stop()
s.Nil(p.mapToAckFuture)
s.Nil(p.bulkProcessor)
s.NotNil(p.mapToAckFuture)
s.NotNil(p.bulkProcessor)
}

func (s *processorSuite) TestAdd() {
Expand Down Expand Up @@ -219,6 +221,43 @@ func (s *processorSuite) TestAdd_ConcurrentAdd_Duplicates() {
s.Equal(1, s.esProcessor.mapToAckFuture.Len(), "only one request should be in the bulk")
}

func (s *processorSuite) TestAdd_ConcurrentAdd_Shutdown() {
request := &client.BulkableRequest{}
docsCount := 1000
parallelFactor := 10
futures := make([]future.Future[bool], docsCount)

s.mockBulkProcessor.EXPECT().Add(request).MaxTimes(docsCount + 2) // +2 for explicit adds before and after shutdown
s.mockBulkProcessor.EXPECT().Stop().Return(nil).Times(1)
s.mockMetricHandler.EXPECT().Timer(metrics.ElasticsearchBulkProcessorWaitAddLatency.GetMetricName()).Return(metrics.NoopTimerMetricFunc).MaxTimes(docsCount + 2)

addBefore := s.esProcessor.Add(request, "test-key-before")

wg := sync.WaitGroup{}
wg.Add(parallelFactor + 1) // +1 for separate shutdown goroutine
for i := 0; i < parallelFactor; i++ {
go func(i int) {
for j := 0; j < docsCount/parallelFactor; j++ {
futures[i*docsCount/parallelFactor+j] = s.esProcessor.Add(request, fmt.Sprintf("test-key-%d-%d", i, j))
}
wg.Done()
}(i)
}
go func() {
time.Sleep(1 * time.Millisecond) // slight delay so at least a few docs get added
s.esProcessor.Stop()
wg.Done()
}()

wg.Wait()
addAfter := s.esProcessor.Add(request, "test-key-after")

s.False(addBefore.Ready()) // first request should be in bulk
s.True(addAfter.Ready()) // final request should be only error
_, err := addAfter.Get(context.Background())
s.ErrorIs(err, errVisibilityShutdown)
}

func (s *processorSuite) TestBulkAfterAction_Ack() {
version := int64(3)
testKey := "testKey"
Expand Down

0 comments on commit de0b049

Please sign in to comment.