From 7c97c107c9c113b696093468b373918ea6d6fefd Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Thu, 20 Jun 2024 16:03:15 +1000 Subject: [PATCH 1/7] Add support for falling back to Prometheus' engine by setting a HTTP header --- pkg/api/handlers.go | 4 +- pkg/streamingpromql/compat/fallback_engine.go | 48 ++++++++----- .../compat/fallback_engine_test.go | 37 ++++++++-- pkg/streamingpromql/compat/fallback_header.go | 40 +++++++++++ .../compat/fallback_header_test.go | 70 +++++++++++++++++++ 5 files changed, 173 insertions(+), 26 deletions(-) create mode 100644 pkg/streamingpromql/compat/fallback_header.go create mode 100644 pkg/streamingpromql/compat/fallback_header_test.go diff --git a/pkg/api/handlers.go b/pkg/api/handlers.go index 4c7e1e81e94..ad5003168ab 100644 --- a/pkg/api/handlers.go +++ b/pkg/api/handlers.go @@ -34,6 +34,7 @@ import ( "github.com/grafana/mimir/pkg/querier" querierapi "github.com/grafana/mimir/pkg/querier/api" "github.com/grafana/mimir/pkg/querier/stats" + "github.com/grafana/mimir/pkg/streamingpromql/compat" "github.com/grafana/mimir/pkg/usagestats" "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/validation" @@ -285,7 +286,8 @@ func NewQuerierHandler( router := mux.NewRouter() routeInjector := middleware.RouteInjector{RouteMatcher: router} - router.Use(routeInjector.Wrap) + fallbackInjector := compat.EngineFallbackInjector{} + router.Use(routeInjector.Wrap, fallbackInjector.Wrap) // Use a separate metric for the querier in order to differentiate requests from the query-frontend when // running Mimir in monolithic mode. diff --git a/pkg/streamingpromql/compat/fallback_engine.go b/pkg/streamingpromql/compat/fallback_engine.go index 7dda25583a4..7e70e287055 100644 --- a/pkg/streamingpromql/compat/fallback_engine.go +++ b/pkg/streamingpromql/compat/fallback_engine.go @@ -27,6 +27,8 @@ type EngineWithFallback struct { logger log.Logger } +var errFallbackForcedByHTTPHeader = NotSupportedError{"fallback forced by HTTP header"} + func NewEngineWithFallback(preferred, fallback promql.QueryEngine, reg prometheus.Registerer, logger log.Logger) promql.QueryEngine { return &EngineWithFallback{ preferred: preferred, @@ -46,17 +48,22 @@ func NewEngineWithFallback(preferred, fallback promql.QueryEngine, reg prometheu } func (e EngineWithFallback) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) { - query, err := e.preferred.NewInstantQuery(ctx, q, opts, qs, ts) + notSupportedErr := NotSupportedError{} - if err == nil { - e.supportedQueries.Inc() - return query, nil - } + if isForceFallbackEnabled(ctx) { + notSupportedErr = errFallbackForcedByHTTPHeader + } else { + query, err := e.preferred.NewInstantQuery(ctx, q, opts, qs, ts) - notSupportedErr := NotSupportedError{} - if !errors.As(err, ¬SupportedErr) { - // Don't bother trying the fallback engine if we failed for a reason other than the expression not being supported. - return nil, err + if err == nil { + e.supportedQueries.Inc() + return query, nil + } + + if !errors.As(err, ¬SupportedErr) { + // Don't bother trying the fallback engine if we failed for a reason other than the expression not being supported. + return nil, err + } } logger := spanlogger.FromContext(ctx, e.logger) @@ -67,17 +74,22 @@ func (e EngineWithFallback) NewInstantQuery(ctx context.Context, q storage.Query } func (e EngineWithFallback) NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) { - query, err := e.preferred.NewRangeQuery(ctx, q, opts, qs, start, end, interval) + notSupportedErr := NotSupportedError{} - if err == nil { - e.supportedQueries.Inc() - return query, nil - } + if isForceFallbackEnabled(ctx) { + notSupportedErr = errFallbackForcedByHTTPHeader + } else { + query, err := e.preferred.NewRangeQuery(ctx, q, opts, qs, start, end, interval) - notSupportedErr := NotSupportedError{} - if !errors.As(err, ¬SupportedErr) { - // Don't bother trying the fallback engine if we failed for a reason other than the expression not being supported. - return nil, err + if err == nil { + e.supportedQueries.Inc() + return query, nil + } + + if !errors.As(err, ¬SupportedErr) { + // Don't bother trying the fallback engine if we failed for a reason other than the expression not being supported. + return nil, err + } } logger := spanlogger.FromContext(ctx, e.logger) diff --git a/pkg/streamingpromql/compat/fallback_engine_test.go b/pkg/streamingpromql/compat/fallback_engine_test.go index 35c7889f363..91c4cfb85a4 100644 --- a/pkg/streamingpromql/compat/fallback_engine_test.go +++ b/pkg/streamingpromql/compat/fallback_engine_test.go @@ -20,14 +20,13 @@ import ( ) func TestEngineWithFallback(t *testing.T) { - ctx := context.Background() logger := log.NewNopLogger() - generators := map[string]func(engine promql.QueryEngine, expr string) (promql.Query, error){ - "instant query": func(engine promql.QueryEngine, expr string) (promql.Query, error) { + generators := map[string]func(ctx context.Context, engine promql.QueryEngine, expr string) (promql.Query, error){ + "instant query": func(ctx context.Context, engine promql.QueryEngine, expr string) (promql.Query, error) { return engine.NewInstantQuery(ctx, nil, nil, expr, time.Now()) }, - "range query": func(engine promql.QueryEngine, expr string) (promql.Query, error) { + "range query": func(ctx context.Context, engine promql.QueryEngine, expr string) (promql.Query, error) { return engine.NewRangeQuery(ctx, nil, nil, expr, time.Now(), time.Now().Add(-time.Minute), time.Second) }, } @@ -35,12 +34,13 @@ func TestEngineWithFallback(t *testing.T) { for name, createQuery := range generators { t.Run(name, func(t *testing.T) { t.Run("should not fall back for supported expressions", func(t *testing.T) { + ctx := context.Background() reg := prometheus.NewPedanticRegistry() preferredEngine := newFakeEngineThatSupportsLimitedQueries() fallbackEngine := newFakeEngineThatSupportsAllQueries() engineWithFallback := NewEngineWithFallback(preferredEngine, fallbackEngine, reg, logger) - query, err := createQuery(engineWithFallback, "a_supported_expression") + query, err := createQuery(ctx, engineWithFallback, "a_supported_expression") require.NoError(t, err) require.Equal(t, preferredEngine.query, query, "should return query from preferred engine") require.False(t, fallbackEngine.wasCalled, "should not call fallback engine if expression is supported by preferred engine") @@ -53,12 +53,13 @@ func TestEngineWithFallback(t *testing.T) { }) t.Run("should fall back for unsupported expressions", func(t *testing.T) { + ctx := context.Background() reg := prometheus.NewPedanticRegistry() preferredEngine := newFakeEngineThatSupportsLimitedQueries() fallbackEngine := newFakeEngineThatSupportsAllQueries() engineWithFallback := NewEngineWithFallback(preferredEngine, fallbackEngine, reg, logger) - query, err := createQuery(engineWithFallback, "a_non_supported_expression") + query, err := createQuery(ctx, engineWithFallback, "a_non_supported_expression") require.NoError(t, err) require.Equal(t, fallbackEngine.query, query, "should return query from fallback engine if expression is not supported by preferred engine") @@ -73,15 +74,37 @@ func TestEngineWithFallback(t *testing.T) { }) t.Run("should not fall back if creating query fails for another reason", func(t *testing.T) { + ctx := context.Background() reg := prometheus.NewPedanticRegistry() preferredEngine := newFakeEngineThatSupportsLimitedQueries() fallbackEngine := newFakeEngineThatSupportsAllQueries() engineWithFallback := NewEngineWithFallback(preferredEngine, fallbackEngine, reg, logger) - _, err := createQuery(engineWithFallback, "an_invalid_expression") + _, err := createQuery(ctx, engineWithFallback, "an_invalid_expression") require.EqualError(t, err, "the query is invalid") require.False(t, fallbackEngine.wasCalled, "should not call fallback engine if creating query fails for another reason") }) + + t.Run("should fall back if falling back has been explicitly requested, even if the expression is supported", func(t *testing.T) { + ctx := withForceFallbackEnabled(context.Background()) + reg := prometheus.NewPedanticRegistry() + preferredEngine := newFakeEngineThatSupportsLimitedQueries() + fallbackEngine := newFakeEngineThatSupportsAllQueries() + engineWithFallback := NewEngineWithFallback(preferredEngine, fallbackEngine, reg, logger) + + query, err := createQuery(ctx, engineWithFallback, "a_supported_expression") + require.NoError(t, err) + require.Equal(t, fallbackEngine.query, query, "should return query from fallback engine if expression is not supported by preferred engine") + + require.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_mimir_query_engine_supported_queries_total Total number of queries that were supported by the Mimir query engine. + # TYPE cortex_mimir_query_engine_supported_queries_total counter + cortex_mimir_query_engine_supported_queries_total 0 + # HELP cortex_mimir_query_engine_unsupported_queries_total Total number of queries that were not supported by the Mimir query engine and so fell back to Prometheus' engine. + # TYPE cortex_mimir_query_engine_unsupported_queries_total counter + cortex_mimir_query_engine_unsupported_queries_total{reason="fallback forced by HTTP header"} 1 + `), "cortex_mimir_query_engine_supported_queries_total", "cortex_mimir_query_engine_unsupported_queries_total")) + }) }) } } diff --git a/pkg/streamingpromql/compat/fallback_header.go b/pkg/streamingpromql/compat/fallback_header.go new file mode 100644 index 00000000000..844c6403393 --- /dev/null +++ b/pkg/streamingpromql/compat/fallback_header.go @@ -0,0 +1,40 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package compat + +import ( + "context" + "fmt" + "net/http" +) + +type engineFallbackContextKey int + +const forceFallbackEnabledContextKey = engineFallbackContextKey(0) +const forceFallbackHeaderName = "X-Mimir-Force-Prometheus-Engine" + +type EngineFallbackInjector struct{} + +func (i EngineFallbackInjector) Wrap(handler http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if value := r.Header.Get(forceFallbackHeaderName); value != "" { + if value != "true" { + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(fmt.Sprintf("invalid value '%s' for '%s' header, must be exactly 'true' or not set", value, forceFallbackHeaderName))) + return + } + + r = r.WithContext(withForceFallbackEnabled(r.Context())) + } + + handler.ServeHTTP(w, r) + }) +} + +func withForceFallbackEnabled(ctx context.Context) context.Context { + return context.WithValue(ctx, forceFallbackEnabledContextKey, true) +} + +func isForceFallbackEnabled(ctx context.Context) bool { + return ctx.Value(forceFallbackEnabledContextKey) != nil +} diff --git a/pkg/streamingpromql/compat/fallback_header_test.go b/pkg/streamingpromql/compat/fallback_header_test.go new file mode 100644 index 00000000000..363cf527eee --- /dev/null +++ b/pkg/streamingpromql/compat/fallback_header_test.go @@ -0,0 +1,70 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package compat + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestEngineFallbackInjector(t *testing.T) { + testCases := map[string]struct { + headers http.Header + + forceFallback bool + expectError bool + }{ + "no headers": { + headers: http.Header{}, + forceFallback: false, + }, + "unrelated header": { + headers: http.Header{ + "Content-Type": []string{"application/blah"}, + }, + forceFallback: false, + }, + "force fallback header is present, but does not have expected value": { + headers: http.Header{ + "X-Mimir-Force-Prometheus-Engine": []string{"blah"}, + }, + expectError: true, + }, + "force fallback header is present, and does have expected value": { + headers: http.Header{ + "X-Mimir-Force-Prometheus-Engine": []string{"true"}, + }, + forceFallback: true, + }, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + injector := EngineFallbackInjector{} + handlerCalled := false + handler := injector.Wrap(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + handlerCalled = true + require.Equal(t, testCase.forceFallback, isForceFallbackEnabled(req.Context())) + w.WriteHeader(http.StatusOK) + })) + + req, err := http.NewRequest(http.MethodGet, "/blah", nil) + require.NoError(t, err) + req.Header = testCase.headers + + resp := httptest.NewRecorder() + handler.ServeHTTP(resp, req) + + if testCase.expectError { + require.False(t, handlerCalled) + require.Equal(t, http.StatusBadRequest, resp.Code) + } else { + require.True(t, handlerCalled) + require.Equal(t, http.StatusOK, resp.Code) + } + }) + } +} From ffecb623f6088ef18a9034d67b40a1b4c78dcd9e Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Fri, 21 Jun 2024 14:36:14 +1000 Subject: [PATCH 2/7] Propagate header from query-frontend to querier --- pkg/frontend/querymiddleware/codec.go | 10 ++++++++++ pkg/streamingpromql/compat/fallback_header.go | 6 +++--- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/pkg/frontend/querymiddleware/codec.go b/pkg/frontend/querymiddleware/codec.go index 23818b3ab00..48bdc5c9fd4 100644 --- a/pkg/frontend/querymiddleware/codec.go +++ b/pkg/frontend/querymiddleware/codec.go @@ -35,6 +35,7 @@ import ( apierror "github.com/grafana/mimir/pkg/api/error" "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/querier/api" + "github.com/grafana/mimir/pkg/streamingpromql/compat" "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/spanlogger" ) @@ -579,6 +580,15 @@ func (c prometheusCodec) EncodeMetricsQueryRequest(ctx context.Context, r Metric req.Header.Add(api.ReadConsistencyHeader, consistency) } + for _, h := range r.GetHeaders() { + if h.Name == compat.ForceFallbackHeaderName { + for _, v := range h.Values { + // There should only be one value, but add all of them for completeness. + req.Header.Add(compat.ForceFallbackHeaderName, v) + } + } + } + return req.WithContext(ctx), nil } diff --git a/pkg/streamingpromql/compat/fallback_header.go b/pkg/streamingpromql/compat/fallback_header.go index 844c6403393..bfa9a529995 100644 --- a/pkg/streamingpromql/compat/fallback_header.go +++ b/pkg/streamingpromql/compat/fallback_header.go @@ -11,16 +11,16 @@ import ( type engineFallbackContextKey int const forceFallbackEnabledContextKey = engineFallbackContextKey(0) -const forceFallbackHeaderName = "X-Mimir-Force-Prometheus-Engine" +const ForceFallbackHeaderName = "X-Mimir-Force-Prometheus-Engine" type EngineFallbackInjector struct{} func (i EngineFallbackInjector) Wrap(handler http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if value := r.Header.Get(forceFallbackHeaderName); value != "" { + if value := r.Header.Get(ForceFallbackHeaderName); value != "" { if value != "true" { w.WriteHeader(http.StatusBadRequest) - _, _ = w.Write([]byte(fmt.Sprintf("invalid value '%s' for '%s' header, must be exactly 'true' or not set", value, forceFallbackHeaderName))) + _, _ = w.Write([]byte(fmt.Sprintf("invalid value '%s' for '%s' header, must be exactly 'true' or not set", value, ForceFallbackHeaderName))) return } From 4b239de8f251e545e195d31501f851177a1b5796 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Fri, 21 Jun 2024 14:49:08 +1000 Subject: [PATCH 3/7] Send a Prometheus API-style JSON-encoded response when an invalid header value is received --- pkg/api/error/error.go | 44 ++++++++++--------- pkg/api/error/error_test.go | 8 ++-- pkg/streamingpromql/compat/fallback_header.go | 12 ++++- .../compat/fallback_header_test.go | 17 ++++--- 4 files changed, 49 insertions(+), 32 deletions(-) diff --git a/pkg/api/error/error.go b/pkg/api/error/error.go index 6a6b4ca9d15..848d114242c 100644 --- a/pkg/api/error/error.go +++ b/pkg/api/error/error.go @@ -31,17 +31,17 @@ const ( TypeNotAcceptable Type = "not_acceptable" ) -type apiError struct { +type ApiError struct { Type Type Message string } -func (e *apiError) Error() string { +func (e *ApiError) Error() string { return e.Message } // adapted from https://github.com/prometheus/prometheus/blob/fdbc40a9efcc8197a94f23f0e479b0b56e52d424/web/api/v1/api.go#L1508-L1521 -func (e *apiError) statusCode() int { +func (e *ApiError) StatusCode() int { switch e.Type { case TypeBadData: return http.StatusBadRequest @@ -67,30 +67,34 @@ func (e *apiError) statusCode() int { return http.StatusInternalServerError } -// HTTPResponseFromError converts an apiError into a JSON HTTP response -func HTTPResponseFromError(err error) (*httpgrpc.HTTPResponse, bool) { - var apiErr *apiError - if !errors.As(err, &apiErr) { - return nil, false - } - - body, err := json.Marshal( +func (e *ApiError) EncodeJSON() ([]byte, error) { + return json.Marshal( struct { Status string `json:"status"` ErrorType Type `json:"errorType,omitempty"` Error string `json:"error,omitempty"` }{ Status: "error", - Error: apiErr.Message, - ErrorType: apiErr.Type, + Error: e.Message, + ErrorType: e.Type, }, ) +} + +// HTTPResponseFromError converts an ApiError into a JSON HTTP response +func HTTPResponseFromError(err error) (*httpgrpc.HTTPResponse, bool) { + var apiErr *ApiError + if !errors.As(err, &apiErr) { + return nil, false + } + + body, err := apiErr.EncodeJSON() if err != nil { return nil, false } return &httpgrpc.HTTPResponse{ - Code: int32(apiErr.statusCode()), + Code: int32(apiErr.StatusCode()), Body: body, Headers: []*httpgrpc.Header{ {Key: "Content-Type", Values: []string{"application/json"}}, @@ -99,29 +103,29 @@ func HTTPResponseFromError(err error) (*httpgrpc.HTTPResponse, bool) { } // New creates a new apiError with a static string message -func New(typ Type, msg string) error { - return &apiError{ +func New(typ Type, msg string) *ApiError { + return &ApiError{ Message: msg, Type: typ, } } // Newf creates a new apiError with a formatted message -func Newf(typ Type, tmpl string, args ...interface{}) error { +func Newf(typ Type, tmpl string, args ...interface{}) *ApiError { return New(typ, fmt.Sprintf(tmpl, args...)) } // IsAPIError returns true if the error provided is an apiError. // This implies that HTTPResponseFromError will succeed. func IsAPIError(err error) bool { - apiErr := &apiError{} + apiErr := &ApiError{} return errors.As(err, &apiErr) } // AddDetails adds details to an existing apiError, but keeps the type and handling. // If the error is not an apiError, it will wrap the error with the details. func AddDetails(err error, details string) error { - apiErr := &apiError{} + apiErr := &ApiError{} if !errors.As(err, &apiErr) { return errors.Wrap(err, details) } @@ -131,7 +135,7 @@ func AddDetails(err error, details string) error { // IsNonRetryableAPIError returns true if err is an apiError which should be failed and not retried. func IsNonRetryableAPIError(err error) bool { - apiErr := &apiError{} + apiErr := &ApiError{} // Reasoning: // TypeNone and TypeNotFound are not used anywhere in Mimir nor Prometheus; // TypeTimeout, TypeTooManyRequests, TypeNotAcceptable, TypeUnavailable we presume a retry of the same request will fail in the same way. diff --git a/pkg/api/error/error_test.go b/pkg/api/error/error_test.go index 0e0bd217790..dd86557e0af 100644 --- a/pkg/api/error/error_test.go +++ b/pkg/api/error/error_test.go @@ -17,15 +17,15 @@ func TestAllPrometheusErrorTypeValues(t *testing.T) { for _, prometheusErrorTypeString := range prometheusErrorTypeStrings { errorType := Type(prometheusErrorTypeString) - apiError := New(errorType, "").(*apiError) + apiError := New(errorType, "") if errorType == TypeUnavailable { - require.Equal(t, http.StatusServiceUnavailable, apiError.statusCode()) + require.Equal(t, http.StatusServiceUnavailable, apiError.StatusCode()) } else if errorType == TypeInternal || errorType == TypeNone { - require.Equal(t, http.StatusInternalServerError, apiError.statusCode()) + require.Equal(t, http.StatusInternalServerError, apiError.StatusCode()) } else { // If this assertion fails, it probably means a new error type has been added to Prometheus' API. - require.NotEqual(t, http.StatusInternalServerError, apiError.statusCode(), "unrecognised Prometheus error type constant '%s'", prometheusErrorTypeString) + require.NotEqual(t, http.StatusInternalServerError, apiError.StatusCode(), "unrecognised Prometheus error type constant '%s'", prometheusErrorTypeString) } } } diff --git a/pkg/streamingpromql/compat/fallback_header.go b/pkg/streamingpromql/compat/fallback_header.go index bfa9a529995..dba8e412c32 100644 --- a/pkg/streamingpromql/compat/fallback_header.go +++ b/pkg/streamingpromql/compat/fallback_header.go @@ -4,8 +4,9 @@ package compat import ( "context" - "fmt" "net/http" + + apierror "github.com/grafana/mimir/pkg/api/error" ) type engineFallbackContextKey int @@ -19,8 +20,15 @@ func (i EngineFallbackInjector) Wrap(handler http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if value := r.Header.Get(ForceFallbackHeaderName); value != "" { if value != "true" { + // Send a Prometheus API-style JSON error response. + w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusBadRequest) - _, _ = w.Write([]byte(fmt.Sprintf("invalid value '%s' for '%s' header, must be exactly 'true' or not set", value, ForceFallbackHeaderName))) + e := apierror.Newf(apierror.TypeBadData, "invalid value '%s' for '%s' header, must be exactly 'true' or not set", value, ForceFallbackHeaderName) + + if body, err := e.EncodeJSON(); err == nil { + _, _ = w.Write(body) + } + return } diff --git a/pkg/streamingpromql/compat/fallback_header_test.go b/pkg/streamingpromql/compat/fallback_header_test.go index 363cf527eee..1fdc9120102 100644 --- a/pkg/streamingpromql/compat/fallback_header_test.go +++ b/pkg/streamingpromql/compat/fallback_header_test.go @@ -15,7 +15,7 @@ func TestEngineFallbackInjector(t *testing.T) { headers http.Header forceFallback bool - expectError bool + expectedError string }{ "no headers": { headers: http.Header{}, @@ -31,7 +31,7 @@ func TestEngineFallbackInjector(t *testing.T) { headers: http.Header{ "X-Mimir-Force-Prometheus-Engine": []string{"blah"}, }, - expectError: true, + expectedError: "invalid value 'blah' for 'X-Mimir-Force-Prometheus-Engine' header, must be exactly 'true' or not set", }, "force fallback header is present, and does have expected value": { headers: http.Header{ @@ -58,12 +58,17 @@ func TestEngineFallbackInjector(t *testing.T) { resp := httptest.NewRecorder() handler.ServeHTTP(resp, req) - if testCase.expectError { - require.False(t, handlerCalled) - require.Equal(t, http.StatusBadRequest, resp.Code) - } else { + if testCase.expectedError == "" { require.True(t, handlerCalled) require.Equal(t, http.StatusOK, resp.Code) + } else { + require.False(t, handlerCalled) + require.Equal(t, http.StatusBadRequest, resp.Code) + require.Equal(t, "application/json", resp.Header().Get("Content-Type")) + + body := resp.Body.String() + expectedBody := `{"status": "error", "errorType": "bad_data", "error": "` + testCase.expectedError + `"}` + require.JSONEq(t, expectedBody, body) } }) } From d5d5fad805423f1f52b7339441d9030094ff7604 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Fri, 21 Jun 2024 14:52:14 +1000 Subject: [PATCH 4/7] Add changelog entry --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 550fb08fe4e..560aff52ed5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,7 +25,7 @@ * [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747 * [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653 * [FEATURE] New `-.s3.bucket-lookup-type` flag configures lookup style type, used to access bucket in s3 compatible providers. #7684 -* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=mimir`. #7693 #7898 #7899 #8023 #8058 #8096 #8121 #8197 #8230 #8247 #8270 #8276 #8277 #8291 #8303 #8340 #8256 #8348 #8422 #8430 +* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=mimir`. #7693 #7898 #7899 #8023 #8058 #8096 #8121 #8197 #8230 #8247 #8270 #8276 #8277 #8291 #8303 #8340 #8256 #8348 #8422 #8430 #8454 * [FEATURE] New `/ingester/unregister-on-shutdown` HTTP endpoint allows dynamic access to ingesters' `-ingester.ring.unregister-on-shutdown` configuration. #7739 * [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698 * [FEATURE] mimirtool: Add `runtime-config verify` sub-command, for verifying Mimir runtime config files. #8123 From 0356324cd0b361c0b454bcbf8854eb1b1b317beb Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Fri, 21 Jun 2024 15:21:28 +1000 Subject: [PATCH 5/7] Rename type to make linter happy --- pkg/api/error/error.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/api/error/error.go b/pkg/api/error/error.go index 848d114242c..c30947b0c40 100644 --- a/pkg/api/error/error.go +++ b/pkg/api/error/error.go @@ -31,17 +31,17 @@ const ( TypeNotAcceptable Type = "not_acceptable" ) -type ApiError struct { +type APIError struct { Type Type Message string } -func (e *ApiError) Error() string { +func (e *APIError) Error() string { return e.Message } // adapted from https://github.com/prometheus/prometheus/blob/fdbc40a9efcc8197a94f23f0e479b0b56e52d424/web/api/v1/api.go#L1508-L1521 -func (e *ApiError) StatusCode() int { +func (e *APIError) StatusCode() int { switch e.Type { case TypeBadData: return http.StatusBadRequest @@ -67,7 +67,7 @@ func (e *ApiError) StatusCode() int { return http.StatusInternalServerError } -func (e *ApiError) EncodeJSON() ([]byte, error) { +func (e *APIError) EncodeJSON() ([]byte, error) { return json.Marshal( struct { Status string `json:"status"` @@ -81,9 +81,9 @@ func (e *ApiError) EncodeJSON() ([]byte, error) { ) } -// HTTPResponseFromError converts an ApiError into a JSON HTTP response +// HTTPResponseFromError converts an APIError into a JSON HTTP response func HTTPResponseFromError(err error) (*httpgrpc.HTTPResponse, bool) { - var apiErr *ApiError + var apiErr *APIError if !errors.As(err, &apiErr) { return nil, false } @@ -103,29 +103,29 @@ func HTTPResponseFromError(err error) (*httpgrpc.HTTPResponse, bool) { } // New creates a new apiError with a static string message -func New(typ Type, msg string) *ApiError { - return &ApiError{ +func New(typ Type, msg string) *APIError { + return &APIError{ Message: msg, Type: typ, } } // Newf creates a new apiError with a formatted message -func Newf(typ Type, tmpl string, args ...interface{}) *ApiError { +func Newf(typ Type, tmpl string, args ...interface{}) *APIError { return New(typ, fmt.Sprintf(tmpl, args...)) } // IsAPIError returns true if the error provided is an apiError. // This implies that HTTPResponseFromError will succeed. func IsAPIError(err error) bool { - apiErr := &ApiError{} + apiErr := &APIError{} return errors.As(err, &apiErr) } // AddDetails adds details to an existing apiError, but keeps the type and handling. // If the error is not an apiError, it will wrap the error with the details. func AddDetails(err error, details string) error { - apiErr := &ApiError{} + apiErr := &APIError{} if !errors.As(err, &apiErr) { return errors.Wrap(err, details) } @@ -135,7 +135,7 @@ func AddDetails(err error, details string) error { // IsNonRetryableAPIError returns true if err is an apiError which should be failed and not retried. func IsNonRetryableAPIError(err error) bool { - apiErr := &ApiError{} + apiErr := &APIError{} // Reasoning: // TypeNone and TypeNotFound are not used anywhere in Mimir nor Prometheus; // TypeTimeout, TypeTooManyRequests, TypeNotAcceptable, TypeUnavailable we presume a retry of the same request will fail in the same way. From ef553903f53e5970415f63a4cfb255c298631083 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Fri, 21 Jun 2024 16:22:00 +1000 Subject: [PATCH 6/7] Address PR feedback: make name clearer --- pkg/streamingpromql/compat/fallback_header_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/streamingpromql/compat/fallback_header_test.go b/pkg/streamingpromql/compat/fallback_header_test.go index 1fdc9120102..5c5fdb97148 100644 --- a/pkg/streamingpromql/compat/fallback_header_test.go +++ b/pkg/streamingpromql/compat/fallback_header_test.go @@ -14,18 +14,18 @@ func TestEngineFallbackInjector(t *testing.T) { testCases := map[string]struct { headers http.Header - forceFallback bool - expectedError string + expectFallback bool + expectedError string }{ "no headers": { - headers: http.Header{}, - forceFallback: false, + headers: http.Header{}, + expectFallback: false, }, "unrelated header": { headers: http.Header{ "Content-Type": []string{"application/blah"}, }, - forceFallback: false, + expectFallback: false, }, "force fallback header is present, but does not have expected value": { headers: http.Header{ @@ -37,7 +37,7 @@ func TestEngineFallbackInjector(t *testing.T) { headers: http.Header{ "X-Mimir-Force-Prometheus-Engine": []string{"true"}, }, - forceFallback: true, + expectFallback: true, }, } @@ -47,7 +47,7 @@ func TestEngineFallbackInjector(t *testing.T) { handlerCalled := false handler := injector.Wrap(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { handlerCalled = true - require.Equal(t, testCase.forceFallback, isForceFallbackEnabled(req.Context())) + require.Equal(t, testCase.expectFallback, isForceFallbackEnabled(req.Context())) w.WriteHeader(http.StatusOK) })) From 28c4bc2fb0813a6be90d1f9004287e9ef192931f Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Fri, 21 Jun 2024 16:24:55 +1000 Subject: [PATCH 7/7] Address PR feedback: don't awkwardly use NotSupportedError --- pkg/streamingpromql/compat/fallback_engine.go | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/pkg/streamingpromql/compat/fallback_engine.go b/pkg/streamingpromql/compat/fallback_engine.go index 7e70e287055..504aa31cf6e 100644 --- a/pkg/streamingpromql/compat/fallback_engine.go +++ b/pkg/streamingpromql/compat/fallback_engine.go @@ -27,7 +27,7 @@ type EngineWithFallback struct { logger log.Logger } -var errFallbackForcedByHTTPHeader = NotSupportedError{"fallback forced by HTTP header"} +const fallbackForcedByHTTPHeader = "fallback forced by HTTP header" func NewEngineWithFallback(preferred, fallback promql.QueryEngine, reg prometheus.Registerer, logger log.Logger) promql.QueryEngine { return &EngineWithFallback{ @@ -48,10 +48,10 @@ func NewEngineWithFallback(preferred, fallback promql.QueryEngine, reg prometheu } func (e EngineWithFallback) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) { - notSupportedErr := NotSupportedError{} + reason := "" if isForceFallbackEnabled(ctx) { - notSupportedErr = errFallbackForcedByHTTPHeader + reason = fallbackForcedByHTTPHeader } else { query, err := e.preferred.NewInstantQuery(ctx, q, opts, qs, ts) @@ -60,24 +60,27 @@ func (e EngineWithFallback) NewInstantQuery(ctx context.Context, q storage.Query return query, nil } + notSupportedErr := NotSupportedError{} if !errors.As(err, ¬SupportedErr) { // Don't bother trying the fallback engine if we failed for a reason other than the expression not being supported. return nil, err } + + reason = notSupportedErr.reason } logger := spanlogger.FromContext(ctx, e.logger) - level.Info(logger).Log("msg", "falling back to Prometheus' PromQL engine", "reason", notSupportedErr.reason, "expr", qs) - e.unsupportedQueries.WithLabelValues(notSupportedErr.reason).Inc() + level.Info(logger).Log("msg", "falling back to Prometheus' PromQL engine", "reason", reason, "expr", qs) + e.unsupportedQueries.WithLabelValues(reason).Inc() return e.fallback.NewInstantQuery(ctx, q, opts, qs, ts) } func (e EngineWithFallback) NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) { - notSupportedErr := NotSupportedError{} + reason := "" if isForceFallbackEnabled(ctx) { - notSupportedErr = errFallbackForcedByHTTPHeader + reason = fallbackForcedByHTTPHeader } else { query, err := e.preferred.NewRangeQuery(ctx, q, opts, qs, start, end, interval) @@ -86,15 +89,18 @@ func (e EngineWithFallback) NewRangeQuery(ctx context.Context, q storage.Queryab return query, nil } + notSupportedErr := NotSupportedError{} if !errors.As(err, ¬SupportedErr) { // Don't bother trying the fallback engine if we failed for a reason other than the expression not being supported. return nil, err } + + reason = notSupportedErr.reason } logger := spanlogger.FromContext(ctx, e.logger) - level.Info(logger).Log("msg", "falling back to Prometheus' PromQL engine", "reason", notSupportedErr.reason, "expr", qs) - e.unsupportedQueries.WithLabelValues(notSupportedErr.reason).Inc() + level.Info(logger).Log("msg", "falling back to Prometheus' PromQL engine", "reason", reason, "expr", qs) + e.unsupportedQueries.WithLabelValues(reason).Inc() return e.fallback.NewRangeQuery(ctx, q, opts, qs, start, end, interval) }