Skip to content

Commit

Permalink
kv: don't clobber commit status when heartbeat fails
Browse files Browse the repository at this point in the history
When a heartbeat failed while a request was in-flight, two proto updates
could race and produce illegal behavior. For example, it was possible
for a client to successfully commit but receive an ABORTED proto (and no
error).

This commit removes the ad-hoc manipulation of the state during
heartbeat failures. Instead, before each new request sent through the
heartbeat interceptor we check on the heartbeat loop's status; if it
indicates an error, we refuse new requests.

Fixes #39658.

Release note: None
  • Loading branch information
tbg committed Aug 14, 2019
1 parent e445511 commit 1da9b85
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 29 deletions.
61 changes: 34 additions & 27 deletions pkg/kv/txn_interceptor_heartbeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ import (
opentracing "github.com/opentracing/opentracing-go"
)

type txnHeartbeaterStatus byte

const (
txnHeartbeaterStatusReady txnHeartbeaterStatus = iota
txnHeartbeaterStatusRunning
txnHeartbeaterStatusRejecting
)

// txnHeartbeater is a txnInterceptor in charge of a transaction's heartbeat
// loop. Transaction coordinators heartbeat their transaction record
// periodically to indicate the liveness of their transaction. Other actors like
Expand Down Expand Up @@ -84,9 +92,9 @@ type txnHeartbeater struct {
mu struct {
sync.Locker

// loopStarted indicates whether the heartbeat loop has been launched
// status indicates whether the heartbeat loop has been launched
// for the transaction or not. It remains true once the loop terminates.
loopStarted bool
status txnHeartbeaterStatus

// txnEnd is closed when the transaction is aborted or committed, terminating
// the heartbeat loop. Nil if the heartbeat loop is not running.
Expand Down Expand Up @@ -122,20 +130,21 @@ func (h *txnHeartbeater) init(
h.metrics = metrics
h.mu.Locker = mu
h.mu.txn = txn
h.mu.status = txnHeartbeaterStatusReady
h.gatekeeper = gatekeeper
h.asyncAbortCallbackLocked = asyncAbortCallbackLocked
}

// SendLocked is part of the txnInteceptor interface.
// SendLocked is part of the txnInterceptor interface.
func (h *txnHeartbeater) SendLocked(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
// If finalErr is set, we reject everything but rollbacks.
if h.mu.finalErr != nil {
if h.mu.status == txnHeartbeaterStatusRejecting {
singleRollback := ba.IsSingleEndTransactionRequest() &&
!ba.Requests[0].GetInner().(*roachpb.EndTransactionRequest).Commit
if !singleRollback {
return nil, h.mu.finalErr
return nil, roachpb.NewErrorf("programming error: txnHeartbeater is finalized")
}
}

Expand All @@ -161,12 +170,9 @@ func (h *txnHeartbeater) SendLocked(
// Note that we don't do it for 1PC txns: they only leave intents around on
// retriable errors if the batch has been split between ranges. We consider
// that unlikely enough so we prefer to not pay for a goroutine.
if !h.mu.loopStarted {
if h.mu.status == txnHeartbeaterStatusReady {
if _, haveEndTxn := ba.GetArg(roachpb.EndTransaction); !haveEndTxn {
if err := h.startHeartbeatLoopLocked(ctx); err != nil {
h.mu.finalErr = roachpb.NewError(err)
return nil, h.mu.finalErr
}
h.startHeartbeatLoopLocked(ctx)
}
}
}
Expand Down Expand Up @@ -200,13 +206,13 @@ func (h *txnHeartbeater) closeLocked() {
}

// startHeartbeatLoopLocked starts a heartbeat loop in a different goroutine.
func (h *txnHeartbeater) startHeartbeatLoopLocked(ctx context.Context) error {
if h.mu.loopStarted || h.mu.txnEnd != nil {
func (h *txnHeartbeater) startHeartbeatLoopLocked(ctx context.Context) {
if h.mu.status != txnHeartbeaterStatusReady || h.mu.txnEnd != nil {
log.Fatal(ctx, "attempting to start a second heartbeat loop ")
}

log.VEventf(ctx, 2, "coordinator spawns heartbeat loop")
h.mu.loopStarted = true
h.mu.status = txnHeartbeaterStatusRunning
h.mu.txnEnd = make(chan struct{})

// Create a new context so that the heartbeat loop doesn't inherit the
Expand All @@ -217,10 +223,12 @@ func (h *txnHeartbeater) startHeartbeatLoopLocked(ctx context.Context) error {
hbCtx := h.AnnotateCtx(context.Background())
hbCtx = opentracing.ContextWithSpan(hbCtx, opentracing.SpanFromContext(ctx))

return h.stopper.RunAsyncTask(
if err := h.stopper.RunAsyncTask(
hbCtx, "kv.TxnCoordSender: heartbeat loop", func(ctx context.Context) {
h.heartbeatLoop(ctx)
})
}); err != nil {
h.mu.status = txnHeartbeaterStatusRejecting
}
}

// heartbeatLoop periodically sends a HeartbeatTxn request to the transaction
Expand All @@ -234,13 +242,10 @@ func (h *txnHeartbeater) heartbeatLoop(ctx context.Context) {
defer ticker.Stop()
}

var finalErr *roachpb.Error
defer func() {
h.mu.Lock()
// Prevent future SendLocked() calls.
if finalErr != nil {
h.mu.finalErr = finalErr
}
// Prevent future SendLocked() calls (except rollbacks).
h.mu.status = txnHeartbeaterStatusRejecting
if h.mu.txnEnd != nil {
h.mu.txnEnd = nil
}
Expand All @@ -264,15 +269,12 @@ func (h *txnHeartbeater) heartbeatLoop(ctx context.Context) {
// This error we're generating here should not be seen by clients. Since
// the transaction is aborted, they should be rejected before they reach
// this interceptor.
finalErr = roachpb.NewErrorf("heartbeat failed fatally")
return
}
case <-closer:
// Transaction finished normally.
finalErr = roachpb.NewErrorf("txnHeartbeater already closed")
return
case <-h.stopper.ShouldQuiesce():
finalErr = roachpb.NewErrorf("node already quiescing")
return
}
}
Expand Down Expand Up @@ -337,8 +339,16 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool {
// TODO(nvanbenschoten): Make this the only case where we get back an
// Aborted txn.
if _, ok := pErr.GetDetail().(*roachpb.TransactionAbortedError); ok {
h.mu.txn.Status = roachpb.ABORTED
// Note that it's possible that the txn actually committed but its
// record got GC'ed. In that case, aborting won't hurt anyone though,
// since all intents have already been resolved.
// The only thing we must ascertain is that we don't tell the client
// about this error - it will get either a definitive result of
// its commit or an ambiguous one and we have nothing to offer that
// provides more clarity. We do however prevent it from running more
// requests in case it isn't aware that the transaction is over.
log.VEventf(ctx, 1, "Heartbeat detected aborted txn. Cleaning up.")
h.mu.status = txnHeartbeaterStatusRejecting
h.abortTxnAsyncLocked(ctx)
return false
}
Expand Down Expand Up @@ -382,9 +392,6 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool {
// abortTxnAsyncLocked send an EndTransaction(commmit=false) asynchronously.
// The asyncAbortCallbackLocked callback is also called.
func (h *txnHeartbeater) abortTxnAsyncLocked(ctx context.Context) {
if h.mu.txn.Status != roachpb.ABORTED {
log.Fatalf(ctx, "abortTxnAsyncLocked called for non-aborted txn: %s", h.mu.txn)
}
h.asyncAbortCallbackLocked(ctx)
txn := h.mu.txn.Clone()

Expand Down
2 changes: 0 additions & 2 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1091,8 +1091,6 @@ func TestFailedSnapshotFillsReservation(t *testing.T) {
func TestConcurrentRaftSnapshots(t *testing.T) {
defer leaktest.AfterTest(t)()

t.Skip("https://github.com/cockroachdb/cockroach/issues/39652")

mtc := &multiTestContext{
// This test was written before the multiTestContext started creating many
// system ranges at startup, and hasn't been update to take that into
Expand Down

0 comments on commit 1da9b85

Please sign in to comment.