Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kad): reuse outbound streams instead of closing them #3474

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions protocols/kad/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@

- Bump MSRV to 1.65.0.

- Outgoing stream reuse. See [PR 3474].
nazar-pc marked this conversation as resolved.
Show resolved Hide resolved

[PR 3239]: https://github.com/libp2p/rust-libp2p/pull/3239
[PR 3287]: https://github.com/libp2p/rust-libp2p/pull/3287
[PR 3474]: https://github.com/libp2p/rust-libp2p/pull/3474

# 0.42.1

Expand Down
4 changes: 4 additions & 0 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2276,6 +2276,10 @@ where
}
}
}

KademliaHandlerEvent::Available { .. } => {
unreachable!("Processed by handler internally")
}
};
}

Expand Down
104 changes: 61 additions & 43 deletions protocols/kad/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,12 @@ pub struct KademliaHandler<TUserData> {
/// Number of outbound streams being upgraded right now.
num_requested_outbound_streams: usize,

/// List of outbound substreams that are waiting to become active next.
/// List of outbound substreams that are waiting to be created next.
/// Contains the request we want to send, and the user data if we expect an answer.
requested_streams:
VecDeque<SubstreamProtocol<KademliaProtocolConfig, (KadRequestMsg, Option<TUserData>)>>,
pending_substream_requests: VecDeque<(KadRequestMsg, Option<TUserData>)>,

/// Outgoing substreams available for future reuse.
reusable_outgoing_substreams: Vec<KadOutStreamSink<NegotiatedSubstream>>,

/// List of active inbound substreams with the state they are in.
inbound_substreams: SelectAll<InboundSubstreamState<TUserData>>,
Expand Down Expand Up @@ -161,8 +163,8 @@ enum OutboundSubstreamState<TUserData> {
WaitingAnswer(KadOutStreamSink<NegotiatedSubstream>, TUserData),
/// An error happened on the substream and we should report the error to the user.
ReportError(KademliaHandlerQueryErr, TUserData),
/// The substream is being closed.
Closing(KadOutStreamSink<NegotiatedSubstream>),
/// The substream is available for future reuse.
Available(KadOutStreamSink<NegotiatedSubstream>),
nazar-pc marked this conversation as resolved.
Show resolved Hide resolved
/// The substream is complete and will not perform any more work.
Done,
Poisoned,
Expand Down Expand Up @@ -352,6 +354,12 @@ pub enum KademliaHandlerEvent<TUserData> {
/// The user data passed to the `PutValue`.
user_data: TUserData,
},

/// The substream is available for future reuse.
Available {
/// Substream to be reused for outgoing requests.
substream: KadOutStreamSink<NegotiatedSubstream>,
},
}

/// Error that can happen when requesting an RPC query.
Expand Down Expand Up @@ -531,7 +539,8 @@ where
inbound_substreams: Default::default(),
outbound_substreams: Default::default(),
num_requested_outbound_streams: 0,
requested_streams: Default::default(),
pending_substream_requests: Default::default(),
reusable_outgoing_substreams: Default::default(),
keep_alive,
protocol_status: ProtocolStatus::Unconfirmed,
}
Expand Down Expand Up @@ -677,21 +686,17 @@ where
}
KademliaHandlerIn::FindNodeReq { key, user_data } => {
let msg = KadRequestMsg::FindNode { key };
self.requested_streams.push_back(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, Some(user_data)),
));
self.pending_substream_requests
.push_back((msg, Some(user_data)));
}
KademliaHandlerIn::FindNodeRes {
closer_peers,
request_id,
} => self.answer_pending_request(request_id, KadResponseMsg::FindNode { closer_peers }),
KademliaHandlerIn::GetProvidersReq { key, user_data } => {
let msg = KadRequestMsg::GetProviders { key };
self.requested_streams.push_back(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, Some(user_data)),
));
self.pending_substream_requests
.push_back((msg, Some(user_data)));
}
KademliaHandlerIn::GetProvidersRes {
closer_peers,
Expand All @@ -706,24 +711,17 @@ where
),
KademliaHandlerIn::AddProvider { key, provider } => {
let msg = KadRequestMsg::AddProvider { key, provider };
self.requested_streams.push_back(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, None),
));
self.pending_substream_requests.push_back((msg, None));
}
KademliaHandlerIn::GetRecord { key, user_data } => {
let msg = KadRequestMsg::GetValue { key };
self.requested_streams.push_back(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, Some(user_data)),
));
self.pending_substream_requests
.push_back((msg, Some(user_data)));
}
KademliaHandlerIn::PutRecord { record, user_data } => {
let msg = KadRequestMsg::PutValue { record };
self.requested_streams.push_back(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, Some(user_data)),
));
self.pending_substream_requests
.push_back((msg, Some(user_data)));
}
KademliaHandlerIn::GetRecordRes {
record,
Expand Down Expand Up @@ -772,20 +770,41 @@ where
));
}

if let Poll::Ready(Some(event)) = self.outbound_substreams.poll_next_unpin(cx) {
return Poll::Ready(event);
while let Poll::Ready(Some(event)) = self.outbound_substreams.poll_next_unpin(cx) {
if let ConnectionHandlerEvent::Custom(KademliaHandlerEvent::Available { substream }) =
event
{
self.reusable_outgoing_substreams.push(substream);
} else {
return Poll::Ready(event);
}
}

if let Poll::Ready(Some(event)) = self.inbound_substreams.poll_next_unpin(cx) {
return Poll::Ready(event);
}

let num_in_progress_outbound_substreams =
self.outbound_substreams.len() + self.num_requested_outbound_streams;
nazar-pc marked this conversation as resolved.
Show resolved Hide resolved
if num_in_progress_outbound_substreams < MAX_NUM_SUBSTREAMS {
if let Some(protocol) = self.requested_streams.pop_front() {
self.num_requested_outbound_streams += 1;
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol });
while !self.pending_substream_requests.is_empty()
&& self.outbound_substreams.len() + self.num_requested_outbound_streams
< MAX_NUM_SUBSTREAMS
{
if let Some((msg, user_data)) = self.pending_substream_requests.pop_front() {
nazar-pc marked this conversation as resolved.
Show resolved Hide resolved
if let Some(substream) = self.reusable_outgoing_substreams.pop() {
self.outbound_substreams
.push(OutboundSubstreamState::PendingSend(
substream, msg, user_data,
));
cx.waker().wake_by_ref();
} else {
self.num_requested_outbound_streams += 1;
let protocol = SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, user_data),
);
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol,
});
}
nazar-pc marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down Expand Up @@ -910,7 +929,7 @@ where
if let Some(user_data) = user_data {
*this = OutboundSubstreamState::WaitingAnswer(substream, user_data);
} else {
*this = OutboundSubstreamState::Closing(substream);
*this = OutboundSubstreamState::Available(substream);
}
}
Poll::Pending => {
Expand All @@ -933,7 +952,7 @@ where
OutboundSubstreamState::WaitingAnswer(mut substream, user_data) => {
match substream.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(msg))) => {
*this = OutboundSubstreamState::Closing(substream);
*this = OutboundSubstreamState::Available(substream);
let event = process_kad_response(msg, user_data);

return Poll::Ready(Some(ConnectionHandlerEvent::Custom(event)));
Expand Down Expand Up @@ -970,13 +989,12 @@ where

return Poll::Ready(Some(ConnectionHandlerEvent::Custom(event)));
}
OutboundSubstreamState::Closing(mut stream) => match stream.poll_close_unpin(cx) {
Poll::Ready(Ok(())) | Poll::Ready(Err(_)) => return Poll::Ready(None),
Poll::Pending => {
*this = OutboundSubstreamState::Closing(stream);
return Poll::Pending;
}
},
OutboundSubstreamState::Available(substream) => {
*this = OutboundSubstreamState::Done;
let event = KademliaHandlerEvent::Available { substream };
nazar-pc marked this conversation as resolved.
Show resolved Hide resolved

return Poll::Ready(Some(ConnectionHandlerEvent::Custom(event)));
}
OutboundSubstreamState::Done => {
*this = OutboundSubstreamState::Done;
return Poll::Ready(None);
Expand Down
55 changes: 48 additions & 7 deletions protocols/kad/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ use instant::Instant;
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use libp2p_core::{Multiaddr, PeerId};
use prost::Message;
use std::{borrow::Cow, convert::TryFrom, time::Duration};
use std::ops::{Deref, DerefMut};
use std::{borrow::Cow, convert::TryFrom, fmt, time::Duration};
use std::{io, iter};
use unsigned_varint::codec;

Expand Down Expand Up @@ -198,7 +199,7 @@ where
let mut codec = UviBytes::default();
codec.set_max_len(self.max_packet_size);

future::ok(
future::ok(KadInStreamSink(
Framed::new(incoming, codec)
.err_into()
.with::<_, _, fn(_) -> _, _>(|response| {
Expand All @@ -216,7 +217,7 @@ where
};
future::ready(proto_to_req_msg(request))
}),
)
))
}
}

Expand All @@ -232,7 +233,7 @@ where
let mut codec = UviBytes::default();
codec.set_max_len(self.max_packet_size);

future::ok(
future::ok(KadOutStreamSink(
Framed::new(incoming, codec)
.err_into()
.with::<_, _, fn(_) -> _, _>(|request| {
Expand All @@ -250,15 +251,55 @@ where
};
future::ready(proto_to_resp_msg(response))
}),
)
))
}
}

/// Sink of responses and stream of requests.
pub type KadInStreamSink<S> = KadStreamSink<S, KadResponseMsg, KadRequestMsg>;
pub struct KadInStreamSink<S>(KadStreamSink<S, KadResponseMsg, KadRequestMsg>);
nazar-pc marked this conversation as resolved.
Show resolved Hide resolved

impl<S> Deref for KadInStreamSink<S> {
type Target = KadStreamSink<S, KadResponseMsg, KadRequestMsg>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl<S> DerefMut for KadInStreamSink<S> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

impl<S> fmt::Debug for KadInStreamSink<S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("KadInStreamSink")
}
}

/// Sink of requests and stream of responses.
pub type KadOutStreamSink<S> = KadStreamSink<S, KadRequestMsg, KadResponseMsg>;
pub struct KadOutStreamSink<S>(KadStreamSink<S, KadRequestMsg, KadResponseMsg>);

impl<S> Deref for KadOutStreamSink<S> {
type Target = KadStreamSink<S, KadRequestMsg, KadResponseMsg>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl<S> DerefMut for KadOutStreamSink<S> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

impl<S> fmt::Debug for KadOutStreamSink<S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("KadOutStreamSink")
}
}

pub type KadStreamSink<S, A, B> = stream::AndThen<
sink::With<
Expand Down