From 5cdb850c70a603a3b6468ef14029561fbb4eb0b0 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Tue, 18 Jun 2024 12:11:45 +1000 Subject: [PATCH 01/15] Initialise tracing from environment variables in query-tee --- cmd/query-tee/main.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/cmd/query-tee/main.go b/cmd/query-tee/main.go index f56f4069546..f1327d8a6ef 100644 --- a/cmd/query-tee/main.go +++ b/cmd/query-tee/main.go @@ -8,13 +8,16 @@ package main import ( "flag" "fmt" + "io" "os" "github.com/go-kit/log/level" "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/log" + "github.com/grafana/dskit/tracing" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" + jaegercfg "github.com/uber/jaeger-client-go/config" "github.com/grafana/mimir/pkg/util/instrumentation" util_log "github.com/grafana/mimir/pkg/util/log" @@ -44,6 +47,10 @@ func main() { util_log.InitLogger(log.LogfmtFormat, cfg.LogLevel, false, util_log.RateLimitedLoggerCfg{}) + if closer := initTracing(); closer != nil { + defer closer.Close() + } + // Run the instrumentation server. registry := prometheus.NewRegistry() registry.MustRegister(collectors.NewGoCollector()) @@ -72,6 +79,21 @@ func main() { proxy.Await() } +func initTracing() io.Closer { + name := os.Getenv("JAEGER_SERVICE_NAME") + if name == "" { + name = "query-tee" + } + + trace, err := tracing.NewFromEnv(name, jaegercfg.MaxTagValueLength(16e3)) + if err != nil { + level.Error(util_log.Logger).Log("msg", "Failed to setup tracing", "err", err.Error()) + return nil + } + + return trace +} + func mimirReadRoutes(cfg Config) []querytee.Route { prefix := cfg.PathPrefix From 81946eb56a0ce1dff709323bd48efd08643ae77f Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Tue, 18 Jun 2024 13:31:03 +1000 Subject: [PATCH 02/15] Emit trace spans for incoming and outgoing requests --- tools/querytee/proxy_backend.go | 49 ++++++++++++++++----------- tools/querytee/proxy_backend_test.go | 3 +- tools/querytee/proxy_endpoint.go | 8 ++++- tools/querytee/proxy_endpoint_test.go | 3 +- 4 files changed, 40 insertions(+), 23 deletions(-) diff --git a/tools/querytee/proxy_backend.go b/tools/querytee/proxy_backend.go index 6459e81fe27..301c6b4cdc6 100644 --- a/tools/querytee/proxy_backend.go +++ b/tools/querytee/proxy_backend.go @@ -15,6 +15,8 @@ import ( "path" "time" + "github.com/opentracing-contrib/go-stdlib/nethttp" + "github.com/opentracing/opentracing-go" "github.com/pkg/errors" ) @@ -22,7 +24,7 @@ type ProxyBackendInterface interface { Name() string Endpoint() *url.URL Preferred() bool - ForwardRequest(orig *http.Request, body io.ReadCloser) (time.Duration, int, []byte, *http.Response, error) + ForwardRequest(ctx context.Context, orig *http.Request, body io.ReadCloser) (time.Duration, int, []byte, *http.Response, error) } // ProxyBackend holds the information of a single backend. @@ -39,6 +41,23 @@ type ProxyBackend struct { // NewProxyBackend makes a new ProxyBackend func NewProxyBackend(name string, endpoint *url.URL, timeout time.Duration, preferred bool, skipTLSVerify bool) ProxyBackendInterface { + innerTransport := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: skipTLSVerify, + }, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + MaxIdleConns: 100, + MaxIdleConnsPerHost: 100, // see https://github.com/golang/go/issues/13801 + IdleConnTimeout: 90 * time.Second, + DisableCompression: true, + } + + tracingTransport := &nethttp.Transport{RoundTripper: innerTransport} + return &ProxyBackend{ name: name, endpoint: endpoint, @@ -48,20 +67,7 @@ func NewProxyBackend(name string, endpoint *url.URL, timeout time.Duration, pref CheckRedirect: func(_ *http.Request, _ []*http.Request) error { return errors.New("the query-tee proxy does not follow redirects") }, - Transport: &http.Transport{ - Proxy: http.ProxyFromEnvironment, - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: skipTLSVerify, - }, - DialContext: (&net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 30 * time.Second, - }).DialContext, - MaxIdleConns: 100, - MaxIdleConnsPerHost: 100, // see https://github.com/golang/go/issues/13801 - IdleConnTimeout: 90 * time.Second, - DisableCompression: true, - }, + Transport: tracingTransport, }, } } @@ -78,12 +84,15 @@ func (b *ProxyBackend) Preferred() bool { return b.preferred } -func (b *ProxyBackend) ForwardRequest(orig *http.Request, body io.ReadCloser) (time.Duration, int, []byte, *http.Response, error) { - req, err := b.createBackendRequest(orig, body) +func (b *ProxyBackend) ForwardRequest(ctx context.Context, orig *http.Request, body io.ReadCloser) (time.Duration, int, []byte, *http.Response, error) { + req, err := b.createBackendRequest(ctx, orig, body) if err != nil { return 0, 0, nil, nil, err } + req, ht := nethttp.TraceRequest(opentracing.GlobalTracer(), req) + defer ht.Finish() + start := time.Now() status, responseBody, resp, err := b.doBackendRequest(req) elapsed := time.Since(start) @@ -91,8 +100,8 @@ func (b *ProxyBackend) ForwardRequest(orig *http.Request, body io.ReadCloser) (t return elapsed, status, responseBody, resp, err } -func (b *ProxyBackend) createBackendRequest(orig *http.Request, body io.ReadCloser) (*http.Request, error) { - req := orig.Clone(context.Background()) +func (b *ProxyBackend) createBackendRequest(ctx context.Context, orig *http.Request, body io.ReadCloser) (*http.Request, error) { + req := orig.Clone(ctx) req.Body = body // RequestURI can't be set on a cloned request. It's only for handlers. req.RequestURI = "" @@ -135,7 +144,7 @@ func (b *ProxyBackend) createBackendRequest(orig *http.Request, body io.ReadClos func (b *ProxyBackend) doBackendRequest(req *http.Request) (int, []byte, *http.Response, error) { // Honor the read timeout. - ctx, cancel := context.WithTimeout(context.Background(), b.timeout) + ctx, cancel := context.WithTimeout(req.Context(), b.timeout) defer cancel() // Execute the request. diff --git a/tools/querytee/proxy_backend_test.go b/tools/querytee/proxy_backend_test.go index 2d66ec80936..4a892286f8f 100644 --- a/tools/querytee/proxy_backend_test.go +++ b/tools/querytee/proxy_backend_test.go @@ -6,6 +6,7 @@ package querytee import ( + "context" "fmt" "net/http/httptest" "net/url" @@ -88,7 +89,7 @@ func Test_ProxyBackend_createBackendRequest_HTTPBasicAuthentication(t *testing.T if !ok { t.Fatalf("Type assertion to *ProxyBackend failed") } - r, err := bp.createBackendRequest(orig, nil) + r, err := bp.createBackendRequest(context.Background(), orig, nil) require.NoError(t, err) actualUser, actualPass, _ := r.BasicAuth() diff --git a/tools/querytee/proxy_endpoint.go b/tools/querytee/proxy_endpoint.go index d3d1cf3fad9..5451fcfeb3f 100644 --- a/tools/querytee/proxy_endpoint.go +++ b/tools/querytee/proxy_endpoint.go @@ -7,6 +7,7 @@ package querytee import ( "bytes" + "context" "fmt" "io" "net/http" @@ -121,12 +122,17 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *ba go func() { defer wg.Done() + + // Don't cancel the child request's context when the parent context (from the incoming HTTP request) is cancelled after we return a response. + // This allows us to continue running slower requests after returning a response to the caller. + requestCtx := context.WithoutCancel(req.Context()) + var bodyReader io.ReadCloser if len(body) > 0 { bodyReader = io.NopCloser(bytes.NewReader(body)) } - elapsed, status, body, resp, err := b.ForwardRequest(req, bodyReader) + elapsed, status, body, resp, err := b.ForwardRequest(requestCtx, req, bodyReader) contentType := "" if p.slowResponseThreshold > 0 { diff --git a/tools/querytee/proxy_endpoint_test.go b/tools/querytee/proxy_endpoint_test.go index d31e73f93a5..8bb2868e265 100644 --- a/tools/querytee/proxy_endpoint_test.go +++ b/tools/querytee/proxy_endpoint_test.go @@ -7,6 +7,7 @@ package querytee import ( "bytes" + "context" "fmt" "io" "net/http" @@ -762,7 +763,7 @@ func (b *mockProxyBackend) Preferred() bool { return b.preferred } -func (b *mockProxyBackend) ForwardRequest(_ *http.Request, _ io.ReadCloser) (time.Duration, int, []byte, *http.Response, error) { +func (b *mockProxyBackend) ForwardRequest(_ context.Context, _ *http.Request, _ io.ReadCloser) (time.Duration, int, []byte, *http.Response, error) { resp := &http.Response{ StatusCode: 200, Header: make(http.Header), From a6c55e5298278675d3a53003b1b9a78c0e70671a Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Tue, 18 Jun 2024 13:47:50 +1000 Subject: [PATCH 03/15] Set common log fields in one place so all log messages get them, and add them to trace spans too --- tools/querytee/proxy_endpoint.go | 47 +++++++++++++++++++------------- 1 file changed, 28 insertions(+), 19 deletions(-) diff --git a/tools/querytee/proxy_endpoint.go b/tools/querytee/proxy_endpoint.go index 5451fcfeb3f..5c741f46998 100644 --- a/tools/querytee/proxy_endpoint.go +++ b/tools/querytee/proxy_endpoint.go @@ -17,6 +17,8 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + + "github.com/grafana/mimir/pkg/util/spanlogger" ) type ResponsesComparator interface { @@ -87,26 +89,38 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *ba responsesMtx = sync.Mutex{} timingMtx = sync.Mutex{} query = req.URL.RawQuery + logger, ctx = spanlogger.NewWithLogger(req.Context(), p.logger, "Incoming request") ) + defer logger.Finish() + + setSpanAndLogTags := func(logger *spanlogger.SpanLogger) { + logger.SetSpanAndLogTag("path", req.URL.Path) + logger.SetSpanAndLogTag("query", query) + logger.SetSpanAndLogTag("route_name", p.routeName) + logger.SetSpanAndLogTag("user", req.Header.Get("X-Scope-OrgID")) + } + + setSpanAndLogTags(logger) + if req.Body != nil { body, err = io.ReadAll(req.Body) if err != nil { - level.Warn(p.logger).Log("msg", "Unable to read request body", "err", err) + level.Warn(logger).Log("msg", "Unable to read request body", "err", err) return } if err := req.Body.Close(); err != nil { - level.Warn(p.logger).Log("msg", "Unable to close request body", "err", err) + level.Warn(logger).Log("msg", "Unable to close request body", "err", err) } req.Body = io.NopCloser(bytes.NewReader(body)) if err := req.ParseForm(); err != nil { - level.Warn(p.logger).Log("msg", "Unable to parse form", "err", err) + level.Warn(logger).Log("msg", "Unable to parse form", "err", err) } query = req.Form.Encode() } - level.Debug(p.logger).Log("msg", "Received request", "path", req.URL.Path, "query", query) + level.Debug(logger).Log("msg", "Received request") // Keep track of the fastest and slowest backends var ( @@ -125,14 +139,18 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *ba // Don't cancel the child request's context when the parent context (from the incoming HTTP request) is cancelled after we return a response. // This allows us to continue running slower requests after returning a response to the caller. - requestCtx := context.WithoutCancel(req.Context()) + ctx := context.WithoutCancel(ctx) + logger, ctx := spanlogger.NewWithLogger(ctx, p.logger, fmt.Sprintf("Outgoing request to %s", b.Name())) + defer logger.Finish() + setSpanAndLogTags(logger) + logger.SetSpanAndLogTag("backend", b.Name()) var bodyReader io.ReadCloser if len(body) > 0 { bodyReader = io.NopCloser(bytes.NewReader(body)) } - elapsed, status, body, resp, err := b.ForwardRequest(requestCtx, req, bodyReader) + elapsed, status, body, resp, err := b.ForwardRequest(ctx, req, bodyReader) contentType := "" if p.slowResponseThreshold > 0 { @@ -167,7 +185,7 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *ba lvl = level.Warn } - lvl(p.logger).Log("msg", "Backend response", "path", req.URL.Path, "query", query, "backend", b.Name(), "status", status, "elapsed", elapsed) + lvl(logger).Log("msg", "Backend response", "status", status, "elapsed", elapsed) p.metrics.requestDuration.WithLabelValues(res.backend.Name(), req.Method, p.routeName, strconv.Itoa(res.statusCode())).Observe(elapsed.Seconds()) // Keep track of the response if required. @@ -195,21 +213,15 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *ba result, err := p.compareResponses(expectedResponse, actualResponse) if result == ComparisonFailed { - level.Error(p.logger).Log( + level.Error(logger).Log( "msg", "response comparison failed", - "route_name", p.routeName, - "query", query, - "user", req.Header.Get("X-Scope-OrgID"), "err", err, "expected_response_duration", expectedResponse.elapsedTime, "actual_response_duration", actualResponse.elapsedTime, ) } else if result == ComparisonSkipped { - level.Warn(p.logger).Log( + level.Warn(logger).Log( "msg", "response comparison skipped", - "route_name", p.routeName, - "query", query, - "user", req.Header.Get("X-Scope-OrgID"), "err", err, "expected_response_duration", expectedResponse.elapsedTime, "actual_response_duration", actualResponse.elapsedTime, @@ -218,11 +230,8 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *ba // Log queries that are slower in some backends than others if p.slowResponseThreshold > 0 && slowestDuration-fastestDuration >= p.slowResponseThreshold { - level.Warn(p.logger).Log( + level.Warn(logger).Log( "msg", "response time difference between backends exceeded threshold", - "route_name", p.routeName, - "query", query, - "user", req.Header.Get("X-Scope-OrgID"), "slowest_duration", slowestDuration, "slowest_backend", slowestBackend.Name(), "fastest_duration", fastestDuration, From 394b742541ba217354b6dc09ac08083998bbbd73 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Tue, 18 Jun 2024 13:48:35 +1000 Subject: [PATCH 04/15] Add user agent to logs and spans --- tools/querytee/proxy_endpoint.go | 1 + 1 file changed, 1 insertion(+) diff --git a/tools/querytee/proxy_endpoint.go b/tools/querytee/proxy_endpoint.go index 5c741f46998..f754a5b2ea5 100644 --- a/tools/querytee/proxy_endpoint.go +++ b/tools/querytee/proxy_endpoint.go @@ -99,6 +99,7 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *ba logger.SetSpanAndLogTag("query", query) logger.SetSpanAndLogTag("route_name", p.routeName) logger.SetSpanAndLogTag("user", req.Header.Get("X-Scope-OrgID")) + logger.SetSpanAndLogTag("user_agent", req.Header.Get("User-Agent")) } setSpanAndLogTags(logger) From e0e1686aa21817326506ac2b7adc969819a58dd6 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Tue, 18 Jun 2024 13:49:24 +1000 Subject: [PATCH 05/15] Set status on outgoing request span --- tools/querytee/proxy_endpoint.go | 1 + 1 file changed, 1 insertion(+) diff --git a/tools/querytee/proxy_endpoint.go b/tools/querytee/proxy_endpoint.go index f754a5b2ea5..daae485f3d5 100644 --- a/tools/querytee/proxy_endpoint.go +++ b/tools/querytee/proxy_endpoint.go @@ -188,6 +188,7 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *ba lvl(logger).Log("msg", "Backend response", "status", status, "elapsed", elapsed) p.metrics.requestDuration.WithLabelValues(res.backend.Name(), req.Method, p.routeName, strconv.Itoa(res.statusCode())).Observe(elapsed.Seconds()) + logger.SetTag("status", status) // Keep track of the response if required. if p.comparator != nil { From d706b5f0c397cc7d8b03971959b72883f4068b7d Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Tue, 18 Jun 2024 13:50:40 +1000 Subject: [PATCH 06/15] Set error on span for failed outgoing requests --- tools/querytee/proxy_endpoint.go | 1 + 1 file changed, 1 insertion(+) diff --git a/tools/querytee/proxy_endpoint.go b/tools/querytee/proxy_endpoint.go index daae485f3d5..5634e517261 100644 --- a/tools/querytee/proxy_endpoint.go +++ b/tools/querytee/proxy_endpoint.go @@ -184,6 +184,7 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *ba lvl := level.Debug if !res.succeeded() { lvl = level.Warn + logger.Error(err) } lvl(logger).Log("msg", "Backend response", "status", status, "elapsed", elapsed) From e4f02203561bd547bd809fde13a8f0f327857f12 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Tue, 18 Jun 2024 14:00:49 +1000 Subject: [PATCH 07/15] Always log errors from backends, rather than relying on comparator to do it --- tools/querytee/proxy_endpoint.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/tools/querytee/proxy_endpoint.go b/tools/querytee/proxy_endpoint.go index 5634e517261..056bbb84c65 100644 --- a/tools/querytee/proxy_endpoint.go +++ b/tools/querytee/proxy_endpoint.go @@ -17,6 +17,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/opentracing/opentracing-go/ext" "github.com/grafana/mimir/pkg/util/spanlogger" ) @@ -184,10 +185,17 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *ba lvl := level.Debug if !res.succeeded() { lvl = level.Warn - logger.Error(err) } - lvl(logger).Log("msg", "Backend response", "status", status, "elapsed", elapsed) + l := lvl(logger) + + // If we got an error (rather than just a non-2xx response), log that and mark the span as failed. + if err != nil { + l = log.With(l, "err", err) + ext.Error.Set(logger.Span, true) + } + + l.Log("msg", "Backend response", "status", status, "elapsed", elapsed) p.metrics.requestDuration.WithLabelValues(res.backend.Name(), req.Method, p.routeName, strconv.Itoa(res.statusCode())).Observe(elapsed.Seconds()) logger.SetTag("status", status) From 70b62fb9d0ed25ce0144db6835381d5c663ac170 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Tue, 18 Jun 2024 14:01:43 +1000 Subject: [PATCH 08/15] Make span names clearer and constants --- tools/querytee/proxy_endpoint.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/querytee/proxy_endpoint.go b/tools/querytee/proxy_endpoint.go index 056bbb84c65..65366be68fb 100644 --- a/tools/querytee/proxy_endpoint.go +++ b/tools/querytee/proxy_endpoint.go @@ -90,7 +90,7 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *ba responsesMtx = sync.Mutex{} timingMtx = sync.Mutex{} query = req.URL.RawQuery - logger, ctx = spanlogger.NewWithLogger(req.Context(), p.logger, "Incoming request") + logger, ctx = spanlogger.NewWithLogger(req.Context(), p.logger, "Incoming proxied request") ) defer logger.Finish() @@ -142,7 +142,7 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *ba // Don't cancel the child request's context when the parent context (from the incoming HTTP request) is cancelled after we return a response. // This allows us to continue running slower requests after returning a response to the caller. ctx := context.WithoutCancel(ctx) - logger, ctx := spanlogger.NewWithLogger(ctx, p.logger, fmt.Sprintf("Outgoing request to %s", b.Name())) + logger, ctx := spanlogger.NewWithLogger(ctx, p.logger, "Outgoing proxied request") defer logger.Finish() setSpanAndLogTags(logger) logger.SetSpanAndLogTag("backend", b.Name()) From 80165895c909877fcf95e97be88f9127e6fcc383 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 19 Jun 2024 13:59:26 +1000 Subject: [PATCH 09/15] Optionally add missing `time` parameter to instant queries --- cmd/query-tee/main.go | 9 +- tools/querytee/instant_query_transform.go | 62 +++++++ .../querytee/instant_query_transform_test.go | 171 ++++++++++++++++++ tools/querytee/proxy.go | 47 +++-- tools/querytee/proxy_endpoint.go | 38 +++- tools/querytee/proxy_endpoint_test.go | 18 +- tools/querytee/proxy_test.go | 84 ++++++++- 7 files changed, 391 insertions(+), 38 deletions(-) create mode 100644 tools/querytee/instant_query_transform.go create mode 100644 tools/querytee/instant_query_transform_test.go diff --git a/cmd/query-tee/main.go b/cmd/query-tee/main.go index f1327d8a6ef..44c1e5277bb 100644 --- a/cmd/query-tee/main.go +++ b/cmd/query-tee/main.go @@ -107,8 +107,15 @@ func mimirReadRoutes(cfg Config) []querytee.Route { UseRelativeError: cfg.ProxyConfig.UseRelativeError, SkipRecentSamples: cfg.ProxyConfig.SkipRecentSamples, }) + + var instantQueryTransformers []querytee.RequestTransformer + + if cfg.ProxyConfig.AddMissingTimeParamToInstantQueries { + instantQueryTransformers = append(instantQueryTransformers, querytee.AddMissingTimeParam) + } + return []querytee.Route{ - {Path: prefix + "/api/v1/query", RouteName: "api_v1_query", Methods: []string{"GET", "POST"}, ResponseComparator: samplesComparator}, + {Path: prefix + "/api/v1/query", RouteName: "api_v1_query", Methods: []string{"GET", "POST"}, ResponseComparator: samplesComparator, RequestTransformers: instantQueryTransformers}, {Path: prefix + "/api/v1/query_range", RouteName: "api_v1_query_range", Methods: []string{"GET", "POST"}, ResponseComparator: samplesComparator}, {Path: prefix + "/api/v1/query_exemplars", RouteName: "api_v1_query_exemplars", Methods: []string{"GET", "POST"}, ResponseComparator: nil}, {Path: prefix + "/api/v1/labels", RouteName: "api_v1_labels", Methods: []string{"GET", "POST"}, ResponseComparator: nil}, diff --git a/tools/querytee/instant_query_transform.go b/tools/querytee/instant_query_transform.go new file mode 100644 index 00000000000..3feacfe38eb --- /dev/null +++ b/tools/querytee/instant_query_transform.go @@ -0,0 +1,62 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package querytee + +import ( + "net/http" + "net/url" + "strconv" + "time" + + "github.com/go-kit/log/level" + "github.com/grafana/dskit/spanlogger" +) + +var timeNow = time.Now // Interception point to allow tests to set the current time for AddMissingTimeParam. + +// AddMissingTimeParam adds a 'time' parameter to any query request that does not have one. +// +// Instant queries without a 'time' parameter use the current time on the executing backend. +// However, this can vary between backends for the same request, which can comparison failures. +// +// So, to make comparisons more reliable, we add the 'time' parameter in the proxy to ensure all +// backends use the same value. +func AddMissingTimeParam(r *http.Request, body []byte, logger *spanlogger.SpanLogger) (*http.Request, []byte, error) { + parsedBody, err := url.ParseQuery(string(body)) + if err != nil { + return nil, nil, err + } + + if parsedBody.Has("time") { + return r, body, nil + } + + queryParams := r.URL.Query() + + if queryParams.Has("time") { + return r, body, nil + } + + // No 'time' parameter in either the body or URL. Add it. + level.Debug(logger).Log("msg", "instant query had no explicit time parameter, adding it based on the current time") + + if len(body) > 0 { + // Request has a body, add the 'time' parameter there. + parsedBody.Set("time", timeNow().Format(time.RFC3339)) + body = []byte(parsedBody.Encode()) + + // Outgoing requests should only rely on the request body, but we update Form here for consistency. + r.Form = parsedBody + + // Update the content length to reflect the new body. + r.ContentLength = int64(len(body)) + r.Header.Set("Content-Length", strconv.Itoa(len(body))) + + return r, body, nil + } + + // Otherwise, add it to the URL. + queryParams.Set("time", timeNow().Format(time.RFC3339)) + r.URL.RawQuery = queryParams.Encode() + return r, body, nil +} diff --git a/tools/querytee/instant_query_transform_test.go b/tools/querytee/instant_query_transform_test.go new file mode 100644 index 00000000000..c1a6a270c74 --- /dev/null +++ b/tools/querytee/instant_query_transform_test.go @@ -0,0 +1,171 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package querytee + +import ( + "context" + "net/http" + "net/url" + "strconv" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/grafana/mimir/pkg/util/spanlogger" +) + +func TestAddMissingTimeParam(t *testing.T) { + // Mock the current time. + originalTimeNow := timeNow + timeNow = func() time.Time { return time.Date(2024, 6, 10, 20, 30, 40, 0, time.UTC) } + t.Cleanup(func() { timeNow = originalTimeNow }) + + testCases := map[string]struct { + method string + urlParams url.Values + form url.Values + + expectedURLParams url.Values + expectedForm url.Values + }{ + "GET with time in URL": { + method: http.MethodGet, + urlParams: url.Values{ + "query": []string{"sum(abc)"}, + "timeout": []string{"60s"}, + "time": []string{"2024-06-19T01:23:45Z"}, + }, + + expectedURLParams: url.Values{ + "query": []string{"sum(abc)"}, + "timeout": []string{"60s"}, + "time": []string{"2024-06-19T01:23:45Z"}, + }, + expectedForm: url.Values{}, + }, + "GET without time in URL": { + method: http.MethodGet, + urlParams: url.Values{ + "query": []string{"sum(abc)"}, + "timeout": []string{"60s"}, + }, + + expectedURLParams: url.Values{ + "query": []string{"sum(abc)"}, + "timeout": []string{"60s"}, + "time": []string{"2024-06-10T20:30:40Z"}, + }, + expectedForm: url.Values{}, + }, + "POST with time in body": { + method: http.MethodPost, + form: url.Values{ + "query": []string{"sum(abc)"}, + "timeout": []string{"60s"}, + "time": []string{"2024-06-19T01:23:45Z"}, + }, + + expectedURLParams: url.Values{}, + expectedForm: url.Values{ + "query": []string{"sum(abc)"}, + "timeout": []string{"60s"}, + "time": []string{"2024-06-19T01:23:45Z"}, + }, + }, + "POST without time in body": { + method: http.MethodPost, + form: url.Values{ + "query": []string{"sum(abc)"}, + "timeout": []string{"60s"}, + }, + + expectedURLParams: url.Values{}, + expectedForm: url.Values{ + "query": []string{"sum(abc)"}, + "timeout": []string{"60s"}, + "time": []string{"2024-06-10T20:30:40Z"}, + }, + }, + "POST with time in URL": { + method: http.MethodPost, + urlParams: url.Values{ + "query": []string{"sum(abc)"}, + "timeout": []string{"60s"}, + "time": []string{"2024-06-19T01:23:45Z"}, + }, + + expectedURLParams: url.Values{ + "query": []string{"sum(abc)"}, + "timeout": []string{"60s"}, + "time": []string{"2024-06-19T01:23:45Z"}, + }, + expectedForm: url.Values{}, + }, + "POST without time in URL": { + method: http.MethodPost, + urlParams: url.Values{ + "query": []string{"sum(abc)"}, + "timeout": []string{"60s"}, + }, + + expectedURLParams: url.Values{ + "query": []string{"sum(abc)"}, + "timeout": []string{"60s"}, + "time": []string{"2024-06-10T20:30:40Z"}, + }, + expectedForm: url.Values{}, + }, + } + + logger, _ := spanlogger.New(context.Background(), "test") + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + var req *http.Request + var err error + var body []byte + + if testCase.method == http.MethodGet { + require.Nil(t, testCase.form, "invalid test case: GET request should not have body") + req, err = http.NewRequest(testCase.method, "/blah?"+testCase.urlParams.Encode(), nil) + require.NoError(t, err) + body = nil + } else { + encoded := testCase.form.Encode() + reader := strings.NewReader(encoded) + req, err = http.NewRequest(testCase.method, "/blah?"+testCase.urlParams.Encode(), reader) + require.NoError(t, err) + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + req.Header.Set("Content-Length", strconv.Itoa(reader.Len())) + + // ProxyEndpoint.executeBackendRequests parses the form body, so we do the same, for consistency. + require.NoError(t, req.ParseForm()) + + body = []byte(encoded) + } + + transformed, transformedBody, err := AddMissingTimeParam(req, body, logger) + require.NoError(t, err) + require.Equal(t, testCase.method, transformed.Method) + require.Equal(t, "/blah", transformed.URL.Path) + require.Equal(t, testCase.expectedURLParams, transformed.URL.Query()) + + if len(testCase.expectedForm) == 0 { + require.Empty(t, transformedBody) + } else { + parsedBody, err := url.ParseQuery(string(transformedBody)) + require.NoError(t, err) + require.Equal(t, testCase.expectedForm, parsedBody) + + // Make sure we've updated both places, for consistency. + require.Equal(t, testCase.expectedForm, transformed.Form) + + // Make sure we've updated the Content-Length header to match the new request body length. + require.Equal(t, int64(len(transformedBody)), transformed.ContentLength) + require.Equal(t, strconv.Itoa(len(transformedBody)), transformed.Header.Get("Content-Length")) + } + }) + } +} diff --git a/tools/querytee/proxy.go b/tools/querytee/proxy.go index d0a3693a4ab..9bafb09a534 100644 --- a/tools/querytee/proxy.go +++ b/tools/querytee/proxy.go @@ -19,6 +19,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/server" + "github.com/grafana/dskit/spanlogger" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" ) @@ -26,20 +27,21 @@ import ( var errMinBackends = errors.New("at least 1 backend is required") type ProxyConfig struct { - ServerHTTPServiceAddress string - ServerHTTPServicePort int - ServerGRPCServiceAddress string - ServerGRPCServicePort int - BackendEndpoints string - PreferredBackend string - BackendReadTimeout time.Duration - CompareResponses bool - LogSlowQueryResponseThreshold time.Duration - ValueComparisonTolerance float64 - UseRelativeError bool - PassThroughNonRegisteredRoutes bool - SkipRecentSamples time.Duration - BackendSkipTLSVerify bool + ServerHTTPServiceAddress string + ServerHTTPServicePort int + ServerGRPCServiceAddress string + ServerGRPCServicePort int + BackendEndpoints string + PreferredBackend string + BackendReadTimeout time.Duration + CompareResponses bool + LogSlowQueryResponseThreshold time.Duration + ValueComparisonTolerance float64 + UseRelativeError bool + PassThroughNonRegisteredRoutes bool + SkipRecentSamples time.Duration + BackendSkipTLSVerify bool + AddMissingTimeParamToInstantQueries bool } func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) { @@ -62,15 +64,22 @@ func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.UseRelativeError, "proxy.compare-use-relative-error", false, "Use relative error tolerance when comparing floating point values.") f.DurationVar(&cfg.SkipRecentSamples, "proxy.compare-skip-recent-samples", 2*time.Minute, "The window from now to skip comparing samples. 0 to disable.") f.BoolVar(&cfg.PassThroughNonRegisteredRoutes, "proxy.passthrough-non-registered-routes", false, "Passthrough requests for non-registered routes to preferred backend.") + f.BoolVar(&cfg.AddMissingTimeParamToInstantQueries, "proxy.add-missing-time-parameter-to-instant-queries", true, "Add a 'time' parameter to proxied instant query requests if they do not have one.") } type Route struct { - Path string - RouteName string - Methods []string - ResponseComparator ResponsesComparator + Path string + RouteName string + Methods []string + ResponseComparator ResponsesComparator + RequestTransformers []RequestTransformer } +// RequestTransformer manipulates a proxied request before it is sent to downstream endpoints. +// +// r.Body is ignored, use body instead. +type RequestTransformer func(r *http.Request, body []byte, logger *spanlogger.SpanLogger) (*http.Request, []byte, error) + type Proxy struct { cfg ProxyConfig backends []ProxyBackendInterface @@ -211,7 +220,7 @@ func (p *Proxy) Start() error { if p.cfg.CompareResponses { comparator = route.ResponseComparator } - router.Path(route.Path).Methods(route.Methods...).Handler(NewProxyEndpoint(p.backends, route.RouteName, p.metrics, p.logger, comparator, p.cfg.LogSlowQueryResponseThreshold)) + router.Path(route.Path).Methods(route.Methods...).Handler(NewProxyEndpoint(p.backends, route, p.metrics, p.logger, comparator, p.cfg.LogSlowQueryResponseThreshold)) } if p.cfg.PassThroughNonRegisteredRoutes { diff --git a/tools/querytee/proxy_endpoint.go b/tools/querytee/proxy_endpoint.go index 65366be68fb..b7bf42ad754 100644 --- a/tools/querytee/proxy_endpoint.go +++ b/tools/querytee/proxy_endpoint.go @@ -36,11 +36,10 @@ type ProxyEndpoint struct { // Whether for this endpoint there's a preferred backend configured. hasPreferredBackend bool - // The route name used to track metrics. - routeName string + route Route } -func NewProxyEndpoint(backends []ProxyBackendInterface, routeName string, metrics *ProxyMetrics, logger log.Logger, comparator ResponsesComparator, slowResponseThreshold time.Duration) *ProxyEndpoint { +func NewProxyEndpoint(backends []ProxyBackendInterface, route Route, metrics *ProxyMetrics, logger log.Logger, comparator ResponsesComparator, slowResponseThreshold time.Duration) *ProxyEndpoint { hasPreferredBackend := false for _, backend := range backends { if backend.Preferred() { @@ -51,7 +50,7 @@ func NewProxyEndpoint(backends []ProxyBackendInterface, routeName string, metric return &ProxyEndpoint{ backends: backends, - routeName: routeName, + route: route, metrics: metrics, logger: logger, comparator: comparator, @@ -78,7 +77,7 @@ func (p *ProxyEndpoint) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } - p.metrics.responsesTotal.WithLabelValues(downstreamRes.backend.Name(), r.Method, p.routeName).Inc() + p.metrics.responsesTotal.WithLabelValues(downstreamRes.backend.Name(), r.Method, p.route.RouteName).Inc() } func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *backendResponse) { @@ -98,7 +97,7 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *ba setSpanAndLogTags := func(logger *spanlogger.SpanLogger) { logger.SetSpanAndLogTag("path", req.URL.Path) logger.SetSpanAndLogTag("query", query) - logger.SetSpanAndLogTag("route_name", p.routeName) + logger.SetSpanAndLogTag("route_name", p.route.RouteName) logger.SetSpanAndLogTag("user", req.Header.Get("X-Scope-OrgID")) logger.SetSpanAndLogTag("user_agent", req.Header.Get("User-Agent")) } @@ -109,6 +108,8 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *ba body, err = io.ReadAll(req.Body) if err != nil { level.Warn(logger).Log("msg", "Unable to read request body", "err", err) + resCh <- &backendResponse{err: err} + close(resCh) return } if err := req.Body.Close(); err != nil { @@ -124,6 +125,23 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *ba level.Debug(logger).Log("msg", "Received request") + for _, transform := range p.route.RequestTransformers { + req, body, err = transform(req, body, logger) + + if err != nil { + level.Error(logger).Log("msg", "Transforming request failed", "err", err) + resCh <- &backendResponse{err: err} + close(resCh) + return + } + + // Update the query used in logging based on the updated request. + query = req.URL.RawQuery + if body != nil { + query = req.Form.Encode() + } + } + // Keep track of the fastest and slowest backends var ( fastestDuration time.Duration @@ -196,7 +214,7 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *ba } l.Log("msg", "Backend response", "status", status, "elapsed", elapsed) - p.metrics.requestDuration.WithLabelValues(res.backend.Name(), req.Method, p.routeName, strconv.Itoa(res.statusCode())).Observe(elapsed.Seconds()) + p.metrics.requestDuration.WithLabelValues(res.backend.Name(), req.Method, p.route.RouteName, strconv.Itoa(res.statusCode())).Observe(elapsed.Seconds()) logger.SetTag("status", status) // Keep track of the response if required. @@ -252,9 +270,9 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *ba relativeDuration := actualResponse.elapsedTime - expectedResponse.elapsedTime proportionalDurationDifference := relativeDuration.Seconds() / expectedResponse.elapsedTime.Seconds() - p.metrics.relativeDuration.WithLabelValues(p.routeName).Observe(relativeDuration.Seconds()) - p.metrics.proportionalDuration.WithLabelValues(p.routeName).Observe(proportionalDurationDifference) - p.metrics.responsesComparedTotal.WithLabelValues(p.routeName, string(result)).Inc() + p.metrics.relativeDuration.WithLabelValues(p.route.RouteName).Observe(relativeDuration.Seconds()) + p.metrics.proportionalDuration.WithLabelValues(p.route.RouteName).Observe(proportionalDurationDifference) + p.metrics.responsesComparedTotal.WithLabelValues(p.route.RouteName, string(result)).Inc() } } diff --git a/tools/querytee/proxy_endpoint_test.go b/tools/querytee/proxy_endpoint_test.go index 8bb2868e265..dbcca2a464a 100644 --- a/tools/querytee/proxy_endpoint_test.go +++ b/tools/querytee/proxy_endpoint_test.go @@ -29,6 +29,7 @@ import ( ) func Test_ProxyEndpoint_waitBackendResponseForDownstream(t *testing.T) { + testRoute := Route{RouteName: "test"} backendURL1, err := url.Parse("http://backend-1/") require.NoError(t, err) backendURL2, err := url.Parse("http://backend-2/") @@ -106,7 +107,7 @@ func Test_ProxyEndpoint_waitBackendResponseForDownstream(t *testing.T) { testData := testData t.Run(testName, func(t *testing.T) { - endpoint := NewProxyEndpoint(testData.backends, "test", NewProxyMetrics(nil), log.NewNopLogger(), nil, 0) + endpoint := NewProxyEndpoint(testData.backends, testRoute, NewProxyMetrics(nil), log.NewNopLogger(), nil, 0) // Send the responses from a dedicated goroutine. resCh := make(chan *backendResponse) @@ -131,6 +132,7 @@ func Test_ProxyEndpoint_Requests(t *testing.T) { testHandler http.HandlerFunc ) + testRoute := Route{RouteName: "test"} handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { defer wg.Done() defer requestCount.Add(1) @@ -150,7 +152,7 @@ func Test_ProxyEndpoint_Requests(t *testing.T) { NewProxyBackend("backend-1", backendURL1, time.Second, true, false), NewProxyBackend("backend-2", backendURL2, time.Second, false, false), } - endpoint := NewProxyEndpoint(backends, "test", NewProxyMetrics(nil), log.NewNopLogger(), nil, 0) + endpoint := NewProxyEndpoint(backends, testRoute, NewProxyMetrics(nil), log.NewNopLogger(), nil, 0) for _, tc := range []struct { name string @@ -234,6 +236,8 @@ func Test_ProxyEndpoint_Requests(t *testing.T) { } func Test_ProxyEndpoint_Comparison(t *testing.T) { + testRoute := Route{RouteName: "test"} + scenarios := map[string]struct { preferredResponseStatusCode int secondaryResponseStatusCode int @@ -324,7 +328,7 @@ func Test_ProxyEndpoint_Comparison(t *testing.T) { comparisonError: scenario.comparatorError, } - endpoint := NewProxyEndpoint(backends, "test", NewProxyMetrics(reg), logger, comparator, 0) + endpoint := NewProxyEndpoint(backends, testRoute, NewProxyMetrics(reg), logger, comparator, 0) resp := httptest.NewRecorder() req, err := http.NewRequest("GET", "http://test/api/v1/test", nil) @@ -353,6 +357,8 @@ func Test_ProxyEndpoint_Comparison(t *testing.T) { } func Test_ProxyEndpoint_LogSlowQueries(t *testing.T) { + testRoute := Route{RouteName: "test"} + scenarios := map[string]struct { slowResponseThreshold time.Duration preferredResponseLatency time.Duration @@ -424,7 +430,7 @@ func Test_ProxyEndpoint_LogSlowQueries(t *testing.T) { comparisonResult: ComparisonSuccess, } - endpoint := NewProxyEndpoint(backends, "test", NewProxyMetrics(reg), logger, comparator, scenario.slowResponseThreshold) + endpoint := NewProxyEndpoint(backends, testRoute, NewProxyMetrics(reg), logger, comparator, scenario.slowResponseThreshold) resp := httptest.NewRecorder() req, err := http.NewRequest("GET", "http://test/api/v1/test", nil) @@ -449,6 +455,8 @@ func Test_ProxyEndpoint_LogSlowQueries(t *testing.T) { } func Test_ProxyEndpoint_RelativeDurationMetric(t *testing.T) { + testRoute := Route{RouteName: "test"} + scenarios := map[string]struct { latencyPairs []latencyPair expectedDurationSampleSum float64 @@ -491,7 +499,7 @@ func Test_ProxyEndpoint_RelativeDurationMetric(t *testing.T) { comparisonResult: ComparisonSuccess, } - endpoint := NewProxyEndpoint(backends, "test", NewProxyMetrics(reg), logger, comparator, 0) + endpoint := NewProxyEndpoint(backends, testRoute, NewProxyMetrics(reg), logger, comparator, 0) resp := httptest.NewRecorder() req, err := http.NewRequest("GET", "http://test/api/v1/test", nil) diff --git a/tools/querytee/proxy_test.go b/tools/querytee/proxy_test.go index 4b53376d702..078dc519777 100644 --- a/tools/querytee/proxy_test.go +++ b/tools/querytee/proxy_test.go @@ -25,10 +25,24 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + + "github.com/grafana/mimir/pkg/util/spanlogger" ) var testRoutes = []Route{ - {Path: "/api/v1/query", RouteName: "api_v1_query", Methods: []string{"GET"}, ResponseComparator: &testComparator{}}, + { + Path: "/api/v1/query", + RouteName: "api_v1_query", + Methods: []string{"GET"}, + ResponseComparator: &testComparator{}, + }, + { + Path: "/api/v1/query_with_transform", + RouteName: "api_v1_query_with_transform", + Methods: []string{"POST"}, + ResponseComparator: &testComparator{}, + RequestTransformers: []RequestTransformer{testRequestTransformer1, testRequestTransformer2}, + }, } type testComparator struct{} @@ -37,6 +51,22 @@ func (testComparator) Compare(_, _ []byte) (ComparisonResult, error) { return ComparisonSuccess, nil } +func testRequestTransformer1(r *http.Request, body []byte, _ *spanlogger.SpanLogger) (*http.Request, []byte, error) { + r = r.Clone(r.Context()) + r.URL.Path = r.URL.Path + "/transformed_1" + body = append(body, []byte("from 1\n")...) + + return r, body, nil +} + +func testRequestTransformer2(r *http.Request, body []byte, _ *spanlogger.SpanLogger) (*http.Request, []byte, error) { + r = r.Clone(r.Context()) + r.URL.Path = r.URL.Path + "/transformed_2" + body = append(body, []byte("from 2\n")...) + + return r, body, nil +} + func Test_NewProxy(t *testing.T) { cfg := ProxyConfig{} @@ -57,12 +87,16 @@ func Test_Proxy_RequestsForwarding(t *testing.T) { } tests := map[string]struct { + requestPath string + requestMethod string backends []mockedBackend preferredBackendIdx int expectedStatus int expectedRes string }{ "one backend returning 2xx": { + requestPath: "/api/v1/query", + requestMethod: http.MethodGet, backends: []mockedBackend{ {handler: mockQueryResponse("/api/v1/query", 200, querySingleMetric1)}, }, @@ -70,6 +104,8 @@ func Test_Proxy_RequestsForwarding(t *testing.T) { expectedRes: querySingleMetric1, }, "one backend returning 5xx": { + requestPath: "/api/v1/query", + requestMethod: http.MethodGet, backends: []mockedBackend{ {handler: mockQueryResponse("/api/v1/query", 500, "")}, }, @@ -77,6 +113,8 @@ func Test_Proxy_RequestsForwarding(t *testing.T) { expectedRes: "", }, "two backends without path prefix": { + requestPath: "/api/v1/query", + requestMethod: http.MethodGet, backends: []mockedBackend{ {handler: mockQueryResponse("/api/v1/query", 200, querySingleMetric1)}, {handler: mockQueryResponse("/api/v1/query", 200, querySingleMetric2)}, @@ -86,6 +124,8 @@ func Test_Proxy_RequestsForwarding(t *testing.T) { expectedRes: querySingleMetric1, }, "two backends with the same path prefix": { + requestPath: "/api/v1/query", + requestMethod: http.MethodGet, backends: []mockedBackend{ { pathPrefix: "/prometheus", @@ -101,6 +141,8 @@ func Test_Proxy_RequestsForwarding(t *testing.T) { expectedRes: querySingleMetric1, }, "two backends with different path prefix": { + requestPath: "/api/v1/query", + requestMethod: http.MethodGet, backends: []mockedBackend{ { pathPrefix: "/prefix-1", @@ -116,6 +158,8 @@ func Test_Proxy_RequestsForwarding(t *testing.T) { expectedRes: querySingleMetric1, }, "preferred backend returns 4xx": { + requestPath: "/api/v1/query", + requestMethod: http.MethodGet, backends: []mockedBackend{ {handler: mockQueryResponse("/api/v1/query", 400, "")}, {handler: mockQueryResponse("/api/v1/query", 200, querySingleMetric1)}, @@ -125,6 +169,8 @@ func Test_Proxy_RequestsForwarding(t *testing.T) { expectedRes: "", }, "preferred backend returns 5xx": { + requestPath: "/api/v1/query", + requestMethod: http.MethodGet, backends: []mockedBackend{ {handler: mockQueryResponse("/api/v1/query", 500, "")}, {handler: mockQueryResponse("/api/v1/query", 200, querySingleMetric1)}, @@ -134,6 +180,8 @@ func Test_Proxy_RequestsForwarding(t *testing.T) { expectedRes: querySingleMetric1, }, "non-preferred backend returns 5xx": { + requestPath: "/api/v1/query", + requestMethod: http.MethodGet, backends: []mockedBackend{ {handler: mockQueryResponse("/api/v1/query", 200, querySingleMetric1)}, {handler: mockQueryResponse("/api/v1/query", 500, "")}, @@ -143,6 +191,8 @@ func Test_Proxy_RequestsForwarding(t *testing.T) { expectedRes: querySingleMetric1, }, "all backends returns 5xx": { + requestPath: "/api/v1/query", + requestMethod: http.MethodGet, backends: []mockedBackend{ {handler: mockQueryResponse("/api/v1/query", 500, "")}, {handler: mockQueryResponse("/api/v1/query", 500, "")}, @@ -151,6 +201,15 @@ func Test_Proxy_RequestsForwarding(t *testing.T) { expectedStatus: 500, expectedRes: "", }, + "request to route with outgoing request transformer": { + requestPath: "/api/v1/query_with_transform", + requestMethod: http.MethodPost, + backends: []mockedBackend{ + {handler: mockQueryResponseWithExpectedBody("/api/v1/query_with_transform/transformed_1/transformed_2", "from 1\nfrom 2\n", 200, querySingleMetric1)}, + }, + expectedStatus: 200, + expectedRes: querySingleMetric1, + }, } for testName, testData := range tests { @@ -188,8 +247,10 @@ func Test_Proxy_RequestsForwarding(t *testing.T) { require.NoError(t, p.Start()) // Send a query request to the proxy. - endpoint := fmt.Sprintf("http://%s/api/v1/query", p.server.HTTPListenAddr()) - res, err := http.Get(endpoint) + endpoint := fmt.Sprintf("http://%s"+testData.requestPath, p.server.HTTPListenAddr()) + req, err := http.NewRequest(testData.requestMethod, endpoint, nil) + require.NoError(t, err) + res, err := http.DefaultClient.Do(req) require.NoError(t, err) defer res.Body.Close() @@ -497,6 +558,10 @@ func TestProxyHTTPGRPC(t *testing.T) { } func mockQueryResponse(path string, status int, res string) http.HandlerFunc { + return mockQueryResponseWithExpectedBody(path, "", status, res) +} + +func mockQueryResponseWithExpectedBody(path string, expectedBody string, status int, res string) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { // Ensure the path is the expected one. if r.URL.Path != path { @@ -504,6 +569,19 @@ func mockQueryResponse(path string, status int, res string) http.HandlerFunc { return } + if expectedBody != "" { + actualBody, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + if string(actualBody) != expectedBody { + w.WriteHeader(http.StatusBadRequest) + return + } + } + // Send back the mocked response. w.WriteHeader(status) if status == http.StatusOK { From 44ae290ba4dc44aad6c8582d8e785e66de33f921 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 19 Jun 2024 14:10:41 +1000 Subject: [PATCH 10/15] Set span tags after parsing request body --- tools/querytee/proxy_endpoint.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/tools/querytee/proxy_endpoint.go b/tools/querytee/proxy_endpoint.go index b7bf42ad754..1ebdb963774 100644 --- a/tools/querytee/proxy_endpoint.go +++ b/tools/querytee/proxy_endpoint.go @@ -94,16 +94,6 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *ba defer logger.Finish() - setSpanAndLogTags := func(logger *spanlogger.SpanLogger) { - logger.SetSpanAndLogTag("path", req.URL.Path) - logger.SetSpanAndLogTag("query", query) - logger.SetSpanAndLogTag("route_name", p.route.RouteName) - logger.SetSpanAndLogTag("user", req.Header.Get("X-Scope-OrgID")) - logger.SetSpanAndLogTag("user_agent", req.Header.Get("User-Agent")) - } - - setSpanAndLogTags(logger) - if req.Body != nil { body, err = io.ReadAll(req.Body) if err != nil { @@ -123,6 +113,16 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *ba query = req.Form.Encode() } + setSpanAndLogTags := func(logger *spanlogger.SpanLogger) { + logger.SetSpanAndLogTag("path", req.URL.Path) + logger.SetSpanAndLogTag("query", query) + logger.SetSpanAndLogTag("route_name", p.route.RouteName) + logger.SetSpanAndLogTag("user", req.Header.Get("X-Scope-OrgID")) + logger.SetSpanAndLogTag("user_agent", req.Header.Get("User-Agent")) + } + + setSpanAndLogTags(logger) + level.Debug(logger).Log("msg", "Received request") for _, transform := range p.route.RequestTransformers { From 39b65244e06e9eb3cc2a9556e0dac20cff3e1a55 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 19 Jun 2024 20:14:52 +1000 Subject: [PATCH 11/15] Add query-tee to microservices Docker Compose setup --- .../mimir-microservices-mode/compose-up.sh | 8 ++- .../docker-compose.jsonnet | 49 ++++++++++++++----- 2 files changed, 43 insertions(+), 14 deletions(-) diff --git a/development/mimir-microservices-mode/compose-up.sh b/development/mimir-microservices-mode/compose-up.sh index cb303144361..229d82c13db 100755 --- a/development/mimir-microservices-mode/compose-up.sh +++ b/development/mimir-microservices-mode/compose-up.sh @@ -23,6 +23,12 @@ cd "$SCRIPT_DIR" && make # -gcflags "all=-N -l" disables optimizations that allow for better run with combination with Delve debugger. # GOARCH is not changed. CGO_ENABLED=0 GOOS=linux go build -mod=vendor -tags=netgo,stringlabels -gcflags "all=-N -l" -o "${SCRIPT_DIR}"/mimir "${SCRIPT_DIR}"/../../cmd/mimir - docker_compose -f "${SCRIPT_DIR}"/docker-compose.yml build --build-arg BUILD_IMAGE="${BUILD_IMAGE}" distributor-1 + +if [ "$(yq '.services.query-tee' "${SCRIPT_DIR}"/docker-compose.yml)" != "null" ]; then + # If query-tee is enabled, build its binary and image as well. + CGO_ENABLED=0 GOOS=linux go build -mod=vendor -tags=netgo,stringlabels -gcflags "all=-N -l" -o "${SCRIPT_DIR}"/../../cmd/query-tee "${SCRIPT_DIR}"/../../cmd/query-tee + docker_compose -f "${SCRIPT_DIR}"/docker-compose.yml build --build-arg BUILD_IMAGE="${BUILD_IMAGE}" query-tee +fi + docker_compose -f "${SCRIPT_DIR}"/docker-compose.yml up "$@" diff --git a/development/mimir-microservices-mode/docker-compose.jsonnet b/development/mimir-microservices-mode/docker-compose.jsonnet index 4840c2a05bb..98b03d133a3 100644 --- a/development/mimir-microservices-mode/docker-compose.jsonnet +++ b/development/mimir-microservices-mode/docker-compose.jsonnet @@ -30,6 +30,9 @@ std.manifestYamlDoc({ enable_grafana_agent: false, enable_prometheus: true, // If Prometheus is disabled, recording rules will not be evaluated and so dashboards in Grafana that depend on these recorded series will display no data. enable_otel_collector: false, + + // If true, a query-tee instance with a single backend is started. + enable_query_tee: false, }, // We explicitely list all important services here, so that it's easy to disable them by commenting out. @@ -51,6 +54,7 @@ std.manifestYamlDoc({ (if $._config.ring == 'consul' || $._config.ring == 'multi' then self.consul else {}) + (if $._config.cache_backend == 'redis' then self.redis else self.memcached + self.memcached_exporter) + (if $._config.enable_load_generator then self.load_generator else {}) + + (if $._config.enable_query_tee then self.query_tee else {}) + {}, distributor:: { @@ -175,6 +179,21 @@ std.manifestYamlDoc({ local all_rings = ['-ingester.ring', '-distributor.ring', '-compactor.ring', '-store-gateway.sharding-ring', '-ruler.ring', '-alertmanager.sharding-ring'], + local jaegerEnv(appName) = { + JAEGER_AGENT_HOST: 'jaeger', + JAEGER_AGENT_PORT: 6831, + JAEGER_SAMPLER_TYPE: 'const', + JAEGER_SAMPLER_PARAM: 1, + JAEGER_TAGS: 'app=%s' % appName, + JAEGER_REPORTER_MAX_QUEUE_SIZE: 1000, + }, + + local formatEnv(env) = [ + '%s=%s' % [key, env[key]] + for key in std.objectFields(env) + if env[key] != null + ], + // This function builds docker-compose declaration for Mimir service. // Default grpcPort is (httpPort + 1000), and default debug port is (httpPort + 10000) local mimirService(serviceOptions) = { @@ -189,14 +208,7 @@ std.manifestYamlDoc({ // Extra arguments passed to Mimir command line. extraArguments: '', dependsOn: ['minio'] + (if $._config.ring == 'consul' || $._config.ring == 'multi' then ['consul'] else if s.target != 'distributor' then ['distributor-1'] else []), - env: { - JAEGER_AGENT_HOST: 'jaeger', - JAEGER_AGENT_PORT: 6831, - JAEGER_SAMPLER_TYPE: 'const', - JAEGER_SAMPLER_PARAM: 1, - JAEGER_TAGS: 'app=%s' % s.jaegerApp, - JAEGER_REPORTER_MAX_QUEUE_SIZE: 1000, - }, + env: jaegerEnv(s.jaegerApp), extraVolumes: [], memberlistNodeName: self.jaegerApp, memberlistBindPort: self.httpPort + 2000, @@ -223,11 +235,7 @@ std.manifestYamlDoc({ std.join(' ', if $._config.cache_backend == 'redis' then [x + '.backend=redis' for x in all_caches] + [x + '.redis.endpoint=redis:6379' for x in all_caches] else [x + '.backend=memcached' for x in all_caches] + [x + '.memcached.addresses=dns+memcached:11211' for x in all_caches]), ]), ], - environment: [ - '%s=%s' % [key, options.env[key]] - for key in std.objectFields(options.env) - if options.env[key] != null - ], + environment: formatEnv(options.env), hostname: options.name, // Only publish HTTP and debug port, but not gRPC one. ports: ['%d:%d' % [options.httpPort, options.httpPort]] + @@ -385,6 +393,21 @@ std.manifestYamlDoc({ }, }, + query_tee:: { + 'query-tee': { + local env = jaegerEnv('query-tee'), + + image: 'query-tee', + build: { + context: '../../cmd/query-tee', + }, + command: '-backend.endpoints=http://nginx:8080 -backend.preferred=nginx -proxy.passthrough-non-registered-routes=true -server.path-prefix=/prometheus', + environment: formatEnv(env), + hostname: 'query-tee', + ports: ['9999:80'], + }, + }, + // docker-compose YAML output version. version: '3.4', From 5e72af56adcf671b9ffb627f7b8a7ccc8d884404 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 19 Jun 2024 20:28:17 +1000 Subject: [PATCH 12/15] Add changelog entry. --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1b9f09f950e..1afdb19d4a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -174,6 +174,11 @@ * [ENHANCEMENT] Add two new metrics for measuring the relative duration between backends: #7782 #8013 #8330 * `cortex_querytee_backend_response_relative_duration_seconds` * `cortex_querytee_backend_response_relative_duration_proportional` +* [ENHANCEMENT] Emit trace spans from query-tee. #8419 +* [ENHANCEMENT] Log trace ID (if present) with all log messages written while processing a request. #8419 +* [ENHANCEMENT] Log user agent when processing a request. #8419 +* [ENHANCEMENT] Add `time` parameter to proxied instant queries if it is not included in the incoming request. This is optional but enabled by default, and can be disabled with `-proxy.add-missing-time-parameter-to-instant-queries=false`. #8419 +* [BUGFIX] Ensure any errors encountered while forwarding a request to a backend (eg. DNS resolution failures) are logged. #8419 ### Documentation From 33a7b52f88fe9e840afc2ce516369e217c0e5e29 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Thu, 20 Jun 2024 14:40:00 +1000 Subject: [PATCH 13/15] Address PR feedback: fix comment Co-authored-by: Joshua Hesketh --- tools/querytee/instant_query_transform.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/querytee/instant_query_transform.go b/tools/querytee/instant_query_transform.go index 3feacfe38eb..037970534b1 100644 --- a/tools/querytee/instant_query_transform.go +++ b/tools/querytee/instant_query_transform.go @@ -17,7 +17,7 @@ var timeNow = time.Now // Interception point to allow tests to set the current t // AddMissingTimeParam adds a 'time' parameter to any query request that does not have one. // // Instant queries without a 'time' parameter use the current time on the executing backend. -// However, this can vary between backends for the same request, which can comparison failures. +// However, this can vary between backends for the same request, which can cause comparison failures. // // So, to make comparisons more reliable, we add the 'time' parameter in the proxy to ensure all // backends use the same value. From a8ef9ff630b9654282ac2ea679a4b40c4de9173f Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Thu, 20 Jun 2024 14:54:24 +1000 Subject: [PATCH 14/15] Simplify logic and ensure `Form` and `PostForm` are kept consistent with changes --- tools/querytee/instant_query_transform.go | 31 ++++++------- .../querytee/instant_query_transform_test.go | 45 ++++++++++--------- 2 files changed, 39 insertions(+), 37 deletions(-) diff --git a/tools/querytee/instant_query_transform.go b/tools/querytee/instant_query_transform.go index 037970534b1..a0ddaf054e8 100644 --- a/tools/querytee/instant_query_transform.go +++ b/tools/querytee/instant_query_transform.go @@ -4,7 +4,6 @@ package querytee import ( "net/http" - "net/url" "strconv" "time" @@ -22,31 +21,28 @@ var timeNow = time.Now // Interception point to allow tests to set the current t // So, to make comparisons more reliable, we add the 'time' parameter in the proxy to ensure all // backends use the same value. func AddMissingTimeParam(r *http.Request, body []byte, logger *spanlogger.SpanLogger) (*http.Request, []byte, error) { - parsedBody, err := url.ParseQuery(string(body)) - if err != nil { + // ParseForm should have already been called, but we call it again to be sure. + // ParseForm is idempotent. + if err := r.ParseForm(); err != nil { return nil, nil, err } - if parsedBody.Has("time") { - return r, body, nil - } - - queryParams := r.URL.Query() - - if queryParams.Has("time") { + if r.Form.Has("time") { return r, body, nil } // No 'time' parameter in either the body or URL. Add it. - level.Debug(logger).Log("msg", "instant query had no explicit time parameter, adding it based on the current time") + t := timeNow().Format(time.RFC3339) + level.Debug(logger).Log("msg", "instant query had no explicit time parameter, adding it based on the current time", "time", t) + + // Form should contain URL parameters + parameters from the body, and isn't updated automatically when we set PostForm or URL below, + // so update it here to ensure everything remains consistent. + r.Form.Set("time", t) if len(body) > 0 { // Request has a body, add the 'time' parameter there. - parsedBody.Set("time", timeNow().Format(time.RFC3339)) - body = []byte(parsedBody.Encode()) - - // Outgoing requests should only rely on the request body, but we update Form here for consistency. - r.Form = parsedBody + r.PostForm.Set("time", t) + body = []byte(r.PostForm.Encode()) // Update the content length to reflect the new body. r.ContentLength = int64(len(body)) @@ -56,7 +52,8 @@ func AddMissingTimeParam(r *http.Request, body []byte, logger *spanlogger.SpanLo } // Otherwise, add it to the URL. - queryParams.Set("time", timeNow().Format(time.RFC3339)) + queryParams := r.URL.Query() + queryParams.Set("time", t) r.URL.RawQuery = queryParams.Encode() return r, body, nil } diff --git a/tools/querytee/instant_query_transform_test.go b/tools/querytee/instant_query_transform_test.go index c1a6a270c74..d6d988fec6d 100644 --- a/tools/querytee/instant_query_transform_test.go +++ b/tools/querytee/instant_query_transform_test.go @@ -25,10 +25,10 @@ func TestAddMissingTimeParam(t *testing.T) { testCases := map[string]struct { method string urlParams url.Values - form url.Values + body url.Values expectedURLParams url.Values - expectedForm url.Values + expectedBody url.Values }{ "GET with time in URL": { method: http.MethodGet, @@ -43,7 +43,7 @@ func TestAddMissingTimeParam(t *testing.T) { "timeout": []string{"60s"}, "time": []string{"2024-06-19T01:23:45Z"}, }, - expectedForm: url.Values{}, + expectedBody: url.Values{}, }, "GET without time in URL": { method: http.MethodGet, @@ -57,18 +57,18 @@ func TestAddMissingTimeParam(t *testing.T) { "timeout": []string{"60s"}, "time": []string{"2024-06-10T20:30:40Z"}, }, - expectedForm: url.Values{}, + expectedBody: url.Values{}, }, "POST with time in body": { method: http.MethodPost, - form: url.Values{ + body: url.Values{ "query": []string{"sum(abc)"}, "timeout": []string{"60s"}, "time": []string{"2024-06-19T01:23:45Z"}, }, expectedURLParams: url.Values{}, - expectedForm: url.Values{ + expectedBody: url.Values{ "query": []string{"sum(abc)"}, "timeout": []string{"60s"}, "time": []string{"2024-06-19T01:23:45Z"}, @@ -76,13 +76,13 @@ func TestAddMissingTimeParam(t *testing.T) { }, "POST without time in body": { method: http.MethodPost, - form: url.Values{ + body: url.Values{ "query": []string{"sum(abc)"}, "timeout": []string{"60s"}, }, expectedURLParams: url.Values{}, - expectedForm: url.Values{ + expectedBody: url.Values{ "query": []string{"sum(abc)"}, "timeout": []string{"60s"}, "time": []string{"2024-06-10T20:30:40Z"}, @@ -101,7 +101,7 @@ func TestAddMissingTimeParam(t *testing.T) { "timeout": []string{"60s"}, "time": []string{"2024-06-19T01:23:45Z"}, }, - expectedForm: url.Values{}, + expectedBody: url.Values{}, }, "POST without time in URL": { method: http.MethodPost, @@ -115,7 +115,7 @@ func TestAddMissingTimeParam(t *testing.T) { "timeout": []string{"60s"}, "time": []string{"2024-06-10T20:30:40Z"}, }, - expectedForm: url.Values{}, + expectedBody: url.Values{}, }, } @@ -128,21 +128,17 @@ func TestAddMissingTimeParam(t *testing.T) { var body []byte if testCase.method == http.MethodGet { - require.Nil(t, testCase.form, "invalid test case: GET request should not have body") + require.Nil(t, testCase.body, "invalid test case: GET request should not have body") req, err = http.NewRequest(testCase.method, "/blah?"+testCase.urlParams.Encode(), nil) require.NoError(t, err) body = nil } else { - encoded := testCase.form.Encode() + encoded := testCase.body.Encode() reader := strings.NewReader(encoded) req, err = http.NewRequest(testCase.method, "/blah?"+testCase.urlParams.Encode(), reader) require.NoError(t, err) req.Header.Set("Content-Type", "application/x-www-form-urlencoded") req.Header.Set("Content-Length", strconv.Itoa(reader.Len())) - - // ProxyEndpoint.executeBackendRequests parses the form body, so we do the same, for consistency. - require.NoError(t, req.ParseForm()) - body = []byte(encoded) } @@ -152,20 +148,29 @@ func TestAddMissingTimeParam(t *testing.T) { require.Equal(t, "/blah", transformed.URL.Path) require.Equal(t, testCase.expectedURLParams, transformed.URL.Query()) - if len(testCase.expectedForm) == 0 { + if len(testCase.expectedBody) == 0 { require.Empty(t, transformedBody) + require.Empty(t, transformed.PostForm) } else { parsedBody, err := url.ParseQuery(string(transformedBody)) require.NoError(t, err) - require.Equal(t, testCase.expectedForm, parsedBody) + require.Equal(t, testCase.expectedBody, parsedBody) - // Make sure we've updated both places, for consistency. - require.Equal(t, testCase.expectedForm, transformed.Form) + // Make sure we've updated both the body and PostForm, for consistency. + require.Equal(t, testCase.expectedBody, transformed.PostForm) // Make sure we've updated the Content-Length header to match the new request body length. require.Equal(t, int64(len(transformedBody)), transformed.ContentLength) require.Equal(t, strconv.Itoa(len(transformedBody)), transformed.Header.Get("Content-Length")) } + + expectedForm := testCase.expectedURLParams + + for k, v := range testCase.expectedBody { + expectedForm[k] = v + } + + require.Equal(t, expectedForm, transformed.Form) }) } } From 527c4161f1266a1f4083f847191db4f12ab7b727 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Fri, 21 Jun 2024 12:25:55 +1000 Subject: [PATCH 15/15] Address PR feedback: add tests for case where some parameters are in body and some are in URL --- .../querytee/instant_query_transform_test.go | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/tools/querytee/instant_query_transform_test.go b/tools/querytee/instant_query_transform_test.go index d6d988fec6d..fe6550ba8b4 100644 --- a/tools/querytee/instant_query_transform_test.go +++ b/tools/querytee/instant_query_transform_test.go @@ -117,6 +117,59 @@ func TestAddMissingTimeParam(t *testing.T) { }, expectedBody: url.Values{}, }, + "POST with parameters in both URL and body, and no time in either place": { + method: http.MethodPost, + urlParams: url.Values{ + "query": []string{"sum(abc)"}, + }, + body: url.Values{ + "timeout": []string{"60s"}, + }, + + expectedURLParams: url.Values{ + "query": []string{"sum(abc)"}, + }, + expectedBody: url.Values{ + "timeout": []string{"60s"}, + "time": []string{"2024-06-10T20:30:40Z"}, + }, + }, + "POST with parameters in both URL and body, and time in URL": { + method: http.MethodPost, + urlParams: url.Values{ + "query": []string{"sum(abc)"}, + "time": []string{"2024-06-19T01:23:45Z"}, + }, + body: url.Values{ + "timeout": []string{"60s"}, + }, + + expectedURLParams: url.Values{ + "query": []string{"sum(abc)"}, + "time": []string{"2024-06-19T01:23:45Z"}, + }, + expectedBody: url.Values{ + "timeout": []string{"60s"}, + }, + }, + "POST with parameters in both URL and body, and time in body": { + method: http.MethodPost, + urlParams: url.Values{ + "query": []string{"sum(abc)"}, + }, + body: url.Values{ + "time": []string{"2024-06-19T01:23:45Z"}, + "timeout": []string{"60s"}, + }, + + expectedURLParams: url.Values{ + "query": []string{"sum(abc)"}, + }, + expectedBody: url.Values{ + "time": []string{"2024-06-19T01:23:45Z"}, + "timeout": []string{"60s"}, + }, + }, } logger, _ := spanlogger.New(context.Background(), "test")