diff --git a/CHANGELOG.md b/CHANGELOG.md index c3defcfad1f..68658dcd95d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -77,6 +77,7 @@ * [ENHANCEMENT] Update runtime configuration to read gzip-compressed files with `.gz` extension. #9074 * [ENHANCEMENT] Ingester: add `cortex_lifecycler_read_only` metric which is set to 1 when ingester's lifecycler is set to read-only mode. #9095 * [ENHANCEMENT] Add a new field, `encode_time_seconds` to query stats log messages, to record the amount of time it takes the query-frontend to encode a response. This does not include any serialization time for downstream components. #9062 +* [ENHANCEMENT] OTLP: If the flag `-distributor.otel-created-timestamp-zero-ingestion-enabled` is true, OTel start timestamps are converted to Prometheus zero samples to mark series start. #9131 * [BUGFIX] Ruler: add support for draining any outstanding alert notifications before shutting down. This can be enabled with the `-ruler.drain-notification-queue-on-shutdown=true` CLI flag. #8346 * [BUGFIX] Query-frontend: fix `-querier.max-query-lookback` enforcement when `-compactor.blocks-retention-period` is not set, and viceversa. #8388 * [BUGFIX] Ingester: fix sporadic `not found` error causing an internal server error if label names are queried with matchers during head compaction. #8391 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 14cc5fe9463..003c5d58097 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -4699,6 +4699,17 @@ "fieldType": "boolean", "fieldCategory": "advanced" }, + { + "kind": "field", + "name": "otel_created_timestamp_zero_ingestion_enabled", + "required": false, + "desc": "Whether to enable translation of OTel start timestamps to Prometheus zero samples in the OTLP endpoint.", + "fieldValue": null, + "fieldDefaultValue": false, + "fieldFlag": "distributor.otel-created-timestamp-zero-ingestion-enabled", + "fieldType": "boolean", + "fieldCategory": "experimental" + }, { "kind": "field", "name": "ingest_storage_read_consistency", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 31075017563..f21efd1b522 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1215,6 +1215,8 @@ Usage of ./cmd/mimir/mimir: [experimental] Max size of the pooled buffers used for marshaling write requests. If 0, no max size is enforced. -distributor.metric-relabeling-enabled [experimental] Enable metric relabeling for the tenant. This configuration option can be used to forcefully disable metric relabeling on a per-tenant basis. (default true) + -distributor.otel-created-timestamp-zero-ingestion-enabled + [experimental] Whether to enable translation of OTel start timestamps to Prometheus zero samples in the OTLP endpoint. -distributor.otel-metric-suffixes-enabled Whether to enable automatic suffixes to names of metrics ingested through OTLP. -distributor.remote-timeout duration diff --git a/docs/sources/mimir/configure/about-versioning.md b/docs/sources/mimir/configure/about-versioning.md index ba33fb89ba5..26fdf7cd704 100644 --- a/docs/sources/mimir/configure/about-versioning.md +++ b/docs/sources/mimir/configure/about-versioning.md @@ -81,6 +81,8 @@ The following features are currently experimental: - `-distributor.max-request-pool-buffer-size` - Enable direct translation from OTLP write requests to Mimir equivalents - `-distributor.direct-otlp-translation-enabled` + - Enable conversion of OTel start timestamps to Prometheus zero samples to mark series start + - `-distributor.otel-created-timestamp-zero-ingestion-enabled` - Hash ring - Disabling ring heartbeat timeouts - `-distributor.ring.heartbeat-timeout=0` diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 7a57d69a4dc..08c956263bd 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -3707,6 +3707,11 @@ The `limits` block configures default and per-tenant limits imposed by component # CLI flag: -distributor.otel-metric-suffixes-enabled [otel_metric_suffixes_enabled: | default = false] +# (experimental) Whether to enable translation of OTel start timestamps to +# Prometheus zero samples in the OTLP endpoint. +# CLI flag: -distributor.otel-created-timestamp-zero-ingestion-enabled +[otel_created_timestamp_zero_ingestion_enabled: | default = false] + # (experimental) The default consistency level to enforce for queries when using # the ingest storage. Supports values: strong, eventual. # CLI flag: -ingest-storage.read-consistency diff --git a/go.mod b/go.mod index 1a3f38b110d..2f7df839d4c 100644 --- a/go.mod +++ b/go.mod @@ -278,7 +278,7 @@ require ( ) // Using a fork of Prometheus with Mimir-specific changes. -replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240830123921-fdf902dd68d9 +replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240830150301-6b342fac9c48 // client_golang v1.20.0 has some bugs https://github.com/prometheus/client_golang/issues/1605, https://github.com/prometheus/client_golang/issues/1607 // Stick to v1.19.1 until they are fixed. diff --git a/go.sum b/go.sum index a1fcc9f8e5f..c8ec107ba32 100644 --- a/go.sum +++ b/go.sum @@ -1118,8 +1118,8 @@ github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 h1:X8IKQ0wu40wp github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= -github.com/grafana/mimir-prometheus v0.0.0-20240830123921-fdf902dd68d9 h1:B09XYT+dKsdwye3e52achSnT9WXs0vEcOoHtT770sJg= -github.com/grafana/mimir-prometheus v0.0.0-20240830123921-fdf902dd68d9/go.mod h1:Sp9UNArUoyscK0pnnjTmmE5HfhEifkoY8hi3tzxZFZo= +github.com/grafana/mimir-prometheus v0.0.0-20240830150301-6b342fac9c48 h1:SwY0fuJgoUGguKLOwY/1cUm2DAc0U+dk4UZBoTGd71c= +github.com/grafana/mimir-prometheus v0.0.0-20240830150301-6b342fac9c48/go.mod h1:Sp9UNArUoyscK0pnnjTmmE5HfhEifkoY8hi3tzxZFZo= github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 h1:em1oddjXL8c1tL0iFdtVtPloq2hRPen2MJQKoAWpxu0= github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU= github.com/grafana/prometheus-alertmanager v0.25.1-0.20240625192351-66ec17e3aa45 h1:AJKOtDKAOg8XNFnIZSmqqqutoTSxVlRs6vekL2p2KEY= diff --git a/pkg/api/handlers.go b/pkg/api/handlers.go index ad0722052b7..2f79278624d 100644 --- a/pkg/api/handlers.go +++ b/pkg/api/handlers.go @@ -250,7 +250,7 @@ func NewQuerierHandler( const ( remoteWriteEnabled = false - oltpEnabled = false + otlpEnabled = false ) api := v1.NewAPI( @@ -281,7 +281,8 @@ func NewQuerierHandler( nil, remoteWriteEnabled, nil, - oltpEnabled, + otlpEnabled, + true, ) api.InstallCodec(protobufCodec{}) diff --git a/pkg/distributor/otel.go b/pkg/distributor/otel.go index f55d02d0529..e0038065b45 100644 --- a/pkg/distributor/otel.go +++ b/pkg/distributor/otel.go @@ -49,6 +49,7 @@ const ( type OTLPHandlerLimits interface { OTelMetricSuffixesEnabled(id string) bool + OTelCreatedTimestampZeroIngestionEnabled(id string) bool } // OTLPHandler is an http.Handler accepting OTLP write requests. @@ -162,18 +163,19 @@ func OTLPHandler( return err } addSuffixes := limits.OTelMetricSuffixesEnabled(tenantID) + enableCTZeroIngestion := limits.OTelCreatedTimestampZeroIngestionEnabled(tenantID) pushMetrics.IncOTLPRequest(tenantID) pushMetrics.ObserveUncompressedBodySize(tenantID, float64(uncompressedBodySize)) var metrics []mimirpb.PreallocTimeseries if directTranslation { - metrics, err = otelMetricsToTimeseries(ctx, tenantID, addSuffixes, discardedDueToOtelParseError, logger, otlpReq.Metrics()) + metrics, err = otelMetricsToTimeseries(ctx, tenantID, addSuffixes, enableCTZeroIngestion, discardedDueToOtelParseError, logger, otlpReq.Metrics()) if err != nil { return err } } else { - metrics, err = otelMetricsToTimeseriesOld(ctx, tenantID, addSuffixes, discardedDueToOtelParseError, logger, otlpReq.Metrics()) + metrics, err = otelMetricsToTimeseriesOld(ctx, tenantID, addSuffixes, enableCTZeroIngestion, discardedDueToOtelParseError, logger, otlpReq.Metrics()) if err != nil { return err } @@ -401,10 +403,11 @@ func otelMetricsToMetadata(addSuffixes bool, md pmetric.Metrics) []*mimirpb.Metr return metadata } -func otelMetricsToTimeseries(ctx context.Context, tenantID string, addSuffixes bool, discardedDueToOtelParseError *prometheus.CounterVec, logger log.Logger, md pmetric.Metrics) ([]mimirpb.PreallocTimeseries, error) { +func otelMetricsToTimeseries(ctx context.Context, tenantID string, addSuffixes, enableCTZeroIngestion bool, discardedDueToOtelParseError *prometheus.CounterVec, logger log.Logger, md pmetric.Metrics) ([]mimirpb.PreallocTimeseries, error) { converter := otlp.NewMimirConverter() _, errs := converter.FromMetrics(ctx, md, otlp.Settings{ - AddMetricSuffixes: addSuffixes, + AddMetricSuffixes: addSuffixes, + EnableCreatedTimestampZeroIngestion: enableCTZeroIngestion, }) mimirTS := converter.TimeSeries() if errs != nil { @@ -427,10 +430,11 @@ func otelMetricsToTimeseries(ctx context.Context, tenantID string, addSuffixes b } // Old, less efficient, version of otelMetricsToTimeseries. -func otelMetricsToTimeseriesOld(ctx context.Context, tenantID string, addSuffixes bool, discardedDueToOtelParseError *prometheus.CounterVec, logger log.Logger, md pmetric.Metrics) ([]mimirpb.PreallocTimeseries, error) { +func otelMetricsToTimeseriesOld(ctx context.Context, tenantID string, addSuffixes, enableCTZeroIngestion bool, discardedDueToOtelParseError *prometheus.CounterVec, logger log.Logger, md pmetric.Metrics) ([]mimirpb.PreallocTimeseries, error) { converter := prometheusremotewrite.NewPrometheusConverter() - _, errs := converter.FromMetrics(ctx, md, prometheusremotewrite.Settings{ - AddMetricSuffixes: addSuffixes, + annots, errs := converter.FromMetrics(ctx, md, prometheusremotewrite.Settings{ + AddMetricSuffixes: addSuffixes, + EnableCreatedTimestampZeroIngestion: enableCTZeroIngestion, }) promTS := converter.TimeSeries() if errs != nil { @@ -448,6 +452,10 @@ func otelMetricsToTimeseriesOld(ctx context.Context, tenantID string, addSuffixe level.Warn(logger).Log("msg", "OTLP parse error", "err", parseErrs) } + ws, _ := annots.AsStrings("", 0, 0) + if len(ws) > 0 { + level.Warn(logger).Log("msg", "Warnings translating OTLP metrics to Prometheus write request", "warnings", ws) + } mimirTS := mimirpb.PreallocTimeseriesSliceFromPool() for _, ts := range promTS { diff --git a/pkg/distributor/otlp/helper_generated.go b/pkg/distributor/otlp/helper_generated.go index 918add0cb68..cba7b890539 100644 --- a/pkg/distributor/otlp/helper_generated.go +++ b/pkg/distributor/otlp/helper_generated.go @@ -253,12 +253,15 @@ func (c *MimirConverter) addHistogramDataPoints(ctx context.Context, dataPoints pt := dataPoints.At(x) timestamp := convertTimeStamp(pt.Timestamp()) + startTimestampNs := pt.StartTimestamp() + startTimestampMs := convertTimeStamp(startTimestampNs) baseLabels := createAttributes(resource, pt.Attributes(), settings, nil, false) // If the sum is unset, it indicates the _sum metric point should be // omitted if pt.HasSum() { // treat sum as a sample in an individual TimeSeries + sumlabels := createLabels(baseName+sumStr, baseLabels) sum := &mimirpb.Sample{ Value: pt.Sum(), TimestampMs: timestamp, @@ -267,7 +270,7 @@ func (c *MimirConverter) addHistogramDataPoints(ctx context.Context, dataPoints sum.Value = math.Float64frombits(value.StaleNaN) } - sumlabels := createLabels(baseName+sumStr, baseLabels) + c.handleStartTime(startTimestampMs, timestamp, sum.Value, sumlabels, settings) c.addSample(sum, sumlabels) } @@ -282,6 +285,7 @@ func (c *MimirConverter) addHistogramDataPoints(ctx context.Context, dataPoints } countlabels := createLabels(baseName+countStr, baseLabels) + c.handleStartTime(startTimestampMs, timestamp, count.Value, countlabels, settings) c.addSample(count, countlabels) // cumulative count for conversion to cumulative histogram @@ -306,6 +310,7 @@ func (c *MimirConverter) addHistogramDataPoints(ctx context.Context, dataPoints } boundStr := strconv.FormatFloat(bound, 'f', -1, 64) labels := createLabels(baseName+bucketStr, baseLabels, leStr, boundStr) + c.handleStartTime(startTimestampMs, timestamp, bucket.Value, labels, settings) ts := c.addSample(bucket, labels) bucketBounds = append(bucketBounds, bucketBoundsData{ts: ts, bound: bound}) @@ -320,6 +325,7 @@ func (c *MimirConverter) addHistogramDataPoints(ctx context.Context, dataPoints infBucket.Value = float64(pt.Count()) } infLabels := createLabels(baseName+bucketStr, baseLabels, leStr, pInfStr) + c.handleStartTime(startTimestampMs, timestamp, infBucket.Value, infLabels, settings) ts := c.addSample(infBucket, infLabels) bucketBounds = append(bucketBounds, bucketBoundsData{ts: ts, bound: math.Inf(1)}) @@ -327,10 +333,9 @@ func (c *MimirConverter) addHistogramDataPoints(ctx context.Context, dataPoints return err } - startTimestamp := pt.StartTimestamp() - if settings.ExportCreatedMetric && startTimestamp != 0 { + if settings.ExportCreatedMetric && startTimestampNs != 0 { labels := createLabels(baseName+createdSuffix, baseLabels) - c.addTimeSeriesIfNeeded(labels, startTimestamp, pt.Timestamp()) + c.addTimeSeriesIfNeeded(labels, startTimestampNs, pt.Timestamp()) } } @@ -445,6 +450,8 @@ func (c *MimirConverter) addSummaryDataPoints(ctx context.Context, dataPoints pm pt := dataPoints.At(x) timestamp := convertTimeStamp(pt.Timestamp()) + startTimestampNs := pt.StartTimestamp() + startTimestampMs := convertTimeStamp(startTimestampNs) baseLabels := createAttributes(resource, pt.Attributes(), settings, nil, false) // treat sum as a sample in an individual TimeSeries @@ -457,6 +464,7 @@ func (c *MimirConverter) addSummaryDataPoints(ctx context.Context, dataPoints pm } // sum and count of the summary should append suffix to baseName sumlabels := createLabels(baseName+sumStr, baseLabels) + c.handleStartTime(startTimestampMs, timestamp, sum.Value, sumlabels, settings) c.addSample(sum, sumlabels) // treat count as a sample in an individual TimeSeries @@ -468,6 +476,7 @@ func (c *MimirConverter) addSummaryDataPoints(ctx context.Context, dataPoints pm count.Value = math.Float64frombits(value.StaleNaN) } countlabels := createLabels(baseName+countStr, baseLabels) + c.handleStartTime(startTimestampMs, timestamp, count.Value, countlabels, settings) c.addSample(count, countlabels) // process each percentile/quantile @@ -482,13 +491,13 @@ func (c *MimirConverter) addSummaryDataPoints(ctx context.Context, dataPoints pm } percentileStr := strconv.FormatFloat(qt.Quantile(), 'f', -1, 64) qtlabels := createLabels(baseName, baseLabels, quantileStr, percentileStr) + c.handleStartTime(startTimestampMs, timestamp, quantile.Value, qtlabels, settings) c.addSample(quantile, qtlabels) } - startTimestamp := pt.StartTimestamp() - if settings.ExportCreatedMetric && startTimestamp != 0 { + if settings.ExportCreatedMetric && startTimestampNs != 0 { createdLabels := createLabels(baseName+createdSuffix, baseLabels) - c.addTimeSeriesIfNeeded(createdLabels, startTimestamp, pt.Timestamp()) + c.addTimeSeriesIfNeeded(createdLabels, startTimestampNs, pt.Timestamp()) } } @@ -564,6 +573,19 @@ func (c *MimirConverter) addTimeSeriesIfNeeded(lbls []mimirpb.LabelAdapter, star } } +// handleStartTime adds a zero sample 1 millisecond before ts iff startTs == ts. +// The reason for doing this is that PRW v1 doesn't support Created Timestamps. After switching to PRW v2's direct CT support, +// make use of its direct support fort Created Timestamps instead. +func (c *MimirConverter) handleStartTime(startTs, ts int64, value float64, labels []mimirpb.LabelAdapter, settings Settings) { + if !settings.EnableCreatedTimestampZeroIngestion { + return + } + if startTs > 0 && startTs == ts { + // See https://github.com/prometheus/prometheus/issues/14600 for context. + c.addSample(&mimirpb.Sample{TimestampMs: ts - 1}, labels) + } +} + // addResourceTargetInfo converts the resource to the target info metric. func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timestamp pcommon.Timestamp, converter *MimirConverter) { if settings.DisableTargetInfo || timestamp == 0 { @@ -608,10 +630,10 @@ func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timesta return } + ts := convertTimeStamp(timestamp) sample := &mimirpb.Sample{ - Value: float64(1), - // convert ns to ms - TimestampMs: convertTimeStamp(timestamp), + Value: float64(1), + TimestampMs: ts, } converter.addSample(sample, labels) } diff --git a/pkg/distributor/otlp/metrics_to_prw_generated.go b/pkg/distributor/otlp/metrics_to_prw_generated.go index a15d3e4a5ea..af697afb67e 100644 --- a/pkg/distributor/otlp/metrics_to_prw_generated.go +++ b/pkg/distributor/otlp/metrics_to_prw_generated.go @@ -35,13 +35,14 @@ import ( ) type Settings struct { - Namespace string - ExternalLabels map[string]string - DisableTargetInfo bool - ExportCreatedMetric bool - AddMetricSuffixes bool - SendMetadata bool - PromoteResourceAttributes []string + Namespace string + ExternalLabels map[string]string + DisableTargetInfo bool + ExportCreatedMetric bool + AddMetricSuffixes bool + SendMetadata bool + PromoteResourceAttributes []string + EnableCreatedTimestampZeroIngestion bool } // MimirConverter converts from OTel write format to Mimir remote write format. diff --git a/pkg/distributor/otlp/number_data_points_generated.go b/pkg/distributor/otlp/number_data_points_generated.go index 264722f8b64..eabbc29669c 100644 --- a/pkg/distributor/otlp/number_data_points_generated.go +++ b/pkg/distributor/otlp/number_data_points_generated.go @@ -48,9 +48,9 @@ func (c *MimirConverter) addGaugeNumberDataPoints(ctx context.Context, dataPoint model.MetricNameLabel, name, ) + timestamp := convertTimeStamp(pt.Timestamp()) sample := &mimirpb.Sample{ - // convert ns to ms - TimestampMs: convertTimeStamp(pt.Timestamp()), + TimestampMs: timestamp, } switch pt.ValueType() { case pmetric.NumberDataPointValueTypeInt: @@ -75,6 +75,7 @@ func (c *MimirConverter) addSumNumberDataPoints(ctx context.Context, dataPoints } pt := dataPoints.At(x) + startTimestampNs := pt.StartTimestamp() lbls := createAttributes( resource, pt.Attributes(), @@ -84,9 +85,9 @@ func (c *MimirConverter) addSumNumberDataPoints(ctx context.Context, dataPoints model.MetricNameLabel, name, ) + timestamp := convertTimeStamp(pt.Timestamp()) sample := &mimirpb.Sample{ - // convert ns to ms - TimestampMs: convertTimeStamp(pt.Timestamp()), + TimestampMs: timestamp, } switch pt.ValueType() { case pmetric.NumberDataPointValueTypeInt: @@ -97,6 +98,10 @@ func (c *MimirConverter) addSumNumberDataPoints(ctx context.Context, dataPoints if pt.Flags().NoRecordedValue() { sample.Value = math.Float64frombits(value.StaleNaN) } + isMonotonic := metric.Sum().IsMonotonic() + if isMonotonic { + c.handleStartTime(convertTimeStamp(startTimestampNs), timestamp, sample.Value, lbls, settings) + } ts := c.addSample(sample, lbls) if ts != nil { exemplars, err := getPromExemplars[pmetric.NumberDataPoint](ctx, &c.everyN, pt) @@ -107,9 +112,8 @@ func (c *MimirConverter) addSumNumberDataPoints(ctx context.Context, dataPoints } // add created time series if needed - if settings.ExportCreatedMetric && metric.Sum().IsMonotonic() { - startTimestamp := pt.StartTimestamp() - if startTimestamp == 0 { + if settings.ExportCreatedMetric && isMonotonic { + if startTimestampNs == 0 { return nil } @@ -121,7 +125,7 @@ func (c *MimirConverter) addSumNumberDataPoints(ctx context.Context, dataPoints break } } - c.addTimeSeriesIfNeeded(createdLabels, startTimestamp, pt.Timestamp()) + c.addTimeSeriesIfNeeded(createdLabels, startTimestampNs, pt.Timestamp()) } } diff --git a/pkg/distributor/push_test.go b/pkg/distributor/push_test.go index 87ce6249ec6..099008b6a2a 100644 --- a/pkg/distributor/push_test.go +++ b/pkg/distributor/push_test.go @@ -1247,3 +1247,7 @@ type otlpLimitsMock struct{} func (o otlpLimitsMock) OTelMetricSuffixesEnabled(_ string) bool { return false } + +func (o otlpLimitsMock) OTelCreatedTimestampZeroIngestionEnabled(_ string) bool { + return false +} diff --git a/pkg/querier/error_translate_queryable_test.go b/pkg/querier/error_translate_queryable_test.go index bf0fcf0ef88..6a89fa40640 100644 --- a/pkg/querier/error_translate_queryable_test.go +++ b/pkg/querier/error_translate_queryable_test.go @@ -170,6 +170,7 @@ func createPrometheusAPI(q storage.SampleAndChunkQueryable) *route.Router { false, nil, false, + false, ) promRouter := route.New().WithPrefix("/api/v1") diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index c529c38c9e0..7e648407823 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -228,7 +228,8 @@ type Limits struct { AlertmanagerMaxAlertsSizeBytes int `yaml:"alertmanager_max_alerts_size_bytes" json:"alertmanager_max_alerts_size_bytes"` // OpenTelemetry - OTelMetricSuffixesEnabled bool `yaml:"otel_metric_suffixes_enabled" json:"otel_metric_suffixes_enabled" category:"advanced"` + OTelMetricSuffixesEnabled bool `yaml:"otel_metric_suffixes_enabled" json:"otel_metric_suffixes_enabled" category:"advanced"` + OTelCreatedTimestampZeroIngestionEnabled bool `yaml:"otel_created_timestamp_zero_ingestion_enabled" json:"otel_created_timestamp_zero_ingestion_enabled" category:"experimental"` // Ingest storage. IngestStorageReadConsistency string `yaml:"ingest_storage_read_consistency" json:"ingest_storage_read_consistency" category:"experimental"` @@ -264,6 +265,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&l.MetricRelabelingEnabled, "distributor.metric-relabeling-enabled", true, "Enable metric relabeling for the tenant. This configuration option can be used to forcefully disable metric relabeling on a per-tenant basis.") f.BoolVar(&l.ServiceOverloadStatusCodeOnRateLimitEnabled, "distributor.service-overload-status-code-on-rate-limit-enabled", false, "If enabled, rate limit errors will be reported to the client with HTTP status code 529 (Service is overloaded). If disabled, status code 429 (Too Many Requests) is used. Enabling -distributor.retry-after-header.enabled before utilizing this option is strongly recommended as it helps prevent premature request retries by the client.") f.BoolVar(&l.OTelMetricSuffixesEnabled, "distributor.otel-metric-suffixes-enabled", false, "Whether to enable automatic suffixes to names of metrics ingested through OTLP.") + f.BoolVar(&l.OTelCreatedTimestampZeroIngestionEnabled, "distributor.otel-created-timestamp-zero-ingestion-enabled", false, "Whether to enable translation of OTel start timestamps to Prometheus zero samples in the OTLP endpoint.") f.IntVar(&l.MaxGlobalSeriesPerUser, MaxSeriesPerUserFlag, 150000, "The maximum number of in-memory series per tenant, across the cluster before replication. 0 to disable.") f.IntVar(&l.MaxGlobalSeriesPerMetric, MaxSeriesPerMetricFlag, 0, "The maximum number of in-memory series per metric name, across the cluster before replication. 0 to disable.") @@ -1047,6 +1049,10 @@ func (o *Overrides) OTelMetricSuffixesEnabled(tenantID string) bool { return o.getOverridesForUser(tenantID).OTelMetricSuffixesEnabled } +func (o *Overrides) OTelCreatedTimestampZeroIngestionEnabled(tenantID string) bool { + return o.getOverridesForUser(tenantID).OTelCreatedTimestampZeroIngestionEnabled +} + func (o *Overrides) AlignQueriesWithStep(userID string) bool { return o.getOverridesForUser(userID).AlignQueriesWithStep } diff --git a/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/helper.go b/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/helper.go index fd7f58f0738..782e1a8764c 100644 --- a/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/helper.go +++ b/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/helper.go @@ -251,12 +251,15 @@ func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPo pt := dataPoints.At(x) timestamp := convertTimeStamp(pt.Timestamp()) + startTimestampNs := pt.StartTimestamp() + startTimestampMs := convertTimeStamp(startTimestampNs) baseLabels := createAttributes(resource, pt.Attributes(), settings, nil, false) // If the sum is unset, it indicates the _sum metric point should be // omitted if pt.HasSum() { // treat sum as a sample in an individual TimeSeries + sumlabels := createLabels(baseName+sumStr, baseLabels) sum := &prompb.Sample{ Value: pt.Sum(), Timestamp: timestamp, @@ -265,7 +268,7 @@ func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPo sum.Value = math.Float64frombits(value.StaleNaN) } - sumlabels := createLabels(baseName+sumStr, baseLabels) + c.handleStartTime(startTimestampMs, timestamp, sum.Value, sumlabels, settings) c.addSample(sum, sumlabels) } @@ -280,6 +283,7 @@ func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPo } countlabels := createLabels(baseName+countStr, baseLabels) + c.handleStartTime(startTimestampMs, timestamp, count.Value, countlabels, settings) c.addSample(count, countlabels) // cumulative count for conversion to cumulative histogram @@ -304,6 +308,7 @@ func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPo } boundStr := strconv.FormatFloat(bound, 'f', -1, 64) labels := createLabels(baseName+bucketStr, baseLabels, leStr, boundStr) + c.handleStartTime(startTimestampMs, timestamp, bucket.Value, labels, settings) ts := c.addSample(bucket, labels) bucketBounds = append(bucketBounds, bucketBoundsData{ts: ts, bound: bound}) @@ -318,6 +323,7 @@ func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPo infBucket.Value = float64(pt.Count()) } infLabels := createLabels(baseName+bucketStr, baseLabels, leStr, pInfStr) + c.handleStartTime(startTimestampMs, timestamp, infBucket.Value, infLabels, settings) ts := c.addSample(infBucket, infLabels) bucketBounds = append(bucketBounds, bucketBoundsData{ts: ts, bound: math.Inf(1)}) @@ -325,10 +331,9 @@ func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPo return err } - startTimestamp := pt.StartTimestamp() - if settings.ExportCreatedMetric && startTimestamp != 0 { + if settings.ExportCreatedMetric && startTimestampNs != 0 { labels := createLabels(baseName+createdSuffix, baseLabels) - c.addTimeSeriesIfNeeded(labels, startTimestamp, pt.Timestamp()) + c.addTimeSeriesIfNeeded(labels, startTimestampNs, pt.Timestamp()) } } @@ -443,6 +448,8 @@ func (c *PrometheusConverter) addSummaryDataPoints(ctx context.Context, dataPoin pt := dataPoints.At(x) timestamp := convertTimeStamp(pt.Timestamp()) + startTimestampNs := pt.StartTimestamp() + startTimestampMs := convertTimeStamp(startTimestampNs) baseLabels := createAttributes(resource, pt.Attributes(), settings, nil, false) // treat sum as a sample in an individual TimeSeries @@ -455,6 +462,7 @@ func (c *PrometheusConverter) addSummaryDataPoints(ctx context.Context, dataPoin } // sum and count of the summary should append suffix to baseName sumlabels := createLabels(baseName+sumStr, baseLabels) + c.handleStartTime(startTimestampMs, timestamp, sum.Value, sumlabels, settings) c.addSample(sum, sumlabels) // treat count as a sample in an individual TimeSeries @@ -466,6 +474,7 @@ func (c *PrometheusConverter) addSummaryDataPoints(ctx context.Context, dataPoin count.Value = math.Float64frombits(value.StaleNaN) } countlabels := createLabels(baseName+countStr, baseLabels) + c.handleStartTime(startTimestampMs, timestamp, count.Value, countlabels, settings) c.addSample(count, countlabels) // process each percentile/quantile @@ -480,13 +489,13 @@ func (c *PrometheusConverter) addSummaryDataPoints(ctx context.Context, dataPoin } percentileStr := strconv.FormatFloat(qt.Quantile(), 'f', -1, 64) qtlabels := createLabels(baseName, baseLabels, quantileStr, percentileStr) + c.handleStartTime(startTimestampMs, timestamp, quantile.Value, qtlabels, settings) c.addSample(quantile, qtlabels) } - startTimestamp := pt.StartTimestamp() - if settings.ExportCreatedMetric && startTimestamp != 0 { + if settings.ExportCreatedMetric && startTimestampNs != 0 { createdLabels := createLabels(baseName+createdSuffix, baseLabels) - c.addTimeSeriesIfNeeded(createdLabels, startTimestamp, pt.Timestamp()) + c.addTimeSeriesIfNeeded(createdLabels, startTimestampNs, pt.Timestamp()) } } @@ -562,6 +571,19 @@ func (c *PrometheusConverter) addTimeSeriesIfNeeded(lbls []prompb.Label, startTi } } +// handleStartTime adds a zero sample 1 millisecond before ts iff startTs == ts. +// The reason for doing this is that PRW v1 doesn't support Created Timestamps. After switching to PRW v2's direct CT support, +// make use of its direct support fort Created Timestamps instead. +func (c *PrometheusConverter) handleStartTime(startTs, ts int64, value float64, labels []prompb.Label, settings Settings) { + if !settings.EnableCreatedTimestampZeroIngestion { + return + } + if startTs > 0 && startTs == ts { + // See https://github.com/prometheus/prometheus/issues/14600 for context. + c.addSample(&prompb.Sample{Timestamp: ts - 1}, labels) + } +} + // addResourceTargetInfo converts the resource to the target info metric. func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timestamp pcommon.Timestamp, converter *PrometheusConverter) { if settings.DisableTargetInfo || timestamp == 0 { @@ -606,10 +628,10 @@ func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timesta return } + ts := convertTimeStamp(timestamp) sample := &prompb.Sample{ - Value: float64(1), - // convert ns to ms - Timestamp: convertTimeStamp(timestamp), + Value: float64(1), + Timestamp: ts, } converter.addSample(sample, labels) } diff --git a/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go b/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go index 0afd2ad57e4..190f80769c8 100644 --- a/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go +++ b/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go @@ -32,13 +32,14 @@ import ( ) type Settings struct { - Namespace string - ExternalLabels map[string]string - DisableTargetInfo bool - ExportCreatedMetric bool - AddMetricSuffixes bool - SendMetadata bool - PromoteResourceAttributes []string + Namespace string + ExternalLabels map[string]string + DisableTargetInfo bool + ExportCreatedMetric bool + AddMetricSuffixes bool + SendMetadata bool + PromoteResourceAttributes []string + EnableCreatedTimestampZeroIngestion bool } // PrometheusConverter converts from OTel write format to Prometheus remote write format. diff --git a/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go b/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go index 6cdab450e1a..f0f873234b3 100644 --- a/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go +++ b/vendor/github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go @@ -45,9 +45,9 @@ func (c *PrometheusConverter) addGaugeNumberDataPoints(ctx context.Context, data model.MetricNameLabel, name, ) + timestamp := convertTimeStamp(pt.Timestamp()) sample := &prompb.Sample{ - // convert ns to ms - Timestamp: convertTimeStamp(pt.Timestamp()), + Timestamp: timestamp, } switch pt.ValueType() { case pmetric.NumberDataPointValueTypeInt: @@ -72,6 +72,7 @@ func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPo } pt := dataPoints.At(x) + startTimestampNs := pt.StartTimestamp() lbls := createAttributes( resource, pt.Attributes(), @@ -81,9 +82,9 @@ func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPo model.MetricNameLabel, name, ) + timestamp := convertTimeStamp(pt.Timestamp()) sample := &prompb.Sample{ - // convert ns to ms - Timestamp: convertTimeStamp(pt.Timestamp()), + Timestamp: timestamp, } switch pt.ValueType() { case pmetric.NumberDataPointValueTypeInt: @@ -94,6 +95,10 @@ func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPo if pt.Flags().NoRecordedValue() { sample.Value = math.Float64frombits(value.StaleNaN) } + isMonotonic := metric.Sum().IsMonotonic() + if isMonotonic { + c.handleStartTime(convertTimeStamp(startTimestampNs), timestamp, sample.Value, lbls, settings) + } ts := c.addSample(sample, lbls) if ts != nil { exemplars, err := getPromExemplars[pmetric.NumberDataPoint](ctx, &c.everyN, pt) @@ -104,9 +109,8 @@ func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPo } // add created time series if needed - if settings.ExportCreatedMetric && metric.Sum().IsMonotonic() { - startTimestamp := pt.StartTimestamp() - if startTimestamp == 0 { + if settings.ExportCreatedMetric && isMonotonic { + if startTimestampNs == 0 { return nil } @@ -118,7 +122,7 @@ func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPo break } } - c.addTimeSeriesIfNeeded(createdLabels, startTimestamp, pt.Timestamp()) + c.addTimeSeriesIfNeeded(createdLabels, startTimestampNs, pt.Timestamp()) } } diff --git a/vendor/github.com/prometheus/prometheus/storage/remote/write_handler.go b/vendor/github.com/prometheus/prometheus/storage/remote/write_handler.go index 736bc8eff3c..54c1b9db49f 100644 --- a/vendor/github.com/prometheus/prometheus/storage/remote/write_handler.go +++ b/vendor/github.com/prometheus/prometheus/storage/remote/write_handler.go @@ -482,23 +482,25 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * // NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and // writes them to the provided appendable. -func NewOTLPWriteHandler(logger log.Logger, appendable storage.Appendable, configFunc func() config.Config) http.Handler { +func NewOTLPWriteHandler(logger log.Logger, appendable storage.Appendable, configFunc func() config.Config, enableCTZeroIngestion bool) http.Handler { rwHandler := &writeHandler{ logger: logger, appendable: appendable, } return &otlpWriteHandler{ - logger: logger, - rwHandler: rwHandler, - configFunc: configFunc, + logger: logger, + rwHandler: rwHandler, + configFunc: configFunc, + enableCTZeroIngestion: enableCTZeroIngestion, } } type otlpWriteHandler struct { - logger log.Logger - rwHandler *writeHandler - configFunc func() config.Config + logger log.Logger + rwHandler *writeHandler + configFunc func() config.Config + enableCTZeroIngestion bool } func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -513,8 +515,9 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { converter := otlptranslator.NewPrometheusConverter() annots, err := converter.FromMetrics(r.Context(), req.Metrics(), otlptranslator.Settings{ - AddMetricSuffixes: true, - PromoteResourceAttributes: otlpCfg.PromoteResourceAttributes, + AddMetricSuffixes: true, + PromoteResourceAttributes: otlpCfg.PromoteResourceAttributes, + EnableCreatedTimestampZeroIngestion: h.enableCTZeroIngestion, }) if err != nil { level.Warn(h.logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err) diff --git a/vendor/github.com/prometheus/prometheus/web/api/v1/api.go b/vendor/github.com/prometheus/prometheus/web/api/v1/api.go index d58be211f21..48283a6c5ef 100644 --- a/vendor/github.com/prometheus/prometheus/web/api/v1/api.go +++ b/vendor/github.com/prometheus/prometheus/web/api/v1/api.go @@ -251,6 +251,7 @@ func NewAPI( rwEnabled bool, acceptRemoteWriteProtoMsgs []config.RemoteWriteProtoMsg, otlpEnabled bool, + enableCTZeroIngestion bool, ) *API { a := &API{ QueryEngine: qe, @@ -295,7 +296,7 @@ func NewAPI( a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs) } if otlpEnabled { - a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap, configFunc) + a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap, configFunc, enableCTZeroIngestion) } return a diff --git a/vendor/modules.txt b/vendor/modules.txt index 2ccabf11588..ef9bbb2c82e 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -997,7 +997,7 @@ github.com/prometheus/exporter-toolkit/web github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util -# github.com/prometheus/prometheus v1.99.0 => github.com/grafana/mimir-prometheus v0.0.0-20240830123921-fdf902dd68d9 +# github.com/prometheus/prometheus v1.99.0 => github.com/grafana/mimir-prometheus v0.0.0-20240830150301-6b342fac9c48 ## explicit; go 1.21.0 github.com/prometheus/prometheus/config github.com/prometheus/prometheus/discovery @@ -1648,7 +1648,7 @@ sigs.k8s.io/kustomize/kyaml/yaml/walk sigs.k8s.io/yaml sigs.k8s.io/yaml/goyaml.v2 sigs.k8s.io/yaml/goyaml.v3 -# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240830123921-fdf902dd68d9 +# github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20240830150301-6b342fac9c48 # github.com/prometheus/client_golang => github.com/prometheus/client_golang v1.19.1 # github.com/hashicorp/memberlist => github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe # gopkg.in/yaml.v3 => github.com/colega/go-yaml-yaml v0.0.0-20220720105220-255a8d16d094