Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

querytee: Add support to log comparatively slow queries #7346

Merged
merged 13 commits into from
Mar 17, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@

* [BUGFIX] Fix issue where `Host` HTTP header was not being correctly changed for the proxy targets. #7386
* [ENHANCEMENT] Allow using the value of X-Scope-OrgID for basic auth username in the forwarded request if URL username is set as `__REQUEST_HEADER_X_SCOPE_ORGID__`. #7452
* [ENHANCEMENT] Log queries that take longer than `proxy.log-slow-query-response-threshold` when compared to other backends. #7346

### Documentation

Expand Down
8 changes: 8 additions & 0 deletions docs/sources/mimir/manage/tools/query-tee.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ This prevents false positives due to racing with ingestion, and, if the query se
If either Mimir cluster is running with a non-default value of `-ruler.evaluation-delay-duration`, you should set `-proxy.compare-skip-recent-samples` to one minute more than the value of `-ruler.evaluation-delay-duration`.
{{< /admonition >}}

### Slow query log

You can configure query-tee to log requests that take longer than the fastest backend by setting the flag `-proxy.log-slow-query-response-threshold`.

The default value is `10s` which logs requests that are ten seconds slower than the fastest backend.

To disable slow query logging, set `-proxy.log-slow-query-response-threshold` to `0`.

### Exported metrics

The query-tee exposes the following Prometheus metrics at the `/metrics` endpoint listening on the port configured via the flag `-server.metrics-port`:
Expand Down
4 changes: 3 additions & 1 deletion tools/querytee/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type ProxyConfig struct {
PreferredBackend string
BackendReadTimeout time.Duration
CompareResponses bool
LogSlowQueryResponseThreshold time.Duration
ValueComparisonTolerance float64
UseRelativeError bool
PassThroughNonRegisteredRoutes bool
Expand All @@ -56,6 +57,7 @@ func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.PreferredBackend, "backend.preferred", "", "The hostname of the preferred backend when selecting the response to send back to the client. If no preferred backend is configured then the query-tee will send back to the client the first successful response received without waiting for other backends.")
f.DurationVar(&cfg.BackendReadTimeout, "backend.read-timeout", 150*time.Second, "The timeout when reading the response from a backend.")
f.BoolVar(&cfg.CompareResponses, "proxy.compare-responses", false, "Compare responses between preferred and secondary endpoints for supported routes.")
f.DurationVar(&cfg.LogSlowQueryResponseThreshold, "proxy.log-slow-query-response-threshold", 10*time.Second, "The minimum difference in response time between slowest and fastest back-end over which to log the query. 0 to disable.")
f.Float64Var(&cfg.ValueComparisonTolerance, "proxy.value-comparison-tolerance", 0.000001, "The tolerance to apply when comparing floating point values in the responses. 0 to disable tolerance and require exact match (not recommended).")
f.BoolVar(&cfg.UseRelativeError, "proxy.compare-use-relative-error", false, "Use relative error tolerance when comparing floating point values.")
f.DurationVar(&cfg.SkipRecentSamples, "proxy.compare-skip-recent-samples", 2*time.Minute, "The window from now to skip comparing samples. 0 to disable.")
Expand Down Expand Up @@ -209,7 +211,7 @@ func (p *Proxy) Start() error {
if p.cfg.CompareResponses {
comparator = route.ResponseComparator
}
router.Path(route.Path).Methods(route.Methods...).Handler(NewProxyEndpoint(p.backends, route.RouteName, p.metrics, p.logger, comparator))
router.Path(route.Path).Methods(route.Methods...).Handler(NewProxyEndpoint(p.backends, route.RouteName, p.metrics, p.logger, comparator, p.cfg.LogSlowQueryResponseThreshold))
}

if p.cfg.PassThroughNonRegisteredRoutes {
Expand Down
70 changes: 57 additions & 13 deletions tools/querytee/proxy_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ type ResponsesComparator interface {
}

type ProxyEndpoint struct {
backends []*ProxyBackend
metrics *ProxyMetrics
logger log.Logger
comparator ResponsesComparator
backends []*ProxyBackend
metrics *ProxyMetrics
logger log.Logger
comparator ResponsesComparator
slowResponseThreshold time.Duration

// Whether for this endpoint there's a preferred backend configured.
hasPreferredBackend bool
Expand All @@ -35,7 +36,7 @@ type ProxyEndpoint struct {
routeName string
}

func NewProxyEndpoint(backends []*ProxyBackend, routeName string, metrics *ProxyMetrics, logger log.Logger, comparator ResponsesComparator) *ProxyEndpoint {
func NewProxyEndpoint(backends []*ProxyBackend, routeName string, metrics *ProxyMetrics, logger log.Logger, comparator ResponsesComparator, slowResponseThreshold time.Duration) *ProxyEndpoint {
hasPreferredBackend := false
for _, backend := range backends {
if backend.preferred {
Expand All @@ -45,12 +46,13 @@ func NewProxyEndpoint(backends []*ProxyBackend, routeName string, metrics *Proxy
}

return &ProxyEndpoint{
backends: backends,
routeName: routeName,
metrics: metrics,
logger: logger,
comparator: comparator,
hasPreferredBackend: hasPreferredBackend,
backends: backends,
routeName: routeName,
metrics: metrics,
logger: logger,
comparator: comparator,
slowResponseThreshold: slowResponseThreshold,
hasPreferredBackend: hasPreferredBackend,
}
}

Expand Down Expand Up @@ -82,6 +84,7 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *ba
body []byte
responses = make([]*backendResponse, 0, len(p.backends))
responsesMtx = sync.Mutex{}
timingMtx = sync.Mutex{}
query = req.URL.RawQuery
)

Expand All @@ -104,6 +107,14 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *ba

level.Debug(p.logger).Log("msg", "Received request", "path", req.URL.Path, "query", query)

// Keep track of the fastest and slowest backends
var (
fastestDuration time.Duration
fastestBackend *ProxyBackend
slowestDuration time.Duration
slowestBackend *ProxyBackend
)

wg.Add(len(p.backends))
for _, b := range p.backends {
b := b
Expand All @@ -122,6 +133,19 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *ba
elapsed := time.Since(start)
contentType := ""

if p.slowResponseThreshold > 0 {
timingMtx.Lock()
if elapsed > slowestDuration {
slowestDuration = elapsed
slowestBackend = b
}
if fastestDuration == 0 || elapsed < fastestDuration {
fastestDuration = elapsed
fastestBackend = b
}
timingMtx.Unlock()
}

if resp != nil {
contentType = resp.Header.Get("Content-Type")
}
Expand All @@ -132,6 +156,7 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *ba
contentType: contentType,
body: body,
err: err,
elapsedTime: elapsed,
}

// Log with a level based on the backend response.
Expand Down Expand Up @@ -170,18 +195,36 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *ba
if result == ComparisonFailed {
level.Error(p.logger).Log(
"msg", "response comparison failed",
"route-name", p.routeName,
"route_name", p.routeName,
"query", query,
"user", req.Header.Get("X-Scope-OrgID"),
"err", err,
"expected_response_duration", expectedResponse.elapsedTime,
"actual_response_duration", actualResponse.elapsedTime,
)
} else if result == ComparisonSkipped {
level.Warn(p.logger).Log(
"msg", "response comparison skipped",
"route-name", p.routeName,
"route_name", p.routeName,
"query", query,
"user", req.Header.Get("X-Scope-OrgID"),
"err", err,
"expected_response_duration", expectedResponse.elapsedTime,
"actual_response_duration", actualResponse.elapsedTime,
)
}

// Log queries that are slower in some backends than others
if p.slowResponseThreshold > 0 && slowestDuration-fastestDuration >= p.slowResponseThreshold {
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
level.Warn(p.logger).Log(
"msg", "response time difference between backends exceeded threshold",
"route_name", p.routeName,
"query", query,
"user", req.Header.Get("X-Scope-OrgID"),
"slowest_duration", slowestDuration,
"slowest_backend", slowestBackend.name,
"fastest_duration", fastestDuration,
"fastest_backend", fastestBackend.name,
)
}

Expand Down Expand Up @@ -254,6 +297,7 @@ type backendResponse struct {
contentType string
body []byte
err error
elapsedTime time.Duration
}

func (r *backendResponse) succeeded() bool {
Expand Down
121 changes: 118 additions & 3 deletions tools/querytee/proxy_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func Test_ProxyEndpoint_waitBackendResponseForDownstream(t *testing.T) {
testData := testData

t.Run(testName, func(t *testing.T) {
endpoint := NewProxyEndpoint(testData.backends, "test", NewProxyMetrics(nil), log.NewNopLogger(), nil)
endpoint := NewProxyEndpoint(testData.backends, "test", NewProxyMetrics(nil), log.NewNopLogger(), nil, 0)

// Send the responses from a dedicated goroutine.
resCh := make(chan *backendResponse)
Expand Down Expand Up @@ -148,7 +148,7 @@ func Test_ProxyEndpoint_Requests(t *testing.T) {
NewProxyBackend("backend-1", backendURL1, time.Second, true, false),
NewProxyBackend("backend-2", backendURL2, time.Second, false, false),
}
endpoint := NewProxyEndpoint(backends, "test", NewProxyMetrics(nil), log.NewNopLogger(), nil)
endpoint := NewProxyEndpoint(backends, "test", NewProxyMetrics(nil), log.NewNopLogger(), nil, 0)

for _, tc := range []struct {
name string
Expand Down Expand Up @@ -322,7 +322,7 @@ func Test_ProxyEndpoint_Comparison(t *testing.T) {
comparisonError: scenario.comparatorError,
}

endpoint := NewProxyEndpoint(backends, "test", NewProxyMetrics(reg), logger, comparator)
endpoint := NewProxyEndpoint(backends, "test", NewProxyMetrics(reg), logger, comparator, 0)

resp := httptest.NewRecorder()
req, err := http.NewRequest("GET", "http://test/api/v1/test", nil)
Expand Down Expand Up @@ -350,6 +350,108 @@ func Test_ProxyEndpoint_Comparison(t *testing.T) {
}
}

func Test_ProxyEndpoint_LogSlowQueries(t *testing.T) {
scenarios := map[string]struct {
slowResponseThreshold time.Duration
preferredResponseLatency time.Duration
secondaryResponseLatency time.Duration
expectLatencyExceedsThreshold bool
}{
"responses are below threshold": {
slowResponseThreshold: 100 * time.Millisecond,
preferredResponseLatency: 0 * time.Millisecond,
secondaryResponseLatency: 0 * time.Millisecond,
expectLatencyExceedsThreshold: false,
},
"one response above threshold": {
slowResponseThreshold: 100 * time.Millisecond,
preferredResponseLatency: 0 * time.Millisecond,
secondaryResponseLatency: 101 * time.Millisecond,
expectLatencyExceedsThreshold: true,
},
"responses are both above threshold, but lower than threshold between themselves": {
slowResponseThreshold: 100 * time.Millisecond,
preferredResponseLatency: 101 * time.Millisecond,
secondaryResponseLatency: 150 * time.Millisecond,
expectLatencyExceedsThreshold: false,
},
"responses are both above threshold, and above threshold between themselves": {
slowResponseThreshold: 100 * time.Millisecond,
preferredResponseLatency: 101 * time.Millisecond,
secondaryResponseLatency: 202 * time.Millisecond,
expectLatencyExceedsThreshold: true,
},
"secondary latency is faster than primary, and difference is below threshold": {
slowResponseThreshold: 100 * time.Millisecond,
preferredResponseLatency: 50 * time.Millisecond,
secondaryResponseLatency: 0 * time.Millisecond,
expectLatencyExceedsThreshold: false,
},
"secondary latency is faster than primary, and difference is above threshold": {
slowResponseThreshold: 100 * time.Millisecond,
preferredResponseLatency: 101 * time.Millisecond,
secondaryResponseLatency: 0 * time.Millisecond,
expectLatencyExceedsThreshold: true,
},
}

for name, scenario := range scenarios {
t.Run(name, func(t *testing.T) {
preferredBackend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(200)
time.Sleep(scenario.preferredResponseLatency)
_, err := w.Write([]byte("preferred response"))
require.NoError(t, err)
}))

defer preferredBackend.Close()
preferredBackendURL, err := url.Parse(preferredBackend.URL)
require.NoError(t, err)

secondaryBackend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(200)
time.Sleep(scenario.secondaryResponseLatency)
_, err := w.Write([]byte("preferred response"))
require.NoError(t, err)
}))

defer secondaryBackend.Close()
secondaryBackendURL, err := url.Parse(secondaryBackend.URL)
require.NoError(t, err)

backends := []*ProxyBackend{
NewProxyBackend("preferred-backend", preferredBackendURL, time.Second, true, false),
NewProxyBackend("secondary-backend", secondaryBackendURL, time.Second, false, false),
}

logger := newMockLogger()
reg := prometheus.NewPedanticRegistry()
comparator := &mockComparator{
comparisonResult: ComparisonSuccess,
}

endpoint := NewProxyEndpoint(backends, "test", NewProxyMetrics(reg), logger, comparator, scenario.slowResponseThreshold)

resp := httptest.NewRecorder()
req, err := http.NewRequest("GET", "http://test/api/v1/test", nil)
require.NoError(t, err)
endpoint.ServeHTTP(resp, req)

// The HTTP request above will return as soon as the primary response is received, but this doesn't guarantee that the response comparison has been completed.
// Wait for the response comparison to complete before checking the logged messages.
waitForResponseComparisonMetric(t, reg, ComparisonSuccess)

if scenario.expectLatencyExceedsThreshold {
requireLogMessage(t, logger.messages, "response time difference between backends exceeded threshold")
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
} else {
requireNoLogMessages(t, logger.messages, "response time difference between backends exceeded threshold")
}
})
}
}

func waitForResponseComparisonMetric(t *testing.T, g prometheus.Gatherer, expectedResult ComparisonResult) {
started := time.Now()
timeoutAt := started.Add(2 * time.Second)
Expand All @@ -374,6 +476,19 @@ func waitForResponseComparisonMetric(t *testing.T, g prometheus.Gatherer, expect
}
}

func requireLogMessage(t *testing.T, messages []map[string]interface{}, expectedMessage string) {
sawMessage := false

for _, m := range messages {
if m["msg"] == expectedMessage {
sawMessage = true
break
}
}

require.True(t, sawMessage, "expected to find a '%s' message logged, but only these messages were logged: %v", expectedMessage, messages)
}

func requireNoLogMessages(t *testing.T, messages []map[string]interface{}, forbiddenMessages ...string) {
for _, m := range messages {
msg := m["msg"]
Expand Down
Loading