Skip to content

Commit

Permalink
Merge pull request #82 from carlaKC/eventloop-err
Browse files Browse the repository at this point in the history
deadlock: Error in eventloop deadlock with peerController shutdown
  • Loading branch information
carlaKC authored Oct 20, 2023
2 parents 0b86f48 + 66b6bcf commit cefc7ee
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 7 deletions.
23 changes: 16 additions & 7 deletions process.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
var (
rpcTimeout = 10 * time.Second
defaultPeerRefreshInterval = 10 * time.Minute

errChannelNotFound = errors.New("channel not found")
)

const burstSize = 10
Expand Down Expand Up @@ -180,7 +182,7 @@ func (p *process) Run(ctx context.Context) error {
})

group.Go(func() error {
return p.eventLoop(ctx)
return p.runEventLoop(ctx)
})

return group.Wait()
Expand Down Expand Up @@ -280,13 +282,20 @@ func (p *process) createPeerController(ctx context.Context, peer route.Vertex,
return ctrl
}

func (p *process) eventLoop(ctx context.Context) error {
// Create a group to attach peer goroutines to.
func (p *process) runEventLoop(ctx context.Context) error {
group, ctx := errgroup.WithContext(ctx)
defer func() {
_ = group.Wait()
}()

// Event loop will spin up new goroutines using the group that is passed in here.
// We run it in the same group so that both errors in eventLoop and those in
// the goroutines that is spins will will prompt exit.
group.Go(func() error {
return p.eventLoop(ctx, group)
})

return group.Wait()
}

func (p *process) eventLoop(ctx context.Context, group *errgroup.Group) error {
// Retrieve all pending htlcs from lnd.
htlcsPerPeer, err := p.client.getPendingIncomingHtlcs(ctx, nil)
if err != nil {
Expand Down Expand Up @@ -516,5 +525,5 @@ func (p *process) getChanInfo(channel uint64) (*channel, error) {
}

// Channel not found.
return nil, fmt.Errorf("incoming channel %v not found", channel)
return nil, fmt.Errorf("%w: %v", errChannelNotFound, channel)
}
41 changes: 41 additions & 0 deletions process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,3 +366,44 @@ func TestBlocked(t *testing.T) {
cancel()
require.ErrorIs(t, <-exit, context.Canceled)
}

// TestChannelNotFound tests that we'll successfully exit when we cannot lookup the
// channel that a htlc belongs to.
func TestChannelNotFound(t *testing.T) {
client := newLndclientMock(testChannels)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

db, cleanup := setupTestDb(t)
defer cleanup()

log := zaptest.NewLogger(t).Sugar()

cfg := &Limits{}

p := NewProcess(client, log, cfg, db)

exit := make(chan error)

go func() {
exit <- p.Run(ctx)
}()

// Next, send a htlc that is from an unknown channel.
key := circuitKey{
channel: 99,
htlc: 4,
}
client.htlcInterceptorRequests <- &interceptedEvent{
circuitKey: key,
}

select {
case err := <-exit:
require.ErrorIs(t, err, errChannelNotFound)

case <-time.After(time.Second * 10):
t.Fatalf("timeout on process error")
}
}

0 comments on commit cefc7ee

Please sign in to comment.