Skip to content

Commit

Permalink
event: fix Resubscribe deadlock when unsubscribing after inner sub en…
Browse files Browse the repository at this point in the history
…ds (#28359)

A goroutine is used to manage the lifetime of subscriptions managed by
resubscriptions. When the subscription ends with no error, the resub
goroutine ends as well. However, the resub goroutine needs to live
long enough to read from the unsub channel. Otheriwse, an Unsubscribe
call deadlocks when writing to the unsub channel.

This is fixed by adding a buffer to the unsub channel.
  • Loading branch information
Inphi authored Oct 22, 2023
1 parent a6a0ae4 commit ffc6a0f
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 1 deletion.
2 changes: 1 addition & 1 deletion event/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func ResubscribeErr(backoffMax time.Duration, fn ResubscribeErrFunc) Subscriptio
backoffMax: backoffMax,
fn: fn,
err: make(chan error),
unsub: make(chan struct{}),
unsub: make(chan struct{}, 1),
}
go s.loop()
return s
Expand Down
24 changes: 24 additions & 0 deletions event/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,27 @@ func TestResubscribeWithErrorHandler(t *testing.T) {
t.Fatalf("unexpected subscription errors %v, want %v", subErrs, expectedSubErrs)
}
}

func TestResubscribeWithCompletedSubscription(t *testing.T) {
t.Parallel()

quitProducerAck := make(chan struct{})
quitProducer := make(chan struct{})

sub := ResubscribeErr(100*time.Millisecond, func(ctx context.Context, lastErr error) (Subscription, error) {
return NewSubscription(func(unsubscribed <-chan struct{}) error {
select {
case <-quitProducer:
quitProducerAck <- struct{}{}
return nil
case <-unsubscribed:
return nil
}
}), nil
})

// Ensure producer has started and exited before Unsubscribe
close(quitProducer)
<-quitProducerAck
sub.Unsubscribe()
}

0 comments on commit ffc6a0f

Please sign in to comment.