Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mimir query engine: add HTTP header to force use of Prometheus' engine #8454

Merged
merged 8 commits into from
Jun 21, 2024
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747
* [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653
* [FEATURE] New `-<prefix>.s3.bucket-lookup-type` flag configures lookup style type, used to access bucket in s3 compatible providers. #7684
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #7693 #7898 #7899 #8023 #8058 #8096 #8121 #8197 #8230 #8247 #8270 #8276 #8277 #8291 #8303 #8340 #8256 #8348 #8422 #8430 #8455
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #7693 #7898 #7899 #8023 #8058 #8096 #8121 #8197 #8230 #8247 #8270 #8276 #8277 #8291 #8303 #8340 #8256 #8348 #8422 #8430 #8454 #8455
* [FEATURE] New `/ingester/unregister-on-shutdown` HTTP endpoint allows dynamic access to ingesters' `-ingester.ring.unregister-on-shutdown` configuration. #7739
* [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698
* [FEATURE] mimirtool: Add `runtime-config verify` sub-command, for verifying Mimir runtime config files. #8123
Expand Down
44 changes: 24 additions & 20 deletions pkg/api/error/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,17 @@ const (
TypeNotAcceptable Type = "not_acceptable"
)

type apiError struct {
type APIError struct {
Type Type
Message string
}

func (e *apiError) Error() string {
func (e *APIError) Error() string {
return e.Message
}

// adapted from https://github.com/prometheus/prometheus/blob/fdbc40a9efcc8197a94f23f0e479b0b56e52d424/web/api/v1/api.go#L1508-L1521
func (e *apiError) statusCode() int {
func (e *APIError) StatusCode() int {
switch e.Type {
case TypeBadData:
return http.StatusBadRequest
Expand All @@ -67,30 +67,34 @@ func (e *apiError) statusCode() int {
return http.StatusInternalServerError
}

// HTTPResponseFromError converts an apiError into a JSON HTTP response
func HTTPResponseFromError(err error) (*httpgrpc.HTTPResponse, bool) {
var apiErr *apiError
if !errors.As(err, &apiErr) {
return nil, false
}

body, err := json.Marshal(
func (e *APIError) EncodeJSON() ([]byte, error) {
return json.Marshal(
struct {
Status string `json:"status"`
ErrorType Type `json:"errorType,omitempty"`
Error string `json:"error,omitempty"`
}{
Status: "error",
Error: apiErr.Message,
ErrorType: apiErr.Type,
Error: e.Message,
ErrorType: e.Type,
},
)
}

// HTTPResponseFromError converts an APIError into a JSON HTTP response
func HTTPResponseFromError(err error) (*httpgrpc.HTTPResponse, bool) {
var apiErr *APIError
if !errors.As(err, &apiErr) {
return nil, false
}

body, err := apiErr.EncodeJSON()
if err != nil {
return nil, false
}

return &httpgrpc.HTTPResponse{
Code: int32(apiErr.statusCode()),
Code: int32(apiErr.StatusCode()),
Body: body,
Headers: []*httpgrpc.Header{
{Key: "Content-Type", Values: []string{"application/json"}},
Expand All @@ -99,29 +103,29 @@ func HTTPResponseFromError(err error) (*httpgrpc.HTTPResponse, bool) {
}

// New creates a new apiError with a static string message
func New(typ Type, msg string) error {
return &apiError{
func New(typ Type, msg string) *APIError {
return &APIError{
Message: msg,
Type: typ,
}
}

// Newf creates a new apiError with a formatted message
func Newf(typ Type, tmpl string, args ...interface{}) error {
func Newf(typ Type, tmpl string, args ...interface{}) *APIError {
return New(typ, fmt.Sprintf(tmpl, args...))
}

// IsAPIError returns true if the error provided is an apiError.
// This implies that HTTPResponseFromError will succeed.
func IsAPIError(err error) bool {
apiErr := &apiError{}
apiErr := &APIError{}
return errors.As(err, &apiErr)
}

// AddDetails adds details to an existing apiError, but keeps the type and handling.
// If the error is not an apiError, it will wrap the error with the details.
func AddDetails(err error, details string) error {
apiErr := &apiError{}
apiErr := &APIError{}
if !errors.As(err, &apiErr) {
return errors.Wrap(err, details)
}
Expand All @@ -131,7 +135,7 @@ func AddDetails(err error, details string) error {

// IsNonRetryableAPIError returns true if err is an apiError which should be failed and not retried.
func IsNonRetryableAPIError(err error) bool {
apiErr := &apiError{}
apiErr := &APIError{}
// Reasoning:
// TypeNone and TypeNotFound are not used anywhere in Mimir nor Prometheus;
// TypeTimeout, TypeTooManyRequests, TypeNotAcceptable, TypeUnavailable we presume a retry of the same request will fail in the same way.
Expand Down
8 changes: 4 additions & 4 deletions pkg/api/error/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ func TestAllPrometheusErrorTypeValues(t *testing.T) {

for _, prometheusErrorTypeString := range prometheusErrorTypeStrings {
errorType := Type(prometheusErrorTypeString)
apiError := New(errorType, "").(*apiError)
apiError := New(errorType, "")

if errorType == TypeUnavailable {
require.Equal(t, http.StatusServiceUnavailable, apiError.statusCode())
require.Equal(t, http.StatusServiceUnavailable, apiError.StatusCode())
} else if errorType == TypeInternal || errorType == TypeNone {
require.Equal(t, http.StatusInternalServerError, apiError.statusCode())
require.Equal(t, http.StatusInternalServerError, apiError.StatusCode())
} else {
// If this assertion fails, it probably means a new error type has been added to Prometheus' API.
require.NotEqual(t, http.StatusInternalServerError, apiError.statusCode(), "unrecognised Prometheus error type constant '%s'", prometheusErrorTypeString)
require.NotEqual(t, http.StatusInternalServerError, apiError.StatusCode(), "unrecognised Prometheus error type constant '%s'", prometheusErrorTypeString)
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/grafana/mimir/pkg/querier"
querierapi "github.com/grafana/mimir/pkg/querier/api"
"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/streamingpromql/compat"
"github.com/grafana/mimir/pkg/usagestats"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/validation"
Expand Down Expand Up @@ -285,7 +286,8 @@ func NewQuerierHandler(

router := mux.NewRouter()
routeInjector := middleware.RouteInjector{RouteMatcher: router}
router.Use(routeInjector.Wrap)
fallbackInjector := compat.EngineFallbackInjector{}
router.Use(routeInjector.Wrap, fallbackInjector.Wrap)

// Use a separate metric for the querier in order to differentiate requests from the query-frontend when
// running Mimir in monolithic mode.
Expand Down
10 changes: 10 additions & 0 deletions pkg/frontend/querymiddleware/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
apierror "github.com/grafana/mimir/pkg/api/error"
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/querier/api"
"github.com/grafana/mimir/pkg/streamingpromql/compat"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/spanlogger"
)
Expand Down Expand Up @@ -579,6 +580,15 @@ func (c prometheusCodec) EncodeMetricsQueryRequest(ctx context.Context, r Metric
req.Header.Add(api.ReadConsistencyHeader, consistency)
}

for _, h := range r.GetHeaders() {
if h.Name == compat.ForceFallbackHeaderName {
for _, v := range h.Values {
// There should only be one value, but add all of them for completeness.
req.Header.Add(compat.ForceFallbackHeaderName, v)
}
}
}

return req.WithContext(ctx), nil
}

Expand Down
62 changes: 40 additions & 22 deletions pkg/streamingpromql/compat/fallback_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type EngineWithFallback struct {
logger log.Logger
}

const fallbackForcedByHTTPHeader = "fallback forced by HTTP header"

func NewEngineWithFallback(preferred, fallback promql.QueryEngine, reg prometheus.Registerer, logger log.Logger) promql.QueryEngine {
return &EngineWithFallback{
preferred: preferred,
Expand All @@ -46,43 +48,59 @@ func NewEngineWithFallback(preferred, fallback promql.QueryEngine, reg prometheu
}

func (e EngineWithFallback) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
query, err := e.preferred.NewInstantQuery(ctx, q, opts, qs, ts)
reason := ""

if err == nil {
e.supportedQueries.Inc()
return query, nil
}
if isForceFallbackEnabled(ctx) {
reason = fallbackForcedByHTTPHeader
} else {
query, err := e.preferred.NewInstantQuery(ctx, q, opts, qs, ts)

if err == nil {
e.supportedQueries.Inc()
return query, nil
}

notSupportedErr := NotSupportedError{}
if !errors.As(err, &notSupportedErr) {
// Don't bother trying the fallback engine if we failed for a reason other than the expression not being supported.
return nil, err
}

notSupportedErr := NotSupportedError{}
if !errors.As(err, &notSupportedErr) {
// Don't bother trying the fallback engine if we failed for a reason other than the expression not being supported.
return nil, err
reason = notSupportedErr.reason
}

logger := spanlogger.FromContext(ctx, e.logger)
level.Info(logger).Log("msg", "falling back to Prometheus' PromQL engine", "reason", notSupportedErr.reason, "expr", qs)
e.unsupportedQueries.WithLabelValues(notSupportedErr.reason).Inc()
level.Info(logger).Log("msg", "falling back to Prometheus' PromQL engine", "reason", reason, "expr", qs)
e.unsupportedQueries.WithLabelValues(reason).Inc()

return e.fallback.NewInstantQuery(ctx, q, opts, qs, ts)
}

func (e EngineWithFallback) NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) {
query, err := e.preferred.NewRangeQuery(ctx, q, opts, qs, start, end, interval)
reason := ""

if err == nil {
e.supportedQueries.Inc()
return query, nil
}
if isForceFallbackEnabled(ctx) {
reason = fallbackForcedByHTTPHeader
} else {
query, err := e.preferred.NewRangeQuery(ctx, q, opts, qs, start, end, interval)

if err == nil {
e.supportedQueries.Inc()
return query, nil
}

notSupportedErr := NotSupportedError{}
if !errors.As(err, &notSupportedErr) {
// Don't bother trying the fallback engine if we failed for a reason other than the expression not being supported.
return nil, err
}

notSupportedErr := NotSupportedError{}
if !errors.As(err, &notSupportedErr) {
// Don't bother trying the fallback engine if we failed for a reason other than the expression not being supported.
return nil, err
reason = notSupportedErr.reason
}

logger := spanlogger.FromContext(ctx, e.logger)
level.Info(logger).Log("msg", "falling back to Prometheus' PromQL engine", "reason", notSupportedErr.reason, "expr", qs)
e.unsupportedQueries.WithLabelValues(notSupportedErr.reason).Inc()
level.Info(logger).Log("msg", "falling back to Prometheus' PromQL engine", "reason", reason, "expr", qs)
e.unsupportedQueries.WithLabelValues(reason).Inc()

return e.fallback.NewRangeQuery(ctx, q, opts, qs, start, end, interval)
}
37 changes: 30 additions & 7 deletions pkg/streamingpromql/compat/fallback_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,27 @@ import (
)

func TestEngineWithFallback(t *testing.T) {
ctx := context.Background()
logger := log.NewNopLogger()

generators := map[string]func(engine promql.QueryEngine, expr string) (promql.Query, error){
"instant query": func(engine promql.QueryEngine, expr string) (promql.Query, error) {
generators := map[string]func(ctx context.Context, engine promql.QueryEngine, expr string) (promql.Query, error){
"instant query": func(ctx context.Context, engine promql.QueryEngine, expr string) (promql.Query, error) {
return engine.NewInstantQuery(ctx, nil, nil, expr, time.Now())
},
"range query": func(engine promql.QueryEngine, expr string) (promql.Query, error) {
"range query": func(ctx context.Context, engine promql.QueryEngine, expr string) (promql.Query, error) {
return engine.NewRangeQuery(ctx, nil, nil, expr, time.Now(), time.Now().Add(-time.Minute), time.Second)
},
}

for name, createQuery := range generators {
t.Run(name, func(t *testing.T) {
t.Run("should not fall back for supported expressions", func(t *testing.T) {
ctx := context.Background()
reg := prometheus.NewPedanticRegistry()
preferredEngine := newFakeEngineThatSupportsLimitedQueries()
fallbackEngine := newFakeEngineThatSupportsAllQueries()
engineWithFallback := NewEngineWithFallback(preferredEngine, fallbackEngine, reg, logger)

query, err := createQuery(engineWithFallback, "a_supported_expression")
query, err := createQuery(ctx, engineWithFallback, "a_supported_expression")
require.NoError(t, err)
require.Equal(t, preferredEngine.query, query, "should return query from preferred engine")
require.False(t, fallbackEngine.wasCalled, "should not call fallback engine if expression is supported by preferred engine")
Expand All @@ -53,12 +53,13 @@ func TestEngineWithFallback(t *testing.T) {
})

t.Run("should fall back for unsupported expressions", func(t *testing.T) {
ctx := context.Background()
reg := prometheus.NewPedanticRegistry()
preferredEngine := newFakeEngineThatSupportsLimitedQueries()
fallbackEngine := newFakeEngineThatSupportsAllQueries()
engineWithFallback := NewEngineWithFallback(preferredEngine, fallbackEngine, reg, logger)

query, err := createQuery(engineWithFallback, "a_non_supported_expression")
query, err := createQuery(ctx, engineWithFallback, "a_non_supported_expression")
require.NoError(t, err)
require.Equal(t, fallbackEngine.query, query, "should return query from fallback engine if expression is not supported by preferred engine")

Expand All @@ -73,15 +74,37 @@ func TestEngineWithFallback(t *testing.T) {
})

t.Run("should not fall back if creating query fails for another reason", func(t *testing.T) {
ctx := context.Background()
reg := prometheus.NewPedanticRegistry()
preferredEngine := newFakeEngineThatSupportsLimitedQueries()
fallbackEngine := newFakeEngineThatSupportsAllQueries()
engineWithFallback := NewEngineWithFallback(preferredEngine, fallbackEngine, reg, logger)

_, err := createQuery(engineWithFallback, "an_invalid_expression")
_, err := createQuery(ctx, engineWithFallback, "an_invalid_expression")
require.EqualError(t, err, "the query is invalid")
require.False(t, fallbackEngine.wasCalled, "should not call fallback engine if creating query fails for another reason")
})

t.Run("should fall back if falling back has been explicitly requested, even if the expression is supported", func(t *testing.T) {
ctx := withForceFallbackEnabled(context.Background())
reg := prometheus.NewPedanticRegistry()
preferredEngine := newFakeEngineThatSupportsLimitedQueries()
fallbackEngine := newFakeEngineThatSupportsAllQueries()
engineWithFallback := NewEngineWithFallback(preferredEngine, fallbackEngine, reg, logger)

query, err := createQuery(ctx, engineWithFallback, "a_supported_expression")
require.NoError(t, err)
require.Equal(t, fallbackEngine.query, query, "should return query from fallback engine if expression is not supported by preferred engine")

require.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_mimir_query_engine_supported_queries_total Total number of queries that were supported by the Mimir query engine.
# TYPE cortex_mimir_query_engine_supported_queries_total counter
cortex_mimir_query_engine_supported_queries_total 0
# HELP cortex_mimir_query_engine_unsupported_queries_total Total number of queries that were not supported by the Mimir query engine and so fell back to Prometheus' engine.
# TYPE cortex_mimir_query_engine_unsupported_queries_total counter
cortex_mimir_query_engine_unsupported_queries_total{reason="fallback forced by HTTP header"} 1
`), "cortex_mimir_query_engine_supported_queries_total", "cortex_mimir_query_engine_unsupported_queries_total"))
})
})
}
}
Expand Down
Loading
Loading