Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into eager-workflow-disp…
Browse files Browse the repository at this point in the history
…atch
  • Loading branch information
bergundy committed Feb 1, 2023
2 parents efece49 + 2bc48d2 commit f23a706
Show file tree
Hide file tree
Showing 11 changed files with 294 additions and 262 deletions.
43 changes: 16 additions & 27 deletions common/metrics/metricstest/metricstest.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@ package metricstest

import (
"fmt"
"net/http"
"net/http/httptest"
"strings"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"go.opentelemetry.io/otel/exporters/prometheus"
exporters "go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/metric"
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
"go.opentelemetry.io/otel/sdk/resource"
sdkmetrics "go.opentelemetry.io/otel/sdk/metric"
"golang.org/x/exp/maps"

"go.temporal.io/server/common/log"
Expand All @@ -46,7 +46,7 @@ import (
type (
Handler struct {
metrics.Handler
exporter *prometheus.Exporter
reg *prometheus.Registry
}

sample struct {
Expand All @@ -69,32 +69,19 @@ func MustNewHandler(logger log.Logger) *Handler {
}

func NewHandler(logger log.Logger) (*Handler, error) {
ctrl := controller.New(
processor.NewFactory(
metrics.NewOtelAggregatorSelector(nil),
aggregation.CumulativeTemporalitySelector(),
processor.WithMemory(true),
),
controller.WithResource(resource.Empty()),
// Set collect period to 0 otherwise Snapshot() will potentially
// return an old view of metrics.
controller.WithCollectPeriod(0),
)

exporter, err := prometheus.New(prometheus.Config{}, ctrl)
registry := prometheus.NewRegistry()
exporter, err := exporters.New(exporters.WithRegisterer(registry))
if err != nil {
return nil, err
}

provider := &otelProvider{
meter: ctrl.Meter("temporal"),
}
provider := sdkmetrics.NewMeterProvider(sdkmetrics.WithReader(exporter))
meter := provider.Meter("temporal")
clientConfig := metrics.ClientConfig{}
otelHandler := metrics.NewOtelMetricsHandler(logger, provider, clientConfig)

otelHandler := metrics.NewOtelMetricsHandler(logger, &otelProvider{meter: meter}, clientConfig)
metricsHandler := &Handler{
Handler: otelHandler,
exporter: exporter,
Handler: otelHandler,
reg: registry,
}

return metricsHandler, nil
Expand All @@ -105,7 +92,9 @@ func (*Handler) Stop(log.Logger) {}
func (h *Handler) Snapshot() (Snapshot, error) {
rec := httptest.NewRecorder()
req := httptest.NewRequest("GET", "/metrics", nil)
h.exporter.ServeHTTP(rec, req)
handler := http.NewServeMux()
handler.HandleFunc("/metrics", promhttp.HandlerFor(h.reg, promhttp.HandlerOpts{Registry: h.reg}).ServeHTTP)
handler.ServeHTTP(rec, req)

var tp expfmt.TextParser
families, err := tp.TextToMetricFamilies(rec.Body)
Expand Down
12 changes: 9 additions & 3 deletions common/metrics/metricstest/metricstest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,29 @@ func TestBasic(t *testing.T) {
metrics.StringTag("l2", "v2"),
metrics.StringTag("l1", "v1"),
}
expectedSystemTags := []metrics.Tag{
metrics.StringTag("otel_scope_name", "temporal"),
metrics.StringTag("otel_scope_version", ""),
}
expectedCounterTags := append(expectedSystemTags, counterTags...)
counter := handler.WithTags(counterTags...).Counter(counterName)
counter.Record(1)
counter.Record(1)

s1 := handler.MustSnapshot()
require.Equal(t, float64(2), s1.MustCounter(counterName, counterTags...))
require.Equal(t, float64(2), s1.MustCounter(counterName+"_total", expectedCounterTags...))

gaugeName := "gauge1"
gaugeTags := []metrics.Tag{
metrics.StringTag("l3", "v3"),
metrics.StringTag("l4", "v4"),
}
expectedGaugeTags := append(expectedSystemTags, gaugeTags...)
gauge := handler.WithTags(gaugeTags...).Gauge(gaugeName)
gauge.Record(-2)
gauge.Record(10)

s2 := handler.MustSnapshot()
require.Equal(t, float64(2), s2.MustCounter(counterName, counterTags...))
require.Equal(t, float64(10), s2.MustGauge(gaugeName, gaugeTags...))
require.Equal(t, float64(2), s2.MustCounter(counterName+"_total", expectedCounterTags...))
require.Equal(t, float64(10), s2.MustGauge(gaugeName, expectedGaugeTags...))
}
88 changes: 0 additions & 88 deletions common/metrics/opentelemetry_aggregator_selector.go

This file was deleted.

55 changes: 31 additions & 24 deletions common/metrics/opentelemetry_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ import (
"net/http"
"time"

"go.opentelemetry.io/otel/exporters/prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
exporters "go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/metric"
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/metric/unit"
sdkmetrics "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"

"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand All @@ -49,7 +50,7 @@ type (
}

openTelemetryProviderImpl struct {
exporter *prometheus.Exporter
exporter *exporters.Exporter
meter metric.Meter
config *PrometheusConfig
server *http.Server
Expand All @@ -61,27 +62,33 @@ func NewOpenTelemetryProvider(
prometheusConfig *PrometheusConfig,
clientConfig *ClientConfig,
) (*openTelemetryProviderImpl, error) {

c := controller.New(
processor.NewFactory(
NewOtelAggregatorSelector(
clientConfig.PerUnitHistogramBoundaries,
),
aggregation.CumulativeTemporalitySelector(),
processor.WithMemory(true),
),
controller.WithResource(resource.Empty()),
)
exporter, err := prometheus.New(prometheus.Config{}, c)

reg := prometheus.NewRegistry()
exporter, err := exporters.New(exporters.WithRegisterer(reg))
if err != nil {
logger.Error("Failed to initialize prometheus exporter.", tag.Error(err))
return nil, err
}

metricServer := initPrometheusListener(prometheusConfig, logger, exporter)

meter := c.Meter("temporal")
var views []sdkmetrics.View
for _, u := range []string{Dimensionless, Bytes, Milliseconds} {
views = append(views, sdkmetrics.NewView(
sdkmetrics.Instrument{
Kind: sdkmetrics.InstrumentKindSyncHistogram,
Unit: unit.Unit(u),
},
sdkmetrics.Stream{
Aggregation: aggregation.ExplicitBucketHistogram{
Boundaries: clientConfig.PerUnitHistogramBoundaries[u],
},
},
))
}
provider := sdkmetrics.NewMeterProvider(
sdkmetrics.WithReader(exporter),
sdkmetrics.WithView(views...),
)
metricServer := initPrometheusListener(prometheusConfig, reg, logger)
meter := provider.Meter("temporal")
reporter := &openTelemetryProviderImpl{
exporter: exporter,
meter: meter,
Expand All @@ -92,14 +99,14 @@ func NewOpenTelemetryProvider(
return reporter, nil
}

func initPrometheusListener(config *PrometheusConfig, logger log.Logger, exporter *prometheus.Exporter) *http.Server {
func initPrometheusListener(config *PrometheusConfig, reg *prometheus.Registry, logger log.Logger) *http.Server {
handlerPath := config.HandlerPath
if handlerPath == "" {
handlerPath = "/metrics"
}

handler := http.NewServeMux()
handler.HandleFunc(handlerPath, exporter.ServeHTTP)
handler.HandleFunc(handlerPath, promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}).ServeHTTP)

if config.ListenAddress == "" {
logger.Fatal("Listen address must be specified.", tag.Address(config.ListenAddress))
Expand Down
7 changes: 6 additions & 1 deletion common/metrics/otel_metrics_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,12 @@ func (omp *otelMetricsHandler) Gauge(gauge string) GaugeIface {
}

return GaugeFunc(func(i float64, t ...Tag) {
c.Observe(context.Background(), i, tagsToAttributes(omp.tags, t, omp.excludeTags)...)
err = omp.provider.GetMeter().RegisterCallback([]instrument.Asynchronous{c}, func(ctx context.Context) {
c.Observe(ctx, i, tagsToAttributes(omp.tags, t, omp.excludeTags)...)
})
if err != nil {
omp.l.Fatal("error setting callback metric update", tag.NewStringTag("MetricName", gauge), tag.Error(err))
}
})
}

Expand Down
Loading

0 comments on commit f23a706

Please sign in to comment.