diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index 25cc8af2395..d41c7124c07 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -1,8 +1,12 @@ ## 0.15.2 - unreleased +- Send correct `PeerId` in outbound STOP message to client. + See [PR 3767]. + - As a relay, when forwarding data between relay-connection-source and -destination and vice versa, flush write side when read currently has no more data available. See [PR 3765]. +[PR 3767]: https://github.com/libp2p/rust-libp2p/pull/3767 [PR 3765]: https://github.com/libp2p/rust-libp2p/pull/3765 ## 0.15.1 diff --git a/protocols/relay/src/behaviour.rs b/protocols/relay/src/behaviour.rs index c301a05068b..f21b699114b 100644 --- a/protocols/relay/src/behaviour.rs +++ b/protocols/relay/src/behaviour.rs @@ -520,7 +520,6 @@ impl NetworkBehaviour for Behaviour { event: Either::Left(handler::In::NegotiateOutboundConnect { circuit_id, inbound_circuit_req, - relay_peer_id: self.local_peer_id, src_peer_id: event_source, src_connection_id: connection, }), diff --git a/protocols/relay/src/behaviour/handler.rs b/protocols/relay/src/behaviour/handler.rs index 3acbda0eff5..88147a307e7 100644 --- a/protocols/relay/src/behaviour/handler.rs +++ b/protocols/relay/src/behaviour/handler.rs @@ -69,7 +69,6 @@ pub enum In { NegotiateOutboundConnect { circuit_id: CircuitId, inbound_circuit_req: inbound_hop::CircuitReq, - relay_peer_id: PeerId, src_peer_id: PeerId, src_connection_id: ConnectionId, }, @@ -112,13 +111,11 @@ impl fmt::Debug for In { In::NegotiateOutboundConnect { circuit_id, inbound_circuit_req: _, - relay_peer_id, src_peer_id, src_connection_id, } => f .debug_struct("In::NegotiateOutboundConnect") .field("circuit_id", circuit_id) - .field("relay_peer_id", relay_peer_id) .field("src_peer_id", src_peer_id) .field("src_connection_id", src_connection_id) .finish(), @@ -655,7 +652,6 @@ impl ConnectionHandler for Handler { In::NegotiateOutboundConnect { circuit_id, inbound_circuit_req, - relay_peer_id, src_peer_id, src_connection_id, } => { @@ -663,7 +659,7 @@ impl ConnectionHandler for Handler { .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new( outbound_stop::Upgrade { - relay_peer_id, + src_peer_id, max_circuit_duration: self.config.max_circuit_duration, max_circuit_bytes: self.config.max_circuit_bytes, }, diff --git a/protocols/relay/src/priv_client/handler.rs b/protocols/relay/src/priv_client/handler.rs index 3d7bc6c4d1e..504fab590f8 100644 --- a/protocols/relay/src/priv_client/handler.rs +++ b/protocols/relay/src/priv_client/handler.rs @@ -199,10 +199,7 @@ impl Handler { }); self.queued_events.push_back(ConnectionHandlerEvent::Custom( - Event::InboundCircuitEstablished { - src_peer_id: self.remote_peer_id, - limit, - }, + Event::InboundCircuitEstablished { src_peer_id, limit }, )); } Reservation::None => { diff --git a/protocols/relay/src/protocol/outbound_stop.rs b/protocols/relay/src/protocol/outbound_stop.rs index 4ba9c6909e1..9b028bc5b66 100644 --- a/protocols/relay/src/protocol/outbound_stop.rs +++ b/protocols/relay/src/protocol/outbound_stop.rs @@ -32,7 +32,7 @@ use std::time::Duration; use thiserror::Error; pub struct Upgrade { - pub relay_peer_id: PeerId, + pub src_peer_id: PeerId, pub max_circuit_duration: Duration, pub max_circuit_bytes: u64, } @@ -55,7 +55,7 @@ impl upgrade::OutboundUpgrade for Upgrade { let msg = proto::StopMessage { type_pb: proto::StopMessageType::CONNECT, peer: Some(proto::Peer { - id: self.relay_peer_id.to_bytes(), + id: self.src_peer_id.to_bytes(), addrs: vec![], }), limit: Some(proto::Limit { diff --git a/protocols/relay/tests/lib.rs b/protocols/relay/tests/lib.rs index 880dae0c6aa..da4857488cb 100644 --- a/protocols/relay/tests/lib.rs +++ b/protocols/relay/tests/lib.rs @@ -203,34 +203,46 @@ fn connect() { relay_peer_id, false, // No renewal. )); - spawn_swarm_on_pool(&pool, dst); let mut src = build_client(); + let src_peer_id = *src.local_peer_id(); src.dial(dst_addr).unwrap(); - pool.run_until(async { - loop { - match src.select_next_some().await { - SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {} - SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {} - SwarmEvent::Behaviour(ClientEvent::Ping(ping::Event { peer, .. })) - if peer == dst_peer_id => - { - break - } - SwarmEvent::Behaviour(ClientEvent::Relay( - relay::client::Event::OutboundCircuitEstablished { .. }, - )) => {} - SwarmEvent::Behaviour(ClientEvent::Ping(ping::Event { peer, .. })) - if peer == relay_peer_id => {} - SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == dst_peer_id => { - break - } - e => panic!("{e:?}"), + pool.run_until(futures::future::join( + connection_established_to(src, relay_peer_id, dst_peer_id), + connection_established_to(dst, relay_peer_id, src_peer_id), + )); +} + +async fn connection_established_to(mut swarm: Swarm, relay_peer_id: PeerId, other: PeerId) { + loop { + match swarm.select_next_some().await { + SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {} + SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {} + SwarmEvent::Behaviour(ClientEvent::Ping(ping::Event { peer, .. })) if peer == other => { + break + } + SwarmEvent::Behaviour(ClientEvent::Relay( + relay::client::Event::OutboundCircuitEstablished { .. }, + )) => {} + SwarmEvent::Behaviour(ClientEvent::Relay( + relay::client::Event::InboundCircuitEstablished { src_peer_id, .. }, + )) => { + assert_eq!(src_peer_id, other); + } + SwarmEvent::Behaviour(ClientEvent::Ping(ping::Event { peer, .. })) + if peer == relay_peer_id => {} + SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == other => break, + SwarmEvent::IncomingConnection { send_back_addr, .. } => { + let peer_id_from_addr = + PeerId::try_from_multiaddr(&send_back_addr).expect("to have /p2p"); + + assert_eq!(peer_id_from_addr, other) } + e => panic!("{e:?}"), } - }) + } } #[test]