Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kad): reuse outbound streams instead of closing them #3474

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions protocols/kad/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@

- Bump MSRV to 1.65.0.

- Reuse outbound streams instead of closing them after one message. See [PR 3474].

[PR 3239]: https://github.com/libp2p/rust-libp2p/pull/3239
[PR 3287]: https://github.com/libp2p/rust-libp2p/pull/3287
[PR 3474]: https://github.com/libp2p/rust-libp2p/pull/3474

# 0.42.1

Expand Down
83 changes: 44 additions & 39 deletions protocols/kad/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,9 @@ pub struct KademliaHandler<TUserData> {
/// Number of outbound streams being upgraded right now.
num_requested_outbound_streams: usize,

/// List of outbound substreams that are waiting to become active next.
/// List of outbound substreams that are waiting to be created next.
/// Contains the request we want to send, and the user data if we expect an answer.
requested_streams:
VecDeque<SubstreamProtocol<KademliaProtocolConfig, (KadRequestMsg, Option<TUserData>)>>,
pending_substream_requests: VecDeque<(KadRequestMsg, Option<TUserData>)>,

/// List of active inbound substreams with the state they are in.
inbound_substreams: SelectAll<InboundSubstreamState<TUserData>>,
Expand Down Expand Up @@ -161,8 +160,8 @@ enum OutboundSubstreamState<TUserData> {
WaitingAnswer(KadOutStreamSink<NegotiatedSubstream>, TUserData),
/// An error happened on the substream and we should report the error to the user.
ReportError(KademliaHandlerQueryErr, TUserData),
/// The substream is being closed.
Closing(KadOutStreamSink<NegotiatedSubstream>),
/// The substream is available for future reuse.
Idle(KadOutStreamSink<NegotiatedSubstream>, Waker),
/// The substream is complete and will not perform any more work.
Done,
Poisoned,
Expand Down Expand Up @@ -531,7 +530,7 @@ where
inbound_substreams: Default::default(),
outbound_substreams: Default::default(),
num_requested_outbound_streams: 0,
requested_streams: Default::default(),
pending_substream_requests: Default::default(),
keep_alive,
protocol_status: ProtocolStatus::Unconfirmed,
}
Expand Down Expand Up @@ -677,21 +676,17 @@ where
}
KademliaHandlerIn::FindNodeReq { key, user_data } => {
let msg = KadRequestMsg::FindNode { key };
self.requested_streams.push_back(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, Some(user_data)),
));
self.pending_substream_requests
.push_back((msg, Some(user_data)));
}
KademliaHandlerIn::FindNodeRes {
closer_peers,
request_id,
} => self.answer_pending_request(request_id, KadResponseMsg::FindNode { closer_peers }),
KademliaHandlerIn::GetProvidersReq { key, user_data } => {
let msg = KadRequestMsg::GetProviders { key };
self.requested_streams.push_back(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, Some(user_data)),
));
self.pending_substream_requests
.push_back((msg, Some(user_data)));
}
KademliaHandlerIn::GetProvidersRes {
closer_peers,
Expand All @@ -706,24 +701,17 @@ where
),
KademliaHandlerIn::AddProvider { key, provider } => {
let msg = KadRequestMsg::AddProvider { key, provider };
self.requested_streams.push_back(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, None),
));
self.pending_substream_requests.push_back((msg, None));
}
KademliaHandlerIn::GetRecord { key, user_data } => {
let msg = KadRequestMsg::GetValue { key };
self.requested_streams.push_back(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, Some(user_data)),
));
self.pending_substream_requests
.push_back((msg, Some(user_data)));
}
KademliaHandlerIn::PutRecord { record, user_data } => {
let msg = KadRequestMsg::PutValue { record };
self.requested_streams.push_back(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, Some(user_data)),
));
self.pending_substream_requests
.push_back((msg, Some(user_data)));
}
KademliaHandlerIn::GetRecordRes {
record,
Expand Down Expand Up @@ -780,12 +768,32 @@ where
return Poll::Ready(event);
}

let num_in_progress_outbound_substreams =
self.outbound_substreams.len() + self.num_requested_outbound_streams;
nazar-pc marked this conversation as resolved.
Show resolved Hide resolved
if num_in_progress_outbound_substreams < MAX_NUM_SUBSTREAMS {
if let Some(protocol) = self.requested_streams.pop_front() {
'outer: while self.outbound_substreams.len() + self.num_requested_outbound_streams
< MAX_NUM_SUBSTREAMS
{
if let Some((msg, user_data)) = self.pending_substream_requests.pop_front() {
// Search for outbound substream waiting to be reused first.
for outbound_substream in self.outbound_substreams.iter_mut() {
match std::mem::replace(outbound_substream, OutboundSubstreamState::Poisoned) {
OutboundSubstreamState::Idle(substream, waker) => {
*outbound_substream =
OutboundSubstreamState::PendingSend(substream, msg, user_data);
waker.wake();
continue 'outer;
}
other => {
*outbound_substream = other;
}
}
}
nazar-pc marked this conversation as resolved.
Show resolved Hide resolved

// If not found request new substream.
self.num_requested_outbound_streams += 1;
let protocol =
SubstreamProtocol::new(self.config.protocol_config.clone(), (msg, user_data));
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol });
} else {
break;
}
}

Expand Down Expand Up @@ -910,7 +918,7 @@ where
if let Some(user_data) = user_data {
*this = OutboundSubstreamState::WaitingAnswer(substream, user_data);
} else {
*this = OutboundSubstreamState::Closing(substream);
*this = OutboundSubstreamState::Idle(substream, cx.waker().clone());
}
}
Poll::Pending => {
Expand All @@ -933,7 +941,7 @@ where
OutboundSubstreamState::WaitingAnswer(mut substream, user_data) => {
match substream.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(msg))) => {
*this = OutboundSubstreamState::Closing(substream);
*this = OutboundSubstreamState::Idle(substream, cx.waker().clone());
let event = process_kad_response(msg, user_data);

return Poll::Ready(Some(ConnectionHandlerEvent::Custom(event)));
Expand Down Expand Up @@ -970,13 +978,10 @@ where

return Poll::Ready(Some(ConnectionHandlerEvent::Custom(event)));
}
OutboundSubstreamState::Closing(mut stream) => match stream.poll_close_unpin(cx) {
Poll::Ready(Ok(())) | Poll::Ready(Err(_)) => return Poll::Ready(None),
Poll::Pending => {
*this = OutboundSubstreamState::Closing(stream);
return Poll::Pending;
}
},
OutboundSubstreamState::Idle(substream, _waker) => {
*this = OutboundSubstreamState::Idle(substream, cx.waker().clone());
return Poll::Pending;
}
OutboundSubstreamState::Done => {
*this = OutboundSubstreamState::Done;
return Poll::Ready(None);
Expand Down