From e1ae9b5605707c5f34c3fcee5999d0c1da024489 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Tue, 20 Dec 2022 09:39:39 -0800 Subject: [PATCH 1/4] Exclude http status code 500 from Elasticsearch bulk process retryable codes --- .../store/elasticsearch/client/client_v7.go | 1 + .../store/elasticsearch/client/errors.go | 36 +++++++++++-------- .../store/elasticsearch/processor.go | 2 +- 3 files changed, 23 insertions(+), 16 deletions(-) diff --git a/common/persistence/visibility/store/elasticsearch/client/client_v7.go b/common/persistence/visibility/store/elasticsearch/client/client_v7.go index 6e6ca2272a1..70cf0dd50cd 100644 --- a/common/persistence/visibility/store/elasticsearch/client/client_v7.go +++ b/common/persistence/visibility/store/elasticsearch/client/client_v7.go @@ -228,6 +228,7 @@ func (c *clientImpl) RunBulkProcessor(ctx context.Context, p *BulkProcessorParam Backoff(p.Backoff). Before(p.BeforeFunc). After(p.AfterFunc). + RetryItemStatusCodes(retryItemStatusCodes...). Do(ctx) return newBulkProcessor(esBulkProcessor), err diff --git a/common/persistence/visibility/store/elasticsearch/client/errors.go b/common/persistence/visibility/store/elasticsearch/client/errors.go index ccb4b0e4df2..9dec3984b2c 100644 --- a/common/persistence/visibility/store/elasticsearch/client/errors.go +++ b/common/persistence/visibility/store/elasticsearch/client/errors.go @@ -28,6 +28,27 @@ import ( "github.com/olivere/elastic/v7" ) +var ( + // retryItemStatusCodes is an array of status codes that indicate that a bulk + // response line item should be retried. Should match default value from + // elastic.defaultRetryItemStatusCodes. + // 408 - Request Timeout + // 429 - Too Many Requests + // 503 - Service Unavailable + // 507 - Insufficient Storage + retryItemStatusCodes = []int{408, 429, 503, 507} +) + +// IsRetryableStatus check if httpsStatus is retryable. +func IsRetryableStatus(httpStatus int) bool { + for _, code := range retryItemStatusCodes { + if httpStatus == code { + return true + } + } + return false +} + func HttpStatus(err error) int { switch e := err.(type) { case *elastic.Error: @@ -37,21 +58,6 @@ func HttpStatus(err error) int { } } -// IsRetryableStatus is complaint with elastic.BulkProcessorService.RetryItemStatusCodes -// responses with these status will be kept in queue and retried until success -// 408 - Request Timeout -// 429 - Too Many Requests -// 500 - Node not connected -// 503 - Service Unavailable -// 507 - Insufficient Storage -func IsRetryableStatus(httpStatus int) bool { - switch httpStatus { - case 408, 429, 500, 503, 507: - return true - } - return false -} - func IsRetryableError(err error) bool { return IsRetryableStatus(HttpStatus(err)) } diff --git a/common/persistence/visibility/store/elasticsearch/processor.go b/common/persistence/visibility/store/elasticsearch/processor.go index 0ccec8c235c..6d3ce83b87e 100644 --- a/common/persistence/visibility/store/elasticsearch/processor.go +++ b/common/persistence/visibility/store/elasticsearch/processor.go @@ -275,7 +275,7 @@ func (p *processorImpl) bulkAfterAction(_ int64, requests []elastic.BulkableRequ p.metricsHandler.Counter(metrics.ElasticsearchBulkProcessorFailures.GetMetricName()).Record(1, metrics.HttpStatusTag(responseItem.Status)) p.notifyResult(visibilityTaskKey, false) default: // bulk processor will retry - p.logger.Warn("ES request retried.", + p.logger.Warn("ES request will be retried by bulk processor.", tag.ESResponseStatus(responseItem.Status), tag.ESResponseError(extractErrorReason(responseItem)), tag.Key(visibilityTaskKey), From b0f19792b3b427f0802b7849e914619cb7b0b216 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Tue, 20 Dec 2022 11:12:00 -0800 Subject: [PATCH 2/4] Remove internal bulk processor retries --- common/metrics/metric_defs.go | 1 - .../elasticsearch/client/bulk_processor.go | 1 - .../store/elasticsearch/client/client_test.go | 8 --- .../store/elasticsearch/client/client_v7.go | 4 +- .../store/elasticsearch/client/errors.go | 63 ------------------- .../store/elasticsearch/processor.go | 42 +++++-------- .../store/elasticsearch/visibility_store.go | 9 +-- .../worker/addsearchattributes/workflow.go | 19 +++++- 8 files changed, 40 insertions(+), 107 deletions(-) delete mode 100644 common/persistence/visibility/store/elasticsearch/client/errors.go diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index 8557a05815a..7b3615c3686 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -1682,7 +1682,6 @@ var ( BatcherOperationFailures = NewCounterDef("batcher_operation_errors") ElasticsearchBulkProcessorRequests = NewCounterDef("elasticsearch_bulk_processor_requests") ElasticsearchBulkProcessorQueuedRequests = NewDimensionlessHistogramDef("elasticsearch_bulk_processor_queued_requests") - ElasticsearchBulkProcessorRetries = NewCounterDef("elasticsearch_bulk_processor_retries") ElasticsearchBulkProcessorFailures = NewCounterDef("elasticsearch_bulk_processor_errors") ElasticsearchBulkProcessorCorruptedData = NewCounterDef("elasticsearch_bulk_processor_corrupted_data") ElasticsearchBulkProcessorDuplicateRequest = NewCounterDef("elasticsearch_bulk_processor_duplicate_request") diff --git a/common/persistence/visibility/store/elasticsearch/client/bulk_processor.go b/common/persistence/visibility/store/elasticsearch/client/bulk_processor.go index 9e639191e09..d424104180b 100644 --- a/common/persistence/visibility/store/elasticsearch/client/bulk_processor.go +++ b/common/persistence/visibility/store/elasticsearch/client/bulk_processor.go @@ -52,7 +52,6 @@ type ( BulkActions int BulkSize int FlushInterval time.Duration - Backoff elastic.Backoff BeforeFunc elastic.BulkBeforeFunc AfterFunc elastic.BulkAfterFunc } diff --git a/common/persistence/visibility/store/elasticsearch/client/client_test.go b/common/persistence/visibility/store/elasticsearch/client/client_test.go index b35cd9a29ba..ca1dd6eb40d 100644 --- a/common/persistence/visibility/store/elasticsearch/client/client_test.go +++ b/common/persistence/visibility/store/elasticsearch/client/client_test.go @@ -29,7 +29,6 @@ import ( "testing" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" enumspb "go.temporal.io/api/enums/v1" ) @@ -74,10 +73,3 @@ func Test_BuildPutMappingBody(t *testing.T) { assert.Equal(test.expected, fmt.Sprintf("%v", buildMappingBody(test.input))) } } - -func TestIsResponseRetryable(t *testing.T) { - status := []int{408, 429, 500, 503, 507} - for _, code := range status { - require.True(t, IsRetryableStatus(code)) - } -} diff --git a/common/persistence/visibility/store/elasticsearch/client/client_v7.go b/common/persistence/visibility/store/elasticsearch/client/client_v7.go index 70cf0dd50cd..168619b6240 100644 --- a/common/persistence/visibility/store/elasticsearch/client/client_v7.go +++ b/common/persistence/visibility/store/elasticsearch/client/client_v7.go @@ -225,10 +225,10 @@ func (c *clientImpl) RunBulkProcessor(ctx context.Context, p *BulkProcessorParam BulkActions(p.BulkActions). BulkSize(p.BulkSize). FlushInterval(p.FlushInterval). - Backoff(p.Backoff). Before(p.BeforeFunc). After(p.AfterFunc). - RetryItemStatusCodes(retryItemStatusCodes...). + // Disable built-in retry logic because visibility task processor has its own. + RetryItemStatusCodes(). Do(ctx) return newBulkProcessor(esBulkProcessor), err diff --git a/common/persistence/visibility/store/elasticsearch/client/errors.go b/common/persistence/visibility/store/elasticsearch/client/errors.go deleted file mode 100644 index 9dec3984b2c..00000000000 --- a/common/persistence/visibility/store/elasticsearch/client/errors.go +++ /dev/null @@ -1,63 +0,0 @@ -// The MIT License -// -// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. -// -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package client - -import ( - "github.com/olivere/elastic/v7" -) - -var ( - // retryItemStatusCodes is an array of status codes that indicate that a bulk - // response line item should be retried. Should match default value from - // elastic.defaultRetryItemStatusCodes. - // 408 - Request Timeout - // 429 - Too Many Requests - // 503 - Service Unavailable - // 507 - Insufficient Storage - retryItemStatusCodes = []int{408, 429, 503, 507} -) - -// IsRetryableStatus check if httpsStatus is retryable. -func IsRetryableStatus(httpStatus int) bool { - for _, code := range retryItemStatusCodes { - if httpStatus == code { - return true - } - } - return false -} - -func HttpStatus(err error) int { - switch e := err.(type) { - case *elastic.Error: - return e.Status - default: - return 0 - } -} - -func IsRetryableError(err error) bool { - return IsRetryableStatus(HttpStatus(err)) -} diff --git a/common/persistence/visibility/store/elasticsearch/processor.go b/common/persistence/visibility/store/elasticsearch/processor.go index 6d3ce83b87e..202d22c165d 100644 --- a/common/persistence/visibility/store/elasticsearch/processor.go +++ b/common/persistence/visibility/store/elasticsearch/processor.go @@ -93,10 +93,7 @@ type ( var _ Processor = (*processorImpl)(nil) const ( - // retry configs for es bulk processor - esProcessorInitialRetryInterval = 200 * time.Millisecond - esProcessorMaxRetryInterval = 20 * time.Second - visibilityProcessorName = "visibility-processor" + visibilityProcessorName = "visibility-processor" ) // NewProcessor create new processorImpl @@ -119,7 +116,6 @@ func NewProcessor( BulkActions: cfg.ESProcessorBulkActions(), BulkSize: cfg.ESProcessorBulkSize(), FlushInterval: cfg.ESProcessorFlushInterval(), - Backoff: elastic.NewExponentialBackoff(esProcessorInitialRetryInterval, esProcessorMaxRetryInterval), }, } p.bulkProcessorParameters.AfterFunc = p.bulkAfterAction @@ -220,8 +216,11 @@ func (p *processorImpl) bulkBeforeAction(_ int64, requests []elastic.BulkableReq func (p *processorImpl) bulkAfterAction(_ int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) { if err != nil { const logFirstNRequests = 5 - httpStatus := client.HttpStatus(err) - isRetryable := client.IsRetryableStatus(httpStatus) + var httpStatus int + if err, isElasticErr := err.(*elastic.Error); isElasticErr { + httpStatus = err.Status + } + var logRequests strings.Builder for i, request := range requests { if i < logFirstNRequests { @@ -229,16 +228,13 @@ func (p *processorImpl) bulkAfterAction(_ int64, requests []elastic.BulkableRequ logRequests.WriteRune('\n') } p.metricsHandler.Counter(metrics.ElasticsearchBulkProcessorFailures.GetMetricName()).Record(1, metrics.HttpStatusTag(httpStatus)) - - if !isRetryable { - visibilityTaskKey := p.extractVisibilityTaskKey(request) - if visibilityTaskKey == "" { - continue - } - p.notifyResult(visibilityTaskKey, false) + visibilityTaskKey := p.extractVisibilityTaskKey(request) + if visibilityTaskKey == "" { + continue } + p.notifyResult(visibilityTaskKey, false) } - p.logger.Error("Unable to commit bulk ES request.", tag.Error(err), tag.IsRetryable(isRetryable), tag.RequestCount(len(requests)), tag.ESRequest(logRequests.String())) + p.logger.Error("Unable to commit bulk ES request.", tag.Error(err), tag.RequestCount(len(requests)), tag.ESRequest(logRequests.String())) return } @@ -262,10 +258,7 @@ func (p *processorImpl) bulkAfterAction(_ int64, requests []elastic.BulkableRequ continue } - switch { - case isSuccess(responseItem): - p.notifyResult(visibilityTaskKey, true) - case !client.IsRetryableStatus(responseItem.Status): + if !isSuccess(responseItem) { p.logger.Error("ES request failed.", tag.ESResponseStatus(responseItem.Status), tag.ESResponseError(extractErrorReason(responseItem)), @@ -274,15 +267,10 @@ func (p *processorImpl) bulkAfterAction(_ int64, requests []elastic.BulkableRequ tag.ESRequest(request.String())) p.metricsHandler.Counter(metrics.ElasticsearchBulkProcessorFailures.GetMetricName()).Record(1, metrics.HttpStatusTag(responseItem.Status)) p.notifyResult(visibilityTaskKey, false) - default: // bulk processor will retry - p.logger.Warn("ES request will be retried by bulk processor.", - tag.ESResponseStatus(responseItem.Status), - tag.ESResponseError(extractErrorReason(responseItem)), - tag.Key(visibilityTaskKey), - tag.ESDocID(docID), - tag.ESRequest(request.String())) - p.metricsHandler.Counter(metrics.ElasticsearchBulkProcessorRetries.GetMetricName()).Record(1, metrics.HttpStatusTag(responseItem.Status)) + continue } + + p.notifyResult(visibilityTaskKey, true) } // Record how many documents are waiting to be flushed to Elasticsearch after this bulk is committed. diff --git a/common/persistence/visibility/store/elasticsearch/visibility_store.go b/common/persistence/visibility/store/elasticsearch/visibility_store.go index 91d36c6be39..3efa3c5a83c 100644 --- a/common/persistence/visibility/store/elasticsearch/visibility_store.go +++ b/common/persistence/visibility/store/elasticsearch/visibility_store.go @@ -254,14 +254,15 @@ func (s *visibilityStore) addBulkRequestAndWait( return &persistence.TimeoutError{Msg: fmt.Sprintf("visibility task %s timed out waiting for ACK after %v", visibilityTaskKey, s.processorAckTimeout())} } // Returns non-retryable Internal error here because these errors are unexpected. - // Visibility task processor retries all errors though, therefore new request will be generated for the same task. + // Visibility task processor retries all errors though, therefore new request will be generated for the same visibility task. return serviceerror.NewInternal(fmt.Sprintf("visibility task %s received error %v", visibilityTaskKey, err)) } if !ack { - // Returns non-retryable Internal error here because NACK from bulk processor means that this request can't be processed. - // Visibility task processor retries all errors though, therefore new request will be generated for the same task. - return serviceerror.NewInternal(fmt.Sprintf("visibility task %s received NACK", visibilityTaskKey)) + // Returns retryable Unavailable error here because NACK from bulk processor + // means that this request wasn't processed successfully and needs to be retried. + // Visibility task processor retries all errors anyway, therefore new request will be generated for the same visibility task. + return serviceerror.NewUnavailable(fmt.Sprintf("visibility task %s received NACK", visibilityTaskKey)) } return nil } diff --git a/service/worker/addsearchattributes/workflow.go b/service/worker/addsearchattributes/workflow.go index 684e5435075..9146afc7b3e 100644 --- a/service/worker/addsearchattributes/workflow.go +++ b/service/worker/addsearchattributes/workflow.go @@ -28,8 +28,10 @@ import ( "context" "errors" "fmt" + "net/http" "time" + "github.com/olivere/elastic/v7" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" @@ -140,7 +142,8 @@ func (a *activities) AddESMappingFieldActivity(ctx context.Context, params Workf _, err := a.esClient.PutMapping(ctx, params.IndexName, params.CustomAttributesToAdd) if err != nil { a.metricsHandler.Counter(metrics.AddSearchAttributesFailuresCount.GetMetricName()).Record(1) - if esclient.IsRetryableError(err) { + + if a.isRetryableError(err) { a.logger.Error("Unable to update Elasticsearch mapping (retryable error).", tag.ESIndex(params.IndexName), tag.Error(err)) return fmt.Errorf("%w: %v", ErrUnableToUpdateESMapping, err) } @@ -152,6 +155,20 @@ func (a *activities) AddESMappingFieldActivity(ctx context.Context, params Workf return nil } +func (a *activities) isRetryableError(err error) bool { + var httpStatusCode int + if err, isElasticErr := err.(*elastic.Error); isElasticErr { + httpStatusCode = err.Status + } + + switch httpStatusCode { + case http.StatusBadRequest, http.StatusUnauthorized, http.StatusForbidden, http.StatusNotFound, http.StatusConflict: + return false + default: + return true + } +} + func (a *activities) WaitForYellowStatusActivity(ctx context.Context, indexName string) error { if a.esClient == nil { a.logger.Info("Elasticsearch client is not configured. Skipping Elasticsearch status check.") From 68d3676adb518fca913abf0e6276365f087c3634 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Tue, 20 Dec 2022 12:11:50 -0800 Subject: [PATCH 3/4] Fix unit tests --- .../persistence/visibility/store/elasticsearch/processor_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/common/persistence/visibility/store/elasticsearch/processor_test.go b/common/persistence/visibility/store/elasticsearch/processor_test.go index f19d226f71a..e1de4666024 100644 --- a/common/persistence/visibility/store/elasticsearch/processor_test.go +++ b/common/persistence/visibility/store/elasticsearch/processor_test.go @@ -113,7 +113,6 @@ func (s *processorSuite) TestNewESProcessorAndStartStop() { s.Equal(config.ESProcessorBulkActions(), input.BulkActions) s.Equal(config.ESProcessorBulkSize(), input.BulkSize) s.Equal(config.ESProcessorFlushInterval(), input.FlushInterval) - s.NotNil(input.Backoff) s.NotNil(input.AfterFunc) bulkProcessor := client.NewMockBulkProcessor(s.controller) From 81067a44e1ed175ee51a7aeecf40fb2f0a702a07 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Tue, 20 Dec 2022 13:53:36 -0800 Subject: [PATCH 4/4] Address feedback --- .../visibility/store/elasticsearch/processor.go | 6 ++++-- service/worker/addsearchattributes/workflow.go | 8 ++++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/common/persistence/visibility/store/elasticsearch/processor.go b/common/persistence/visibility/store/elasticsearch/processor.go index 202d22c165d..14c36007179 100644 --- a/common/persistence/visibility/store/elasticsearch/processor.go +++ b/common/persistence/visibility/store/elasticsearch/processor.go @@ -30,6 +30,7 @@ package elasticsearch import ( "context" "encoding/json" + "errors" "fmt" "strings" "sync/atomic" @@ -217,8 +218,9 @@ func (p *processorImpl) bulkAfterAction(_ int64, requests []elastic.BulkableRequ if err != nil { const logFirstNRequests = 5 var httpStatus int - if err, isElasticErr := err.(*elastic.Error); isElasticErr { - httpStatus = err.Status + var esErr *elastic.Error + if errors.As(err, &esErr) { + httpStatus = esErr.Status } var logRequests strings.Builder diff --git a/service/worker/addsearchattributes/workflow.go b/service/worker/addsearchattributes/workflow.go index 9146afc7b3e..c1107aa10c7 100644 --- a/service/worker/addsearchattributes/workflow.go +++ b/service/worker/addsearchattributes/workflow.go @@ -156,12 +156,12 @@ func (a *activities) AddESMappingFieldActivity(ctx context.Context, params Workf } func (a *activities) isRetryableError(err error) bool { - var httpStatusCode int - if err, isElasticErr := err.(*elastic.Error); isElasticErr { - httpStatusCode = err.Status + var esErr *elastic.Error + if !errors.As(err, &esErr) { + return true } - switch httpStatusCode { + switch esErr.Status { case http.StatusBadRequest, http.StatusUnauthorized, http.StatusForbidden, http.StatusNotFound, http.StatusConflict: return false default: