Skip to content

Commit

Permalink
refactor: Introduce context cause to our code (#13224)
Browse files Browse the repository at this point in the history
Introduce the usage of Go new `WithCause` functions to make our context cancellation errors more descriptive.
  • Loading branch information
DylanGuedes committed Jun 26, 2024
1 parent f661593 commit 4eb45cc
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 20 deletions.
8 changes: 4 additions & 4 deletions clients/pkg/promtail/discovery/consulagent/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
ticker := time.NewTicker(d.refreshInterval)

// Watched services and their cancellation functions.
services := make(map[string]func())
services := make(map[string]func(error))

for {
select {
Expand All @@ -340,7 +340,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
// Watch the catalog for new services we would like to watch. This is called only
// when we don't know yet the names of the services and need to ask Consul the
// entire list of services.
func (d *Discovery) watchServices(ctx context.Context, ch chan<- []*targetgroup.Group, services map[string]func()) {
func (d *Discovery) watchServices(ctx context.Context, ch chan<- []*targetgroup.Group, services map[string]func(error)) {
agent := d.client.Agent()
level.Debug(d.logger).Log("msg", "Watching services", "tags", strings.Join(d.watchedTags, ","))

Expand Down Expand Up @@ -378,7 +378,7 @@ func (d *Discovery) watchServices(ctx context.Context, ch chan<- []*targetgroup.
continue // We are already watching the service.
}

wctx, cancel := context.WithCancel(ctx)
wctx, cancel := context.WithCancelCause(ctx)
d.watchService(wctx, ch, name)
services[name] = cancel
}
Expand All @@ -390,7 +390,7 @@ func (d *Discovery) watchServices(ctx context.Context, ch chan<- []*targetgroup.
"msg", "removing service since consul no longer has a record of it",
"name", name)
// Call the watch cancellation function.
cancel()
cancel(errors.New("canceling service since consul no longer has a record of it"))
delete(services, name)

// Send clearing target group.
Expand Down
6 changes: 3 additions & 3 deletions pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ func newBinOpStepEvaluator(

var lse, rse StepEvaluator

ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancelCause(ctx)
g := errgroup.Group{}

// We have two non-literal legs,
Expand All @@ -832,15 +832,15 @@ func newBinOpStepEvaluator(
var err error
lse, err = evFactory.NewStepEvaluator(ctx, evFactory, expr.SampleExpr, q)
if err != nil {
cancel()
cancel(fmt.Errorf("new step evaluator for left leg errored: %w", err))
}
return err
})
g.Go(func() error {
var err error
rse, err = evFactory.NewStepEvaluator(ctx, evFactory, expr.RHS, q)
if err != nil {
cancel()
cancel(fmt.Errorf("new step evaluator for right leg errored: %w", err))
}
return err
})
Expand Down
3 changes: 2 additions & 1 deletion pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/middleware"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/promql/parser"

Expand Down Expand Up @@ -474,7 +475,7 @@ func WrapQuerySpanAndTimeout(call string, limits Limits) middleware.Interface {

timeoutCapture := func(id string) time.Duration { return limits.QueryTimeout(ctx, id) }
timeout := util_validation.SmallestPositiveNonZeroDurationPerTenant(tenants, timeoutCapture)
newCtx, cancel := context.WithTimeout(ctx, timeout)
newCtx, cancel := context.WithTimeoutCause(ctx, timeout, errors.New("query timeout reached"))
defer cancel()

newReq := req.WithContext(newCtx)
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ func (q *SingleTenantQuerier) Series(ctx context.Context, req *logproto.SeriesRe

// Enforce the query timeout while querying backends
queryTimeout := q.limits.QueryTimeout(ctx, userID)
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(queryTimeout))
ctx, cancel := context.WithDeadlineCause(ctx, time.Now().Add(queryTimeout), errors.New("query timeout reached"))
defer cancel()

return q.awaitSeries(ctx, req)
Expand Down
5 changes: 3 additions & 2 deletions pkg/querier/queryrange/split_by_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/grafana/dskit/httpgrpc"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -102,8 +103,8 @@ func (h *splitByInterval) Process(
maxSeries int,
) ([]queryrangebase.Response, error) {
var responses []queryrangebase.Response
ctx, cancel := context.WithCancel(ctx)
defer cancel()
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(errors.New("split by interval process canceled"))

ch := h.Feed(ctx, input)

Expand Down
5 changes: 3 additions & 2 deletions pkg/querier/worker/frontend_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/httpgrpc"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"google.golang.org/grpc"

"github.com/grafana/loki/v3/pkg/lokifrontend/frontend/v1/frontendv1pb"
Expand Down Expand Up @@ -83,8 +84,8 @@ func (fp *frontendProcessor) processQueriesOnSingleStream(ctx context.Context, c
// process loops processing requests on an established stream.
func (fp *frontendProcessor) process(c frontendv1pb.Frontend_ProcessClient) error {
// Build a child context so we can cancel a query when the stream is closed.
ctx, cancel := context.WithCancel(c.Context())
defer cancel()
ctx, cancel := context.WithCancelCause(c.Context())
defer cancel(errors.New("frontend processor process finished"))

for {
request, err := c.Recv()
Expand Down
3 changes: 2 additions & 1 deletion pkg/querier/worker/processor_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/pkg/errors"
"go.uber.org/atomic"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -44,7 +45,7 @@ func newProcessorManager(ctx context.Context, p processor, conn *grpc.ClientConn
func (pm *processorManager) stop() {
// Notify the remote query-frontend or query-scheduler we're shutting down.
// We use a new context to make sure it's not cancelled.
notifyCtx, cancel := context.WithTimeout(context.Background(), notifyShutdownTimeout)
notifyCtx, cancel := context.WithTimeoutCause(context.Background(), notifyShutdownTimeout, errors.New("notify shutdown timeout reached"))
defer cancel()
pm.p.notifyShutdown(notifyCtx, pm.conn, pm.address)

Expand Down
7 changes: 4 additions & 3 deletions pkg/querier/worker/scheduler_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/grafana/dskit/user"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"go.uber.org/atomic"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
Expand Down Expand Up @@ -89,7 +90,7 @@ func (sp *schedulerProcessor) processQueriesOnSingleStream(workerCtx context.Con
// Run the querier loop (and so all the queries) in a dedicated context that we call the "execution context".
// The execution context is cancelled once the workerCtx is cancelled AND there's no inflight query executing.
execCtx, execCancel, inflightQuery := newExecutionContext(workerCtx, sp.log)
defer execCancel()
defer execCancel(errors.New("scheduler processor execution context canceled"))

backoff := backoff.New(execCtx, processorBackoffConfig)
for backoff.Ongoing() {
Expand Down Expand Up @@ -121,8 +122,8 @@ func (sp *schedulerProcessor) processQueriesOnSingleStream(workerCtx context.Con
// process loops processing requests on an established stream.
func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_QuerierLoopClient, address string, inflightQuery *atomic.Bool, workerID string) error {
// Build a child context so we can cancel a query when the stream is closed.
ctx, cancel := context.WithCancel(c.Context())
defer cancel()
ctx, cancel := context.WithCancelCause(c.Context())
defer cancel(errors.New("querier loop canceled"))

for {
start := time.Now()
Expand Down
7 changes: 4 additions & 3 deletions pkg/querier/worker/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/go-kit/log/level"
"github.com/gogo/status"
"github.com/grafana/dskit/httpgrpc"
"github.com/pkg/errors"
"go.uber.org/atomic"
"google.golang.org/grpc/codes"

Expand All @@ -32,8 +33,8 @@ import (
// - The execution context is canceled when the worker context gets cancelled (ie. querier is shutting down)
// and there's no inflight query execution. In case there's an inflight query, the execution context is canceled
// once the inflight query terminates and the response has been sent.
func newExecutionContext(workerCtx context.Context, logger log.Logger) (execCtx context.Context, execCancel context.CancelFunc, inflightQuery *atomic.Bool) {
execCtx, execCancel = context.WithCancel(context.Background())
func newExecutionContext(workerCtx context.Context, logger log.Logger) (execCtx context.Context, execCancel context.CancelCauseFunc, inflightQuery *atomic.Bool) {
execCtx, execCancel = context.WithCancelCause(context.Background())
inflightQuery = atomic.NewBool(false)

go func() {
Expand Down Expand Up @@ -76,7 +77,7 @@ func newExecutionContext(workerCtx context.Context, logger log.Logger) (execCtx
}

level.Debug(logger).Log("msg", "querier worker context has been canceled and there's no inflight query, canceling the execution context too")
execCancel()
execCancel(errors.New("querier worker context has been canceled and there's no inflight query"))
case <-execCtx.Done():
// Nothing to do. The execution context has been explicitly canceled.
}
Expand Down

0 comments on commit 4eb45cc

Please sign in to comment.