diff --git a/CHANGELOG.md b/CHANGELOG.md index 72d25acb230..002fb3af4d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,7 +21,7 @@ * [CHANGE] Distributor, ruler: remove deprecated `-ingester.client.report-grpc-codes-in-instrumentation-label-enabled`. #8700 * [CHANGE] Ingester client: experimental support for client-side circuit breakers, their configuration options (`-ingester.client.circuit-breaker.*`) and metrics (`cortex_ingester_client_circuit_breaker_results_total`, `cortex_ingester_client_circuit_breaker_transitions_total`) were removed. #8802 * [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8671 #8677 #8747 -* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 +* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 * What it is: * When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path. * New configuration options: diff --git a/pkg/frontend/querymiddleware/read_consistency.go b/pkg/frontend/querymiddleware/read_consistency.go index 2f3eebc9b40..7455a924af0 100644 --- a/pkg/frontend/querymiddleware/read_consistency.go +++ b/pkg/frontend/querymiddleware/read_consistency.go @@ -22,10 +22,10 @@ type readConsistencyRoundTripper struct { offsetsReader *ingest.TopicOffsetsReader limits Limits logger log.Logger - metrics *ingest.StrongReadConsistencyInstrumentation[*http.Response] + metrics *ingest.StrongReadConsistencyInstrumentation[map[int32]int64] } -func newReadConsistencyRoundTripper(next http.RoundTripper, offsetsReader *ingest.TopicOffsetsReader, limits Limits, logger log.Logger, metrics *ingest.StrongReadConsistencyInstrumentation[*http.Response]) http.RoundTripper { +func newReadConsistencyRoundTripper(next http.RoundTripper, offsetsReader *ingest.TopicOffsetsReader, limits Limits, logger log.Logger, metrics *ingest.StrongReadConsistencyInstrumentation[map[int32]int64]) http.RoundTripper { return &readConsistencyRoundTripper{ next: next, limits: limits, @@ -57,17 +57,17 @@ func (r *readConsistencyRoundTripper) RoundTrip(req *http.Request) (_ *http.Resp return r.next.RoundTrip(req) } - return r.metrics.Observe(false, func() (*http.Response, error) { - // Fetch last produced offsets. - offsets, err := r.offsetsReader.WaitNextFetchLastProducedOffset(ctx) - if err != nil { - return nil, errors.Wrap(err, "wait for last produced offsets") - } + // Fetch last produced offsets. + offsets, err := r.metrics.Observe(false, func() (map[int32]int64, error) { + return r.offsetsReader.WaitNextFetchLastProducedOffset(ctx) + }) + if err != nil { + return nil, errors.Wrap(err, "wait for last produced offsets") + } - req.Header.Add(querierapi.ReadConsistencyOffsetsHeader, string(querierapi.EncodeOffsets(offsets))) + req.Header.Add(querierapi.ReadConsistencyOffsetsHeader, string(querierapi.EncodeOffsets(offsets))) - return r.next.RoundTrip(req) - }) + return r.next.RoundTrip(req) } // getDefaultReadConsistency returns the default read consistency for the input tenantIDs, @@ -82,7 +82,7 @@ func getDefaultReadConsistency(tenantIDs []string, limits Limits) string { return querierapi.ReadConsistencyEventual } -func newReadConsistencyMetrics(reg prometheus.Registerer) *ingest.StrongReadConsistencyInstrumentation[*http.Response] { +func newReadConsistencyMetrics(reg prometheus.Registerer) *ingest.StrongReadConsistencyInstrumentation[map[int32]int64] { const component = "query-frontend" - return ingest.NewStrongReadConsistencyInstrumentation[*http.Response](component, reg) + return ingest.NewStrongReadConsistencyInstrumentation[map[int32]int64](component, reg) }