Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ketama quorum #5910

Merged
merged 7 commits into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 112 additions & 129 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ import (
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/api"
statusapi "github.com/thanos-io/thanos/pkg/api/status"
"github.com/thanos-io/thanos/pkg/logging"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/thanos-io/thanos/pkg/api"
statusapi "github.com/thanos-io/thanos/pkg/api/status"
"github.com/thanos-io/thanos/pkg/logging"

"github.com/thanos-io/thanos/pkg/errutil"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/runutil"
Expand Down Expand Up @@ -350,7 +351,25 @@ type replica struct {
// endpointReplica is a pair of a receive endpoint and a write request replica.
type endpointReplica struct {
endpoint string
replica replica
replica uint64
}

type trackedSeries struct {
seriesIDs []int
timeSeries []prompb.TimeSeries
}

type writeResponse struct {
seriesIDs []int
replica uint64
fpetkovski marked this conversation as resolved.
Show resolved Hide resolved
fpetkovski marked this conversation as resolved.
Show resolved Hide resolved
err error
}

func newWriteResponse(seriesIDs []int, err error) writeResponse {
return writeResponse{
seriesIDs: seriesIDs,
err: err,
}
}

func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string, wreq *prompb.WriteRequest) error {
Expand Down Expand Up @@ -552,47 +571,66 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p
return errors.New("hashring is not ready")
}

// Batch all of the time series in the write request
// into several smaller write requests that are
// grouped by target endpoint. This ensures that
// for any incoming write request to a node,
// at most one outgoing write request will be made
// to every other node in the hashring, rather than
// one request per time series.
wreqs := make(map[endpointReplica]*prompb.WriteRequest)
for i := range wreq.Timeseries {
endpoint, err := h.hashring.GetN(tenant, &wreq.Timeseries[i], r.n)
if err != nil {
h.mtx.RUnlock()
return err
var replicas []uint64
if r.replicated {
replicas = []uint64{r.n}
} else {
for rn := uint64(0); rn < h.options.ReplicationFactor; rn++ {
replicas = append(replicas, rn)
}
key := endpointReplica{endpoint: endpoint, replica: r}
if _, ok := wreqs[key]; !ok {
wreqs[key] = &prompb.WriteRequest{}
}

wreqs := make(map[endpointReplica]trackedSeries)
for tsID, ts := range wreq.Timeseries {
for _, rn := range replicas {
endpoint, err := h.hashring.GetN(tenant, &ts, rn)
if err != nil {
h.mtx.RUnlock()
return err
}
key := endpointReplica{endpoint: endpoint, replica: rn}
er, ok := wreqs[key]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this variable named er receive a better name? I have no clue what an er is and it's easy to mistake it for err and even endpointReplica (often variables of this type have the name er, which is something else I think we have to slowly move away from).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I renamed this variable to writeTarget for clarity.

if !ok {
er = trackedSeries{
seriesIDs: make([]int, 0),
timeSeries: make([]prompb.TimeSeries, 0),
}
}
er.timeSeries = append(wreqs[key].timeSeries, ts)
er.seriesIDs = append(wreqs[key].seriesIDs, tsID)
wreqs[key] = er
}
wr := wreqs[key]
wr.Timeseries = append(wr.Timeseries, wreq.Timeseries[i])
}
h.mtx.RUnlock()

return h.fanoutForward(ctx, tenant, wreqs, len(wreqs))
return h.fanoutForward(ctx, tenant, wreqs, len(wreq.Timeseries), r.replicated)
}

// writeQuorum returns minimum number of replicas that has to confirm write success before claiming replication success.
func (h *Handler) writeQuorum() int {
return int((h.options.ReplicationFactor / 2) + 1)
}

func quorumReached(successes []int, successThreshold int) bool {
for _, success := range successes {
if success < successThreshold {
return false
}
}

return true
}

// fanoutForward fans out concurrently given set of write requests. It returns status immediately when quorum of
// requests succeeds or fails or if context is canceled.
func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[endpointReplica]*prompb.WriteRequest, successThreshold int) error {
func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[endpointReplica]trackedSeries, numSeries int, seriesReplicated bool) error {
var errs errutil.MultiError

fctx, cancel := context.WithTimeout(tracing.CopyTraceContext(context.Background(), pctx), h.options.ForwardTimeout)
defer func() {
if errs.Err() != nil {
// NOTICE: The cancel function is not used on all paths intentionally,
// if there is no error when quorum successThreshold is reached,
// if there is no error when quorum is reached,
// let forward requests to optimistically run until timeout.
cancel()
}
Expand All @@ -607,68 +645,43 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
tLogger = log.With(h.logger, logTags)
}

ec := make(chan error)
ec := make(chan writeResponse)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could ec receive a better name? It's used many times in the next hundreds of lines and the name isn't clear. Suggestions: errorChannel, if that's even what is actually is. 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to responses.


var wg sync.WaitGroup
for er := range wreqs {
er := er
r := er.replica
endpoint := er.endpoint

wg.Add(1)
// If the request is not yet replicated, let's replicate it.
// If the replication factor isn't greater than 1, let's
// just forward the requests.
if !r.replicated && h.options.ReplicationFactor > 1 {
go func(endpoint string) {
defer wg.Done()

var err error
tracing.DoInSpan(fctx, "receive_replicate", func(ctx context.Context) {
err = h.replicate(ctx, tenant, wreqs[er])
})
if err != nil {
h.replications.WithLabelValues(labelError).Inc()
ec <- errors.Wrapf(err, "replicate write request for endpoint %v", endpoint)
return
}

h.replications.WithLabelValues(labelSuccess).Inc()
ec <- nil
}(endpoint)

continue
}

// If the endpoint for the write request is the
// local node, then don't make a request but store locally.
// By handing replication to the local node in the same
// function as replication to other nodes, we can treat
// a failure to write locally as just another error that
// can be ignored if the replication factor is met.
if endpoint == h.options.Endpoint {
go func(endpoint string) {
if er.endpoint == h.options.Endpoint {
go func(er endpointReplica) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment here about the er variable name, which also applies to other occurrences: it gives no clue of what it is in this context and can be easily confused with err . Could we rename it? Some suggestions: replicationKey, replicaKey, replicationID, endpointReplica.

defer wg.Done()

var err error
tracing.DoInSpan(fctx, "receive_tsdb_write", func(_ context.Context) {
err = h.writer.Write(fctx, tenant, wreqs[er])
err = h.writer.Write(fctx, tenant, &prompb.WriteRequest{
Timeseries: wreqs[er].timeSeries,
})
})
if err != nil {
// When a MultiError is added to another MultiError, the error slices are concatenated, not nested.
// To avoid breaking the counting logic, we need to flatten the error.
level.Debug(tLogger).Log("msg", "local tsdb write failed", "err", err.Error())
ec <- errors.Wrapf(determineWriteErrorCause(err, 1), "store locally for endpoint %v", endpoint)
ec <- newWriteResponse(wreqs[er].seriesIDs, errors.Wrapf(determineWriteErrorCause(err, 1), "store locally for endpoint %v", er.endpoint))
return
}
ec <- nil
}(endpoint)
ec <- newWriteResponse(wreqs[er].seriesIDs, nil)
}(er)

continue
}

// Make a request to the specified endpoint.
go func(endpoint string) {
go func(er endpointReplica) {
defer wg.Done()

var (
Expand All @@ -684,18 +697,18 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
h.forwardRequests.WithLabelValues(labelSuccess).Inc()
}()

cl, err = h.peers.get(fctx, endpoint)
cl, err = h.peers.get(fctx, er.endpoint)
if err != nil {
ec <- errors.Wrapf(err, "get peer connection for endpoint %v", endpoint)
ec <- newWriteResponse(wreqs[er].seriesIDs, errors.Wrapf(err, "get peer connection for endpoint %v", er.endpoint))
return
}

h.mtx.RLock()
b, ok := h.peerStates[endpoint]
b, ok := h.peerStates[er.endpoint]
if ok {
if time.Now().Before(b.nextAllowed) {
h.mtx.RUnlock()
ec <- errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", endpoint)
ec <- newWriteResponse(wreqs[er].seriesIDs, errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", er.endpoint))
return
}
}
Expand All @@ -705,37 +718,38 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
tracing.DoInSpan(fctx, "receive_forward", func(ctx context.Context) {
// Actually make the request against the endpoint we determined should handle these time series.
_, err = cl.RemoteWrite(ctx, &storepb.WriteRequest{
Timeseries: wreqs[er].Timeseries,
Timeseries: wreqs[er].timeSeries,
Tenant: tenant,
// Increment replica since on-the-wire format is 1-indexed and 0 indicates un-replicated.
Replica: int64(r.n + 1),
Replica: int64(er.replica + 1),
})
})
if err != nil {
// Check if peer connection is unavailable, don't attempt to send requests constantly.
if st, ok := status.FromError(err); ok {
if st.Code() == codes.Unavailable {
h.mtx.Lock()
if b, ok := h.peerStates[endpoint]; ok {
if b, ok := h.peerStates[er.endpoint]; ok {
b.attempt++
dur := h.expBackoff.ForAttempt(b.attempt)
b.nextAllowed = time.Now().Add(dur)
level.Debug(tLogger).Log("msg", "target unavailable backing off", "for", dur)
} else {
h.peerStates[endpoint] = &retryState{nextAllowed: time.Now().Add(h.expBackoff.ForAttempt(0))}
h.peerStates[er.endpoint] = &retryState{nextAllowed: time.Now().Add(h.expBackoff.ForAttempt(0))}
}
h.mtx.Unlock()
}
}
ec <- errors.Wrapf(err, "forwarding request to endpoint %v", endpoint)
werr := errors.Wrapf(err, "forwarding request to endpoint %v", er.endpoint)
ec <- newWriteResponse(wreqs[er].seriesIDs, werr)
return
}
h.mtx.Lock()
delete(h.peerStates, endpoint)
delete(h.peerStates, er.endpoint)
h.mtx.Unlock()

ec <- nil
}(endpoint)
ec <- newWriteResponse(wreqs[er].seriesIDs, nil)
}(er)
}

go func() {
Expand All @@ -747,82 +761,49 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
// This is needed if context is canceled or if we reached success of fail quorum faster.
defer func() {
go func() {
for err := range ec {
if err != nil {
level.Debug(tLogger).Log("msg", "request failed, but not needed to achieve quorum", "err", err)
for wresp := range ec {
if wresp.err != nil {
level.Debug(tLogger).Log("msg", "request failed, but not needed to achieve quorum", "err", wresp.err)
}
}
}()
}()

var success int
quorum := h.writeQuorum()
failureThreshold := h.options.ReplicationFactor - uint64(quorum)
successes := make([]int, numSeries)
seriesErrs := make([]errutil.MultiError, numSeries)
for {
select {
case <-fctx.Done():
return fctx.Err()
case err, more := <-ec:
case wresp, more := <-ec:
if !more {
for _, rerr := range seriesErrs {
if seriesReplicated {
errs.Add(rerr.Err())
fpetkovski marked this conversation as resolved.
Show resolved Hide resolved
} else if uint64(len(rerr)) >= failureThreshold {
cause := determineWriteErrorCause(rerr.Err(), quorum)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll also have to change how we determine the HTTP error we return to client, when this cause error bubbles up back to handleRequest. Right now, we'll return error that occurs the most or the original multi error, since we use threshold 1. But this might be incorrect, as if a cause error for any individual series replications will be server error, we have to retry the whole request. I think solution would be:

  • Return server error, if any of the cause errors is an unknown error / unavailable / not ready (cases when we have to retry). Tricky but less important part here might be exactly which error to return if we have a mixed bag of server errors - the behavior of client should be same though regardless of the error message we decide to return.
  • Otherwise we should only have conflict errors and can return conflict

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing we should be mindful of here is when cause will return the original multi-error (and same actually above for the if branch), we are putting a multi-error inside of the errs multi-error, which can lead to erroneous 5xx as described in #5407 (comment) .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you are correct. However I wonder if we already have this issue in main because we calculate the top-level cause the same way using threshold=1. So if we have 2 batches with conflict and 1 batch with server error, we will return conflict to the user and not retry the request.

In any case, I would also prefer to solve this problem now since it can lead to data loss.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I am not sure about is what should the error code be when we try to replicate a series and we get one success, one server error and one client error. Right now I believe we return client-error, but if we change the rules, we would return a server error. It also means that in case of 2 conflicts (samples already exist in TSDB) and 1 server error, we would still return a server error even though that might not be necessary.

Maybe for replicating an individual series we can treat client-errors as success and only return 5xx when two replicas fail. For the overall response, we can return 5xx if any series has a 5xx.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I believe we basically have to treat conflict as if we have 'success'. It's just important to return the correct status to the upstream so if we have any conflicts in the replication, we'll want to return this to the client. Otherwise 5xx and OK should be clear (5xx if any series fails quorum; OK if no failed quorums or conflicts).

Copy link
Contributor Author

@fpetkovski fpetkovski Nov 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I think the MultiError and determineWriteErrorCause are not good abstractions for this. The determineWriteErrorCause function is overloaded and tries to determine the error for both cases.

Because of this, I added two error types writeErrors, and replicationErrors with their own own Cause() methods. The writeErrors cause prioritizes server errors, while the one from replicationErrors is mostly identical to determineWriteErrorCause and is used for determining the error of replicating a single series.

This way we always use the Cause method and depending on the error type we will bubble the appropriate error.

errs.Add(errors.Wrapf(cause, "failed to replicate series"))
}
}
return errs.Err()
}
if err == nil {
success++
if success >= successThreshold {
// In case the success threshold is lower than the total
// number of requests, then we can finish early here. This
// is the case for quorum writes for example.
return nil
if wresp.err != nil {
for _, tsID := range wresp.seriesIDs {
seriesErrs[tsID].Add(wresp.err)
}
continue
}
errs.Add(err)
}
}
}

// replicate replicates a write request to (replication-factor) nodes
// selected by the tenant and time series.
// The function only returns when all replication requests have finished
// or the context is canceled.
func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.WriteRequest) error {
// It is possible that hashring is ready in testReady() but unready now,
// so need to lock here.
h.mtx.RLock()
if h.hashring == nil {
h.mtx.RUnlock()
return errors.New("hashring is not ready")
}

replicatedRequests := make(map[endpointReplica]*prompb.WriteRequest)
for i := uint64(0); i < h.options.ReplicationFactor; i++ {
for _, ts := range wreq.Timeseries {
endpoint, err := h.hashring.GetN(tenant, &ts, i)
if err != nil {
h.mtx.RUnlock()
return err
}

er := endpointReplica{
endpoint: endpoint,
replica: replica{n: i, replicated: true},
for _, tsID := range wresp.seriesIDs {
successes[tsID]++
}
replicatedRequest, ok := replicatedRequests[er]
if !ok {
replicatedRequest = &prompb.WriteRequest{
Timeseries: make([]prompb.TimeSeries, 0),
}
replicatedRequests[er] = replicatedRequest
if quorumReached(successes, quorum) {
return nil
}
replicatedRequest.Timeseries = append(replicatedRequest.Timeseries, ts)
}
}
h.mtx.RUnlock()

quorum := h.writeQuorum()
// fanoutForward only returns an error if successThreshold (quorum) is not reached.
if err := h.fanoutForward(ctx, tenant, replicatedRequests, quorum); err != nil {
return errors.Wrap(determineWriteErrorCause(err, quorum), "quorum not reached")
}
return nil
}

// RemoteWrite implements the gRPC remote write handler for storepb.WriteableStore.
Expand Down Expand Up @@ -923,7 +904,9 @@ type retryState struct {
nextAllowed time.Time
}

type expectedErrors []*struct {
type expectedErrors []*expectedError

type expectedError struct {
err error
cause func(error) bool
count int
Expand Down
Loading