diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 5dcfe53b40b..383d440d473 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -39,7 +39,6 @@ import ( statusapi "github.com/thanos-io/thanos/pkg/api/status" "github.com/thanos-io/thanos/pkg/logging" - "github.com/thanos-io/thanos/pkg/errutil" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/server/http/middleware" @@ -79,6 +78,7 @@ var ( errBadReplica = errors.New("request replica exceeds receiver replication factor") errNotReady = errors.New("target not ready") errUnavailable = errors.New("target not available") + errInternal = errors.New("internal error") ) // Options for the web Handler. @@ -361,7 +361,6 @@ type trackedSeries struct { type writeResponse struct { seriesIDs []int - replica uint64 err error } @@ -532,7 +531,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { responseStatusCode := http.StatusOK if err = h.handleRequest(ctx, rep, tenant, &wreq); err != nil { level.Debug(tLogger).Log("msg", "failed to handle request", "err", err) - switch determineWriteErrorCause(err, 1) { + switch errors.Cause(err) { case errNotReady: responseStatusCode = http.StatusServiceUnavailable case errUnavailable: @@ -624,11 +623,11 @@ func quorumReached(successes []int, successThreshold int) bool { // fanoutForward fans out concurrently given set of write requests. It returns status immediately when quorum of // requests succeeds or fails or if context is canceled. func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[endpointReplica]trackedSeries, numSeries int, seriesReplicated bool) error { - var errs errutil.MultiError + var errs writeErrors fctx, cancel := context.WithTimeout(tracing.CopyTraceContext(context.Background(), pctx), h.options.ForwardTimeout) defer func() { - if errs.Err() != nil { + if errs.ErrOrNil() != nil { // NOTICE: The cancel function is not used on all paths intentionally, // if there is no error when quorum is reached, // let forward requests to optimistically run until timeout. @@ -668,10 +667,8 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e }) }) if err != nil { - // When a MultiError is added to another MultiError, the error slices are concatenated, not nested. - // To avoid breaking the counting logic, we need to flatten the error. level.Debug(tLogger).Log("msg", "local tsdb write failed", "err", err.Error()) - ec <- newWriteResponse(wreqs[er].seriesIDs, errors.Wrapf(determineWriteErrorCause(err, 1), "store locally for endpoint %v", er.endpoint)) + ec <- newWriteResponse(wreqs[er].seriesIDs, errors.Wrapf(err, "store locally for endpoint %v", er.endpoint)) return } ec <- newWriteResponse(wreqs[er].seriesIDs, nil) @@ -770,9 +767,8 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e }() quorum := h.writeQuorum() - failureThreshold := h.options.ReplicationFactor - uint64(quorum) successes := make([]int, numSeries) - seriesErrs := make([]errutil.MultiError, numSeries) + seriesErrs := make([]writeErrors, numSeries) for { select { case <-fctx.Done(): @@ -781,21 +777,21 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e if !more { for _, rerr := range seriesErrs { if seriesReplicated { - errs.Add(rerr.Err()) - } else if uint64(len(rerr)) >= failureThreshold { - cause := determineWriteErrorCause(rerr.Err(), quorum) - errs.Add(errors.Wrapf(cause, "failed to replicate series")) + errs.Add(rerr.Cause()) + } else { + cause := rerr.replicationErr(quorum) + errs.Add(errors.Wrapf(cause, rerr.Error())) } } - return errs.Err() + return errs.ErrOrNil() } + if wresp.err != nil { for _, tsID := range wresp.seriesIDs { seriesErrs[tsID].Add(wresp.err) } continue } - for _, tsID := range wresp.seriesIDs { successes[tsID]++ } @@ -815,7 +811,7 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st if err != nil { level.Debug(h.logger).Log("msg", "failed to handle request", "err", err) } - switch determineWriteErrorCause(err, 1) { + switch errors.Cause(err) { case nil: return &storepb.WriteResponse{}, nil case errNotReady: @@ -916,25 +912,82 @@ func (a expectedErrors) Len() int { return len(a) } func (a expectedErrors) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a expectedErrors) Less(i, j int) bool { return a[i].count < a[j].count } -// determineWriteErrorCause extracts a sentinel error that has occurred more than the given threshold from a given fan-out error. -// It will inspect the error's cause if the error is a MultiError, -// It will return cause of each contained error but will not traverse any deeper. -func determineWriteErrorCause(err error, threshold int) error { +// writeErrors contains all errors that have +// occurred during a remote-write request. +type writeErrors struct { + reasonSet map[string]struct{} + errs []error +} + +// Add adds an error to the writeErrors. +func (es *writeErrors) Add(err error) { if err == nil { + return + } + if len(es.errs) == 0 { + es.errs = []error{err} + } else { + es.errs = append(es.errs, err) + } + if es.reasonSet == nil { + es.reasonSet = make(map[string]struct{}) + } + es.reasonSet[err.Error()] = struct{}{} +} + +// ErrOrNil returns the writeErrors instance if any +// errors are contained in it. +// Otherwise it returns nil. +func (es *writeErrors) ErrOrNil() error { + if len(es.errs) == 0 { + return nil + } + return es +} + +// Cause returns the primary cause for a write failure. +// If multiple errors have occurred, Cause will prefer +// recoverable over non-recoverable errors. +func (es *writeErrors) Cause() error { + if len(es.errs) == 0 { return nil } - unwrappedErr := errors.Cause(err) - errs, ok := unwrappedErr.(errutil.NonNilMultiError) - if !ok { - errs = []error{unwrappedErr} + var recoverableErr error + var nonRecoverableErr error + for _, werr := range es.errs { + cause := errors.Cause(werr) + if isUnavailable(cause) { + return errUnavailable + } + if isNotReady(cause) { + return errNotReady + } + if isConflict(cause) { + nonRecoverableErr = errConflict + continue + } + recoverableErr = errInternal } - if len(errs) == 0 { + + if recoverableErr != nil { + return recoverableErr + } + return nonRecoverableErr +} + +// replicationErr extracts a sentinel error with the highest occurrence +// that has happened more than the given threshold. +// If no single error has occurred more than the threshold, but the +// total number of errors meets the threshold, +// replicationErr will return errInternal. +func (es *writeErrors) replicationErr(threshold int) error { + if es == nil { return nil } - if threshold < 1 { - return err + if len(es.errs) == 0 { + return nil } expErrs := expectedErrors{ @@ -944,7 +997,7 @@ func determineWriteErrorCause(err error, threshold int) error { } for _, exp := range expErrs { exp.count = 0 - for _, err := range errs { + for _, err := range es.errs { if exp.cause(errors.Cause(err)) { exp.count++ } @@ -956,7 +1009,39 @@ func determineWriteErrorCause(err error, threshold int) error { return exp.err } - return err + if len(es.errs) >= threshold { + return errInternal + } + + return nil +} + +// Error returns a string containing a deduplicated set of reasons. +func (es *writeErrors) Error() string { + if es == nil || len(es.reasonSet) == 0 { + return "" + } + reasons := make([]string, 0, len(es.reasonSet)) + for reason := range es.reasonSet { + reasons = append(reasons, reason) + } + sort.Strings(reasons) + + var buf bytes.Buffer + if len(reasons) > 1 { + fmt.Fprintf(&buf, "%d errors: ", len(es.reasonSet)) + } + + var more bool + for _, reason := range reasons { + if more { + buf.WriteString("; ") + } + buf.WriteString(reason) + more = true + } + + return buf.String() } func newPeerGroup(dialOpts ...grpc.DialOption) *peerGroup { diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index eeb4cd6cbaf..594bc29a01b 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -36,213 +36,16 @@ import ( "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "github.com/thanos-io/thanos/pkg/block/metadata" - "github.com/thanos-io/thanos/pkg/errutil" "github.com/thanos-io/thanos/pkg/extkingpin" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" "github.com/thanos-io/thanos/pkg/testutil" + "google.golang.org/grpc" ) -func TestDetermineWriteErrorCause(t *testing.T) { - for _, tc := range []struct { - name string - err error - threshold int - forReplication bool - exp error - }{ - { - name: "nil", - }, - { - name: "nil multierror", - err: errutil.NonNilMultiError([]error{}), - }, - { - name: "matching simple", - err: errConflict, - threshold: 1, - exp: errConflict, - }, - { - name: "non-matching multierror", - err: errutil.NonNilMultiError([]error{ - errors.New("foo"), - errors.New("bar"), - }), - exp: errors.New("2 errors: foo; bar"), - }, - { - name: "nested non-matching multierror", - err: errors.Wrap(errutil.NonNilMultiError([]error{ - errors.New("foo"), - errors.New("bar"), - }), "baz"), - threshold: 1, - exp: errors.New("baz: 2 errors: foo; bar"), - }, - { - name: "deep nested non-matching multierror", - err: errors.Wrap(errutil.NonNilMultiError([]error{ - errors.New("foo"), - errutil.NonNilMultiError([]error{ - errors.New("bar"), - errors.New("qux"), - }), - }), "baz"), - threshold: 1, - exp: errors.New("baz: 2 errors: foo; 2 errors: bar; qux"), - }, - { - name: "matching multierror", - err: errutil.NonNilMultiError([]error{ - storage.ErrOutOfOrderSample, - errors.New("foo"), - errors.New("bar"), - }), - threshold: 1, - exp: errConflict, - }, - { - name: "matching multierror (exemplar error)", - err: errutil.NonNilMultiError([]error{ - storage.ErrExemplarLabelLength, - errors.New("foo"), - errors.New("bar"), - }), - threshold: 1, - exp: errConflict, - }, - { - name: "matching multierror (labels error)", - err: errutil.NonNilMultiError([]error{ - labelpb.ErrEmptyLabels, - errors.New("foo"), - errors.New("bar"), - }), - threshold: 1, - exp: errConflict, - }, - { - name: "matching but below threshold multierror", - err: errutil.NonNilMultiError([]error{ - storage.ErrOutOfOrderSample, - errors.New("foo"), - errors.New("bar"), - }), - threshold: 2, - exp: errors.New("3 errors: out of order sample; foo; bar"), - }, - { - name: "matching multierror many", - err: errutil.NonNilMultiError([]error{ - storage.ErrOutOfOrderSample, - errConflict, - status.Error(codes.AlreadyExists, "conflict"), - errors.New("foo"), - errors.New("bar"), - }), - threshold: 1, - exp: errConflict, - }, - { - name: "matching multierror many, one above threshold", - err: errutil.NonNilMultiError([]error{ - storage.ErrOutOfOrderSample, - errConflict, - tsdb.ErrNotReady, - tsdb.ErrNotReady, - tsdb.ErrNotReady, - errors.New("foo"), - }), - threshold: 2, - exp: errNotReady, - }, - { - name: "matching multierror many, one above threshold (exemplar error)", - err: errutil.NonNilMultiError([]error{ - tsdb.ErrNotReady, - tsdb.ErrNotReady, - storage.ErrDuplicateExemplar, - storage.ErrDuplicateSampleForTimestamp, - storage.ErrExemplarLabelLength, - errors.New("foo"), - }), - threshold: 2, - exp: errConflict, - }, - { - name: "matching multierror many, both above threshold, conflict has precedence", - err: errutil.NonNilMultiError([]error{ - storage.ErrOutOfOrderSample, - errConflict, - tsdb.ErrNotReady, - tsdb.ErrNotReady, - tsdb.ErrNotReady, - status.Error(codes.AlreadyExists, "conflict"), - errors.New("foo"), - }), - threshold: 2, - exp: errConflict, - }, - { - name: "matching multierror many, both above threshold, conflict has precedence (labels error)", - err: errutil.NonNilMultiError([]error{ - labelpb.ErrDuplicateLabels, - labelpb.ErrDuplicateLabels, - tsdb.ErrNotReady, - tsdb.ErrNotReady, - tsdb.ErrNotReady, - labelpb.ErrDuplicateLabels, - errors.New("foo"), - }), - threshold: 2, - exp: errConflict, - }, - { - name: "nested matching multierror", - err: errors.Wrap(errors.Wrap(errutil.NonNilMultiError([]error{ - storage.ErrOutOfOrderSample, - errors.New("foo"), - errors.New("bar"), - }), "baz"), "qux"), - threshold: 1, - exp: errConflict, - }, - { - name: "deep nested matching multierror", - err: errors.Wrap(errutil.NonNilMultiError([]error{ - errutil.NonNilMultiError([]error{ - errors.New("qux"), - status.Error(codes.AlreadyExists, "conflict"), - status.Error(codes.AlreadyExists, "conflict"), - }), - errors.New("foo"), - errors.New("bar"), - }), "baz"), - threshold: 1, - exp: errors.New("baz: 3 errors: 3 errors: qux; rpc error: code = AlreadyExists desc = conflict; rpc error: code = AlreadyExists desc = conflict; foo; bar"), - }, - } { - t.Run(tc.name, func(t *testing.T) { - err := determineWriteErrorCause(tc.err, tc.threshold) - if tc.exp != nil { - testutil.NotOk(t, err) - testutil.Equals(t, tc.exp.Error(), err.Error()) - return - } - testutil.Ok(t, err) - }) - } -} - type fakeTenantAppendable struct { f *fakeAppendable } @@ -375,7 +178,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin TenantHeader: DefaultTenantHeader, ReplicaHeader: DefaultReplicaHeader, ReplicationFactor: replicationFactor, - ForwardTimeout: 5 * time.Second, + ForwardTimeout: 5 * time.Minute, Writer: NewWriter(log.NewNopLogger(), newFakeTenantAppendable(appendables[i])), Limiter: limiter, }) @@ -567,7 +370,7 @@ func testReceiveQuorum(t *testing.T, hashringAlgo HashringAlgorithm, withConsist }, { name: "size 3 conflict and commit error with replication", - status: http.StatusConflict, + status: http.StatusInternalServerError, replicationFactor: 3, wreq: wreq, appendables: []*fakeAppendable{ @@ -695,7 +498,7 @@ func testReceiveQuorum(t *testing.T, hashringAlgo HashringAlgorithm, withConsist }, { name: "size 6 with replication 3 one commit and two conflict error", - status: http.StatusConflict, + status: http.StatusInternalServerError, replicationFactor: 3, wreq: wreq, appendables: []*fakeAppendable{ @@ -761,6 +564,7 @@ func testReceiveQuorum(t *testing.T, hashringAlgo HashringAlgorithm, withConsist if rec.Code != tc.status { t.Errorf("handler %d: got unexpected HTTP status code: expected %d, got %d; body: %s", i+1, tc.status, rec.Code, rec.Body.String()) } + fmt.Println(rec.Body.String()) } if withConsistencyDelay { diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index dbd37515369..4c9223a4e20 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -13,7 +13,6 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" - "github.com/thanos-io/thanos/pkg/errutil" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" ) @@ -73,7 +72,7 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR var ( ref storage.SeriesRef - errs errutil.MultiError + errs writeErrors ) for _, t := range wreq.Timeseries { // Check if time series labels are valid. If not, skip the time series @@ -210,5 +209,5 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR if err := app.Commit(); err != nil { errs.Add(errors.Wrap(err, "commit samples")) } - return errs.Err() + return errs.Cause() }