Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query-tee improvements #8419

Merged
merged 15 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@
* [ENHANCEMENT] Add two new metrics for measuring the relative duration between backends: #7782 #8013 #8330
* `cortex_querytee_backend_response_relative_duration_seconds`
* `cortex_querytee_backend_response_relative_duration_proportional`
* [ENHANCEMENT] Emit trace spans from query-tee. #8419
* [ENHANCEMENT] Log trace ID (if present) with all log messages written while processing a request. #8419
* [ENHANCEMENT] Log user agent when processing a request. #8419
* [ENHANCEMENT] Add `time` parameter to proxied instant queries if it is not included in the incoming request. This is optional but enabled by default, and can be disabled with `-proxy.add-missing-time-parameter-to-instant-queries=false`. #8419
* [BUGFIX] Ensure any errors encountered while forwarding a request to a backend (eg. DNS resolution failures) are logged. #8419

### Documentation

Expand Down
31 changes: 30 additions & 1 deletion cmd/query-tee/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ package main
import (
"flag"
"fmt"
"io"
"os"

"github.com/go-kit/log/level"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/log"
"github.com/grafana/dskit/tracing"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
jaegercfg "github.com/uber/jaeger-client-go/config"

"github.com/grafana/mimir/pkg/util/instrumentation"
util_log "github.com/grafana/mimir/pkg/util/log"
Expand Down Expand Up @@ -44,6 +47,10 @@ func main() {

util_log.InitLogger(log.LogfmtFormat, cfg.LogLevel, false, util_log.RateLimitedLoggerCfg{})

if closer := initTracing(); closer != nil {
defer closer.Close()
}

// Run the instrumentation server.
registry := prometheus.NewRegistry()
registry.MustRegister(collectors.NewGoCollector())
Expand Down Expand Up @@ -72,6 +79,21 @@ func main() {
proxy.Await()
}

func initTracing() io.Closer {
name := os.Getenv("JAEGER_SERVICE_NAME")
if name == "" {
name = "query-tee"
}

trace, err := tracing.NewFromEnv(name, jaegercfg.MaxTagValueLength(16e3))
if err != nil {
level.Error(util_log.Logger).Log("msg", "Failed to setup tracing", "err", err.Error())
return nil
}

return trace
}

func mimirReadRoutes(cfg Config) []querytee.Route {
prefix := cfg.PathPrefix

Expand All @@ -85,8 +107,15 @@ func mimirReadRoutes(cfg Config) []querytee.Route {
UseRelativeError: cfg.ProxyConfig.UseRelativeError,
SkipRecentSamples: cfg.ProxyConfig.SkipRecentSamples,
})

var instantQueryTransformers []querytee.RequestTransformer

if cfg.ProxyConfig.AddMissingTimeParamToInstantQueries {
instantQueryTransformers = append(instantQueryTransformers, querytee.AddMissingTimeParam)
}

return []querytee.Route{
{Path: prefix + "/api/v1/query", RouteName: "api_v1_query", Methods: []string{"GET", "POST"}, ResponseComparator: samplesComparator},
{Path: prefix + "/api/v1/query", RouteName: "api_v1_query", Methods: []string{"GET", "POST"}, ResponseComparator: samplesComparator, RequestTransformers: instantQueryTransformers},
{Path: prefix + "/api/v1/query_range", RouteName: "api_v1_query_range", Methods: []string{"GET", "POST"}, ResponseComparator: samplesComparator},
{Path: prefix + "/api/v1/query_exemplars", RouteName: "api_v1_query_exemplars", Methods: []string{"GET", "POST"}, ResponseComparator: nil},
{Path: prefix + "/api/v1/labels", RouteName: "api_v1_labels", Methods: []string{"GET", "POST"}, ResponseComparator: nil},
Expand Down
8 changes: 7 additions & 1 deletion development/mimir-microservices-mode/compose-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ cd "$SCRIPT_DIR" && make
# -gcflags "all=-N -l" disables optimizations that allow for better run with combination with Delve debugger.
# GOARCH is not changed.
CGO_ENABLED=0 GOOS=linux go build -mod=vendor -tags=netgo,stringlabels -gcflags "all=-N -l" -o "${SCRIPT_DIR}"/mimir "${SCRIPT_DIR}"/../../cmd/mimir

docker_compose -f "${SCRIPT_DIR}"/docker-compose.yml build --build-arg BUILD_IMAGE="${BUILD_IMAGE}" distributor-1

if [ "$(yq '.services.query-tee' "${SCRIPT_DIR}"/docker-compose.yml)" != "null" ]; then
# If query-tee is enabled, build its binary and image as well.
CGO_ENABLED=0 GOOS=linux go build -mod=vendor -tags=netgo,stringlabels -gcflags "all=-N -l" -o "${SCRIPT_DIR}"/../../cmd/query-tee "${SCRIPT_DIR}"/../../cmd/query-tee
docker_compose -f "${SCRIPT_DIR}"/docker-compose.yml build --build-arg BUILD_IMAGE="${BUILD_IMAGE}" query-tee
fi

docker_compose -f "${SCRIPT_DIR}"/docker-compose.yml up "$@"
49 changes: 36 additions & 13 deletions development/mimir-microservices-mode/docker-compose.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ std.manifestYamlDoc({
enable_grafana_agent: false,
enable_prometheus: true, // If Prometheus is disabled, recording rules will not be evaluated and so dashboards in Grafana that depend on these recorded series will display no data.
enable_otel_collector: false,

// If true, a query-tee instance with a single backend is started.
enable_query_tee: false,
},

// We explicitely list all important services here, so that it's easy to disable them by commenting out.
Expand All @@ -51,6 +54,7 @@ std.manifestYamlDoc({
(if $._config.ring == 'consul' || $._config.ring == 'multi' then self.consul else {}) +
(if $._config.cache_backend == 'redis' then self.redis else self.memcached + self.memcached_exporter) +
(if $._config.enable_load_generator then self.load_generator else {}) +
(if $._config.enable_query_tee then self.query_tee else {}) +
{},

distributor:: {
Expand Down Expand Up @@ -175,6 +179,21 @@ std.manifestYamlDoc({

local all_rings = ['-ingester.ring', '-distributor.ring', '-compactor.ring', '-store-gateway.sharding-ring', '-ruler.ring', '-alertmanager.sharding-ring'],

local jaegerEnv(appName) = {
JAEGER_AGENT_HOST: 'jaeger',
JAEGER_AGENT_PORT: 6831,
JAEGER_SAMPLER_TYPE: 'const',
JAEGER_SAMPLER_PARAM: 1,
JAEGER_TAGS: 'app=%s' % appName,
JAEGER_REPORTER_MAX_QUEUE_SIZE: 1000,
},

local formatEnv(env) = [
'%s=%s' % [key, env[key]]
for key in std.objectFields(env)
if env[key] != null
],

// This function builds docker-compose declaration for Mimir service.
// Default grpcPort is (httpPort + 1000), and default debug port is (httpPort + 10000)
local mimirService(serviceOptions) = {
Expand All @@ -189,14 +208,7 @@ std.manifestYamlDoc({
// Extra arguments passed to Mimir command line.
extraArguments: '',
dependsOn: ['minio'] + (if $._config.ring == 'consul' || $._config.ring == 'multi' then ['consul'] else if s.target != 'distributor' then ['distributor-1'] else []),
env: {
JAEGER_AGENT_HOST: 'jaeger',
JAEGER_AGENT_PORT: 6831,
JAEGER_SAMPLER_TYPE: 'const',
JAEGER_SAMPLER_PARAM: 1,
JAEGER_TAGS: 'app=%s' % s.jaegerApp,
JAEGER_REPORTER_MAX_QUEUE_SIZE: 1000,
},
env: jaegerEnv(s.jaegerApp),
extraVolumes: [],
memberlistNodeName: self.jaegerApp,
memberlistBindPort: self.httpPort + 2000,
Expand All @@ -223,11 +235,7 @@ std.manifestYamlDoc({
std.join(' ', if $._config.cache_backend == 'redis' then [x + '.backend=redis' for x in all_caches] + [x + '.redis.endpoint=redis:6379' for x in all_caches] else [x + '.backend=memcached' for x in all_caches] + [x + '.memcached.addresses=dns+memcached:11211' for x in all_caches]),
]),
],
environment: [
'%s=%s' % [key, options.env[key]]
for key in std.objectFields(options.env)
if options.env[key] != null
],
environment: formatEnv(options.env),
hostname: options.name,
// Only publish HTTP and debug port, but not gRPC one.
ports: ['%d:%d' % [options.httpPort, options.httpPort]] +
Expand Down Expand Up @@ -385,6 +393,21 @@ std.manifestYamlDoc({
},
},

query_tee:: {
'query-tee': {
local env = jaegerEnv('query-tee'),

image: 'query-tee',
build: {
context: '../../cmd/query-tee',
},
command: '-backend.endpoints=http://nginx:8080 -backend.preferred=nginx -proxy.passthrough-non-registered-routes=true -server.path-prefix=/prometheus',
environment: formatEnv(env),
hostname: 'query-tee',
ports: ['9999:80'],
},
},

// docker-compose YAML output version.
version: '3.4',

Expand Down
59 changes: 59 additions & 0 deletions tools/querytee/instant_query_transform.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// SPDX-License-Identifier: AGPL-3.0-only

package querytee

import (
"net/http"
"strconv"
"time"

"github.com/go-kit/log/level"
"github.com/grafana/dskit/spanlogger"
)

var timeNow = time.Now // Interception point to allow tests to set the current time for AddMissingTimeParam.

// AddMissingTimeParam adds a 'time' parameter to any query request that does not have one.
//
// Instant queries without a 'time' parameter use the current time on the executing backend.
// However, this can vary between backends for the same request, which can cause comparison failures.
//
// So, to make comparisons more reliable, we add the 'time' parameter in the proxy to ensure all
// backends use the same value.
func AddMissingTimeParam(r *http.Request, body []byte, logger *spanlogger.SpanLogger) (*http.Request, []byte, error) {
// ParseForm should have already been called, but we call it again to be sure.
// ParseForm is idempotent.
if err := r.ParseForm(); err != nil {
return nil, nil, err
}

if r.Form.Has("time") {
return r, body, nil
}

// No 'time' parameter in either the body or URL. Add it.
t := timeNow().Format(time.RFC3339)
level.Debug(logger).Log("msg", "instant query had no explicit time parameter, adding it based on the current time", "time", t)

// Form should contain URL parameters + parameters from the body, and isn't updated automatically when we set PostForm or URL below,
// so update it here to ensure everything remains consistent.
r.Form.Set("time", t)

if len(body) > 0 {
// Request has a body, add the 'time' parameter there.
r.PostForm.Set("time", t)
body = []byte(r.PostForm.Encode())

// Update the content length to reflect the new body.
r.ContentLength = int64(len(body))
r.Header.Set("Content-Length", strconv.Itoa(len(body)))

return r, body, nil
}

// Otherwise, add it to the URL.
queryParams := r.URL.Query()
queryParams.Set("time", t)
r.URL.RawQuery = queryParams.Encode()
return r, body, nil
}
Loading
Loading