Skip to content

Commit

Permalink
Merge pull request #118630 from cockroachdb/blathers/backport-staging…
Browse files Browse the repository at this point in the history
…-v22.2.18-118470

staging-v22.2.18: release-22.2: rangefeed: add `TestProcessorContextCancellation`
  • Loading branch information
erikgrinaker authored Feb 2, 2024
2 parents 0803c71 + e859ee2 commit c9b1fed
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 8 deletions.
73 changes: 69 additions & 4 deletions pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func writeIntentOpWithDetails(
})
}

func writeIntentOpFromMeta(txn enginepb.TxnMeta) enginepb.MVCCLogicalOp {
return writeIntentOpWithDetails(txn.ID, txn.Key, txn.MinTimestamp, txn.WriteTimestamp)
}

func writeIntentOpWithKey(txnID uuid.UUID, key []byte, ts hlc.Timestamp) enginepb.MVCCLogicalOp {
return writeIntentOpWithDetails(txnID, key, ts /* minTS */, ts)
}
Expand Down Expand Up @@ -798,7 +802,9 @@ func TestProcessorTxnPushAttempt(t *testing.T) {

// Create a TxnPusher that performs assertions during the first 3 uses.
var tp testTxnPusher
tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, error) {
tp.mockPushTxns(func(
ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp,
) ([]*roachpb.Transaction, error) {
// The txns are not in a sorted order. Enforce one.
sort.Slice(txns, func(i, j int) bool {
return bytes.Compare(txns[i].Key, txns[j].Key) < 0
Expand Down Expand Up @@ -861,9 +867,6 @@ func TestProcessorTxnPushAttempt(t *testing.T) {
defer stopper.Stop(ctx)

// Add a few intents and move the closed timestamp forward.
writeIntentOpFromMeta := func(txn enginepb.TxnMeta) enginepb.MVCCLogicalOp {
return writeIntentOpWithDetails(txn.ID, txn.Key, txn.MinTimestamp, txn.WriteTimestamp)
}
p.ConsumeLogicalOps(ctx,
writeIntentOpFromMeta(txn1Meta),
writeIntentOpFromMeta(txn2Meta),
Expand Down Expand Up @@ -1466,3 +1469,65 @@ func BenchmarkProcessorWithBudget(b *testing.B) {
require.NoError(b, err.GoError())
}
}

// TestProcessorContextCancellation tests that the processor cancels the
// contexts of async tasks when stopped. It does not, however, cancel the
// process() context -- it probably should, but this should arguably also be
// handled by the scheduler.
func TestProcessorContextCancellation(t *testing.T) {
defer leaktest.AfterTest(t)()

// Try stopping both via the stopper and via Processor.Stop().
testutils.RunTrueAndFalse(t, "stopper", func(t *testing.T, useStopper bool) {

// Set up a transaction to push.
txnTS := hlc.Timestamp{WallTime: 10} // after resolved timestamp
txnMeta := enginepb.TxnMeta{
ID: uuid.MakeV4(), Key: keyA, WriteTimestamp: txnTS, MinTimestamp: txnTS}

// Set up a transaction pusher that will block until the context cancels.
pushReadyC := make(chan struct{})
pushDoneC := make(chan struct{})

var pusher testTxnPusher
pusher.mockPushTxns(func(
ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp,
) ([]*roachpb.Transaction, error) {
pushReadyC <- struct{}{}
<-ctx.Done()
close(pushDoneC)
return nil, ctx.Err()
})
pusher.mockResolveIntentsFn(func(ctx context.Context, intents []roachpb.LockUpdate) error {
return nil
})

// Start a test processor.
p, stopper := newTestProcessorWithTxnPusher(t, nil /* rtsIter */, &pusher)
ctx := context.Background()
defer stopper.Stop(ctx)

// Add an intent and move the closed timestamp past it. This should trigger a
// txn push attempt, wait for that to happen.
p.ConsumeLogicalOps(ctx, writeIntentOpFromMeta(txnMeta))
p.ForwardClosedTS(ctx, txnTS.Add(1, 0))
p.syncEventC()
select {
case <-pushReadyC:
case <-time.After(3 * time.Second):
t.Fatal("txn push did not arrive")
}

// Now, stop the processor, and wait for the txn pusher to exit.
if useStopper {
stopper.Stop(ctx)
} else {
p.Stop()
}
select {
case <-pushDoneC:
case <-time.After(3 * time.Second):
t.Fatal("txn pusher did not exit")
}
})
}
10 changes: 6 additions & 4 deletions pkg/kv/kvserver/rangefeed/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,22 +359,22 @@ func TestInitResolvedTSScan(t *testing.T) {
}

type testTxnPusher struct {
pushTxnsFn func([]enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error)
pushTxnsFn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error)
resolveIntentsFn func(ctx context.Context, intents []roachpb.LockUpdate) error
}

func (tp *testTxnPusher) PushTxns(
ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp,
) ([]*roachpb.Transaction, error) {
return tp.pushTxnsFn(txns, ts)
return tp.pushTxnsFn(ctx, txns, ts)
}

func (tp *testTxnPusher) ResolveIntents(ctx context.Context, intents []roachpb.LockUpdate) error {
return tp.resolveIntentsFn(ctx, intents)
}

func (tp *testTxnPusher) mockPushTxns(
fn func([]enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error),
fn func(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error),
) {
tp.pushTxnsFn = fn
}
Expand Down Expand Up @@ -431,7 +431,9 @@ func TestTxnPushAttempt(t *testing.T) {

// Run a txnPushAttempt.
var tp testTxnPusher
tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, error) {
tp.mockPushTxns(func(
ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp,
) ([]*roachpb.Transaction, error) {
require.Equal(t, 4, len(txns))
require.Equal(t, txn1Meta, txns[0])
require.Equal(t, txn2Meta, txns[1])
Expand Down

0 comments on commit c9b1fed

Please sign in to comment.