Skip to content

Commit

Permalink
Extend ring.DoBatch's error class tracking to all errors (#427)
Browse files Browse the repository at this point in the history
* Extend ring.DoBatch's error class tracking to errors implementing ring.Error

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>

* Fixing review findings

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>

* Remove itemTracker.done

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>

* Fixing review findings 2

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>

* Fixing additional review findings

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>

* Fixing last review findings

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>

---------

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>
  • Loading branch information
duricanikolic authored Nov 13, 2023
1 parent 620b5f1 commit 0ae8e2e
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 117 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@
* `middleware.StreamClientInstrumentInterceptor`
* `middleware.Instrument`
* [ENHANCEMENT] Server: Added new `-server.http-log-closed-connections-without-response-enabled` option to log details about closed connections that received no response. #426
* [ENHANCEMENT] ring: Added new function `DoBatchWithClientError()` that extends an already existing `DoBatch()`. The former differentiates between client and server errors by the filtering function passed as a parameter. This way client and server errors can be tracked separately. The function returns only when there is a quorum of either error class. #427
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
* [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109
Expand Down
99 changes: 58 additions & 41 deletions ring/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,38 +25,55 @@ type instance struct {
}

type itemTracker struct {
minSuccess int
maxFailures int
succeeded atomic.Int32
failed4xx atomic.Int32
failed5xx atomic.Int32
remaining atomic.Int32
err atomic.Error
minSuccess int
maxFailures int
succeeded atomic.Int32
failedClient atomic.Int32
failedServer atomic.Int32
remaining atomic.Int32
err atomic.Error
}

func (i *itemTracker) recordError(err error) int32 {
func (i *itemTracker) recordError(err error, isClientError func(error) bool) int32 {
i.err.Store(err)

if isClientError(err) {
return i.failedClient.Inc()
}
return i.failedServer.Inc()
}

func isHTTPStatus4xx(err error) bool {
if s, ok := status.FromError(err); ok && s.Code()/100 == 4 {
return i.failed4xx.Inc()
return true
}
return false
}

return i.failed5xx.Inc()
// DoBatch is a special case of DoBatchWithClientError where errors
// containing HTTP status code 4xx are treated as client errors.
func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callback func(InstanceDesc, []int) error, cleanup func()) error {
return DoBatchWithClientError(ctx, op, r, keys, callback, cleanup, isHTTPStatus4xx)
}

// DoBatch request against a set of keys in the ring, handling replication and
// failures. For example if we want to write N items where they may all
// hit different instances, and we want them all replicated R ways with
// quorum writes, we track the relationship between batch RPCs and the items
// within them.
// DoBatchWithClientError request against a set of keys in the ring,
// handling replication and failures. For example if we want to write
// N items where they may all hit different instances, and we want them
// all replicated R ways with quorum writes, we track the relationship
// between batch RPCs and the items within them.
//
// Callback is passed the instance to target, and the indexes of the keys
// callback() is passed the instance to target, and the indexes of the keys
// to send to that instance.
//
// cleanup() is always called, either on an error before starting the batches or after they all finish.
// cleanup() is always called, either on an error before starting the batches
// or after they all finish.
//
// Not implemented as a method on Ring so we can test separately.
func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callback func(InstanceDesc, []int) error, cleanup func()) error {
// isClientError() classifies errors returned by `callback()` into client or
// server errors. See `batchTracker.record()` function for details about how
// errors are combined into final error returned by DoBatchWithClientError.
//
// Not implemented as a method on Ring, so we can test separately.
func DoBatchWithClientError(ctx context.Context, op Operation, r ReadRing, keys []uint32, callback func(InstanceDesc, []int) error, cleanup func(), isClientError func(error) bool) error {
if r.InstancesCount() <= 0 {
cleanup()
return fmt.Errorf("DoBatch: InstancesCount <= 0")
Expand Down Expand Up @@ -106,7 +123,7 @@ func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callb
for _, i := range instances {
go func(i instance) {
err := callback(i.desc, i.indexes)
tracker.record(i.itemTrackers, err)
tracker.record(i.itemTrackers, err, isClientError)
wg.Done()
}(i)
}
Expand All @@ -128,55 +145,55 @@ func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callb
}
}

func (b *batchTracker) record(itemTrackers []*itemTracker, err error) {
func (b *batchTracker) record(itemTrackers []*itemTracker, err error, isClientError func(error) bool) {
// If we reach the required number of successful puts on this item, then decrement the
// number of pending items by one.
//
// The use of atomic increments here is needed as:
// * rpcsPending and rpcsFailed guarantee only a single goroutine will write to either channel
// * succeeded, failed4xx, failed5xx and remaining guarantee that the "return decision" is made atomically
// * succeeded, failedClient, failedServer and remaining guarantee that the "return decision" is made atomically
// avoiding race condition
for i := range itemTrackers {
for _, it := range itemTrackers {
if err != nil {
// Track the number of errors by error family, and if it exceeds maxFailures
// shortcut the waiting rpc.
errCount := itemTrackers[i].recordError(err)
errCount := it.recordError(err, isClientError)
// We should return an error if we reach the maxFailure (quorum) on a given error family OR
// we don't have any remaining instances to try.
// we don't have any remaining instances to try. In the following we use ClientError and ServerError
// to denote errors, for which isClientError() returns true and false respectively.
//
// Ex: 2xx, 4xx, 5xx -> return 5xx
// Ex: 4xx, 4xx, _ -> return 4xx
// Ex: 5xx, _, 5xx -> return 5xx
// Ex: Success, ClientError, ServerError -> return ServerError
// Ex: ClientError, ClientError, Success -> return ClientError
// Ex: ServerError, Success, ServerError -> return ServerError
//
// The reason for searching for quorum in 4xx and 5xx errors separately is to give a more accurate
// response to the initial request. So if a quorum of instances rejects the request with 4xx, then the request should be rejected
// even if less-than-quorum instances indicated a failure to process the request (via 5xx).
// The reason for searching for quorum in ClientError and ServerError errors separately is to give a more accurate
// response to the initial request. So if a quorum of instances rejects the request with ClientError, then the request should be rejected
// even if less-than-quorum instances indicated a failure to process the request (via ServerError).
// The speculation is that had the unavailable instances been available,
// they would have rejected the request with a 4xx as well.
// Conversely, if a quorum of instances failed to process the request via 5xx and less-than-quorum
// instances rejected it with 4xx, then we do not have quorum to reject the request as a 4xx. Instead,
// we return the last 5xx error for debuggability.
if errCount > int32(itemTrackers[i].maxFailures) || itemTrackers[i].remaining.Dec() == 0 {
// they would have rejected the request with a ClientError as well.
// Conversely, if a quorum of instances failed to process the request via ServerError and less-than-quorum
// instances rejected it with ClientError, then we do not have quorum to reject the request as a ClientError. Instead,
// we return the last ServerError error for debuggability.
if errCount > int32(it.maxFailures) || it.remaining.Dec() == 0 {
if b.rpcsFailed.Inc() == 1 {
b.err <- err
}
}
} else {
// If we successfully process items in minSuccess instances,
// then wake up the waiting rpc, so it can return early.
if itemTrackers[i].succeeded.Inc() >= int32(itemTrackers[i].minSuccess) {
if it.succeeded.Inc() >= int32(it.minSuccess) {
if b.rpcsPending.Dec() == 0 {
b.done <- struct{}{}
}
continue
}

// If we successfully called this particular instance, but we don't have any remaining instances to try,
// and we failed to call minSuccess instances, then we need to return the last error
// Ex: 4xx, 5xx, 2xx
if itemTrackers[i].remaining.Dec() == 0 {
// and we failed to call minSuccess instances, then we need to return the last error.
if it.remaining.Dec() == 0 {
if b.rpcsFailed.Inc() == 1 {
b.err <- itemTrackers[i].err.Load()
b.err <- it.err.Load()
}
}
}
Expand Down
Loading

0 comments on commit 0ae8e2e

Please sign in to comment.