diff --git a/common/metrics/metricstest/metricstest.go b/common/metrics/metricstest/metricstest.go index e8843a0c9a8..777d3a8f0f7 100644 --- a/common/metrics/metricstest/metricstest.go +++ b/common/metrics/metricstest/metricstest.go @@ -26,17 +26,17 @@ package metricstest import ( "fmt" + "net/http" "net/http/httptest" "strings" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" - "go.opentelemetry.io/otel/exporters/prometheus" + exporters "go.opentelemetry.io/otel/exporters/prometheus" "go.opentelemetry.io/otel/metric" - controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" - "go.opentelemetry.io/otel/sdk/metric/export/aggregation" - processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" - "go.opentelemetry.io/otel/sdk/resource" + sdkmetrics "go.opentelemetry.io/otel/sdk/metric" "golang.org/x/exp/maps" "go.temporal.io/server/common/log" @@ -46,7 +46,7 @@ import ( type ( Handler struct { metrics.Handler - exporter *prometheus.Exporter + reg *prometheus.Registry } sample struct { @@ -69,32 +69,19 @@ func MustNewHandler(logger log.Logger) *Handler { } func NewHandler(logger log.Logger) (*Handler, error) { - ctrl := controller.New( - processor.NewFactory( - metrics.NewOtelAggregatorSelector(nil), - aggregation.CumulativeTemporalitySelector(), - processor.WithMemory(true), - ), - controller.WithResource(resource.Empty()), - // Set collect period to 0 otherwise Snapshot() will potentially - // return an old view of metrics. - controller.WithCollectPeriod(0), - ) - - exporter, err := prometheus.New(prometheus.Config{}, ctrl) + registry := prometheus.NewRegistry() + exporter, err := exporters.New(exporters.WithRegisterer(registry)) if err != nil { return nil, err } - provider := &otelProvider{ - meter: ctrl.Meter("temporal"), - } + provider := sdkmetrics.NewMeterProvider(sdkmetrics.WithReader(exporter)) + meter := provider.Meter("temporal") clientConfig := metrics.ClientConfig{} - otelHandler := metrics.NewOtelMetricsHandler(logger, provider, clientConfig) - + otelHandler := metrics.NewOtelMetricsHandler(logger, &otelProvider{meter: meter}, clientConfig) metricsHandler := &Handler{ - Handler: otelHandler, - exporter: exporter, + Handler: otelHandler, + reg: registry, } return metricsHandler, nil @@ -105,7 +92,9 @@ func (*Handler) Stop(log.Logger) {} func (h *Handler) Snapshot() (Snapshot, error) { rec := httptest.NewRecorder() req := httptest.NewRequest("GET", "/metrics", nil) - h.exporter.ServeHTTP(rec, req) + handler := http.NewServeMux() + handler.HandleFunc("/metrics", promhttp.HandlerFor(h.reg, promhttp.HandlerOpts{Registry: h.reg}).ServeHTTP) + handler.ServeHTTP(rec, req) var tp expfmt.TextParser families, err := tp.TextToMetricFamilies(rec.Body) diff --git a/common/metrics/metricstest/metricstest_test.go b/common/metrics/metricstest/metricstest_test.go index 2a23af1086b..880986b007a 100644 --- a/common/metrics/metricstest/metricstest_test.go +++ b/common/metrics/metricstest/metricstest_test.go @@ -42,23 +42,29 @@ func TestBasic(t *testing.T) { metrics.StringTag("l2", "v2"), metrics.StringTag("l1", "v1"), } + expectedSystemTags := []metrics.Tag{ + metrics.StringTag("otel_scope_name", "temporal"), + metrics.StringTag("otel_scope_version", ""), + } + expectedCounterTags := append(expectedSystemTags, counterTags...) counter := handler.WithTags(counterTags...).Counter(counterName) counter.Record(1) counter.Record(1) s1 := handler.MustSnapshot() - require.Equal(t, float64(2), s1.MustCounter(counterName, counterTags...)) + require.Equal(t, float64(2), s1.MustCounter(counterName+"_total", expectedCounterTags...)) gaugeName := "gauge1" gaugeTags := []metrics.Tag{ metrics.StringTag("l3", "v3"), metrics.StringTag("l4", "v4"), } + expectedGaugeTags := append(expectedSystemTags, gaugeTags...) gauge := handler.WithTags(gaugeTags...).Gauge(gaugeName) gauge.Record(-2) gauge.Record(10) s2 := handler.MustSnapshot() - require.Equal(t, float64(2), s2.MustCounter(counterName, counterTags...)) - require.Equal(t, float64(10), s2.MustGauge(gaugeName, gaugeTags...)) + require.Equal(t, float64(2), s2.MustCounter(counterName+"_total", expectedCounterTags...)) + require.Equal(t, float64(10), s2.MustGauge(gaugeName, expectedGaugeTags...)) } diff --git a/common/metrics/opentelemetry_aggregator_selector.go b/common/metrics/opentelemetry_aggregator_selector.go deleted file mode 100644 index 7dbbcc0a2ce..00000000000 --- a/common/metrics/opentelemetry_aggregator_selector.go +++ /dev/null @@ -1,88 +0,0 @@ -// The MIT License -// -// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. -// -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package metrics - -import ( - "go.opentelemetry.io/otel/sdk/metric/aggregator" - "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" - "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" - "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" - emetric "go.opentelemetry.io/otel/sdk/metric/export" - "go.opentelemetry.io/otel/sdk/metric/sdkapi" -) - -type ( - // OtelAggregatorSelector handles utilizing correct histogram bucket list for distinct metric unit types. - OtelAggregatorSelector struct { - buckets map[MetricUnit][]histogram.Option - } -) - -var _ emetric.AggregatorSelector = &OtelAggregatorSelector{} - -// Creates new instance of aggregator selector. -func NewOtelAggregatorSelector( - perUnitBoundaries map[string][]float64, -) *OtelAggregatorSelector { - perUnitBuckets := make(map[MetricUnit][]histogram.Option, len(perUnitBoundaries)) - for unit, buckets := range perUnitBoundaries { - perUnitBuckets[MetricUnit(unit)] = []histogram.Option{histogram.WithExplicitBoundaries(buckets)} - } - return &OtelAggregatorSelector{ - buckets: perUnitBuckets, - } -} - -func (s OtelAggregatorSelector) AggregatorFor(descriptor *sdkapi.Descriptor, aggPtrs ...*aggregator.Aggregator) { - switch descriptor.InstrumentKind() { - case sdkapi.GaugeObserverInstrumentKind: - lastValueAggs(aggPtrs) - case sdkapi.HistogramInstrumentKind: - var options []histogram.Option - if opts, ok := s.buckets[MetricUnit(descriptor.Unit())]; ok { - options = opts - } - aggs := histogram.New(len(aggPtrs), descriptor, options...) - for i := range aggPtrs { - *aggPtrs[i] = &aggs[i] - } - default: - sumAggs(aggPtrs) - } -} - -func sumAggs(aggPtrs []*aggregator.Aggregator) { - aggs := sum.New(len(aggPtrs)) - for i := range aggPtrs { - *aggPtrs[i] = &aggs[i] - } -} - -func lastValueAggs(aggPtrs []*aggregator.Aggregator) { - aggs := lastvalue.New(len(aggPtrs)) - for i := range aggPtrs { - *aggPtrs[i] = &aggs[i] - } -} diff --git a/common/metrics/opentelemetry_provider.go b/common/metrics/opentelemetry_provider.go index 531dd2d51fc..88ce99cfa5c 100644 --- a/common/metrics/opentelemetry_provider.go +++ b/common/metrics/opentelemetry_provider.go @@ -29,12 +29,13 @@ import ( "net/http" "time" - "go.opentelemetry.io/otel/exporters/prometheus" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + exporters "go.opentelemetry.io/otel/exporters/prometheus" "go.opentelemetry.io/otel/metric" - controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" - "go.opentelemetry.io/otel/sdk/metric/export/aggregation" - processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" - "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/metric/unit" + sdkmetrics "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" @@ -49,7 +50,7 @@ type ( } openTelemetryProviderImpl struct { - exporter *prometheus.Exporter + exporter *exporters.Exporter meter metric.Meter config *PrometheusConfig server *http.Server @@ -61,27 +62,33 @@ func NewOpenTelemetryProvider( prometheusConfig *PrometheusConfig, clientConfig *ClientConfig, ) (*openTelemetryProviderImpl, error) { - - c := controller.New( - processor.NewFactory( - NewOtelAggregatorSelector( - clientConfig.PerUnitHistogramBoundaries, - ), - aggregation.CumulativeTemporalitySelector(), - processor.WithMemory(true), - ), - controller.WithResource(resource.Empty()), - ) - exporter, err := prometheus.New(prometheus.Config{}, c) - + reg := prometheus.NewRegistry() + exporter, err := exporters.New(exporters.WithRegisterer(reg)) if err != nil { logger.Error("Failed to initialize prometheus exporter.", tag.Error(err)) return nil, err } - metricServer := initPrometheusListener(prometheusConfig, logger, exporter) - - meter := c.Meter("temporal") + var views []sdkmetrics.View + for _, u := range []string{Dimensionless, Bytes, Milliseconds} { + views = append(views, sdkmetrics.NewView( + sdkmetrics.Instrument{ + Kind: sdkmetrics.InstrumentKindSyncHistogram, + Unit: unit.Unit(u), + }, + sdkmetrics.Stream{ + Aggregation: aggregation.ExplicitBucketHistogram{ + Boundaries: clientConfig.PerUnitHistogramBoundaries[u], + }, + }, + )) + } + provider := sdkmetrics.NewMeterProvider( + sdkmetrics.WithReader(exporter), + sdkmetrics.WithView(views...), + ) + metricServer := initPrometheusListener(prometheusConfig, reg, logger) + meter := provider.Meter("temporal") reporter := &openTelemetryProviderImpl{ exporter: exporter, meter: meter, @@ -92,14 +99,14 @@ func NewOpenTelemetryProvider( return reporter, nil } -func initPrometheusListener(config *PrometheusConfig, logger log.Logger, exporter *prometheus.Exporter) *http.Server { +func initPrometheusListener(config *PrometheusConfig, reg *prometheus.Registry, logger log.Logger) *http.Server { handlerPath := config.HandlerPath if handlerPath == "" { handlerPath = "/metrics" } handler := http.NewServeMux() - handler.HandleFunc(handlerPath, exporter.ServeHTTP) + handler.HandleFunc(handlerPath, promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}).ServeHTTP) if config.ListenAddress == "" { logger.Fatal("Listen address must be specified.", tag.Address(config.ListenAddress)) diff --git a/common/metrics/otel_metrics_handler.go b/common/metrics/otel_metrics_handler.go index 49a3c9a07fc..842ca37dca5 100644 --- a/common/metrics/otel_metrics_handler.go +++ b/common/metrics/otel_metrics_handler.go @@ -85,7 +85,12 @@ func (omp *otelMetricsHandler) Gauge(gauge string) GaugeIface { } return GaugeFunc(func(i float64, t ...Tag) { - c.Observe(context.Background(), i, tagsToAttributes(omp.tags, t, omp.excludeTags)...) + err = omp.provider.GetMeter().RegisterCallback([]instrument.Asynchronous{c}, func(ctx context.Context) { + c.Observe(ctx, i, tagsToAttributes(omp.tags, t, omp.excludeTags)...) + }) + if err != nil { + omp.l.Fatal("error setting callback metric update", tag.NewStringTag("MetricName", gauge), tag.Error(err)) + } }) } diff --git a/common/metrics/otel_metrics_handler_test.go b/common/metrics/otel_metrics_handler_test.go index 51f39394e3e..9ab7dd44ea7 100644 --- a/common/metrics/otel_metrics_handler_test.go +++ b/common/metrics/otel_metrics_handler_test.go @@ -34,13 +34,20 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/sdk/metric/export/aggregation" - "go.opentelemetry.io/otel/sdk/metric/metrictest" - "go.opentelemetry.io/otel/sdk/metric/number" + "go.opentelemetry.io/otel/metric/unit" + sdkmetrics "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregation" + "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.temporal.io/server/common/log" ) +var ( + minLatency = float64(1248) + maxLatency = float64(5255) + testBytes = float64(1234567) +) + type testProvider struct { meter metric.Meter } @@ -53,92 +60,142 @@ func (t *testProvider) Stop(log.Logger) {} func TestMeter(t *testing.T) { ctx := context.Background() - mp, exp := metrictest.NewTestMeterProvider() - p := NewOtelMetricsHandler(log.NewTestLogger(), &testProvider{meter: mp.Meter("test")}, defaultConfig) + rdr := sdkmetrics.NewManualReader() + provider := sdkmetrics.NewMeterProvider( + sdkmetrics.WithReader(rdr), + sdkmetrics.WithView( + sdkmetrics.NewView( + sdkmetrics.Instrument{ + Kind: sdkmetrics.InstrumentKindSyncHistogram, + Unit: unit.Bytes, + }, + sdkmetrics.Stream{ + Aggregation: aggregation.ExplicitBucketHistogram{ + Boundaries: defaultConfig.PerUnitHistogramBoundaries[string(unit.Bytes)], + }, + }, + ), + sdkmetrics.NewView( + sdkmetrics.Instrument{ + Kind: sdkmetrics.InstrumentKindSyncHistogram, + Unit: unit.Dimensionless, + }, + sdkmetrics.Stream{ + Aggregation: aggregation.ExplicitBucketHistogram{ + Boundaries: defaultConfig.PerUnitHistogramBoundaries[string(unit.Dimensionless)], + }, + }, + ), + sdkmetrics.NewView( + sdkmetrics.Instrument{ + Kind: sdkmetrics.InstrumentKindSyncHistogram, + Unit: unit.Milliseconds, + }, + sdkmetrics.Stream{ + Aggregation: aggregation.ExplicitBucketHistogram{ + Boundaries: defaultConfig.PerUnitHistogramBoundaries[string(unit.Milliseconds)], + }, + }, + ), + ), + ) + p := NewOtelMetricsHandler(log.NewTestLogger(), &testProvider{meter: provider.Meter("test")}, defaultConfig) recordMetrics(p) - err := exp.Collect(ctx) + got, err := rdr.Collect(ctx) assert.Nil(t, err) - //lint:ignore SA1019 TODO: fix later - lib := metrictest.Library{InstrumentationName: "test"} - got := exp.Records - - want := []metrictest.ExportRecord{ + want := []metricdata.Metrics{ { - InstrumentName: "hits", - Sum: number.NewInt64Number(8), - Attributes: nil, - InstrumentationLibrary: lib, - AggregationKind: aggregation.SumKind, - NumberKind: number.Int64Kind, + Name: "hits", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 8, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, }, { - InstrumentName: "hits-tagged", - Sum: number.NewInt64Number(11), - Attributes: []attribute.KeyValue{ - { - Key: attribute.Key("taskqueue"), - Value: attribute.StringValue("__sticky__"), + Name: "hits-tagged", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + //Attributes: attribute.NewSet(attribute.String("taskqueue", "__sticky__")), + Value: 11, + }, }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, }, - InstrumentationLibrary: lib, - AggregationKind: aggregation.SumKind, - NumberKind: number.Int64Kind, }, { - InstrumentName: "hits-tagged-excluded", - Sum: number.NewInt64Number(14), - Attributes: []attribute.KeyValue{ - { - Key: attribute.Key("taskqueue"), - Value: attribute.StringValue(tagExcludedValue), + Name: "hits-tagged-excluded", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 14, + }, }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, }, - InstrumentationLibrary: lib, - AggregationKind: aggregation.SumKind, - NumberKind: number.Int64Kind, }, { - InstrumentName: "latency", - Sum: number.NewInt64Number(6503), - Count: 2, - Attributes: nil, - InstrumentationLibrary: lib, - AggregationKind: aggregation.HistogramKind, - NumberKind: number.Int64Kind, - Histogram: aggregation.Buckets{ - Counts: []uint64{1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Name: "latency", + Data: metricdata.Histogram{ + DataPoints: []metricdata.HistogramDataPoint{ + { + Count: 2, + BucketCounts: []uint64{0, 0, 0, 1, 1, 0}, + Min: &minLatency, + Max: &maxLatency, + Sum: 6503, + }, + }, + Temporality: metricdata.CumulativeTemporality, }, + Unit: unit.Milliseconds, }, { - InstrumentName: "temp", - LastValue: number.NewFloat64Number(100), - Attributes: []attribute.KeyValue{ - { - Key: attribute.Key("location"), - Value: attribute.StringValue("Mare Imbrium"), + Name: "temp", + Data: metricdata.Gauge[float64]{ + DataPoints: []metricdata.DataPoint[float64]{ + { + //Attributes: attribute.NewSet(attribute.String("location", "Mare Imbrium")), + Value: 100, + }, }, }, - InstrumentationLibrary: lib, - AggregationKind: aggregation.LastValueKind, - NumberKind: number.Float64Kind, }, { - InstrumentName: "transmission", - InstrumentationLibrary: lib, - Sum: number.NewInt64Number(1234567), - Count: 1, - AggregationKind: aggregation.HistogramKind, - Histogram: aggregation.Buckets{ - Counts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0}, + Name: "transmission", + Data: metricdata.Histogram{ + DataPoints: []metricdata.HistogramDataPoint{ + { + Count: 1, + BucketCounts: []uint64{0, 0, 1}, + Min: &testBytes, + Max: &testBytes, + Sum: testBytes, + }, + }, + Temporality: metricdata.CumulativeTemporality, }, + Unit: unit.Bytes, }, } - - if diff := cmp.Diff(want, got, cmp.Comparer(valuesEqual), cmpopts.SortSlices(func(x, y metrictest.ExportRecord) bool { - return x.InstrumentName < y.InstrumentName - }), cmpopts.IgnoreFields(aggregation.Buckets{}, "Boundaries")); diff != "" { + if diff := cmp.Diff(want, got.ScopeMetrics[0].Metrics, cmp.Comparer(valuesEqual), + cmpopts.SortSlices(func(x, y metricdata.Metrics) bool { + return x.Name < y.Name + }), + // TODO: No good way to verify metrics tag in attributes as a private field in the attribute.Set. + cmpopts.IgnoreFields(metricdata.DataPoint[int64]{}, "Attributes", "StartTime", "Time"), + cmpopts.IgnoreFields(metricdata.DataPoint[float64]{}, "Attributes", "StartTime", "Time"), + cmpopts.IgnoreFields(metricdata.HistogramDataPoint{}, "Attributes", "StartTime", "Time", "Bounds"), + ); diff != "" { t.Errorf("mismatch (-want, got):\n%s", diff) } } @@ -147,20 +204,20 @@ func valuesEqual(v1, v2 attribute.Value) bool { return v1.AsInterface() == v2.AsInterface() } -func recordMetrics(h Handler) { - hitsCounter := h.Counter("hits") - gauge := h.Gauge("temp") +func recordMetrics(mp Handler) { + hitsCounter := mp.Counter("hits") + gauge := mp.Gauge("temp") - timer := h.Timer("latency") - histogram := h.Histogram("transmission", Bytes) - hitsTaggedCounter := h.Counter("hits-tagged") - hitsTaggedExcludedCounter := h.Counter("hits-tagged-excluded") + timer := mp.Timer("latency") + histogram := mp.Histogram("transmission", Bytes) + hitsTaggedCounter := mp.Counter("hits-tagged") + hitsTaggedExcludedCounter := mp.Counter("hits-tagged-excluded") hitsCounter.Record(8) gauge.Record(100, StringTag("location", "Mare Imbrium")) - timer.Record(1248 * time.Millisecond) - timer.Record(5255 * time.Millisecond) - histogram.Record(1234567) + timer.Record(time.Duration(minLatency) * time.Millisecond) + timer.Record(time.Duration(maxLatency) * time.Millisecond) + histogram.Record(int64(testBytes)) hitsTaggedCounter.Record(11, TaskQueueTag("__sticky__")) hitsTaggedExcludedCounter.Record(14, TaskQueueTag("filtered")) } diff --git a/common/persistence/visibility/store/elasticsearch/processor.go b/common/persistence/visibility/store/elasticsearch/processor.go index 14c36007179..6d04b41213f 100644 --- a/common/persistence/visibility/store/elasticsearch/processor.go +++ b/common/persistence/visibility/store/elasticsearch/processor.go @@ -33,6 +33,7 @@ import ( "errors" "fmt" "strings" + "sync" "sync/atomic" "time" @@ -69,6 +70,7 @@ type ( logger log.Logger metricsHandler metrics.Handler indexerConcurrency uint32 + shutdownLock sync.RWMutex } // ProcessorConfig contains all configs for processor @@ -97,6 +99,10 @@ const ( visibilityProcessorName = "visibility-processor" ) +var ( + errVisibilityShutdown = errors.New("visiblity processor was shut down") +) + // NewProcessor create new processorImpl func NewProcessor( cfg *ProcessorConfig, @@ -150,14 +156,15 @@ func (p *processorImpl) Stop() { return } + p.shutdownLock.Lock() + defer p.shutdownLock.Unlock() + err := p.bulkProcessor.Stop() if err != nil { // This could happen if ES is down when we're trying to shut down the server. p.logger.Error("Unable to stop Elasticsearch processor.", tag.LifeCycleStopFailed, tag.Error(err)) return } - p.mapToAckFuture = nil - p.bulkProcessor = nil } func (p *processorImpl) hashFn(key interface{}) uint32 { @@ -172,7 +179,17 @@ func (p *processorImpl) hashFn(key interface{}) uint32 { // Add request to the bulk and return a future object which will receive ack signal when request is processed. func (p *processorImpl) Add(request *client.BulkableRequest, visibilityTaskKey string) *future.FutureImpl[bool] { - newFuture := newAckFuture() + newFuture := newAckFuture() // Create future first to measure impact of following RWLock on latency + + p.shutdownLock.RLock() + defer p.shutdownLock.RUnlock() + + if atomic.LoadInt32(&p.status) == common.DaemonStatusStopped { + p.logger.Warn("Rejecting ES request for visibility task key because processor has been shut down.", tag.Key(visibilityTaskKey), tag.ESDocID(request.ID), tag.Value(request.Doc)) + newFuture.future.Set(false, errVisibilityShutdown) + return newFuture.future + } + _, isDup, _ := p.mapToAckFuture.PutOrDo(visibilityTaskKey, newFuture, func(key interface{}, value interface{}) error { existingFuture, ok := value.(*ackFuture) if !ok { diff --git a/common/persistence/visibility/store/elasticsearch/processor_test.go b/common/persistence/visibility/store/elasticsearch/processor_test.go index e1de4666024..611bd78fbdc 100644 --- a/common/persistence/visibility/store/elasticsearch/processor_test.go +++ b/common/persistence/visibility/store/elasticsearch/processor_test.go @@ -36,6 +36,7 @@ import ( "github.com/olivere/elastic/v7" "github.com/stretchr/testify/suite" + "go.temporal.io/server/common" "go.temporal.io/server/common/collection" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/future" @@ -89,6 +90,7 @@ func (s *processorSuite) SetupTest() { // esProcessor.Start mock s.esProcessor.mapToAckFuture = collection.NewShardedConcurrentTxMap(1024, s.esProcessor.hashFn) s.esProcessor.bulkProcessor = s.mockBulkProcessor + s.esProcessor.status = common.DaemonStatusStarted } func (s *processorSuite) TearDownTest() { @@ -126,8 +128,8 @@ func (s *processorSuite) TestNewESProcessorAndStartStop() { s.NotNil(p.bulkProcessor) p.Stop() - s.Nil(p.mapToAckFuture) - s.Nil(p.bulkProcessor) + s.NotNil(p.mapToAckFuture) + s.NotNil(p.bulkProcessor) } func (s *processorSuite) TestAdd() { @@ -219,6 +221,43 @@ func (s *processorSuite) TestAdd_ConcurrentAdd_Duplicates() { s.Equal(1, s.esProcessor.mapToAckFuture.Len(), "only one request should be in the bulk") } +func (s *processorSuite) TestAdd_ConcurrentAdd_Shutdown() { + request := &client.BulkableRequest{} + docsCount := 1000 + parallelFactor := 10 + futures := make([]future.Future[bool], docsCount) + + s.mockBulkProcessor.EXPECT().Add(request).MaxTimes(docsCount + 2) // +2 for explicit adds before and after shutdown + s.mockBulkProcessor.EXPECT().Stop().Return(nil).Times(1) + s.mockMetricHandler.EXPECT().Timer(metrics.ElasticsearchBulkProcessorWaitAddLatency.GetMetricName()).Return(metrics.NoopTimerMetricFunc).MaxTimes(docsCount + 2) + + addBefore := s.esProcessor.Add(request, "test-key-before") + + wg := sync.WaitGroup{} + wg.Add(parallelFactor + 1) // +1 for separate shutdown goroutine + for i := 0; i < parallelFactor; i++ { + go func(i int) { + for j := 0; j < docsCount/parallelFactor; j++ { + futures[i*docsCount/parallelFactor+j] = s.esProcessor.Add(request, fmt.Sprintf("test-key-%d-%d", i, j)) + } + wg.Done() + }(i) + } + go func() { + time.Sleep(1 * time.Millisecond) // slight delay so at least a few docs get added + s.esProcessor.Stop() + wg.Done() + }() + + wg.Wait() + addAfter := s.esProcessor.Add(request, "test-key-after") + + s.False(addBefore.Ready()) // first request should be in bulk + s.True(addAfter.Ready()) // final request should be only error + _, err := addAfter.Get(context.Background()) + s.ErrorIs(err, errVisibilityShutdown) +} + func (s *processorSuite) TestBulkAfterAction_Ack() { version := int64(3) testKey := "testKey" diff --git a/common/telemetry/config.go b/common/telemetry/config.go index 36bee9ad1ea..b09a9b44996 100644 --- a/common/telemetry/config.go +++ b/common/telemetry/config.go @@ -33,7 +33,7 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" - otelsdkmetricexp "go.opentelemetry.io/otel/sdk/metric/export" + "go.opentelemetry.io/otel/sdk/metric" otelsdktrace "go.opentelemetry.io/otel/sdk/trace" "google.golang.org/grpc" "google.golang.org/grpc/backoff" @@ -149,7 +149,7 @@ type ( Dial(context.Context) (*grpc.ClientConn, error) } startOnce sync.Once - otelsdkmetricexp.Exporter + metric.Exporter } // ExportConfig represents YAML structured configuration for a set of OTEL @@ -168,7 +168,7 @@ func (ec *ExportConfig) SpanExporters() ([]otelsdktrace.SpanExporter, error) { return ec.inner.SpanExporters() } -func (ec *ExportConfig) MetricExporters() ([]otelsdkmetricexp.Exporter, error) { +func (ec *ExportConfig) MetricExporters() ([]metric.Exporter, error) { return ec.inner.MetricExporters() } @@ -232,8 +232,8 @@ func (ec *exportConfig) SpanExporters() ([]otelsdktrace.SpanExporter, error) { return out, nil } -func (ec *exportConfig) MetricExporters() ([]otelsdkmetricexp.Exporter, error) { - out := make([]otelsdkmetricexp.Exporter, 0, len(ec.Exporters)) +func (ec *exportConfig) MetricExporters() ([]metric.Exporter, error) { + out := make([]metric.Exporter, 0, len(ec.Exporters)) for _, expcfg := range ec.Exporters { if !strings.HasPrefix(expcfg.Kind.Signal, "metric") { continue @@ -255,7 +255,7 @@ func (ec *exportConfig) MetricExporters() ([]otelsdkmetricexp.Exporter, error) { func (ec *exportConfig) buildOtlpGrpcMetricExporter( cfg *otlpGrpcMetricExporter, -) (otelsdkmetricexp.Exporter, error) { +) (metric.Exporter, error) { dopts := cfg.Connection.dialOpts() opts := []otlpmetricgrpc.Option{ otlpmetricgrpc.WithEndpoint(cfg.Connection.Endpoint), @@ -276,7 +276,7 @@ func (ec *exportConfig) buildOtlpGrpcMetricExporter( } if cfg.ConnectionName == "" { - return otlpmetricgrpc.NewUnstarted(opts...), nil + return otlpmetricgrpc.New(context.Background(), opts...) } conncfg, ok := ec.findNamedGrpcConnCfg(cfg.ConnectionName) diff --git a/go.mod b/go.mod index fc08fa118d8..f03a2c6edd9 100644 --- a/go.mod +++ b/go.mod @@ -37,13 +37,13 @@ require ( github.com/urfave/cli/v2 v2.4.0 github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.36.4 - go.opentelemetry.io/otel v1.11.1 - go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.31.0 - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.11.1 - go.opentelemetry.io/otel/exporters/prometheus v0.31.0 - go.opentelemetry.io/otel/metric v0.33.0 - go.opentelemetry.io/otel/sdk v1.11.1 - go.opentelemetry.io/otel/sdk/metric v0.31.0 + go.opentelemetry.io/otel v1.11.2 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.34.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.11.2 + go.opentelemetry.io/otel/exporters/prometheus v0.34.0 + go.opentelemetry.io/otel/metric v0.34.0 + go.opentelemetry.io/otel/sdk v1.11.2 + go.opentelemetry.io/otel/sdk/metric v0.34.0 go.temporal.io/api v1.15.1-0.20230130221739-35f91d43296f go.temporal.io/sdk v1.20.1-0.20230125015921-1fe6824cedfe go.temporal.io/version v0.3.0 @@ -113,10 +113,10 @@ require ( github.com/uber-common/bark v1.3.0 // indirect github.com/uber/jaeger-client-go v2.30.0+incompatible // indirect go.opencensus.io v0.24.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.1 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.31.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.1 // indirect - go.opentelemetry.io/otel/trace v1.11.1 + go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.2 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.34.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.2 // indirect + go.opentelemetry.io/otel/trace v1.11.2 go.opentelemetry.io/proto/otlp v0.19.0 // indirect go.uber.org/dig v1.15.0 // indirect golang.org/x/crypto v0.3.0 // indirect diff --git a/go.sum b/go.sum index c2c70724941..441c2672b97 100644 --- a/go.sum +++ b/go.sum @@ -815,28 +815,28 @@ go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.36.4 h1:PRXhsszxTt5bbPriTjmaweWUsAnJYeWBhUMLRetUgBU= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.36.4/go.mod h1:05eWWy6ZWzmpeImD3UowLTB3VjDMU1yxQ+ENuVWDM3c= -go.opentelemetry.io/otel v1.11.1 h1:4WLLAmcfkmDk2ukNXJyq3/kiz/3UzCaYq6PskJsaou4= -go.opentelemetry.io/otel v1.11.1/go.mod h1:1nNhXBbWSD0nsL38H6btgnFN2k4i0sNLHNNMZMSbUGE= -go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.1 h1:X2GndnMCsUPh6CiY2a+frAbNsXaPLbB0soHRYhAZ5Ig= -go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.1/go.mod h1:i8vjiSzbiUC7wOQplijSXMYUpNM93DtlS5CbUT+C6oQ= -go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.31.0 h1:H0+xwv4shKw0gfj/ZqR13qO2N/dBQogB1OcRjJjV39Y= -go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.31.0/go.mod h1:nkenGD8vcvs0uN6WhR90ZVHQlgDsRmXicnNadMnk+XQ= -go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.31.0 h1:BaQ2xM5cPmldVCMvbLoy5tcLUhXCtIhItDYBNw83B7Y= -go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.31.0/go.mod h1:VRr8tlXQEsTdesDCh0qBe2iKDWhpi3ZqDYw6VlZ8MhI= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.1 h1:MEQNafcNCB0uQIti/oHgU7CZpUMYQ7qigBwMVKycHvc= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.1/go.mod h1:19O5I2U5iys38SsmT2uDJja/300woyzE1KPIQxEUBUc= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.11.1 h1:LYyG/f1W/jzAix16jbksJfMQFpOH/Ma6T639pVPMgfI= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.11.1/go.mod h1:QrRRQiY3kzAoYPNLP0W/Ikg0gR6V3LMc+ODSxr7yyvg= -go.opentelemetry.io/otel/exporters/prometheus v0.31.0 h1:jwtnOGBM8dIty5AVZ+9ZCzZexCea3aVKmUfZAQcHqxs= -go.opentelemetry.io/otel/exporters/prometheus v0.31.0/go.mod h1:QarXIB8L79IwIPoNgG3A6zNvBgVmcppeFogV1d8612s= -go.opentelemetry.io/otel/metric v0.33.0 h1:xQAyl7uGEYvrLAiV/09iTJlp1pZnQ9Wl793qbVvED1E= -go.opentelemetry.io/otel/metric v0.33.0/go.mod h1:QlTYc+EnYNq/M2mNk1qDDMRLpqCOj2f/r5c7Fd5FYaI= -go.opentelemetry.io/otel/sdk v1.11.1 h1:F7KmQgoHljhUuJyA+9BiU+EkJfyX5nVVF4wyzWZpKxs= -go.opentelemetry.io/otel/sdk v1.11.1/go.mod h1:/l3FE4SupHJ12TduVjUkZtlfFqDCQJlOlithYrdktys= -go.opentelemetry.io/otel/sdk/metric v0.31.0 h1:2sZx4R43ZMhJdteKAlKoHvRgrMp53V1aRxvEf5lCq8Q= -go.opentelemetry.io/otel/sdk/metric v0.31.0/go.mod h1:fl0SmNnX9mN9xgU6OLYLMBMrNAsaZQi7qBwprwO3abk= -go.opentelemetry.io/otel/trace v1.11.1 h1:ofxdnzsNrGBYXbP7t7zpUK281+go5rF7dvdIZXF8gdQ= -go.opentelemetry.io/otel/trace v1.11.1/go.mod h1:f/Q9G7vzk5u91PhbmKbg1Qn0rzH1LJ4vbPHFGkTPtOk= +go.opentelemetry.io/otel v1.11.2 h1:YBZcQlsVekzFsFbjygXMOXSs6pialIZxcjfO/mBDmR0= +go.opentelemetry.io/otel v1.11.2/go.mod h1:7p4EUV+AqgdlNV9gL97IgUZiVR3yrFXYo53f9BM3tRI= +go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.2 h1:htgM8vZIF8oPSCxa341e3IZ4yr/sKxgu8KZYllByiVY= +go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.2/go.mod h1:rqbht/LlhVBgn5+k3M5QK96K5Xb0DvXpMJ5SFQpY6uw= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.34.0 h1:kpskzLZ60cJ48SJ4uxWa6waBL+4kSV6nVK8rP+QM8Wg= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.34.0/go.mod h1:4+x3i62TEegDHuzNva0bMcAN8oUi5w4liGb1d/VgPYo= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.34.0 h1:e7kFb4pJLbhJgAwUdoVTHzB9pGujs5O8/7gFyZL88fg= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.34.0/go.mod h1:3x00m9exjIbhK+zTO4MsCSlfbVmgvLP0wjDgDKa/8bw= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.2 h1:fqR1kli93643au1RKo0Uma3d2aPQKT+WBKfTSBaKbOc= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.2/go.mod h1:5Qn6qvgkMsLDX+sYK64rHb1FPhpn0UtxF+ouX1uhyJE= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.11.2 h1:ERwKPn9Aer7Gxsc0+ZlutlH1bEEAUXAUhqm3Y45ABbk= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.11.2/go.mod h1:jWZUM2MWhWCJ9J9xVbRx7tzK1mXKpAlze4CeulycwVY= +go.opentelemetry.io/otel/exporters/prometheus v0.34.0 h1:L5D+HxdaC/ORB47ribbTBbkXRZs9JzPjq0EoIOMWncM= +go.opentelemetry.io/otel/exporters/prometheus v0.34.0/go.mod h1:6gUoJyfhoWqF0tOLaY0ZmKgkQRcvEQx6p5rVlKHp3s4= +go.opentelemetry.io/otel/metric v0.34.0 h1:MCPoQxcg/26EuuJwpYN1mZTeCYAUGx8ABxfW07YkjP8= +go.opentelemetry.io/otel/metric v0.34.0/go.mod h1:ZFuI4yQGNCupurTXCwkeD/zHBt+C2bR7bw5JqUm/AP8= +go.opentelemetry.io/otel/sdk v1.11.2 h1:GF4JoaEx7iihdMFu30sOyRx52HDHOkl9xQ8SMqNXUiU= +go.opentelemetry.io/otel/sdk v1.11.2/go.mod h1:wZ1WxImwpq+lVRo4vsmSOxdd+xwoUJ6rqyLc3SyX9aU= +go.opentelemetry.io/otel/sdk/metric v0.34.0 h1:7ElxfQpXCFZlRTvVRTkcUvK8Gt5DC8QzmzsLsO2gdzo= +go.opentelemetry.io/otel/sdk/metric v0.34.0/go.mod h1:l4r16BIqiqPy5rd14kkxllPy/fOI4tWo1jkpD9Z3ffQ= +go.opentelemetry.io/otel/trace v1.11.2 h1:Xf7hWSF2Glv0DE3MH7fBHvtpSBsjcBUe5MYAmZM/+y0= +go.opentelemetry.io/otel/trace v1.11.2/go.mod h1:4N+yC7QEz7TTsG9BSRLNAa63eg5E06ObSbKPmxQ/pKA= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw= go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=