Skip to content

Commit

Permalink
storage: refactor ambiguous errors
Browse files Browse the repository at this point in the history
This patch refactors the way ambiguous errors are returned from the
evaluation of requests. Before this patch, the situation was confusing
and buggy: the responsibility for generating these errors was split
between Replica.executeWriteBatch() and the lower lever
Replica.tryExecuteWriteBatch(). The latter was returning both errors
and so called "retry reasons", except that some retry reasons were not
actually resulting in retries. The reasons in question were returned
together with an error, which was in blatant violation of proposalResult
comment that said that exactly one of these guys can be set.
I believe the fact that both an error and a retry reason was set was
also causing the timestamp cache updating code to be buggy, I think:
https://github.com/cockroachdb/cockroach/blob/a5eddb23cbb7db5cbb9655b8ed7a1875779b3c62/pkg/storage/replica.go#L2214
The code says that we update the ts cache if the request is not to be
retried, but it was missing cases.

The actual ambiguous errors were sometimes generated in
tryExecuteWriteBatch (for reason proposalRangeNoLongerExists) and
sometimes in executeWriteBatch (for reason proposalErrorReproposing),
which was confusing.

This patch does away with the troublesome retry reasons and removes any
responsibility around creating ambiguous errors from executeWriteBatch.
The lower layers know whether the conditions they're reporting should
return in ambiguous errors or not, and so they create ambiguous errors
directly.

This patch changes the errors returned for some non-commit requests when
the reason is proposalErrorReproposing: it used to be a non-ambiguous
error and now it will be an ambiguous one. executeWriteBatch used to
discriminate between commit and non-commit batches, but that was
inconsistent (it didn't take effect for proposalRangeNoLongerExists)
and, more importantly, it was the wrong layer to take commits into
consideration (in my opinion).

Release note: None
  • Loading branch information
andreimatei committed Jan 13, 2019
1 parent 2e7dac6 commit 80eea9e
Showing 1 changed file with 47 additions and 81 deletions.
128 changes: 47 additions & 81 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,23 +130,13 @@ var FollowerReadsEnabled = settings.RegisterBoolSetting(
false,
)

type proposalRetryReason int
type proposalReevaluationReason int

const (
proposalNoRetry proposalRetryReason = iota
proposalNoReevaluation proposalReevaluationReason = iota
// proposalIllegalLeaseIndex indicates the proposal failed to apply at
// a Lease index it was not legal for. The command should be retried.
// a Lease index it was not legal for. The command should be re-evaluated.
proposalIllegalLeaseIndex
// proposalErrorReproposing indicates that re-proposal
// failed. Because the original proposal may have succeeded, an
// AmbiguousResultError must be returned. The command should not be
// retried.
proposalErrorReproposing
// proposalRangeNoLongerExists indicates the proposal was for a
// range that no longer exists. Because the original proposal may
// have succeeded, an AmbiguousResultError must be returned. The
// command should not be retried.
proposalRangeNoLongerExists
)

// proposalResult indicates the result of a proposal. Exactly one of
Expand All @@ -155,7 +145,7 @@ const (
type proposalResult struct {
Reply *roachpb.BatchResponse
Err *roachpb.Error
ProposalRetry proposalRetryReason
ProposalRetry proposalReevaluationReason
Intents []result.IntentsWithArg
EndTxns []result.EndTxnIntents
}
Expand Down Expand Up @@ -922,8 +912,7 @@ func (r *Replica) cancelPendingCommandsLocked() {
// NB: each proposal needs its own version of the error (i.e. don't try to
// share the error across proposals).
p.finishApplication(proposalResult{
Err: roachpb.NewError(roachpb.NewAmbiguousResultError("removing replica")),
ProposalRetry: proposalRangeNoLongerExists,
Err: roachpb.NewError(roachpb.NewAmbiguousResultError("removing replica")),
})
}
}
Expand Down Expand Up @@ -2201,12 +2190,13 @@ type endCmds struct {

// done releases the latches acquired by the command and updates
// the timestamp cache using the final timestamp of each command.
func (ec *endCmds) done(br *roachpb.BatchResponse, pErr *roachpb.Error, retry proposalRetryReason) {
// Update the timestamp cache if the command is not being
// retried. Each request is considered in turn; only those marked as
// affecting the cache are processed. Inconsistent reads are
// excluded.
if retry == proposalNoRetry && ec.ba.ReadConsistency == roachpb.CONSISTENT {
func (ec *endCmds) done(
br *roachpb.BatchResponse, pErr *roachpb.Error, retry proposalReevaluationReason,
) {
// Update the timestamp cache if the request is not being re-evaluated. Each
// request is considered in turn; only those marked as affecting the cache are
// processed. Inconsistent reads are excluded.
if retry == proposalNoReevaluation && ec.ba.ReadConsistency == roachpb.CONSISTENT {
ec.repl.updateTimestampCache(&ec.ba, br, pErr)
}

Expand Down Expand Up @@ -2773,7 +2763,7 @@ func (r *Replica) executeReadOnlyBatch(
// timestamp cache update is synchronized. This is wrapped to delay
// pErr evaluation to its value when returning.
defer func() {
endCmds.done(br, pErr, proposalNoRetry)
endCmds.done(br, pErr, proposalNoReevaluation)
}()

// TODO(nvanbenschoten): Can this be moved into Replica.requestCanProceed?
Expand Down Expand Up @@ -2845,50 +2835,23 @@ func (r *Replica) executeReadOnlyBatch(
func (r *Replica) executeWriteBatch(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
for count := 0; ; count++ {
var ambiguousResult bool
for {
// TODO(andrei): export some metric about re-evaluations.
br, pErr, retry := r.tryExecuteWriteBatch(ctx, ba)
switch retry {
case proposalIllegalLeaseIndex:
if retry == proposalIllegalLeaseIndex {
log.VEventf(ctx, 2, "retry: proposalIllegalLeaseIndex")
continue // retry
case proposalRangeNoLongerExists, proposalErrorReproposing:
ambiguousResult = true
}
if pErr != nil {
// If this is a transactional request but doesn't include an
// EndTransaction with commit=true, return error immediately.
if etArg, ok := ba.GetArg(roachpb.EndTransaction); ba.Txn != nil &&
(!ok || !etArg.(*roachpb.EndTransactionRequest).Commit) {
return nil, pErr
}
// If we've gotten an indication of possible ambiguous result,
// we must return an AmbiguousResultError to prevent callers
// from retrying thinking this batch could not have succeeded.
//
// TODO(spencer): add metrics for how often the re-proposed
// commands succeed and how often we return errors.
if _, ok := pErr.GetDetail().(*roachpb.AmbiguousResultError); !ok && ambiguousResult {
log.VEventf(ctx, 2, "received error after %d retries; returning ambiguous result: %s",
count, pErr)
are := roachpb.NewAmbiguousResultError(
fmt.Sprintf("Raft re-proposal failed: %s", pErr))
are.WrappedErr = pErr
return nil, roachpb.NewError(are)
if pErr != nil {
log.Fatalf(ctx, "both error and retry returned: %s", pErr)
}
continue // retry
}
return br, pErr
}
}

// tryExecuteWriteBatch is invoked by executeWriteBatch, which will
// call this method until it returns a non-retryable result. Retries
// may happen if either the proposal was submitted to Raft but did not
// end up in a legal log position, or the proposal was submitted to
// Raft and then was re-proposed. On re-proposals, the proposal may
// have applied successfully and so the caller must be careful to
// indicate an ambiguous result to the caller in the event
// proposalReproposed is returned.
// call this method until it returns a non-retryable result (i.e. no
// proposalRetryReason is returned).
//
// Concretely,
//
Expand All @@ -2908,26 +2871,23 @@ func (r *Replica) executeWriteBatch(
// registered with the timestamp cache, its latches are released, and
// its result (which could be an error) is returned to the client.
//
// TODO(tschottdorf): take special care with "special" commands and their
// reorderings. For example, a batch of writes and a split could be in flight
// in parallel without overlap, but if the writes hit the RHS, something must
// prevent them from writing outside of the key range when they apply.
// Returns exactly one of a response, an error or re-evaluation reason.
//
// NB: changing BatchRequest to a pointer here would have to be done cautiously
// as this method makes the assumption that it operates on a shallow copy (see
// call to applyTimestampCache).
func (r *Replica) tryExecuteWriteBatch(
ctx context.Context, ba roachpb.BatchRequest,
) (br *roachpb.BatchResponse, pErr *roachpb.Error, retry proposalRetryReason) {
) (br *roachpb.BatchResponse, pErr *roachpb.Error, retry proposalReevaluationReason) {
startTime := timeutil.Now()

if err := r.maybeBackpressureWriteBatch(ctx, ba); err != nil {
return nil, roachpb.NewError(err), proposalNoRetry
return nil, roachpb.NewError(err), proposalNoReevaluation
}

spans, err := r.collectSpans(&ba)
if err != nil {
return nil, roachpb.NewError(err), proposalNoRetry
return nil, roachpb.NewError(err), proposalNoReevaluation
}

var endCmds *endCmds
Expand All @@ -2940,7 +2900,7 @@ func (r *Replica) tryExecuteWriteBatch(
var err error
endCmds, err = r.beginCmds(ctx, &ba, spans)
if err != nil {
return nil, roachpb.NewError(err), proposalNoRetry
return nil, roachpb.NewError(err), proposalNoReevaluation
}
}

Expand All @@ -2961,7 +2921,7 @@ func (r *Replica) tryExecuteWriteBatch(
// Other write commands require that this replica has the range
// lease.
if status, pErr = r.redirectOnOrAcquireLease(ctx); pErr != nil {
return nil, pErr, proposalNoRetry
return nil, pErr, proposalNoReevaluation
}
lease = status.Lease
}
Expand All @@ -2975,7 +2935,7 @@ func (r *Replica) tryExecuteWriteBatch(
// forward. Or, in the case of a transactional write, the txn
// timestamp and possible write-too-old bool.
if bumped, pErr := r.applyTimestampCache(ctx, &ba, minTS); pErr != nil {
return nil, pErr, proposalNoRetry
return nil, pErr, proposalNoReevaluation
} else if bumped {
// If we bump the transaction's timestamp, we must absolutely
// tell the client in a response transaction (for otherwise it
Expand Down Expand Up @@ -3005,7 +2965,7 @@ func (r *Replica) tryExecuteWriteBatch(
maxLeaseIndex, ba, pErr,
)
}
return nil, pErr, proposalNoRetry
return nil, pErr, proposalNoReevaluation
}
// A max lease index of zero is returned when no proposal was made or a lease was proposed.
// In both cases, we don't need to communicate a MLAI.
Expand Down Expand Up @@ -3071,7 +3031,7 @@ func (r *Replica) tryExecuteWriteBatch(
if tryAbandon() {
log.VEventf(ctx, 2, "context cancellation after %0.1fs of attempting command %s",
timeutil.Since(startTime).Seconds(), ba)
return nil, roachpb.NewError(roachpb.NewAmbiguousResultError(ctx.Err().Error())), proposalNoRetry
return nil, roachpb.NewError(roachpb.NewAmbiguousResultError(ctx.Err().Error())), proposalNoReevaluation
}
ctxDone = nil
case <-shouldQuiesce:
Expand All @@ -3084,7 +3044,7 @@ func (r *Replica) tryExecuteWriteBatch(
if tryAbandon() {
log.VEventf(ctx, 2, "shutdown cancellation after %0.1fs of attempting command %s",
timeutil.Since(startTime).Seconds(), ba)
return nil, roachpb.NewError(roachpb.NewAmbiguousResultError("server shutdown")), proposalNoRetry
return nil, roachpb.NewError(roachpb.NewAmbiguousResultError("server shutdown")), proposalNoReevaluation
}
shouldQuiesce = nil
}
Expand Down Expand Up @@ -4651,7 +4611,9 @@ func (r *Replica) refreshProposalsLocked(refreshAtDelta int, reason refreshRaftR
// https://github.com/cockroachdb/cockroach/issues/21849
} else if err != nil {
r.cleanupFailedProposalLocked(p)
p.finishApplication(proposalResult{Err: roachpb.NewError(err), ProposalRetry: proposalErrorReproposing})
p.finishApplication(proposalResult{
Err: roachpb.NewError(roachpb.NewAmbiguousResultError(err.Error())),
})
}
}
}
Expand Down Expand Up @@ -4882,7 +4844,7 @@ func (r *Replica) checkForcedErrLocked(
raftCmd storagepb.RaftCommand,
proposal *ProposalData,
proposedLocally bool,
) (uint64, proposalRetryReason, *roachpb.Error) {
) (uint64, proposalReevaluationReason, *roachpb.Error) {
leaseIndex := r.mu.state.LeaseAppliedIndex

isLeaseRequest := raftCmd.ReplicatedEvalResult.IsLeaseRequest
Expand All @@ -4896,7 +4858,7 @@ func (r *Replica) checkForcedErrLocked(
// Nothing to do here except making sure that the corresponding batch
// (which is bogus) doesn't get executed (for it is empty and so
// properties like key range are undefined).
return leaseIndex, proposalNoRetry, roachpb.NewErrorf("no-op on empty Raft entry")
return leaseIndex, proposalNoReevaluation, roachpb.NewErrorf("no-op on empty Raft entry")
}

// Verify the lease matches the proposer's expectation. We rely on
Expand Down Expand Up @@ -4959,7 +4921,7 @@ func (r *Replica) checkForcedErrLocked(
// For lease requests we return a special error that
// redirectOnOrAcquireLease() understands. Note that these
// requests don't go through the DistSender.
return leaseIndex, proposalNoRetry, roachpb.NewError(&roachpb.LeaseRejectedError{
return leaseIndex, proposalNoReevaluation, roachpb.NewError(&roachpb.LeaseRejectedError{
Existing: *r.mu.state.Lease,
Requested: requestedLease,
Message: "proposed under invalid lease",
Expand All @@ -4971,7 +4933,7 @@ func (r *Replica) checkForcedErrLocked(
nlhe.CustomMsg = fmt.Sprintf(
"stale proposal: command was proposed under lease #%d but is being applied "+
"under lease: %s", raftCmd.ProposerLeaseSequence, r.mu.state.Lease)
return leaseIndex, proposalNoRetry, roachpb.NewError(nlhe)
return leaseIndex, proposalNoReevaluation, roachpb.NewError(nlhe)
}

if isLeaseRequest {
Expand All @@ -4983,7 +4945,7 @@ func (r *Replica) checkForcedErrLocked(
// However, leases get special vetting to make sure we don't give one to a replica that was
// since removed (see #15385 and a comment in redirectOnOrAcquireLease).
if _, ok := r.mu.state.Desc.GetReplicaDescriptor(requestedLease.Replica.StoreID); !ok {
return leaseIndex, proposalNoRetry, roachpb.NewError(&roachpb.LeaseRejectedError{
return leaseIndex, proposalNoReevaluation, roachpb.NewError(&roachpb.LeaseRejectedError{
Existing: *r.mu.state.Lease,
Requested: requestedLease,
Message: "replica not part of range",
Expand All @@ -5005,7 +4967,7 @@ func (r *Replica) checkForcedErrLocked(
// The command is trying to apply at a past log position. That's
// unfortunate and hopefully rare; the client on the proposer will try
// again. Note that in this situation, the leaseIndex does not advance.
retry := proposalNoRetry
retry := proposalNoReevaluation
if proposedLocally {
log.VEventf(
ctx, 1,
Expand All @@ -5018,7 +4980,7 @@ func (r *Replica) checkForcedErrLocked(
"command observed at lease index %d, but required < %d", leaseIndex, raftCmd.MaxLeaseIndex,
)
}
return leaseIndex, proposalNoRetry, nil
return leaseIndex, proposalNoReevaluation, nil
}

// processRaftCommand processes a raft command by unpacking the
Expand Down Expand Up @@ -5259,7 +5221,7 @@ func (r *Replica) processRaftCommand(

var lResult *result.LocalResult
if proposedLocally {
if proposalRetry != proposalNoRetry {
if proposalRetry != proposalNoReevaluation {
response.ProposalRetry = proposalRetry
if pErr == nil {
log.Fatalf(ctx, "proposal with nontrivial retry behavior, but no error: %+v", proposal)
Expand All @@ -5269,7 +5231,11 @@ func (r *Replica) processRaftCommand(
// A forced error was set (i.e. we did not apply the proposal,
// for instance due to its log position) or the Replica is now
// corrupted.
response.Err = pErr
// If proposalRetry is set, we don't also return an error, as per the
// proposalResult contract.
if proposalRetry == proposalNoReevaluation {
response.Err = pErr
}
} else if proposal.Local.Reply != nil {
response.Reply = proposal.Local.Reply
} else {
Expand Down

0 comments on commit 80eea9e

Please sign in to comment.