Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/gate: Prefix gate metrics for selects #3154

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/thanos-io/thanos/pkg/extgrpc"
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/query"
Expand Down Expand Up @@ -304,8 +305,14 @@ func runQuery(
)
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
rulesProxy = rules.NewProxy(logger, stores.GetRulesClients)
queryableCreator = query.NewQueryableCreator(logger, reg, proxy, maxConcurrentSelects, queryTimeout)
engine = promql.NewEngine(
queryableCreator = query.NewQueryableCreator(
logger,
extprom.WrapRegistererWithPrefix("thanos_query_", reg),
proxy,
maxConcurrentSelects,
queryTimeout,
)
engine = promql.NewEngine(
promql.EngineOpts{
Logger: logger,
Reg: reg,
Expand Down Expand Up @@ -420,11 +427,10 @@ func runQuery(

ins := extpromhttp.NewInstrumentationMiddleware(reg)
// TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting.
ui.NewQueryUI(logger, reg, stores, webExternalPrefix, webPrefixHeaderName).Register(router, ins)
ui.NewQueryUI(logger, stores, webExternalPrefix, webPrefixHeaderName).Register(router, ins)

api := v1.NewQueryAPI(
logger,
reg,
stores,
engine,
queryableCreator,
Expand All @@ -436,7 +442,10 @@ func runQuery(
queryReplicaLabels,
flagsMap,
instantDefaultMaxSourceResolution,
maxConcurrentQueries,
gate.New(
extprom.WrapRegistererWithPrefix("thanos_query_concurrent_", reg),
maxConcurrentQueries,
),
)

api.Register(router.WithPrefix("/api/v1"), tracer, logger, ins, logMiddleware)
Expand Down
7 changes: 1 addition & 6 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/route"

blocksAPI "github.com/thanos-io/thanos/pkg/api/blocks"
Expand Down Expand Up @@ -286,11 +285,7 @@ func runStore(
return errors.Errorf("max concurrency value cannot be lower than 0 (got %v)", maxConcurrency)
}

queriesGate := gate.NewKeeper(extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg)).NewGate(maxConcurrency)
promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_bucket_store_queries_concurrent_max",
Help: "Number of maximum concurrent queries.",
}).Set(float64(maxConcurrency))
queriesGate := gate.New(extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg), maxConcurrency)

bs, err := store.NewBucketStore(
logger,
Expand Down
9 changes: 2 additions & 7 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/go-kit/kit/log"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/pkg/labels"
Expand All @@ -40,7 +39,6 @@ import (
"github.com/prometheus/prometheus/storage"

"github.com/thanos-io/thanos/pkg/api"
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/logging"
Expand All @@ -64,7 +62,6 @@ const (
type QueryAPI struct {
baseAPI *api.BaseAPI
logger log.Logger
reg prometheus.Registerer
gate gate.Gate
queryableCreate query.QueryableCreator
queryEngine *promql.Engine
Expand All @@ -82,7 +79,6 @@ type QueryAPI struct {
// NewQueryAPI returns an initialized QueryAPI type.
func NewQueryAPI(
logger log.Logger,
reg *prometheus.Registry,
storeSet *query.StoreSet,
qe *promql.Engine,
c query.QueryableCreator,
Expand All @@ -93,15 +89,14 @@ func NewQueryAPI(
replicaLabels []string,
flagsMap map[string]string,
defaultInstantQueryMaxSourceResolution time.Duration,
maxConcurrentQueries int,
gate gate.Gate,
) *QueryAPI {
return &QueryAPI{
baseAPI: api.NewBaseAPI(logger, flagsMap),
logger: logger,
reg: reg,
queryEngine: qe,
queryableCreate: c,
gate: gate.NewKeeper(extprom.WrapRegistererWithPrefix("thanos_query_concurrent_", reg)).NewGate(maxConcurrentQueries),
gate: gate,
ruleGroups: ruleGroups,

enableAutodownsampling: enableAutodownsampling,
Expand Down
7 changes: 4 additions & 3 deletions pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

"github.com/prometheus/common/route"
promgate "github.com/prometheus/prometheus/pkg/gate"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/promql"
Expand Down Expand Up @@ -116,7 +117,7 @@ func TestEndpoints(t *testing.T) {
MaxSamples: 10000,
Timeout: timeout,
}),
gate: gate.NewKeeper(nil).NewGate(4),
gate: gate.New(nil, 4),
}

start := time.Unix(0, 0)
Expand Down Expand Up @@ -1053,7 +1054,7 @@ func TestParseDownsamplingParamMillis(t *testing.T) {
for i, test := range tests {
api := QueryAPI{
enableAutodownsampling: test.enableAutodownsampling,
gate: gate.NewKeeper(nil).NewGate(4),
gate: gate.New(nil, 4),
}
v := url.Values{}
v.Set(MaxSourceResolutionParam, test.maxSourceResolutionParam)
Expand Down Expand Up @@ -1101,7 +1102,7 @@ func TestParseStoreMatchersParam(t *testing.T) {
} {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
api := QueryAPI{
gate: gate.NewKeeper(nil).NewGate(4),
gate: promgate.New(4),
}
v := url.Values{}
v.Set(StoreMatcherParam, tc.storeMatchers)
Expand Down
6 changes: 4 additions & 2 deletions pkg/cacheutil/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,10 @@ func newMemcachedClient(
dnsProvider: dnsProvider,
asyncQueue: make(chan func(), config.MaxAsyncBufferSize),
stop: make(chan struct{}, 1),
getMultiGate: gate.NewKeeper(extprom.WrapRegistererWithPrefix("thanos_memcached_getmulti_", reg)).
NewGate(config.MaxGetMultiConcurrency),
getMultiGate: gate.New(
extprom.WrapRegistererWithPrefix("thanos_memcached_getmulti_", reg),
config.MaxGetMultiConcurrency,
),
}

c.clientInfo = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
Expand Down
143 changes: 110 additions & 33 deletions pkg/gate/gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,67 +12,144 @@ import (
promgate "github.com/prometheus/prometheus/pkg/gate"
)

// Gate is an interface that mimics prometheus/pkg/gate behavior.
var (
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved
MaxGaugeOpts = prometheus.GaugeOpts{
Name: "gate_queries_max",
Help: "Maximum number of concurrent queries.",
}
InFlightGaugeOpts = prometheus.GaugeOpts{
Name: "gate_queries_in_flight",
Help: "Number of queries that are currently in flight.",
}
DurationHistogramOpts = prometheus.HistogramOpts{
Name: "gate_duration_seconds",
Help: "How many seconds it took for queries to wait at the gate.",
Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720},
}
)

// Gate controls the maximum number of concurrently running and waiting queries.
//
// Example of use:
//
// g := gate.New(r, 5)
//
// if err := g.Start(ctx); err != nil {
// return
// }
// defer g.Done()
//
type Gate interface {
// Start initiates a new request and waits until it's our turn to fulfill a request.
Start(ctx context.Context) error
// Done finishes a query.
Done()
}

// Gate wraps the Prometheus gate with extra metrics.
type gate struct {
g *promgate.Gate
m *metrics
}

type metrics struct {
inflightQueries prometheus.Gauge
gateTiming prometheus.Histogram
}

// Keeper is used to create multiple gates sharing the same metrics.
//
// Deprecated: when Keeper is used to create several gates, the metric tracking
// the number of in-flight metric isn't meaningful because it is hard to say
// whether requests are being blocked or not. For clients that call
// gate.(*Keeper).NewGate only once, it is recommended to use gate.New()
// instead. Otherwise it is recommended to use the
// github.com/prometheus/prometheus/pkg/gate package directly and wrap the
// returned gate with gate.InstrumentGateDuration().
type Keeper struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still used anywhere in the codebase? If it's not the case, why not remove it directly? I don't think we have any downstream dependents.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and given that Thanos imports Cortex too, it creates a circular dependency hence the need to keep Keeper :)

m *metrics
reg prometheus.Registerer
}

// NewKeeper creates a new Keeper.
//
// Deprecated: see Keeper.
func NewKeeper(reg prometheus.Registerer) *Keeper {
return &Keeper{
m: &metrics{
inflightQueries: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "gate_queries_in_flight",
Help: "Number of queries that are currently in flight.",
}),
gateTiming: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "gate_duration_seconds",
Help: "How many seconds it took for queries to wait at the gate.",
Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720},
}),
},
reg: reg,
}
}

// NewGate returns a new Gate that collects metrics.
// NewGate returns a new Gate ready for use.
//
// Deprecated: see Keeper.
func (k *Keeper) NewGate(maxConcurrent int) Gate {
return &gate{g: promgate.New(maxConcurrent), m: k.m}
return New(k.reg, maxConcurrent)
}

// New returns an instrumented gate limiting the number of requests being
// executed concurrently.
//
// The gate implementation is based on the
// github.com/prometheus/prometheus/pkg/gate package.
//
// It can be called several times but not with the same registerer otherwise it
// will panic when trying to register the same metric multiple times.
func New(reg prometheus.Registerer, maxConcurrent int) Gate {
promauto.With(reg).NewGauge(MaxGaugeOpts).Set(float64(maxConcurrent))

return InstrumentGateDuration(
promauto.With(reg).NewHistogram(DurationHistogramOpts),
InstrumentGateInFlight(
promauto.With(reg).NewGauge(InFlightGaugeOpts),
promgate.New(maxConcurrent),
),
)
}

type instrumentedDurationGate struct {
g Gate
duration prometheus.Observer
}

// Start initiates a new request and waits until it's our turn to fulfill a request.
func (g *gate) Start(ctx context.Context) error {
// InstrumentGateDuration instruments the provided Gate to track how much time
// the request has been waiting in the gate.
func InstrumentGateDuration(duration prometheus.Observer, g Gate) Gate {
return &instrumentedDurationGate{
g: g,
duration: duration,
}
}

// Start implements the Gate interface.
func (g *instrumentedDurationGate) Start(ctx context.Context) error {
start := time.Now()
defer func() {
g.m.gateTiming.Observe(time.Since(start).Seconds())
g.duration.Observe(time.Since(start).Seconds())
}()

return g.g.Start(ctx)
}

// Done implements the Gate interface.
func (g *instrumentedDurationGate) Done() {
g.g.Done()
}

type instrumentedInFlightGate struct {
g Gate
inflight prometheus.Gauge
}

// InstrumentGateInFlight instruments the provided Gate to track how many
// requests are currently in flight.
func InstrumentGateInFlight(inflight prometheus.Gauge, g Gate) Gate {
return &instrumentedInFlightGate{
g: g,
inflight: inflight,
}
}

// Start implements the Gate interface.
func (g *instrumentedInFlightGate) Start(ctx context.Context) error {
if err := g.g.Start(ctx); err != nil {
return err
}

g.m.inflightQueries.Inc()
g.inflight.Inc()
return nil
}

// Done finishes a query.
func (g *gate) Done() {
g.m.inflightQueries.Dec()
// Done implements the Gate interface.
func (g *instrumentedInFlightGate) Done() {
g.inflight.Dec()
g.g.Done()
}
Loading