Skip to content

Commit

Permalink
querytee: Add support to log comparatively slow queries
Browse files Browse the repository at this point in the history
This adds `proxy.log-slow-query-response-threshold` which when enabled
will log any queries that take longer than the configured threshold on
one backend compared to the fastest backend.
  • Loading branch information
jhesketh committed Feb 9, 2024
1 parent 77d4882 commit 01565f8
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 15 deletions.
4 changes: 3 additions & 1 deletion tools/querytee/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type ProxyConfig struct {
PreferredBackend string
BackendReadTimeout time.Duration
CompareResponses bool
LogSlowQueryResponseThreshold time.Duration
ValueComparisonTolerance float64
UseRelativeError bool
PassThroughNonRegisteredRoutes bool
Expand All @@ -51,6 +52,7 @@ func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.PreferredBackend, "backend.preferred", "", "The hostname of the preferred backend when selecting the response to send back to the client. If no preferred backend is configured then the query-tee will send back to the client the first successful response received without waiting for other backends.")
f.DurationVar(&cfg.BackendReadTimeout, "backend.read-timeout", 150*time.Second, "The timeout when reading the response from a backend.")
f.BoolVar(&cfg.CompareResponses, "proxy.compare-responses", false, "Compare responses between preferred and secondary endpoints for supported routes.")
f.DurationVar(&cfg.LogSlowQueryResponseThreshold, "proxy.log-slow-query-response-threshold", 10*time.Second, "The relative elapsed time between backends for when to log queries. 0 to disable.")
f.Float64Var(&cfg.ValueComparisonTolerance, "proxy.value-comparison-tolerance", 0.000001, "The tolerance to apply when comparing floating point values in the responses. 0 to disable tolerance and require exact match (not recommended).")
f.BoolVar(&cfg.UseRelativeError, "proxy.compare-use-relative-error", false, "Use relative error tolerance when comparing floating point values.")
f.DurationVar(&cfg.SkipRecentSamples, "proxy.compare-skip-recent-samples", 2*time.Minute, "The window from now to skip comparing samples. 0 to disable.")
Expand Down Expand Up @@ -204,7 +206,7 @@ func (p *Proxy) Start() error {
if p.cfg.CompareResponses {
comparator = route.ResponseComparator
}
router.Path(route.Path).Methods(route.Methods...).Handler(NewProxyEndpoint(p.backends, route.RouteName, p.metrics, p.logger, comparator))
router.Path(route.Path).Methods(route.Methods...).Handler(NewProxyEndpoint(p.backends, route.RouteName, p.metrics, p.logger, comparator, p.cfg.LogSlowQueryResponseThreshold))
}

if p.cfg.PassThroughNonRegisteredRoutes {
Expand Down
66 changes: 55 additions & 11 deletions tools/querytee/proxy_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ type ResponsesComparator interface {
}

type ProxyEndpoint struct {
backends []*ProxyBackend
metrics *ProxyMetrics
logger log.Logger
comparator ResponsesComparator
backends []*ProxyBackend
metrics *ProxyMetrics
logger log.Logger
comparator ResponsesComparator
slowResponseThreshold time.Duration

// Whether for this endpoint there's a preferred backend configured.
hasPreferredBackend bool
Expand All @@ -35,7 +36,7 @@ type ProxyEndpoint struct {
routeName string
}

func NewProxyEndpoint(backends []*ProxyBackend, routeName string, metrics *ProxyMetrics, logger log.Logger, comparator ResponsesComparator) *ProxyEndpoint {
func NewProxyEndpoint(backends []*ProxyBackend, routeName string, metrics *ProxyMetrics, logger log.Logger, comparator ResponsesComparator, slowResponseThreshold time.Duration) *ProxyEndpoint {
hasPreferredBackend := false
for _, backend := range backends {
if backend.preferred {
Expand All @@ -45,12 +46,13 @@ func NewProxyEndpoint(backends []*ProxyBackend, routeName string, metrics *Proxy
}

return &ProxyEndpoint{
backends: backends,
routeName: routeName,
metrics: metrics,
logger: logger,
comparator: comparator,
hasPreferredBackend: hasPreferredBackend,
backends: backends,
routeName: routeName,
metrics: metrics,
logger: logger,
comparator: comparator,
slowResponseThreshold: slowResponseThreshold,
hasPreferredBackend: hasPreferredBackend,
}
}

Expand Down Expand Up @@ -82,6 +84,7 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *ba
body []byte
responses = make([]*backendResponse, 0, len(p.backends))
responsesMtx = sync.Mutex{}
timingMtx = sync.Mutex{}
query = req.URL.RawQuery
)

Expand All @@ -104,6 +107,14 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *ba

level.Debug(p.logger).Log("msg", "Received request", "path", req.URL.Path, "query", query)

// Keep track of the fastest and slowest backends
var (
fastestDuration time.Duration
fastestBackend *ProxyBackend
slowestDuration time.Duration
slowestBackend *ProxyBackend
)

wg.Add(len(p.backends))
for _, b := range p.backends {
b := b
Expand All @@ -122,6 +133,19 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *ba
elapsed := time.Since(start)
contentType := ""

if p.slowResponseThreshold > 0 {
timingMtx.Lock()
if elapsed > slowestDuration {
slowestDuration = elapsed
slowestBackend = b
}
if fastestDuration == 0 || elapsed < fastestDuration {
fastestDuration = elapsed
fastestBackend = b
}
timingMtx.Unlock()
}

if resp != nil {
contentType = resp.Header.Get("Content-Type")
}
Expand All @@ -132,6 +156,7 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *ba
contentType: contentType,
body: body,
err: err,
elapsedTime: elapsed,
}

// Log with a level based on the backend response.
Expand Down Expand Up @@ -174,6 +199,8 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *ba
"query", query,
"user", req.Header.Get("X-Scope-OrgID"),
"err", err,
"expected-response-duration", expectedResponse.elapsedTime,
"actual-response-duration", actualResponse.elapsedTime,
)
} else if result == ComparisonSkipped {
level.Warn(p.logger).Log(
Expand All @@ -182,6 +209,22 @@ func (p *ProxyEndpoint) executeBackendRequests(req *http.Request, resCh chan *ba
"query", query,
"user", req.Header.Get("X-Scope-OrgID"),
"err", err,
"expected-response-duration", expectedResponse.elapsedTime,
"actual-response-duration", actualResponse.elapsedTime,
)
}

// Log queries that are slower in some backends than others
if p.slowResponseThreshold > 0 && slowestDuration-fastestDuration >= p.slowResponseThreshold {
level.Warn(p.logger).Log(
"msg", "response time between backends exceeded threshold",
"route-name", p.routeName,
"query", query,
"user", req.Header.Get("X-Scope-OrgID"),
"slowest-duration", slowestDuration,
"slowest-backend", slowestBackend.name,
"fastest-duration", fastestDuration,
"fastest-backend", fastestBackend.name,
)
}

Expand Down Expand Up @@ -254,6 +297,7 @@ type backendResponse struct {
contentType string
body []byte
err error
elapsedTime time.Duration
}

func (r *backendResponse) succeeded() bool {
Expand Down
110 changes: 107 additions & 3 deletions tools/querytee/proxy_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func Test_ProxyEndpoint_waitBackendResponseForDownstream(t *testing.T) {
testData := testData

t.Run(testName, func(t *testing.T) {
endpoint := NewProxyEndpoint(testData.backends, "test", NewProxyMetrics(nil), log.NewNopLogger(), nil)
endpoint := NewProxyEndpoint(testData.backends, "test", NewProxyMetrics(nil), log.NewNopLogger(), nil, 0)

// Send the responses from a dedicated goroutine.
resCh := make(chan *backendResponse)
Expand Down Expand Up @@ -148,7 +148,7 @@ func Test_ProxyEndpoint_Requests(t *testing.T) {
NewProxyBackend("backend-1", backendURL1, time.Second, true, false),
NewProxyBackend("backend-2", backendURL2, time.Second, false, false),
}
endpoint := NewProxyEndpoint(backends, "test", NewProxyMetrics(nil), log.NewNopLogger(), nil)
endpoint := NewProxyEndpoint(backends, "test", NewProxyMetrics(nil), log.NewNopLogger(), nil, 0)

for _, tc := range []struct {
name string
Expand Down Expand Up @@ -322,7 +322,7 @@ func Test_ProxyEndpoint_Comparison(t *testing.T) {
comparisonError: scenario.comparatorError,
}

endpoint := NewProxyEndpoint(backends, "test", NewProxyMetrics(reg), logger, comparator)
endpoint := NewProxyEndpoint(backends, "test", NewProxyMetrics(reg), logger, comparator, 0)

resp := httptest.NewRecorder()
req, err := http.NewRequest("GET", "http://test/api/v1/test", nil)
Expand Down Expand Up @@ -350,6 +350,97 @@ func Test_ProxyEndpoint_Comparison(t *testing.T) {
}
}

func Test_ProxyEndpoint_LogSlowQueries(t *testing.T) {
scenarios := map[string]struct {
slowResponseThreshold time.Duration
preferredResponseLatency time.Duration
secondaryResponseLatency time.Duration
}{
"responses are below threshold": {
slowResponseThreshold: 100 * time.Millisecond,
preferredResponseLatency: 0 * time.Millisecond,
secondaryResponseLatency: 0 * time.Millisecond,
},
"one response above threshold": {
slowResponseThreshold: 100 * time.Millisecond,
preferredResponseLatency: 0 * time.Millisecond,
secondaryResponseLatency: 101 * time.Millisecond,
},
"responses are both above threshold, but lower than threshold between themselves": {
slowResponseThreshold: 100 * time.Millisecond,
preferredResponseLatency: 101 * time.Millisecond,
secondaryResponseLatency: 150 * time.Millisecond,
},
"responses are both above threshold, and above threshold between themselves": {
slowResponseThreshold: 100 * time.Millisecond,
preferredResponseLatency: 101 * time.Millisecond,
secondaryResponseLatency: 202 * time.Millisecond,
},
}

for name, scenario := range scenarios {
t.Run(name, func(t *testing.T) {
preferredBackend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(200)
time.Sleep(scenario.preferredResponseLatency)
_, err := w.Write([]byte("preferred response"))
require.NoError(t, err)
}))

defer preferredBackend.Close()
preferredBackendURL, err := url.Parse(preferredBackend.URL)
require.NoError(t, err)

secondaryBackend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(200)
time.Sleep(scenario.secondaryResponseLatency)
_, err := w.Write([]byte("preferred response"))
require.NoError(t, err)
}))

defer secondaryBackend.Close()
secondaryBackendURL, err := url.Parse(secondaryBackend.URL)
require.NoError(t, err)

backends := []*ProxyBackend{
NewProxyBackend("preferred-backend", preferredBackendURL, time.Second, true, false),
NewProxyBackend("secondary-backend", secondaryBackendURL, time.Second, false, false),
}

logger := newMockLogger()
reg := prometheus.NewPedanticRegistry()
comparator := &mockComparator{
comparisonResult: ComparisonSuccess,
}

endpoint := NewProxyEndpoint(backends, "test", NewProxyMetrics(reg), logger, comparator, scenario.slowResponseThreshold)

resp := httptest.NewRecorder()
req, err := http.NewRequest("GET", "http://test/api/v1/test", nil)
require.NoError(t, err)
endpoint.ServeHTTP(resp, req)

// The HTTP request above will return as soon as the primary response is received, but this doesn't guarantee that the response comparison has been completed.
// Wait for the response comparison to complete before checking the logged messages.
waitForResponseComparisonMetric(t, reg, ComparisonSuccess)

for _, m := range logger.messages {
fmt.Println(m)
}
if (scenario.preferredResponseLatency >= scenario.secondaryResponseLatency &&
scenario.preferredResponseLatency-scenario.secondaryResponseLatency < scenario.slowResponseThreshold) ||
(scenario.secondaryResponseLatency >= scenario.preferredResponseLatency &&
scenario.secondaryResponseLatency-scenario.preferredResponseLatency < scenario.slowResponseThreshold) {
requireNoLogMessages(t, logger.messages, "response time between backends exceeded threshold")
} else {
requireLogMessage(t, logger.messages, "response time between backends exceeded threshold")
}
})
}
}

func waitForResponseComparisonMetric(t *testing.T, g prometheus.Gatherer, expectedResult ComparisonResult) {
started := time.Now()
timeoutAt := started.Add(2 * time.Second)
Expand All @@ -374,6 +465,19 @@ func waitForResponseComparisonMetric(t *testing.T, g prometheus.Gatherer, expect
}
}

func requireLogMessage(t *testing.T, messages []map[string]interface{}, expectedMessage string) {
sawMessage := false

for _, m := range messages {
if m["msg"] == expectedMessage {
sawMessage = true
break
}
}

require.True(t, sawMessage, "expected to find a '%s' message logged, but only these messages were logged: %v", expectedMessage, messages)
}

func requireNoLogMessages(t *testing.T, messages []map[string]interface{}, forbiddenMessages ...string) {
for _, m := range messages {
msg := m["msg"]
Expand Down
2 changes: 2 additions & 0 deletions tools/querytee/proxy_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func NewProxyMetrics(registerer prometheus.Registerer) *ProxyMetrics {
Name: "responses_compared_total",
Help: "Total number of responses compared per route name by result.",
}, []string{"route", "result"}),
/// add in metrics for latency
// can have a label per backend
}

return m
Expand Down

0 comments on commit 01565f8

Please sign in to comment.