Skip to content

Commit

Permalink
Implement outgoing stream reuse in Kademlia
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Feb 15, 2023
1 parent 3cc9708 commit e2e9d41
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 44 deletions.
4 changes: 4 additions & 0 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2276,6 +2276,10 @@ where
}
}
}

KademliaHandlerEvent::Available { .. } => {
unreachable!("Processed by handler internally")
}
};
}

Expand Down
105 changes: 61 additions & 44 deletions protocols/kad/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,12 @@ pub struct KademliaHandler<TUserData> {
/// Number of outbound streams being upgraded right now.
num_requested_outbound_streams: usize,

/// List of outbound substreams that are waiting to become active next.
/// List of outbound substreams that are waiting to be created next.
/// Contains the request we want to send, and the user data if we expect an answer.
requested_streams:
VecDeque<SubstreamProtocol<KademliaProtocolConfig, (KadRequestMsg, Option<TUserData>)>>,
pending_substream_requests: VecDeque<(KadRequestMsg, Option<TUserData>)>,

/// Outgoing substreams available for future reuse.
reusable_outgoing_substreams: Vec<KadOutStreamSink<NegotiatedSubstream>>,

/// List of active inbound substreams with the state they are in.
inbound_substreams: SelectAll<InboundSubstreamState<TUserData>>,
Expand Down Expand Up @@ -161,8 +163,8 @@ enum OutboundSubstreamState<TUserData> {
WaitingAnswer(KadOutStreamSink<NegotiatedSubstream>, TUserData),
/// An error happened on the substream and we should report the error to the user.
ReportError(KademliaHandlerQueryErr, TUserData),
/// The substream is being closed.
Closing(KadOutStreamSink<NegotiatedSubstream>),
/// The substream is available for future reuse.
Available(KadOutStreamSink<NegotiatedSubstream>),
/// The substream is complete and will not perform any more work.
Done,
Poisoned,
Expand Down Expand Up @@ -352,6 +354,12 @@ pub enum KademliaHandlerEvent<TUserData> {
/// The user data passed to the `PutValue`.
user_data: TUserData,
},

/// The substream is available for future reuse.
Available {
/// Substream to be reused for outgoing requests.
substream: KadOutStreamSink<NegotiatedSubstream>,
},
}

/// Error that can happen when requesting an RPC query.
Expand Down Expand Up @@ -531,7 +539,8 @@ where
inbound_substreams: Default::default(),
outbound_substreams: Default::default(),
num_requested_outbound_streams: 0,
requested_streams: Default::default(),
pending_substream_requests: Default::default(),
reusable_outgoing_substreams: Default::default(),
keep_alive,
protocol_status: ProtocolStatus::Unconfirmed,
}
Expand Down Expand Up @@ -677,21 +686,17 @@ where
}
KademliaHandlerIn::FindNodeReq { key, user_data } => {
let msg = KadRequestMsg::FindNode { key };
self.requested_streams.push_back(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, Some(user_data)),
));
self.pending_substream_requests
.push_back((msg, Some(user_data)));
}
KademliaHandlerIn::FindNodeRes {
closer_peers,
request_id,
} => self.answer_pending_request(request_id, KadResponseMsg::FindNode { closer_peers }),
KademliaHandlerIn::GetProvidersReq { key, user_data } => {
let msg = KadRequestMsg::GetProviders { key };
self.requested_streams.push_back(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, Some(user_data)),
));
self.pending_substream_requests
.push_back((msg, Some(user_data)));
}
KademliaHandlerIn::GetProvidersRes {
closer_peers,
Expand All @@ -706,24 +711,17 @@ where
),
KademliaHandlerIn::AddProvider { key, provider } => {
let msg = KadRequestMsg::AddProvider { key, provider };
self.requested_streams.push_back(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, None),
));
self.pending_substream_requests.push_back((msg, None));
}
KademliaHandlerIn::GetRecord { key, user_data } => {
let msg = KadRequestMsg::GetValue { key };
self.requested_streams.push_back(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, Some(user_data)),
));
self.pending_substream_requests
.push_back((msg, Some(user_data)));
}
KademliaHandlerIn::PutRecord { record, user_data } => {
let msg = KadRequestMsg::PutValue { record };
self.requested_streams.push_back(SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, Some(user_data)),
));
self.pending_substream_requests
.push_back((msg, Some(user_data)));
}
KademliaHandlerIn::GetRecordRes {
record,
Expand Down Expand Up @@ -772,20 +770,40 @@ where
));
}

if let Poll::Ready(Some(event)) = self.outbound_substreams.poll_next_unpin(cx) {
return Poll::Ready(event);
while let Poll::Ready(Some(event)) = self.outbound_substreams.poll_next_unpin(cx) {
if let ConnectionHandlerEvent::Custom(KademliaHandlerEvent::Available { substream }) =
event
{
self.reusable_outgoing_substreams.push(substream);
} else {
return Poll::Ready(event);
}
}

if let Poll::Ready(Some(event)) = self.inbound_substreams.poll_next_unpin(cx) {
return Poll::Ready(event);
}

let num_in_progress_outbound_substreams =
self.outbound_substreams.len() + self.num_requested_outbound_streams;
if num_in_progress_outbound_substreams < MAX_NUM_SUBSTREAMS {
if let Some(protocol) = self.requested_streams.pop_front() {
self.num_requested_outbound_streams += 1;
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol });
while !self.pending_substream_requests.is_empty()
&& self.outbound_substreams.len() + self.num_requested_outbound_streams
< MAX_NUM_SUBSTREAMS
{
if let Some((msg, user_data)) = self.pending_substream_requests.pop_front() {
if let Some(substream) = self.reusable_outgoing_substreams.pop() {
self.outbound_substreams
.push(OutboundSubstreamState::PendingSend(
substream, msg, user_data,
));
} else {
self.num_requested_outbound_streams += 1;
let protocol = SubstreamProtocol::new(
self.config.protocol_config.clone(),
(msg, user_data),
);
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol,
});
}
}
}

Expand Down Expand Up @@ -869,7 +887,7 @@ where
match std::mem::replace(this, OutboundSubstreamState::Poisoned) {
OutboundSubstreamState::PendingSend(mut substream, msg, user_data) => {
match substream.poll_ready_unpin(cx) {
Poll::Ready(Ok(())) => match substream.start_send_unpin(msg) {
Poll::Ready(Ok(())) => match substream.start_send_unpin(msg.clone()) {
Ok(()) => {
*this = OutboundSubstreamState::PendingFlush(substream, user_data);
}
Expand Down Expand Up @@ -910,7 +928,7 @@ where
if let Some(user_data) = user_data {
*this = OutboundSubstreamState::WaitingAnswer(substream, user_data);
} else {
*this = OutboundSubstreamState::Closing(substream);
*this = OutboundSubstreamState::Available(substream);
}
}
Poll::Pending => {
Expand All @@ -933,7 +951,7 @@ where
OutboundSubstreamState::WaitingAnswer(mut substream, user_data) => {
match substream.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(msg))) => {
*this = OutboundSubstreamState::Closing(substream);
*this = OutboundSubstreamState::Available(substream);
let event = process_kad_response(msg, user_data);

return Poll::Ready(Some(ConnectionHandlerEvent::Custom(event)));
Expand Down Expand Up @@ -970,13 +988,12 @@ where

return Poll::Ready(Some(ConnectionHandlerEvent::Custom(event)));
}
OutboundSubstreamState::Closing(mut stream) => match stream.poll_close_unpin(cx) {
Poll::Ready(Ok(())) | Poll::Ready(Err(_)) => return Poll::Ready(None),
Poll::Pending => {
*this = OutboundSubstreamState::Closing(stream);
return Poll::Pending;
}
},
OutboundSubstreamState::Available(substream) => {
*this = OutboundSubstreamState::Done;
let event = KademliaHandlerEvent::Available { substream };

return Poll::Ready(Some(ConnectionHandlerEvent::Custom(event)));
}
OutboundSubstreamState::Done => {
*this = OutboundSubstreamState::Done;
return Poll::Ready(None);
Expand Down

0 comments on commit e2e9d41

Please sign in to comment.