From 5632bab2fe7002350e7ae336ff72fc426585dbb5 Mon Sep 17 00:00:00 2001 From: Carla Kirk-Cohen Date: Mon, 2 Oct 2023 14:38:49 -0400 Subject: [PATCH 1/6] stub: accept context for clean shutdown Previously if there was an error in process/peercontroller, the stub would continue to run and block on sending new htlcs into channels that have nothing listening on the other end. This change passes in the context used by process so that the stub can shut down cleanly. --- run.go | 6 +++--- stub.go | 39 ++++++++++++++++++++++++++++----------- 2 files changed, 31 insertions(+), 14 deletions(-) diff --git a/run.go b/run.go index c71639b..e25a849 100644 --- a/run.go +++ b/run.go @@ -54,10 +54,12 @@ func run(c *cli.Context) error { } }() + group, ctx := errgroup.WithContext(ctx) + stub := c.Bool(stubFlag.Name) var client lndclient if stub { - stubClient := newStubClient() + stubClient := newStubClient(ctx) client = stubClient } else { @@ -145,8 +147,6 @@ func run(c *cli.Context) error { ReadHeaderTimeout: time.Second * 10, } - group, ctx := errgroup.WithContext(ctx) - // Run circuitbreaker core. group.Go(func() error { return p.Run(ctx) diff --git a/stub.go b/stub.go index 2172dde..3ce3b9a 100644 --- a/stub.go +++ b/stub.go @@ -81,7 +81,7 @@ type stubPeer struct { alias string } -func newStubClient() *stubLndClient { +func newStubClient(ctx context.Context) *stubLndClient { peers := make(map[route.Vertex]*stubPeer) chanMap := make(map[uint64]route.Vertex) pendingHtlcs := make(map[circuitKey]*stubInFlight) @@ -154,21 +154,21 @@ func newStubClient() *stubLndClient { channels = append(channels, channel) } - go client.generateHtlcs(key, peer, channels) + go client.generateHtlcs(ctx, key, peer, channels) } - go client.run() + go client.run(ctx) return client } -func (s *stubLndClient) run() { +func (s *stubLndClient) run(ctx context.Context) { for resp := range s.interceptResponseChan { - go s.resolveHtlc(resp) + go s.resolveHtlc(ctx, resp) } } -func (s *stubLndClient) resolveHtlc(resp *interceptResponse) { +func (s *stubLndClient) resolveHtlc(ctx context.Context, resp *interceptResponse) { s.pendingHtlcsLock.Lock() inFlight, ok := s.pendingHtlcs[resp.key] s.pendingHtlcsLock.Unlock() @@ -177,11 +177,15 @@ func (s *stubLndClient) resolveHtlc(resp *interceptResponse) { } if !resp.resume { - s.eventChan <- &resolvedEvent{ + select { + case s.eventChan <- &resolvedEvent{ incomingCircuitKey: resp.key, settled: false, timestamp: time.Now(), outgoingCircuitKey: inFlight.keyOut, + }: + case <-ctx.Done(): + return } return @@ -213,11 +217,15 @@ func (s *stubLndClient) resolveHtlc(resp *interceptResponse) { settled := rand.Int31n(100) < settledPerc //nolint: gosec - s.eventChan <- &resolvedEvent{ + select { + case s.eventChan <- &resolvedEvent{ incomingCircuitKey: resp.key, outgoingCircuitKey: inFlight.keyOut, settled: settled, timestamp: time.Now(), + }: + case <-ctx.Done(): + return } s.pendingHtlcsLock.Lock() @@ -242,7 +250,7 @@ func randomDelay(profile int) time.Duration { time.Millisecond } -func (s *stubLndClient) generateHtlcs(key route.Vertex, peer *stubPeer, +func (s *stubLndClient) generateHtlcs(ctx context.Context, key route.Vertex, peer *stubPeer, outgoingChannels []uint64) { log.Infow("Starting stub", "peer", peer.alias) @@ -284,15 +292,24 @@ func (s *stubLndClient) generateHtlcs(key route.Vertex, peer *stubPeer, outgoingAmount = incomingAmount } - s.interceptRequestChan <- &interceptedEvent{ + select { + case s.interceptRequestChan <- &interceptedEvent{ circuitKey: circuitKeyIn, incomingMsat: lnwire.MilliSatoshi(incomingAmount), outgoingMsat: lnwire.MilliSatoshi(outgoingAmount), + }: + case <-ctx.Done(): + return } htlcId++ - time.Sleep(randomDelay(delayProfile)) + select { + case <-time.After(randomDelay(delayProfile)): + + case <-ctx.Done(): + break + } } } From e43aa4112184c6facb2aad43652ff7bd483f5dcd Mon Sep 17 00:00:00 2001 From: Carla Kirk-Cohen Date: Mon, 2 Oct 2023 13:06:28 -0400 Subject: [PATCH 2/6] multi: make forwarding history limit configurable --- db.go | 13 ++++++++----- db_test.go | 27 +++++++++++++++++---------- main.go | 5 +++++ process_test.go | 12 ++++++------ run.go | 2 +- 5 files changed, 37 insertions(+), 22 deletions(-) diff --git a/db.go b/db.go index 491f343..e5f837e 100644 --- a/db.go +++ b/db.go @@ -96,7 +96,7 @@ type Db struct { fwdHistoryLimit int } -func NewDb(dbPath string, opts ...func(*Db)) (*Db, error) { +func NewDb(dbPath string, fwdHistoryLimit int) (*Db, error) { const busyTimeoutMs = 5000 dsn := dbPath + fmt.Sprintf("?_pragma=busy_timeout=%d", busyTimeoutMs) @@ -116,10 +116,7 @@ func NewDb(dbPath string, opts ...func(*Db)) (*Db, error) { database := &Db{ db: db, - fwdHistoryLimit: defaultFwdHistoryLimit, - } - for _, opt := range opts { - opt(database) + fwdHistoryLimit: fwdHistoryLimit, } return database, nil @@ -244,6 +241,12 @@ type HtlcInfo struct { func (d *Db) RecordHtlcResolution(ctx context.Context, htlc *HtlcInfo) error { + // If the database is configured to not store any records, save the hassle of + // writing and deleting a record by returning early. + if d.fwdHistoryLimit == 0 { + return nil + } + if err := d.insertHtlcResolution(ctx, htlc); err != nil { return err } diff --git a/db_test.go b/db_test.go index b6e1406..94f0435 100644 --- a/db_test.go +++ b/db_test.go @@ -10,11 +10,11 @@ import ( "github.com/stretchr/testify/require" ) -func setupTestDb(t *testing.T, dbOpts ...func(*Db)) (*Db, func()) { +func setupTestDb(t *testing.T, fwdingHistoryLimit int) (*Db, func()) { file, err := os.CreateTemp("", "test_db_") require.NoError(t, err) - db, err := NewDb(file.Name(), dbOpts...) + db, err := NewDb(file.Name(), fwdingHistoryLimit) require.NoError(t, err) return db, func() { @@ -24,7 +24,7 @@ func setupTestDb(t *testing.T, dbOpts ...func(*Db)) (*Db, func()) { func TestDb(t *testing.T) { ctx := context.Background() - db, cleanup := setupTestDb(t) + db, cleanup := setupTestDb(t, defaultFwdHistoryLimit) defer cleanup() expectedDefaultLimit := Limit{ @@ -68,18 +68,12 @@ func TestDb(t *testing.T) { defer db.Close() } -func dbWithCustomForwardingHistoryLimit(limit int) func(d *Db) { - return func(d *Db) { - d.fwdHistoryLimit = limit - } -} - func TestDbForwardingHistory(t *testing.T) { limit := 20 // Create a test DB that will limit to 10 forwarding history records. ctx := context.Background() - db, cleanup := setupTestDb(t, dbWithCustomForwardingHistoryLimit(limit)) + db, cleanup := setupTestDb(t, limit) defer cleanup() // Insert HTLCs just up until our limit. @@ -122,3 +116,16 @@ func testHtlc(i uint64) *HtlcInfo { }, } } + +func TestDbNoForwardingHistory(t *testing.T) { + ctx := context.Background() + db, cleanup := setupTestDb(t, 0) + defer cleanup() + + htlc := testHtlc(1) + require.NoError(t, db.RecordHtlcResolution(ctx, htlc)) + + fwds, err := db.ListForwardingHistory(ctx, time.Time{}, time.Unix(1000000, 0)) + require.NoError(t, err) + require.Len(t, fwds, 0) +} diff --git a/main.go b/main.go index 75d715e..2a417ef 100644 --- a/main.go +++ b/main.go @@ -129,6 +129,11 @@ func main() { Value: "127.0.0.1:9234", Usage: "grpc server listen address", }, + cli.Uint64Flag{ + Name: "fwdhistorylimit", + Usage: "limit the number of htlc forwards that are persisted", + Value: defaultFwdHistoryLimit, + }, httpListenFlag, stubFlag, } diff --git a/process_test.go b/process_test.go index e48a476..0feb159 100644 --- a/process_test.go +++ b/process_test.go @@ -37,7 +37,7 @@ func testProcess(t *testing.T, event resolveEvent) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - db, cleanup := setupTestDb(t) + db, cleanup := setupTestDb(t, defaultFwdHistoryLimit) defer cleanup() log := zaptest.NewLogger(t).Sugar() @@ -114,7 +114,7 @@ func TestLimits(t *testing.T) { func testRateLimit(t *testing.T, mode Mode) { defer Timeout()() - db, cleanup := setupTestDb(t) + db, cleanup := setupTestDb(t, defaultFwdHistoryLimit) defer cleanup() cfg := &Limits{ @@ -204,7 +204,7 @@ func testRateLimit(t *testing.T, mode Mode) { func testMaxPending(t *testing.T, mode Mode) { defer Timeout()() - db, cleanup := setupTestDb(t) + db, cleanup := setupTestDb(t, defaultFwdHistoryLimit) defer cleanup() cfg := &Limits{ @@ -283,7 +283,7 @@ func TestNewPeer(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - db, cleanup := setupTestDb(t) + db, cleanup := setupTestDb(t, defaultFwdHistoryLimit) defer cleanup() log := zaptest.NewLogger(t).Sugar() @@ -323,7 +323,7 @@ func TestNewPeer(t *testing.T) { func TestBlocked(t *testing.T) { defer Timeout()() - db, cleanup := setupTestDb(t) + db, cleanup := setupTestDb(t, defaultFwdHistoryLimit) defer cleanup() cfg := &Limits{ @@ -375,7 +375,7 @@ func TestChannelNotFound(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - db, cleanup := setupTestDb(t) + db, cleanup := setupTestDb(t, defaultFwdHistoryLimit) defer cleanup() log := zaptest.NewLogger(t).Sugar() diff --git a/run.go b/run.go index e25a849..0dccd87 100644 --- a/run.go +++ b/run.go @@ -43,7 +43,7 @@ func run(c *cli.Context) error { log.Infow("Opening database", "path", dbPath) // Open database. - db, err := NewDb(dbPath) + db, err := NewDb(dbPath, c.Int("fwdhistorylimit")) if err != nil { return err } From 85a1d48bf42a4964805fbd684bc75c7b0fbf27d5 Mon Sep 17 00:00:00 2001 From: Carla Kirk-Cohen Date: Fri, 20 Oct 2023 10:14:03 -0400 Subject: [PATCH 3/6] db: once off clean up forwarding records on db creation --- db.go | 8 +++++++- db_test.go | 27 ++++++++++++++++++++++++++- run.go | 2 +- 3 files changed, 34 insertions(+), 3 deletions(-) diff --git a/db.go b/db.go index e5f837e..9e41802 100644 --- a/db.go +++ b/db.go @@ -96,7 +96,7 @@ type Db struct { fwdHistoryLimit int } -func NewDb(dbPath string, fwdHistoryLimit int) (*Db, error) { +func NewDb(ctx context.Context, dbPath string, fwdHistoryLimit int) (*Db, error) { const busyTimeoutMs = 5000 dsn := dbPath + fmt.Sprintf("?_pragma=busy_timeout=%d", busyTimeoutMs) @@ -119,6 +119,12 @@ func NewDb(dbPath string, fwdHistoryLimit int) (*Db, error) { fwdHistoryLimit: fwdHistoryLimit, } + // Perform a once-off cleanup of the records in the db to update to a potential + // change in limit value. + if err := database.limitHTLCRecords(ctx); err != nil { + return nil, err + } + return database, nil } diff --git a/db_test.go b/db_test.go index 94f0435..ac12e0b 100644 --- a/db_test.go +++ b/db_test.go @@ -14,7 +14,7 @@ func setupTestDb(t *testing.T, fwdingHistoryLimit int) (*Db, func()) { file, err := os.CreateTemp("", "test_db_") require.NoError(t, err) - db, err := NewDb(file.Name(), fwdingHistoryLimit) + db, err := NewDb(context.Background(), file.Name(), fwdingHistoryLimit) require.NoError(t, err) return db, func() { @@ -129,3 +129,28 @@ func TestDbNoForwardingHistory(t *testing.T) { require.NoError(t, err) require.Len(t, fwds, 0) } + +func TestForwadingHistoryDelete(t *testing.T) { + // Create a db that will store HTLCs. + ctx := context.Background() + db, cleanup := setupTestDb(t, 5) + defer cleanup() + + // Write a test HTLC and assert that it's stored. + htlc := testHtlc(1) + require.NoError(t, db.RecordHtlcResolution(ctx, htlc)) + + fwds, err := db.ListForwardingHistory(ctx, time.Time{}, time.Unix(1000000, 0)) + require.NoError(t, err) + require.Len(t, fwds, 1) + + // Modify the db to have a zero limit on forwarding history. We don't recreate + // the test db because it would re-create the file. Run limitHTLCRecords once + // (as we would on NewDb) to assert that we clean up our records. + db.fwdHistoryLimit = 0 + require.NoError(t, db.limitHTLCRecords(ctx)) + + fwds, err = db.ListForwardingHistory(ctx, time.Time{}, time.Unix(1000000, 0)) + require.NoError(t, err) + require.Len(t, fwds, 0) +} diff --git a/run.go b/run.go index 0dccd87..ab700ea 100644 --- a/run.go +++ b/run.go @@ -43,7 +43,7 @@ func run(c *cli.Context) error { log.Infow("Opening database", "path", dbPath) // Open database. - db, err := NewDb(dbPath, c.Int("fwdhistorylimit")) + db, err := NewDb(ctx, dbPath, c.Int("fwdhistorylimit")) if err != nil { return err } From 50b32abce1a5e730fb9c22760a69eb8e4cc259b3 Mon Sep 17 00:00:00 2001 From: Carla Kirk-Cohen Date: Wed, 11 Oct 2023 12:53:37 -0400 Subject: [PATCH 4/6] multi: surface closedchannels on lndclient --- lndclient.go | 43 +++++++++++++++++++++++++++++++++++++++++++ lndclient_mock.go | 4 ++++ process.go | 2 ++ stub.go | 4 ++++ 4 files changed, 53 insertions(+) diff --git a/lndclient.go b/lndclient.go index d67fbb3..944918f 100644 --- a/lndclient.go +++ b/lndclient.go @@ -255,6 +255,49 @@ func (l *lndclientGrpc) listChannels() (map[uint64]*channel, error) { return chans, nil } +func (l *lndclientGrpc) listClosedChannels() (map[uint64]*channel, error) { + ctx, cancel := context.WithTimeout(ctxb, rpcTimeout) + defer cancel() + + resp, err := l.main.ClosedChannels(ctx, &lnrpc.ClosedChannelsRequest{}) + if err != nil { + return nil, err + } + + chans := make(map[uint64]*channel) + for _, rpcChan := range resp.Channels { + peer, err := route.NewVertexFromStr(rpcChan.RemotePubkey) + if err != nil { + return nil, err + } + + channel := &channel{ + peer: peer, + } + + // LND didn't always store who initiated the channel, so in some cases + // we don't know who initiated the channel (for very old channels). We're + // unlikely to hit this case since we're dealing with channels related + // to current forwards, so we just log that we don't know this value and + // allow initiator to be true. + switch rpcChan.OpenInitiator { + case lnrpc.Initiator_INITIATOR_LOCAL: + channel.initiator = true + + case lnrpc.Initiator_INITIATOR_REMOTE: + + default: + channel.initiator = true + log.Debugf("Channel initiator for %v with %v unknown", + rpcChan.ChanId, peer) + } + + chans[rpcChan.ChanId] = channel + } + + return chans, nil +} + func (l *lndclientGrpc) subscribeHtlcEvents(ctx context.Context) ( htlcEventsClient, error) { diff --git a/lndclient_mock.go b/lndclient_mock.go index 3b73e61..b626510 100644 --- a/lndclient_mock.go +++ b/lndclient_mock.go @@ -50,6 +50,10 @@ func (l *lndclientMock) listChannels() (map[uint64]*channel, error) { return l.channels, nil } +func (l *lndclientMock) listClosedChannels() (map[uint64]*channel, error) { + return make(map[uint64]*channel), nil +} + func (l *lndclientMock) subscribeHtlcEvents(ctx context.Context) ( htlcEventsClient, error) { diff --git a/process.go b/process.go index e03dd2d..c80621b 100644 --- a/process.go +++ b/process.go @@ -26,6 +26,8 @@ type lndclient interface { listChannels() (map[uint64]*channel, error) + listClosedChannels() (map[uint64]*channel, error) + getNodeAlias(key route.Vertex) (string, error) subscribeHtlcEvents(ctx context.Context) (htlcEventsClient, error) diff --git a/stub.go b/stub.go index 3ce3b9a..10fd746 100644 --- a/stub.go +++ b/stub.go @@ -335,6 +335,10 @@ func (s *stubLndClient) listChannels() (map[uint64]*channel, error) { return allChannels, nil } +func (s *stubLndClient) listClosedChannels() (map[uint64]*channel, error) { + return make(map[uint64]*channel), nil +} + func (s *stubLndClient) getNodeAlias(key route.Vertex) (string, error) { peer, ok := s.peers[key] if !ok { From a09ce25a41d21739c11d57582630804778582a6e Mon Sep 17 00:00:00 2001 From: Carla Kirk-Cohen Date: Wed, 11 Oct 2023 11:58:05 -0400 Subject: [PATCH 5/6] stub: add closed channels 10% of the time --- stub.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/stub.go b/stub.go index 10fd746..2a2d0ed 100644 --- a/stub.go +++ b/stub.go @@ -43,6 +43,7 @@ var stubNodes = []string{ type stubChannel struct { initiator bool + closed bool } type stubInFlight struct { @@ -101,8 +102,13 @@ func newStubClient(ctx context.Context) *stubLndClient { channelCount := int(key[5]%5) + 1 for i := 0; i < channelCount; i++ { initiator := key[6+i]%2 == 0 + + // Make this a closed channel 10% of the time. + closed := rand.Intn(10) == 0 //nolint: gosec + channels[chanId] = &stubChannel{ initiator: initiator, + closed: closed, } chanMap[chanId] = key @@ -322,9 +328,17 @@ func (s *stubLndClient) getInfo() (*info, error) { } func (s *stubLndClient) listChannels() (map[uint64]*channel, error) { + return s.getChannels(false), nil +} + +func (s *stubLndClient) getChannels(closed bool) map[uint64]*channel { allChannels := make(map[uint64]*channel) for key, peer := range s.peers { for chanId, ch := range peer.channels { + if (ch.closed || closed) && !(ch.closed && closed) { + continue + } + allChannels[chanId] = &channel{ peer: key, initiator: ch.initiator, @@ -332,11 +346,11 @@ func (s *stubLndClient) listChannels() (map[uint64]*channel, error) { } } - return allChannels, nil + return allChannels } func (s *stubLndClient) listClosedChannels() (map[uint64]*channel, error) { - return make(map[uint64]*channel), nil + return s.getChannels(true), nil } func (s *stubLndClient) getNodeAlias(key route.Vertex) (string, error) { From fa9ee24552481961575fb108575c2f4242f0542e Mon Sep 17 00:00:00 2001 From: Carla Kirk-Cohen Date: Mon, 2 Oct 2023 13:50:25 -0400 Subject: [PATCH 6/6] process: include closed channels in getChanInfo --- lndclient_mock.go | 10 +++++---- process.go | 17 ++++++++++++++ process_test.go | 56 ++++++++++++++++++++++++++++++++++++++++++----- 3 files changed, 73 insertions(+), 10 deletions(-) diff --git a/lndclient_mock.go b/lndclient_mock.go index b626510..f10c6b3 100644 --- a/lndclient_mock.go +++ b/lndclient_mock.go @@ -27,16 +27,18 @@ type lndclientMock struct { htlcInterceptorRequests chan *interceptedEvent htlcInterceptorResponses chan *interceptResponse - channels map[uint64]*channel + channels map[uint64]*channel + closedChannels map[uint64]*channel } -func newLndclientMock(channels map[uint64]*channel) *lndclientMock { +func newLndclientMock(channels, closedChannels map[uint64]*channel) *lndclientMock { return &lndclientMock{ htlcEvents: make(chan *resolvedEvent), htlcInterceptorRequests: make(chan *interceptedEvent), htlcInterceptorResponses: make(chan *interceptResponse), - channels: channels, + channels: channels, + closedChannels: closedChannels, } } @@ -51,7 +53,7 @@ func (l *lndclientMock) listChannels() (map[uint64]*channel, error) { } func (l *lndclientMock) listClosedChannels() (map[uint64]*channel, error) { - return make(map[uint64]*channel), nil + return l.closedChannels, nil } func (l *lndclientMock) subscribeHtlcEvents(ctx context.Context) ( diff --git a/process.go b/process.go index c80621b..a88ad14 100644 --- a/process.go +++ b/process.go @@ -526,6 +526,23 @@ func (p *process) getChanInfo(channel uint64) (*channel, error) { return ch, nil } + // If the channel is not open, fall back to checking our closed + // channels. + closedChannels, err := p.client.listClosedChannels() + if err != nil { + return nil, err + } + + // Add to cache and try again. + for chanId, ch := range closedChannels { + p.chanMap[chanId] = ch + } + + ch, ok = p.chanMap[channel] + if ok { + return ch, nil + } + // Channel not found. return nil, fmt.Errorf("%w: %v", errChannelNotFound, channel) } diff --git a/process_test.go b/process_test.go index 0feb159..3706470 100644 --- a/process_test.go +++ b/process_test.go @@ -33,7 +33,7 @@ const ( ) func testProcess(t *testing.T, event resolveEvent) { - client := newLndclientMock(testChannels) + client := newLndclientMock(testChannels, nil) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -130,7 +130,7 @@ func testRateLimit(t *testing.T, mode Mode) { }, } - client := newLndclientMock(testChannels) + client := newLndclientMock(testChannels, nil) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -222,7 +222,7 @@ func testMaxPending(t *testing.T, mode Mode) { }, } - client := newLndclientMock(testChannels) + client := newLndclientMock(testChannels, nil) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -278,7 +278,7 @@ func testMaxPending(t *testing.T, mode Mode) { func TestNewPeer(t *testing.T) { // Initialize lnd with test channels. - client := newLndclientMock(testChannels) + client := newLndclientMock(testChannels, nil) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -335,7 +335,7 @@ func TestBlocked(t *testing.T) { }, } - client := newLndclientMock(testChannels) + client := newLndclientMock(testChannels, nil) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -370,7 +370,7 @@ func TestBlocked(t *testing.T) { // TestChannelNotFound tests that we'll successfully exit when we cannot lookup the // channel that a htlc belongs to. func TestChannelNotFound(t *testing.T) { - client := newLndclientMock(testChannels) + client := newLndclientMock(testChannels, nil) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -407,3 +407,47 @@ func TestChannelNotFound(t *testing.T) { t.Fatalf("timeout on process error") } } + +// TestClosedChannelHtlc tests that we can handle intercepted htlcs that are associated +// with closed channels. +func TestClosedChannelHtlc(t *testing.T) { + // Initialize lnd with a closed channel. + var testClosedChannels = map[uint64]*channel{ + 5: {peer: route.Vertex{2}}, + } + client := newLndclientMock(testChannels, testClosedChannels) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + db, cleanup := setupTestDb(t, defaultFwdHistoryLimit) + defer cleanup() + + log := zaptest.NewLogger(t).Sugar() + + cfg := &Limits{} + + p := NewProcess(client, log, cfg, db) + + exit := make(chan error) + + go func() { + exit <- p.Run(ctx) + }() + + // Send a htlc that is from a closed channel, it should be given the go-ahead to + // resume. + key := circuitKey{ + channel: 5, + htlc: 3, + } + client.htlcInterceptorRequests <- &interceptedEvent{ + circuitKey: key, + } + + resp := <-client.htlcInterceptorResponses + require.Equal(t, key, resp.key) + + cancel() + require.ErrorIs(t, <-exit, context.Canceled) +}