Skip to content

Commit

Permalink
add metrics for both otlp and remote write
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-jeanne committed Jun 5, 2024
1 parent a63fb85 commit 16cbd34
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 21 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* [FEATURE] Query-frontend, querier: new experimental `/cardinality/active_native_histogram_metrics` API to get active native histogram metric names with statistics about active native histogram buckets. #7982 #7986 #8008
* [FEATURE] Alertmanager: Added `-alertmanager.max-silences-count` and `-alertmanager.max-silence-size-bytes` to set limits on per tenant silences. Disabled by default. #6898
* [FEATURE] Ingester: add experimental support for the server-side circuit breakers when writing to ingesters. This can be enabled using `-ingester.circuit-breaker.enabled` option. Further `-ingester.circuit-breaker.*` options for configuring circuit-breaker are available. Added metrics `cortex_ingester_circuit_breaker_results_total`, `cortex_ingester_circuit_breaker_transitions_total` and `cortex_ingester_circuit_breaker_current_state`. #8180
* [ENHANCEMENT] Distributor: add metrics `cortex_distributor_otlp_samples_per_batch` to track samples per batch in otlp request. #8265
* [ENHANCEMENT] Distributor: add metrics `cortex_distributor_samples_per_request` and `cortex_distributor_exemplars_per_request` to track samples per request. #8265
* [ENHANCEMENT] Reduced memory allocations in functions used to propagate contextual information between gRPC calls. #7529
* [ENHANCEMENT] Distributor: add experimental limit for exemplars per series per request, enabled with `-distributor.max-exemplars-per-series-per-request`, the number of discarded exemplars are tracked with `cortex_discarded_exemplars_total{reason="too_many_exemplars_per_series_per_request"}` #7989 #8010
* [ENHANCEMENT] Store-gateway: merge series from different blocks concurrently. #7456
Expand Down
47 changes: 29 additions & 18 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ type Distributor struct {
dedupedSamples *prometheus.CounterVec
labelsHistogram prometheus.Histogram
sampleDelayHistogram prometheus.Histogram
incomingSamplesPerRequest *prometheus.HistogramVec
incomingExemplarsPerRequest *prometheus.HistogramVec
latestSeenSampleTimestampPerUser *prometheus.GaugeVec
hashCollisionCount prometheus.Counter

Expand Down Expand Up @@ -264,9 +266,8 @@ const (
)

type PushMetrics struct {
otlpRequestCounter *prometheus.CounterVec
uncompressedBodySize *prometheus.HistogramVec
otlpIncomingSamplesPerBatch *prometheus.HistogramVec
otlpRequestCounter *prometheus.CounterVec
uncompressedBodySize *prometheus.HistogramVec
}

func newPushMetrics(reg prometheus.Registerer) *PushMetrics {
Expand All @@ -282,13 +283,6 @@ func newPushMetrics(reg prometheus.Registerer) *PushMetrics {
NativeHistogramMinResetDuration: 1 * time.Hour,
NativeHistogramMaxBucketNumber: 100,
}, []string{"user"}),
otlpIncomingSamplesPerBatch: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_distributor_otlp_samples_per_batch",
Help: "Number of samples per batch in otlp request.",
NativeHistogramBucketFactor: 2,
NativeHistogramMinResetDuration: 1 * time.Hour,
NativeHistogramMaxBucketNumber: 100,
}, []string{"user"}),
}
}

Expand All @@ -304,16 +298,9 @@ func (m *PushMetrics) ObserveUncompressedBodySize(user string, size float64) {
}
}

func (m *PushMetrics) ObserveOtlpIncomingSamplesPerBatch(user string, count float64) {
if m != nil {
m.otlpIncomingSamplesPerBatch.WithLabelValues(user).Observe(count)
}
}

func (m *PushMetrics) deleteUserMetrics(user string) {
m.otlpRequestCounter.DeleteLabelValues(user)
m.uncompressedBodySize.DeleteLabelValues(user)
m.otlpIncomingSamplesPerBatch.DeleteLabelValues(user)
}

// New constructs a new Distributor
Expand Down Expand Up @@ -425,6 +412,20 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
60 * 60 * 24, // 24h
},
}),
incomingSamplesPerRequest: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_distributor_samples_per_request",
Help: "Number of samples per request.",
NativeHistogramBucketFactor: 2,
NativeHistogramMinResetDuration: 1 * time.Hour,
NativeHistogramMaxBucketNumber: 100,
}, []string{"user"}),
incomingExemplarsPerRequest: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_distributor_exemplars_per_request",
Help: "Number of exemplars per request.",
NativeHistogramBucketFactor: 2,
NativeHistogramMinResetDuration: 1 * time.Hour,
NativeHistogramMaxBucketNumber: 100,
}, []string{"user"}),
latestSeenSampleTimestampPerUser: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_distributor_latest_seen_sample_timestamp_seconds",
Help: "Unix timestamp of latest received sample per user.",
Expand Down Expand Up @@ -664,6 +665,8 @@ func (d *Distributor) cleanupInactiveUser(userID string) {
d.incomingSamples.DeleteLabelValues(userID)
d.incomingExemplars.DeleteLabelValues(userID)
d.incomingMetadata.DeleteLabelValues(userID)
d.incomingSamplesPerRequest.DeleteLabelValues(userID)
d.incomingExemplarsPerRequest.DeleteLabelValues(userID)
d.nonHASamples.DeleteLabelValues(userID)
d.latestSeenSampleTimestampPerUser.DeleteLabelValues(userID)

Expand Down Expand Up @@ -730,7 +733,6 @@ func (d *Distributor) validateSeries(nowt time.Time, ts *mimirpb.PreallocTimeser
}

now := model.TimeFromUnixNano(nowt.UnixNano())

for _, s := range ts.Samples {

delta := now - model.Time(s.TimestampMs)
Expand Down Expand Up @@ -1002,6 +1004,7 @@ func (d *Distributor) prePushValidationMiddleware(next PushFunc) PushFunc {
}

now := mtime.Now()

d.receivedRequests.WithLabelValues(userID).Add(1)
d.activeUsers.UpdateUserTimestamp(userID, now)

Expand Down Expand Up @@ -1042,7 +1045,11 @@ func (d *Distributor) prePushValidationMiddleware(next PushFunc) PushFunc {

var firstPartialErr error
var removeIndexes []int
totalSamples, totalExemplars := 0, 0

for tsIdx, ts := range req.Timeseries {
totalSamples += len(ts.Samples)
totalExemplars += len(ts.Exemplars)
if len(ts.Labels) == 0 {
removeIndexes = append(removeIndexes, tsIdx)
continue
Expand All @@ -1068,6 +1075,10 @@ func (d *Distributor) prePushValidationMiddleware(next PushFunc) PushFunc {
validatedSamples += len(ts.Samples) + len(ts.Histograms)
validatedExemplars += len(ts.Exemplars)
}

d.incomingSamplesPerRequest.WithLabelValues(userID).Observe(float64(totalSamples))
d.incomingExemplarsPerRequest.WithLabelValues(userID).Observe(float64(totalExemplars))

if len(removeIndexes) > 0 {
for _, removeIndex := range removeIndexes {
mimirpb.ReusePreallocTimeseries(&req.Timeseries[removeIndex])
Expand Down
2 changes: 0 additions & 2 deletions pkg/distributor/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,6 @@ func OTLPHandler(
"exemplar_count", exemplarCount,
)

pushMetrics.ObserveOtlpIncomingSamplesPerBatch(tenantID, float64(sampleCount))

req.Timeseries = metrics

if enableOtelMetadataStorage {
Expand Down

0 comments on commit 16cbd34

Please sign in to comment.