Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(gossipsub): bound send-queue #4756

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
See [PR 4642](https://github.com/libp2p/rust-libp2p/pull/4642).
- Return typed error from config builder.
See [PR 4445].
- Bound send_queue with an `ArrayDeque` and replace frontmost item if full.
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved

[PR 4445]: https://github.com/libp2p/rust-libp2p/pull/4445

Expand Down
2 changes: 1 addition & 1 deletion protocols/gossipsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ rand = "0.8"
regex = "1.10.2"
serde = { version = "1", optional = true, features = ["derive"] }
sha2 = "0.10.8"
smallvec = "1.11.1"
unsigned-varint = { version = "0.7.2", features = ["asynchronous_codec"] }
void = "1.0.2"

# Metrics dependencies
prometheus-client = { workspace = true }
arraydeque = "0.5.1"

[dev-dependencies]
async-std = { version = "1.6.3", features = ["unstable"] }
Expand Down
11 changes: 5 additions & 6 deletions protocols/gossipsub/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::protocol::{GossipsubCodec, ProtocolConfig};
use crate::rpc_proto::proto;
use crate::types::{PeerKind, RawMessage, Rpc};
use crate::ValidationError;
use arraydeque::{ArrayDeque, Wrapping};
use asynchronous_codec::Framed;
use futures::future::Either;
use futures::prelude::*;
Expand All @@ -33,7 +34,6 @@ use libp2p_swarm::handler::{
FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol,
};
use libp2p_swarm::Stream;
use smallvec::SmallVec;
use std::{
pin::Pin,
task::{Context, Poll},
Expand Down Expand Up @@ -95,7 +95,7 @@ pub struct EnabledHandler {
inbound_substream: Option<InboundSubstreamState>,

/// Queue of values that we want to send to the remote.
send_queue: SmallVec<[proto::RPC; 16]>,
send_queue: ArrayDeque<proto::RPC, 16, Wrapping>,

/// Flag indicating that an outbound substream is being established to prevent duplicate
/// requests.
Expand Down Expand Up @@ -167,7 +167,7 @@ impl Handler {
outbound_substream_establishing: false,
outbound_substream_attempts: 0,
inbound_substream_attempts: 0,
send_queue: SmallVec::new(),
send_queue: ArrayDeque::new(),
peer_kind: None,
peer_kind_sent: false,
last_io_activity: Instant::now(),
Expand Down Expand Up @@ -315,8 +315,7 @@ impl EnabledHandler {
) {
// outbound idle state
Some(OutboundSubstreamState::WaitingOutput(substream)) => {
if let Some(message) = self.send_queue.pop() {
self.send_queue.shrink_to_fit();
if let Some(message) = self.send_queue.pop_front() {
self.outbound_substream =
Some(OutboundSubstreamState::PendingSend(substream, message));
continue;
Expand Down Expand Up @@ -409,7 +408,7 @@ impl ConnectionHandler for Handler {
fn on_behaviour_event(&mut self, message: HandlerIn) {
match self {
Handler::Enabled(handler) => match message {
HandlerIn::Message(m) => handler.send_queue.push(m),
HandlerIn::Message(m) => _ = handler.send_queue.push_back(m), // drops frontmost element if full
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
HandlerIn::JoinedMesh => {
handler.in_mesh = true;
}
Expand Down