Skip to content

Commit

Permalink
core/transport: impl Stream for transport::Boxed
Browse files Browse the repository at this point in the history
  • Loading branch information
elenaf9 committed May 22, 2022
1 parent 45f9c96 commit bfd5fb0
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 27 deletions.
16 changes: 15 additions & 1 deletion core/src/transport/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.

use crate::transport::{ListenerId, Transport, TransportError, TransportEvent};
use futures::prelude::*;
use futures::{prelude::*, stream::FusedStream};
use multiaddr::Multiaddr;
use std::{
error::Error,
Expand Down Expand Up @@ -157,6 +157,20 @@ impl<O> Transport for Boxed<O> {
}
}

impl<O> Stream for Boxed<O> {
type Item = TransportEvent<ListenerUpgrade<O>, io::Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Transport::poll(self, cx).map(Some)
}
}

impl<O> FusedStream for Boxed<O> {
fn is_terminated(&self) -> bool {
false
}
}

fn box_err<E: Error + Send + Sync + 'static>(e: E) -> io::Error {
io::Error::new(io::ErrorKind::Other, e)
}
16 changes: 6 additions & 10 deletions core/src/transport/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,6 @@ impl<T> Drop for Chan<T> {

#[cfg(test)]
mod tests {
use futures::future::poll_fn;

use super::*;

#[test]
Expand Down Expand Up @@ -495,12 +493,12 @@ mod tests {
let t1_addr: Multiaddr = format!("/memory/{}", rand_port).parse().unwrap();
let cloned_t1_addr = t1_addr.clone();

let mut t1 = MemoryTransport::default();
let mut t1 = MemoryTransport::default().boxed();

let listener = async move {
t1.listen_on(ListenerId::new(1), t1_addr.clone()).unwrap();
let upgrade = loop {
let event = poll_fn(|cx| Pin::new(&mut t1).poll(cx)).await;
let event = t1.select_next_some().await;
if let Some(upgrade) = event.into_upgrade() {
break upgrade;
}
Expand Down Expand Up @@ -533,16 +531,14 @@ mod tests {
Protocol::Memory(rand::random::<u64>().saturating_add(1)).into();
let listener_addr_cloned = listener_addr.clone();

let mut listener_transport = MemoryTransport::default();
let mut listener_transport = MemoryTransport::default().boxed();

let listener = async move {
listener_transport
.listen_on(ListenerId::new(1), listener_addr.clone())
.unwrap();
loop {
if let TransportEvent::Incoming { send_back_addr, .. } =
poll_fn(|cx| Pin::new(&mut listener_transport).poll(cx)).await
{
if let TransportEvent::Incoming { send_back_addr, .. } = listener_transport.select_next_some().await {
assert!(
send_back_addr != listener_addr,
"Expect dialer address not to equal listener address."
Expand Down Expand Up @@ -572,15 +568,15 @@ mod tests {
Protocol::Memory(rand::random::<u64>().saturating_add(1)).into();
let listener_addr_cloned = listener_addr.clone();

let mut listener_transport = MemoryTransport::default();
let mut listener_transport = MemoryTransport::default().boxed();

let listener = async move {
listener_transport
.listen_on(ListenerId::new(1), listener_addr.clone())
.unwrap();
loop {
if let TransportEvent::Incoming { send_back_addr, .. } =
poll_fn(|cx| Pin::new(&mut listener_transport).poll(cx)).await
listener_transport.select_next_some().await
{
let dialer_port =
NonZeroU64::new(parse_memory_addr(&send_back_addr).unwrap()).unwrap();
Expand Down
18 changes: 7 additions & 11 deletions core/tests/transport_upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

mod util;

use futures::future::poll_fn;
use futures::prelude::*;
use libp2p_core::identity;
use libp2p_core::transport::{ListenerId, MemoryTransport, Transport};
Expand Down Expand Up @@ -96,7 +95,7 @@ fn upgrade_pipeline() {
// Gracefully close the connection to allow protocol
// negotiation to complete.
util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex))
});
}).boxed();

let dialer_keys = identity::Keypair::generate_ed25519();
let dialer_id = dialer_keys.public().to_peer_id();
Expand All @@ -114,7 +113,8 @@ fn upgrade_pipeline() {
// Gracefully close the connection to allow protocol
// negotiation to complete.
util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex))
});
})
.boxed();

let listen_addr1 = Multiaddr::from(Protocol::Memory(random::<u64>()));
let listen_addr2 = listen_addr1.clone();
Expand All @@ -125,14 +125,10 @@ fn upgrade_pipeline() {

let server = async move {
loop {
let (upgrade, _send_back_addr) =
match poll_fn(|cx| Pin::new(&mut listener_transport).poll(cx))
.await
.into_upgrade()
{
Some(u) => u,
None => continue,
};
let (upgrade, _send_back_addr) = match listener_transport.select_next_some().await.into_upgrade() {
Some(u) => u,
None => continue,
};
let (peer, _mplex) = upgrade.await.unwrap();
assert_eq!(peer, dialer_id);
}
Expand Down
2 changes: 1 addition & 1 deletion src/bandwidth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ where
let this = self.project();
match this.inner.poll(cx) {
Poll::Ready(event) => {
let event = event.map_upgrade( {
let event = event.map_upgrade({
let sinks = this.sinks.clone();
|inner| BandwidthFuture { inner, sinks }
});
Expand Down
7 changes: 3 additions & 4 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2069,12 +2069,12 @@ mod tests {
let mut listen_addresses = Vec::new();
let mut transports = Vec::new();
for _ in 0..num_listen_addrs {
let mut transport = transport::MemoryTransport::default();
let mut transport = transport::MemoryTransport::default().boxed();
transport
.listen_on(ListenerId::new(1), "/memory/0".parse().unwrap())
.unwrap();

match poll_fn(|cx| Pin::new(&mut transport).poll(cx)).await {
match transport.select_next_some().await {
TransportEvent::NewAddress { listen_addr, .. } => {
listen_addresses.push(listen_addr);
}
Expand All @@ -2094,9 +2094,8 @@ mod tests {
)
.unwrap();
for mut transport in transports.into_iter() {
let poll_transport = poll_fn(|cx| Pin::new(&mut transport).poll(cx));
loop {
match futures::future::select(poll_transport, swarm.next()).await {
match futures::future::select(transport.select_next_some(), swarm.next()).await {
Either::Left((TransportEvent::Incoming { .. }, _)) => {
break;
}
Expand Down

0 comments on commit bfd5fb0

Please sign in to comment.