Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove internal bulk processor retries #3739

Merged
merged 4 commits into from
Dec 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1682,7 +1682,6 @@ var (
BatcherOperationFailures = NewCounterDef("batcher_operation_errors")
ElasticsearchBulkProcessorRequests = NewCounterDef("elasticsearch_bulk_processor_requests")
ElasticsearchBulkProcessorQueuedRequests = NewDimensionlessHistogramDef("elasticsearch_bulk_processor_queued_requests")
ElasticsearchBulkProcessorRetries = NewCounterDef("elasticsearch_bulk_processor_retries")
ElasticsearchBulkProcessorFailures = NewCounterDef("elasticsearch_bulk_processor_errors")
ElasticsearchBulkProcessorCorruptedData = NewCounterDef("elasticsearch_bulk_processor_corrupted_data")
ElasticsearchBulkProcessorDuplicateRequest = NewCounterDef("elasticsearch_bulk_processor_duplicate_request")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ type (
BulkActions int
BulkSize int
FlushInterval time.Duration
Backoff elastic.Backoff
BeforeFunc elastic.BulkBeforeFunc
AfterFunc elastic.BulkAfterFunc
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
enumspb "go.temporal.io/api/enums/v1"
)

Expand Down Expand Up @@ -74,10 +73,3 @@ func Test_BuildPutMappingBody(t *testing.T) {
assert.Equal(test.expected, fmt.Sprintf("%v", buildMappingBody(test.input)))
}
}

func TestIsResponseRetryable(t *testing.T) {
status := []int{408, 429, 500, 503, 507}
for _, code := range status {
require.True(t, IsRetryableStatus(code))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,10 @@ func (c *clientImpl) RunBulkProcessor(ctx context.Context, p *BulkProcessorParam
BulkActions(p.BulkActions).
BulkSize(p.BulkSize).
FlushInterval(p.FlushInterval).
Backoff(p.Backoff).
Before(p.BeforeFunc).
After(p.AfterFunc).
// Disable built-in retry logic because visibility task processor has its own.
RetryItemStatusCodes().
Do(ctx)

return newBulkProcessor(esBulkProcessor), err
Expand Down
57 changes: 0 additions & 57 deletions common/persistence/visibility/store/elasticsearch/client/errors.go

This file was deleted.

44 changes: 17 additions & 27 deletions common/persistence/visibility/store/elasticsearch/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ package elasticsearch
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -93,10 +94,7 @@ type (
var _ Processor = (*processorImpl)(nil)

const (
// retry configs for es bulk processor
esProcessorInitialRetryInterval = 200 * time.Millisecond
esProcessorMaxRetryInterval = 20 * time.Second
visibilityProcessorName = "visibility-processor"
visibilityProcessorName = "visibility-processor"
)

// NewProcessor create new processorImpl
Expand All @@ -119,7 +117,6 @@ func NewProcessor(
BulkActions: cfg.ESProcessorBulkActions(),
BulkSize: cfg.ESProcessorBulkSize(),
FlushInterval: cfg.ESProcessorFlushInterval(),
Backoff: elastic.NewExponentialBackoff(esProcessorInitialRetryInterval, esProcessorMaxRetryInterval),
},
}
p.bulkProcessorParameters.AfterFunc = p.bulkAfterAction
Expand Down Expand Up @@ -220,25 +217,26 @@ func (p *processorImpl) bulkBeforeAction(_ int64, requests []elastic.BulkableReq
func (p *processorImpl) bulkAfterAction(_ int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
if err != nil {
const logFirstNRequests = 5
httpStatus := client.HttpStatus(err)
isRetryable := client.IsRetryableStatus(httpStatus)
var httpStatus int
var esErr *elastic.Error
if errors.As(err, &esErr) {
httpStatus = esErr.Status
}

var logRequests strings.Builder
for i, request := range requests {
if i < logFirstNRequests {
logRequests.WriteString(request.String())
logRequests.WriteRune('\n')
}
p.metricsHandler.Counter(metrics.ElasticsearchBulkProcessorFailures.GetMetricName()).Record(1, metrics.HttpStatusTag(httpStatus))

if !isRetryable {
visibilityTaskKey := p.extractVisibilityTaskKey(request)
if visibilityTaskKey == "" {
continue
}
p.notifyResult(visibilityTaskKey, false)
visibilityTaskKey := p.extractVisibilityTaskKey(request)
if visibilityTaskKey == "" {
continue
}
p.notifyResult(visibilityTaskKey, false)
}
p.logger.Error("Unable to commit bulk ES request.", tag.Error(err), tag.IsRetryable(isRetryable), tag.RequestCount(len(requests)), tag.ESRequest(logRequests.String()))
p.logger.Error("Unable to commit bulk ES request.", tag.Error(err), tag.RequestCount(len(requests)), tag.ESRequest(logRequests.String()))
return
}

Expand All @@ -262,10 +260,7 @@ func (p *processorImpl) bulkAfterAction(_ int64, requests []elastic.BulkableRequ
continue
}

switch {
case isSuccess(responseItem):
p.notifyResult(visibilityTaskKey, true)
case !client.IsRetryableStatus(responseItem.Status):
if !isSuccess(responseItem) {
p.logger.Error("ES request failed.",
tag.ESResponseStatus(responseItem.Status),
tag.ESResponseError(extractErrorReason(responseItem)),
Expand All @@ -274,15 +269,10 @@ func (p *processorImpl) bulkAfterAction(_ int64, requests []elastic.BulkableRequ
tag.ESRequest(request.String()))
p.metricsHandler.Counter(metrics.ElasticsearchBulkProcessorFailures.GetMetricName()).Record(1, metrics.HttpStatusTag(responseItem.Status))
p.notifyResult(visibilityTaskKey, false)
default: // bulk processor will retry
p.logger.Warn("ES request retried.",
tag.ESResponseStatus(responseItem.Status),
tag.ESResponseError(extractErrorReason(responseItem)),
tag.Key(visibilityTaskKey),
tag.ESDocID(docID),
tag.ESRequest(request.String()))
p.metricsHandler.Counter(metrics.ElasticsearchBulkProcessorRetries.GetMetricName()).Record(1, metrics.HttpStatusTag(responseItem.Status))
continue
}

p.notifyResult(visibilityTaskKey, true)
}

// Record how many documents are waiting to be flushed to Elasticsearch after this bulk is committed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ func (s *processorSuite) TestNewESProcessorAndStartStop() {
s.Equal(config.ESProcessorBulkActions(), input.BulkActions)
s.Equal(config.ESProcessorBulkSize(), input.BulkSize)
s.Equal(config.ESProcessorFlushInterval(), input.FlushInterval)
s.NotNil(input.Backoff)
s.NotNil(input.AfterFunc)

bulkProcessor := client.NewMockBulkProcessor(s.controller)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,14 +254,15 @@ func (s *visibilityStore) addBulkRequestAndWait(
return &persistence.TimeoutError{Msg: fmt.Sprintf("visibility task %s timed out waiting for ACK after %v", visibilityTaskKey, s.processorAckTimeout())}
}
// Returns non-retryable Internal error here because these errors are unexpected.
// Visibility task processor retries all errors though, therefore new request will be generated for the same task.
// Visibility task processor retries all errors though, therefore new request will be generated for the same visibility task.
return serviceerror.NewInternal(fmt.Sprintf("visibility task %s received error %v", visibilityTaskKey, err))
}

if !ack {
// Returns non-retryable Internal error here because NACK from bulk processor means that this request can't be processed.
// Visibility task processor retries all errors though, therefore new request will be generated for the same task.
return serviceerror.NewInternal(fmt.Sprintf("visibility task %s received NACK", visibilityTaskKey))
// Returns retryable Unavailable error here because NACK from bulk processor
// means that this request wasn't processed successfully and needs to be retried.
// Visibility task processor retries all errors anyway, therefore new request will be generated for the same visibility task.
return serviceerror.NewUnavailable(fmt.Sprintf("visibility task %s received NACK", visibilityTaskKey))
}
return nil
}
Expand Down
19 changes: 18 additions & 1 deletion service/worker/addsearchattributes/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ import (
"context"
"errors"
"fmt"
"net/http"
"time"

"github.com/olivere/elastic/v7"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
Expand Down Expand Up @@ -140,7 +142,8 @@ func (a *activities) AddESMappingFieldActivity(ctx context.Context, params Workf
_, err := a.esClient.PutMapping(ctx, params.IndexName, params.CustomAttributesToAdd)
if err != nil {
a.metricsHandler.Counter(metrics.AddSearchAttributesFailuresCount.GetMetricName()).Record(1)
if esclient.IsRetryableError(err) {

if a.isRetryableError(err) {
a.logger.Error("Unable to update Elasticsearch mapping (retryable error).", tag.ESIndex(params.IndexName), tag.Error(err))
return fmt.Errorf("%w: %v", ErrUnableToUpdateESMapping, err)
}
Expand All @@ -152,6 +155,20 @@ func (a *activities) AddESMappingFieldActivity(ctx context.Context, params Workf
return nil
}

func (a *activities) isRetryableError(err error) bool {
var esErr *elastic.Error
if !errors.As(err, &esErr) {
return true
}

switch esErr.Status {
case http.StatusBadRequest, http.StatusUnauthorized, http.StatusForbidden, http.StatusNotFound, http.StatusConflict:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return false
default:
return true
Copy link
Contributor

@MichaelSnowden MichaelSnowden Dec 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means we retry all non-ES errors. Is that what we want?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean "non-ES"? This error came from Elasticsearch and Elasticsearch uses http status codes to indicate error.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand what you meant. Yes, non-ES errors are most likely some network failures and should be retryable. Also they might indicate some code bug (like bad formed url or missed required parameter). In this case it would be probably better not to retry but I don't know how to differentiate them. Generally, the idea is not to retry something that we know for sure is non-retryable and retry all the rest.

}
}

func (a *activities) WaitForYellowStatusActivity(ctx context.Context, indexName string) error {
if a.esClient == nil {
a.logger.Info("Elasticsearch client is not configured. Skipping Elasticsearch status check.")
Expand Down