Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connectd throttle #7366

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Prev Previous commit
connectd: tie gossip query responses into ratelimiting code.
A bit tricky, since we get more than one message at a time.  However,
this just means we go over quota for a bit, and will get caught when
those are sent (we do this for a single message already, so it's not
that much worse).

Note: this not only limits sending, but it limits the actuall query
processing, which is nice.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
  • Loading branch information
rustyrussell committed Jun 4, 2024
commit 8a52e9e0adde274f7d01786ebc433af39bd8ea04
53 changes: 32 additions & 21 deletions connectd/multiplex.c
Original file line number Diff line number Diff line change
Expand Up @@ -445,24 +445,14 @@ static void wake_gossip(struct peer *peer)
peer->gs.gossip_timer = gossip_stream_timer(peer);
}

/* If we are streaming gossip, get something from gossip store */
static const u8 *maybe_from_gossip_store(const tal_t *ctx, struct peer *peer)
/* Gossip response or something from gossip store */
static const u8 *maybe_gossip_msg(const tal_t *ctx, struct peer *peer)
{
const u8 *msg;
struct timemono now;
struct gossmap *gossmap;
u32 timestamp;

/* dev-mode can suppress all gossip */
if (peer->daemon->dev_suppress_gossip)
return NULL;

/* Not streaming right now? */
if (!peer->gs.active)
return NULL;

/* This should be around to kick us every 60 seconds */
assert(peer->gs.gossip_timer);
const u8 **msgs;

/* If it's been over a second, make a fresh start. */
now = time_mono();
Expand All @@ -487,6 +477,31 @@ static const u8 *maybe_from_gossip_store(const tal_t *ctx, struct peer *peer)

gossmap = get_gossmap(peer->daemon);

/* This can return more than one. */
msgs = maybe_create_query_responses(tmpctx, peer, gossmap);
if (tal_count(msgs) > 0) {
/* We return the first one for immediate sending, and queue
* others for future. We add all the lengths now though! */
for (size_t i = 0; i < tal_count(msgs); i++) {
peer->gs.bytes_this_second += tal_bytelen(msgs[i]);
status_peer_io(LOG_IO_OUT, &peer->id, msgs[i]);
if (i > 0)
msg_enqueue(peer->peer_outq, take(msgs[i]));
}
return msgs[0];
}

/* dev-mode can suppress all gossip */
if (peer->daemon->dev_suppress_gossip)
return NULL;

/* Not streaming right now? */
if (!peer->gs.active)
return NULL;

/* This should be around to kick us every 60 seconds */
assert(peer->gs.gossip_timer);

again:
msg = gossmap_stream_next(ctx, gossmap, peer->gs.iter, &timestamp);
if (msg) {
Expand All @@ -507,6 +522,7 @@ static const u8 *maybe_from_gossip_store(const tal_t *ctx, struct peer *peer)
return msg;
}

/* No gossip left to send */
peer->gs.active = false;
return NULL;
}
Expand Down Expand Up @@ -970,14 +986,9 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn,
return io_sock_shutdown(peer_conn);

/* If they want us to send gossip, do so now. */
if (!peer->draining) {
/* FIXME: make it return the message? */
if (maybe_send_query_responses(peer, get_gossmap(peer->daemon))) {
msg = msg_dequeue(peer->peer_outq);
} else {
msg = maybe_from_gossip_store(NULL, peer);
}
}
if (!peer->draining)
msg = maybe_gossip_msg(NULL, peer);

if (!msg) {
/* Tell them to read again, */
io_wake(&peer->subds);
Expand Down
38 changes: 18 additions & 20 deletions connectd/queries.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,21 +108,22 @@ static void uniquify_node_ids(struct node_id **ids)
}

/* We are fairly careful to avoid the peer DoSing us with channel queries:
* this routine sends information about a single short_channel_id, unless
* this routine creates messages about a single short_channel_id, unless
* it's finished all of them. */
bool maybe_send_query_responses(struct peer *peer, struct gossmap *gossmap)
const u8 **maybe_create_query_responses(const tal_t *ctx,
struct peer *peer,
struct gossmap *gossmap)
{
size_t i, num;
bool sent = false;
const u8 *msg;
const u8 **msgs = tal_arr(ctx, const u8 *, 0);

/* BOLT #7:
*
* - MUST respond to each known `short_channel_id`:
*/
/* Search for next short_channel_id we know about. */
num = tal_count(peer->scid_queries);
for (i = peer->scid_query_idx; !sent && i < num; i++) {
for (i = peer->scid_query_idx; tal_count(msgs) == 0 && i < num; i++) {
struct gossmap_chan *chan;
struct gossmap_node *node;
struct node_id node_id;
Expand All @@ -136,9 +137,8 @@ bool maybe_send_query_responses(struct peer *peer, struct gossmap *gossmap)
* - MUST reply with a `channel_announcement`
*/
if (peer->scid_query_flags[i] & SCID_QF_ANNOUNCE) {
msg = gossmap_chan_get_announce(NULL, gossmap, chan);
inject_peer_msg(peer, take(msg));
sent = true;
tal_arr_expand(&msgs,
gossmap_chan_get_announce(msgs, gossmap, chan));
}

/* BOLT #7:
Expand All @@ -152,15 +152,13 @@ bool maybe_send_query_responses(struct peer *peer, struct gossmap *gossmap)
* `node_id_2` */
if ((peer->scid_query_flags[i] & SCID_QF_UPDATE1)
&& gossmap_chan_set(chan, 0)) {
msg = gossmap_chan_get_update(NULL, gossmap, chan, 0);
inject_peer_msg(peer, take(msg));
sent = true;
tal_arr_expand(&msgs,
gossmap_chan_get_update(msgs, gossmap, chan, 0));
}
if ((peer->scid_query_flags[i] & SCID_QF_UPDATE2)
&& gossmap_chan_set(chan, 1)) {
msg = gossmap_chan_get_update(NULL, gossmap, chan, 1);
inject_peer_msg(peer, take(msg));
sent = true;
tal_arr_expand(&msgs,
gossmap_chan_get_update(msgs, gossmap, chan, 1));
}

/* BOLT #7:
Expand Down Expand Up @@ -212,7 +210,7 @@ bool maybe_send_query_responses(struct peer *peer, struct gossmap *gossmap)
/* If we haven't sent anything above, we look for the next
* node_announcement to send. */
num = tal_count(peer->scid_query_nodes);
for (i = peer->scid_query_nodes_idx; !sent && i < num; i++) {
for (i = peer->scid_query_nodes_idx; tal_count(msgs) == 0 && i < num; i++) {
const struct gossmap_node *n;

/* Not every node announces itself (we know it exists because
Expand All @@ -221,9 +219,8 @@ bool maybe_send_query_responses(struct peer *peer, struct gossmap *gossmap)
if (!n || !gossmap_node_announced(n))
continue;

msg = gossmap_node_get_announce(NULL, gossmap, n);
inject_peer_msg(peer, take(msg));
sent = true;
tal_arr_expand(&msgs,
gossmap_node_get_announce(msgs, gossmap, n));
}
peer->scid_query_nodes_idx = i;

Expand All @@ -245,7 +242,7 @@ bool maybe_send_query_responses(struct peer *peer, struct gossmap *gossmap)
u8 *end = towire_reply_short_channel_ids_end(peer,
&chainparams->genesis_blockhash,
true);
inject_peer_msg(peer, take(end));
tal_arr_expand(&msgs, end);

/* We're done! Clean up so we simply pass-through next time. */
peer->scid_queries = tal_free(peer->scid_queries);
Expand All @@ -254,7 +251,8 @@ bool maybe_send_query_responses(struct peer *peer, struct gossmap *gossmap)
peer->scid_query_nodes = tal_free(peer->scid_query_nodes);
peer->scid_query_nodes_idx = 0;
}
return sent;

return msgs;
}

/* The peer can ask about an array of short channel ids: we don't assemble the
Expand Down
6 changes: 4 additions & 2 deletions connectd/queries.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
#define LIGHTNING_CONNECTD_QUERIES_H
#include "config.h"

/* See if there's a query to respond to, if so, do it and return true */
bool maybe_send_query_responses(struct peer *peer, struct gossmap *gossmap);
/* See if there's a query to respond to: if so, return some msgs */
const u8 **maybe_create_query_responses(const tal_t *ctx,
struct peer *peer,
struct gossmap *gossmap);

void handle_query_short_channel_ids(struct peer *peer, const u8 *msg);
void handle_query_channel_range(struct peer *peer, const u8 *msg);
Expand Down
64 changes: 60 additions & 4 deletions tests/test_gossip.py
Original file line number Diff line number Diff line change
Expand Up @@ -1267,6 +1267,8 @@ def test_node_reannounce(node_factory, bitcoind, chainparams):
bitcoind.generate_block(5)
genesis_blockhash = chainparams['chain_hash']



# Wait for node_announcement for l1.
l2.daemon.wait_for_log(r'\[IN\] 0101.*{}'.format(l1.info['id']))
# Wait for it to process it.
Expand Down Expand Up @@ -2043,7 +2045,7 @@ def test_listchannels_deprecated_local(node_factory, bitcoind):
assert vals == [(True, True, l1l2)] * 2 + [(True, False, l2l3)] * 2 or vals == [(True, False, l2l3)] * 2 + [(True, True, l1l2)] * 2


def test_gossip_throttle(node_factory, bitcoind):
def test_gossip_throttle(node_factory, bitcoind, chainparams):
"""Make some gossip, test it gets throttled"""
l1, l2, l3, l4 = node_factory.line_graph(4, wait_for_announce=True,
opts=[{}, {}, {}, {'dev-throttle-gossip': None}])
Expand All @@ -2063,9 +2065,11 @@ def test_gossip_throttle(node_factory, bitcoind):
'--max-messages={}'.format(expected),
'{}@localhost:{}'.format(l1.info['id'], l1.port)],
check=True,
timeout=TIMEOUT, stdout=subprocess.PIPE).stdout
timeout=TIMEOUT, stdout=subprocess.PIPE).stdout.split()
time_fast = time.time() - start_fast
assert time_fast < 2
# Remove timestamp filter, since timestamp will change!
out1 = [m for m in out1 if not m.startswith(b'0109')]

# l4 is throttled
start_slow = time.time()
Expand All @@ -2076,10 +2080,62 @@ def test_gossip_throttle(node_factory, bitcoind):
'--max-messages={}'.format(expected),
'{}@localhost:{}'.format(l4.info['id'], l4.port)],
check=True,
timeout=TIMEOUT, stdout=subprocess.PIPE).stdout
timeout=TIMEOUT, stdout=subprocess.PIPE).stdout.split()
time_slow = time.time() - start_slow
assert time_slow > 3

# Remove timestamp filter, since timestamp will change!
out2 = [m for m in out2 if not m.startswith(b'0109')]

# Contents should be identical (once uniquified, since each
# doubles-up on its own gossip)
assert set(out1.split()) == set(out2.split())
assert set(out1) == set(out2)

encoded = subprocess.run(['devtools/mkencoded', '--scids', '00',
first_scid(l1, l2),
first_scid(l2, l3),
first_scid(l3, l4)],
check=True,
timeout=TIMEOUT,
stdout=subprocess.PIPE).stdout.strip().decode()

query = subprocess.run(['devtools/mkquery',
'query_short_channel_ids',
chainparams['chain_hash'],
encoded,
# We want channel announce, updates and node ann.
'00', '1F1F1F'],
check=True,
timeout=TIMEOUT,
stdout=subprocess.PIPE).stdout.strip()

# Queries should also be ratelimited, so compare l1 vs l4.
start_fast = time.time()
out3 = subprocess.run(['devtools/gossipwith',
'--no-gossip',
'--hex',
'--network={}'.format(TEST_NETWORK),
'--max-messages={}'.format(expected),
'{}@localhost:{}'.format(l1.info['id'], l1.port),
query],
check=True,
timeout=TIMEOUT, stdout=subprocess.PIPE).stdout.split()
time_fast = time.time() - start_fast
assert time_fast < 2
out3 = [m for m in out3 if not m.startswith(b'0109')]
assert set(out1) == set(out3)

start_slow = time.time()
out4 = subprocess.run(['devtools/gossipwith',
'--no-gossip',
'--hex',
'--network={}'.format(TEST_NETWORK),
'--max-messages={}'.format(expected),
'{}@localhost:{}'.format(l4.info['id'], l4.port),
query],
check=True,
timeout=TIMEOUT, stdout=subprocess.PIPE).stdout.split()
time_slow = time.time() - start_slow
assert time_slow > 3
out4 = [m for m in out4 if not m.startswith(b'0109')]
assert set(out2) == set(out4)