Skip to content

Commit

Permalink
ring: add support for hedging to DoUntilQuorum when request minimis…
Browse files Browse the repository at this point in the history
…ation is enabled (#330)

* Refactor DoUntilQuorum to take a configuration struct.

* Introduce support for hedging when request minimisation is enabled.

* Add changelog entry.
  • Loading branch information
charleskorn authored Jun 28, 2023
1 parent 5fd81c8 commit 9db715a
Show file tree
Hide file tree
Showing 5 changed files with 376 additions and 38 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
* [ENHANCEMENT] Added `TokenGenerator` interface with `RandomTokenGenerator` (generating random tokens), and `SpreadMinimizingTokenGenerator` (generating tokens with almost even distribution) implementation. By default `RandomTokenGenerator` is used. #321
* [ENHANCEMENT] Lifecycler: Added `RingTokenGenerator` configuration that specifies the `TokenGenerator` implementation that is used for token generation. Default value is nil, meaning that `RandomTokenGenerator` is used. #323
* [ENHANCEMENT] BasicLifecycler: Added `RingTokenGenerator` configuration that specifies the `TokenGenerator` implementation that is used for token generation. Default value is nil, meaning that `RandomTokenGenerator` is used. #323
* [ENHANCEMENT] Ring: add support for hedging to `DoUntilQuorum` when request minimization is enabled. #330
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
* [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109
Expand Down
77 changes: 62 additions & 15 deletions ring/replication_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ring

import (
"context"
"errors"
"sort"
"time"
)
Expand Down Expand Up @@ -89,37 +90,69 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(cont
return results, nil
}

type DoUntilQuorumConfig struct {
// If true, enable request minimization.
// See docs for DoUntilQuorum for more information.
MinimizeRequests bool

// If non-zero and MinimizeRequests is true, enables hedging.
// See docs for DoUntilQuorum for more information.
HedgingDelay time.Duration
}

func (c DoUntilQuorumConfig) Validate() error {
if c.HedgingDelay < 0 {
return errors.New("invalid DoUntilQuorumConfig: HedgingDelay must be non-negative")
}

return nil
}

// DoUntilQuorum runs function f in parallel for all replicas in r.
//
// # Result selection
//
// If r.MaxUnavailableZones is greater than zero:
// If r.MaxUnavailableZones is greater than zero, DoUntilQuorum operates in zone-aware mode:
// - DoUntilQuorum returns an error if calls to f for instances in more than r.MaxUnavailableZones zones return errors
// - Otherwise, DoUntilQuorum returns all results from all replicas in the first zones for which f succeeds
// for every instance in that zone (eg. if there are 3 zones and r.MaxUnavailableZones is 1, DoUntilQuorum will
// return the results from all instances in 2 zones, even if all calls to f succeed).
//
// Otherwise:
// Otherwise, DoUntilQuorum operates in non-zone-aware mode:
// - DoUntilQuorum returns an error if more than r.MaxErrors calls to f return errors
// - Otherwise, DoUntilQuorum returns all results from the first len(r.Instances) - r.MaxErrors instances
// (eg. if there are 6 replicas and r.MaxErrors is 2, DoUntilQuorum will return the results from the first 4
// successful calls to f, even if all 6 calls to f succeed).
//
// # Request minimisation
// # Request minimization
//
// cfg.MinimizeRequests enables or disables request minimization.
//
// Regardless of the value of cfg.MinimizeRequests, if one of the termination conditions above is satisfied or ctx is
// cancelled before f is called for an instance, f may not be called for that instance at all.
//
// ## When disabled
//
// If request minimization is disabled, DoUntilQuorum will call f for each instance in r. The value of cfg.HedgingDelay
// is ignored.
//
// If minimizeRequests is false, DoUntilQuorum will call f for each instance in r.
// ## When enabled
//
// If minimizeRequests is true, DoUntilQuorum will initially call f for the minimum number of instances needed to reach
// the termination conditions above, and later call f for further instances if required. For example, if
// If request minimization is enabled, DoUntilQuorum will initially call f for the minimum number of instances needed to
// reach the termination conditions above, and later call f for further instances if required. For example, if
// r.MaxUnavailableZones is 1 and there are three zones, DoUntilQuorum will initially only call f for instances in two
// zones, and only call f for instances in the remaining zone if a request in the initial two zones fails.
//
// If minimizeRequests is true, DoUntilQuorum will randomly select available zones / instances such that calling
// DoUntilQuorum multiple times with the same ReplicationSet should evenly distribute requests across all zones /
// instances.
// DoUntilQuorum will randomly select available zones / instances such that calling DoUntilQuorum multiple times with
// the same ReplicationSet should evenly distribute requests across all zones / instances.
//
// Regardless of the value of minimizeRequests, if one of the termination conditions above is satisfied or ctx is
// cancelled before f is called for an instance, f may not be called for that instance at all.
// If cfg.HedgingDelay is non-zero, DoUntilQuorum will call f for an additional zone's instances (if zone-aware) / an
// additional instance (if not zone-aware) every cfg.HedgingDelay until one of the termination conditions above is
// reached. For example, if r.MaxUnavailableZones is 2, cfg.HedgingDelay is 4 seconds and there are fives zones,
// DoUntilQuorum will initially only call f for instances in three zones, and unless one of the termination conditions
// is reached earlier, will then call f for instances in a fourth zone approximately 4 seconds later, and then call f
// for instances in the final zone approximately 4 seconds after that (ie. roughly 8 seconds since the call to
// DoUntilQuorum began).
//
// # Cleanup
//
Expand All @@ -135,15 +168,15 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(cont
// f will not be used.
//
// DoUntilQuorum cancels the context.Context passed to each invocation of f before DoUntilQuorum returns.
func DoUntilQuorum[T any](ctx context.Context, r ReplicationSet, minimizeRequests bool, f func(context.Context, *InstanceDesc) (T, error), cleanupFunc func(T)) ([]T, error) {
func DoUntilQuorum[T any](ctx context.Context, r ReplicationSet, cfg DoUntilQuorumConfig, f func(context.Context, *InstanceDesc) (T, error), cleanupFunc func(T)) ([]T, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

wrappedF := func(ctx context.Context, desc *InstanceDesc, _ context.CancelFunc) (T, error) {
return f(ctx, desc)
}

return DoUntilQuorumWithoutSuccessfulContextCancellation(ctx, r, minimizeRequests, wrappedF, cleanupFunc)
return DoUntilQuorumWithoutSuccessfulContextCancellation(ctx, r, cfg, wrappedF, cleanupFunc)
}

// DoUntilQuorumWithoutSuccessfulContextCancellation behaves the same as DoUntilQuorum, except it does not cancel
Expand All @@ -158,7 +191,11 @@ func DoUntilQuorum[T any](ctx context.Context, r ReplicationSet, minimizeRequest
// DoUntilQuorumWithoutSuccessfulContextCancellation
//
// Failing to do this may result in a memory leak.
func DoUntilQuorumWithoutSuccessfulContextCancellation[T any](ctx context.Context, r ReplicationSet, minimizeRequests bool, f func(context.Context, *InstanceDesc, context.CancelFunc) (T, error), cleanupFunc func(T)) ([]T, error) {
func DoUntilQuorumWithoutSuccessfulContextCancellation[T any](ctx context.Context, r ReplicationSet, cfg DoUntilQuorumConfig, f func(context.Context, *InstanceDesc, context.CancelFunc) (T, error), cleanupFunc func(T)) ([]T, error) {
if err := cfg.Validate(); err != nil {
return nil, err
}

resultsChan := make(chan instanceResult[T], len(r.Instances))
resultsRemaining := len(r.Instances)

Expand All @@ -185,7 +222,7 @@ func DoUntilQuorumWithoutSuccessfulContextCancellation[T any](ctx context.Contex
contextTracker = newDefaultContextTracker(ctx, r.Instances)
}

if minimizeRequests {
if cfg.MinimizeRequests {
resultTracker.startMinimumRequests()
} else {
resultTracker.startAllRequests()
Expand Down Expand Up @@ -222,13 +259,23 @@ func DoUntilQuorumWithoutSuccessfulContextCancellation[T any](ctx context.Contex
}
}

var hedgingTrigger <-chan time.Time

if cfg.HedgingDelay > 0 {
ticker := time.NewTicker(cfg.HedgingDelay)
defer ticker.Stop()
hedgingTrigger = ticker.C
}

for !resultTracker.succeeded() {
select {
case <-ctx.Done():
// No need to cancel individual instance contexts, as they inherit the cancellation from ctx.
cleanupResultsAlreadyReceived()

return nil, ctx.Err()
case <-hedgingTrigger:
resultTracker.startAdditionalRequests()
case result := <-resultsChan:
resultsRemaining--
resultTracker.done(result.instance, result.err)
Expand Down
Loading

0 comments on commit 9db715a

Please sign in to comment.