Skip to content

Commit

Permalink
OTLP: Convert start timestamps to Mimir created timestamps
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
  • Loading branch information
aknuds1 committed Aug 29, 2024
1 parent 4658643 commit afc98c1
Show file tree
Hide file tree
Showing 20 changed files with 163 additions and 68 deletions.
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -4699,6 +4699,17 @@
"fieldType": "boolean",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "otel_created_timestamp_zero_ingestion_enabled",
"required": false,
"desc": "Whether to enable translation of OTel start timestamps to Prometheus zero samples in the OTLP endpoint.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "distributor.otel-created-timestamp-zero-ingestion-enabled",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "ingest_storage_read_consistency",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,8 @@ Usage of ./cmd/mimir/mimir:
[experimental] Max size of the pooled buffers used for marshaling write requests. If 0, no max size is enforced.
-distributor.metric-relabeling-enabled
[experimental] Enable metric relabeling for the tenant. This configuration option can be used to forcefully disable metric relabeling on a per-tenant basis. (default true)
-distributor.otel-created-timestamp-zero-ingestion-enabled
[experimental] Whether to enable translation of OTel start timestamps to Prometheus zero samples in the OTLP endpoint.
-distributor.otel-metric-suffixes-enabled
Whether to enable automatic suffixes to names of metrics ingested through OTLP.
-distributor.remote-timeout duration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3707,6 +3707,11 @@ The `limits` block configures default and per-tenant limits imposed by component
# CLI flag: -distributor.otel-metric-suffixes-enabled
[otel_metric_suffixes_enabled: <boolean> | default = false]
# (experimental) Whether to enable translation of OTel start timestamps to
# Prometheus zero samples in the OTLP endpoint.
# CLI flag: -distributor.otel-created-timestamp-zero-ingestion-enabled
[otel_created_timestamp_zero_ingestion_enabled: <boolean> | default = false]
# (experimental) The default consistency level to enforce for queries when using
# the ingest storage. Supports values: strong, eventual.
# CLI flag: -ingest-storage.read-consistency
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ require (
)

// Using a fork of Prometheus with Mimir-specific changes.
replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240826084742-bf5bf35a1b4d
replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240829153637-b4b389431da8

// Replace memberlist with our fork which includes some fixes that haven't been
// merged upstream yet:
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -524,8 +524,8 @@ github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 h1:X8IKQ0wu40wp
github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/grafana/mimir-prometheus v0.0.0-20240826084742-bf5bf35a1b4d h1:wnVWb4mZzwcBKdzo2Tdazhq+ZNx5MZ93lcoraH1fLms=
github.com/grafana/mimir-prometheus v0.0.0-20240826084742-bf5bf35a1b4d/go.mod h1:cNDAD0ooSyLfNtakmnGbChNg7JPYmKsRn7CQ01Rpu2E=
github.com/grafana/mimir-prometheus v0.0.0-20240829153637-b4b389431da8 h1:dTCK21S9xnvQs8pOL9F29dh4nONuG0DL7qQh0MXwWZA=
github.com/grafana/mimir-prometheus v0.0.0-20240829153637-b4b389431da8/go.mod h1:cNDAD0ooSyLfNtakmnGbChNg7JPYmKsRn7CQ01Rpu2E=
github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 h1:em1oddjXL8c1tL0iFdtVtPloq2hRPen2MJQKoAWpxu0=
github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU=
github.com/grafana/prometheus-alertmanager v0.25.1-0.20240625192351-66ec17e3aa45 h1:AJKOtDKAOg8XNFnIZSmqqqutoTSxVlRs6vekL2p2KEY=
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@ func NewQuerierHandler(
remoteWriteEnabled,
nil,
oltpEnabled,
// TODO: Have per-tenant config for toggling this.
true,
)

api.InstallCodec(protobufCodec{})
Expand Down
16 changes: 10 additions & 6 deletions pkg/distributor/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (

type OTLPHandlerLimits interface {
OTelMetricSuffixesEnabled(id string) bool
OTelCreatedTimestampZeroIngestionEnabled(id string) bool
}

// OTLPHandler is an http.Handler accepting OTLP write requests.
Expand Down Expand Up @@ -162,18 +163,19 @@ func OTLPHandler(
return err
}
addSuffixes := limits.OTelMetricSuffixesEnabled(tenantID)
enableCTZeroIngestion := limits.OTelCreatedTimestampZeroIngestionEnabled(tenantID)

pushMetrics.IncOTLPRequest(tenantID)
pushMetrics.ObserveUncompressedBodySize(tenantID, float64(uncompressedBodySize))

var metrics []mimirpb.PreallocTimeseries
if directTranslation {
metrics, err = otelMetricsToTimeseries(ctx, tenantID, addSuffixes, discardedDueToOtelParseError, logger, otlpReq.Metrics())
metrics, err = otelMetricsToTimeseries(ctx, tenantID, addSuffixes, enableCTZeroIngestion, discardedDueToOtelParseError, logger, otlpReq.Metrics())
if err != nil {
return err
}
} else {
metrics, err = otelMetricsToTimeseriesOld(ctx, tenantID, addSuffixes, discardedDueToOtelParseError, logger, otlpReq.Metrics())
metrics, err = otelMetricsToTimeseriesOld(ctx, tenantID, addSuffixes, enableCTZeroIngestion, discardedDueToOtelParseError, logger, otlpReq.Metrics())
if err != nil {
return err
}
Expand Down Expand Up @@ -401,10 +403,11 @@ func otelMetricsToMetadata(addSuffixes bool, md pmetric.Metrics) []*mimirpb.Metr
return metadata
}

func otelMetricsToTimeseries(ctx context.Context, tenantID string, addSuffixes bool, discardedDueToOtelParseError *prometheus.CounterVec, logger log.Logger, md pmetric.Metrics) ([]mimirpb.PreallocTimeseries, error) {
func otelMetricsToTimeseries(ctx context.Context, tenantID string, addSuffixes, enableCTZeroIngestion bool, discardedDueToOtelParseError *prometheus.CounterVec, logger log.Logger, md pmetric.Metrics) ([]mimirpb.PreallocTimeseries, error) {
converter := otlp.NewMimirConverter()
errs := converter.FromMetrics(ctx, md, otlp.Settings{
AddMetricSuffixes: addSuffixes,
AddMetricSuffixes: addSuffixes,
EnableCreatedTimestampZeroIngestion: enableCTZeroIngestion,
})
mimirTS := converter.TimeSeries()
if errs != nil {
Expand All @@ -427,10 +430,11 @@ func otelMetricsToTimeseries(ctx context.Context, tenantID string, addSuffixes b
}

// Old, less efficient, version of otelMetricsToTimeseries.
func otelMetricsToTimeseriesOld(ctx context.Context, tenantID string, addSuffixes bool, discardedDueToOtelParseError *prometheus.CounterVec, logger log.Logger, md pmetric.Metrics) ([]mimirpb.PreallocTimeseries, error) {
func otelMetricsToTimeseriesOld(ctx context.Context, tenantID string, addSuffixes, enableCTZeroIngestion bool, discardedDueToOtelParseError *prometheus.CounterVec, logger log.Logger, md pmetric.Metrics) ([]mimirpb.PreallocTimeseries, error) {
converter := prometheusremotewrite.NewPrometheusConverter()
errs := converter.FromMetrics(ctx, md, prometheusremotewrite.Settings{
AddMetricSuffixes: addSuffixes,
AddMetricSuffixes: addSuffixes,
EnableCreatedTimestampZeroIngestion: enableCTZeroIngestion,
})
promTS := converter.TimeSeries()
if errs != nil {
Expand Down
44 changes: 34 additions & 10 deletions pkg/distributor/otlp/helper_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/distributor/otlp/histograms_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 8 additions & 7 deletions pkg/distributor/otlp/metrics_to_prw_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions pkg/distributor/otlp/number_data_points_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/distributor/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1248,3 +1248,7 @@ type otlpLimitsMock struct{}
func (o otlpLimitsMock) OTelMetricSuffixesEnabled(_ string) bool {
return false
}

func (o otlpLimitsMock) OTelCreatedTimestampZeroIngestionEnabled(_ string) bool {
return false
}
1 change: 1 addition & 0 deletions pkg/querier/error_translate_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func createPrometheusAPI(q storage.SampleAndChunkQueryable) *route.Router {
false,
nil,
false,
false,
)

promRouter := route.New().WithPrefix("/api/v1")
Expand Down
8 changes: 7 additions & 1 deletion pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ type Limits struct {
AlertmanagerMaxAlertsSizeBytes int `yaml:"alertmanager_max_alerts_size_bytes" json:"alertmanager_max_alerts_size_bytes"`

// OpenTelemetry
OTelMetricSuffixesEnabled bool `yaml:"otel_metric_suffixes_enabled" json:"otel_metric_suffixes_enabled" category:"advanced"`
OTelMetricSuffixesEnabled bool `yaml:"otel_metric_suffixes_enabled" json:"otel_metric_suffixes_enabled" category:"advanced"`
OTelCreatedTimestampZeroIngestionEnabled bool `yaml:"otel_created_timestamp_zero_ingestion_enabled" json:"otel_created_timestamp_zero_ingestion_enabled" category:"experimental"`

// Ingest storage.
IngestStorageReadConsistency string `yaml:"ingest_storage_read_consistency" json:"ingest_storage_read_consistency" category:"experimental"`
Expand Down Expand Up @@ -264,6 +265,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&l.MetricRelabelingEnabled, "distributor.metric-relabeling-enabled", true, "Enable metric relabeling for the tenant. This configuration option can be used to forcefully disable metric relabeling on a per-tenant basis.")
f.BoolVar(&l.ServiceOverloadStatusCodeOnRateLimitEnabled, "distributor.service-overload-status-code-on-rate-limit-enabled", false, "If enabled, rate limit errors will be reported to the client with HTTP status code 529 (Service is overloaded). If disabled, status code 429 (Too Many Requests) is used. Enabling -distributor.retry-after-header.enabled before utilizing this option is strongly recommended as it helps prevent premature request retries by the client.")
f.BoolVar(&l.OTelMetricSuffixesEnabled, "distributor.otel-metric-suffixes-enabled", false, "Whether to enable automatic suffixes to names of metrics ingested through OTLP.")
f.BoolVar(&l.OTelCreatedTimestampZeroIngestionEnabled, "distributor.otel-created-timestamp-zero-ingestion-enabled", false, "Whether to enable translation of OTel start timestamps to Prometheus zero samples in the OTLP endpoint.")

f.IntVar(&l.MaxGlobalSeriesPerUser, MaxSeriesPerUserFlag, 150000, "The maximum number of in-memory series per tenant, across the cluster before replication. 0 to disable.")
f.IntVar(&l.MaxGlobalSeriesPerMetric, MaxSeriesPerMetricFlag, 0, "The maximum number of in-memory series per metric name, across the cluster before replication. 0 to disable.")
Expand Down Expand Up @@ -1047,6 +1049,10 @@ func (o *Overrides) OTelMetricSuffixesEnabled(tenantID string) bool {
return o.getOverridesForUser(tenantID).OTelMetricSuffixesEnabled
}

func (o *Overrides) OTelCreatedTimestampZeroIngestionEnabled(tenantID string) bool {
return o.getOverridesForUser(tenantID).OTelCreatedTimestampZeroIngestionEnabled
}

func (o *Overrides) AlignQueriesWithStep(userID string) bool {
return o.getOverridesForUser(userID).AlignQueriesWithStep
}
Expand Down
Loading

0 comments on commit afc98c1

Please sign in to comment.