diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 6d81b82bda6..7ddc158bc60 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -829,6 +829,101 @@ where None } + fn handle_listeners_event( + &mut self, + event: ListenersEvent>, + ) -> Option>> { + match event { + ListenersEvent::Incoming { + listener_id: _, + upgrade, + local_addr, + send_back_addr, + } => { + let handler = self.behaviour.new_handler(); + match self.pool.add_incoming( + upgrade, + handler, + IncomingInfo { + local_addr: &local_addr, + send_back_addr: &send_back_addr, + }, + ) { + Ok(_connection_id) => { + return Some(SwarmEvent::IncomingConnection { + local_addr, + send_back_addr, + }); + } + Err((connection_limit, handler)) => { + self.behaviour + .inject_listen_failure(&local_addr, &send_back_addr, handler); + log::warn!("Incoming connection rejected: {:?}", connection_limit); + } + }; + } + ListenersEvent::NewAddress { + listener_id, + listen_addr, + } => { + log::debug!("Listener {:?}; New address: {:?}", listener_id, listen_addr); + if !self.listened_addrs.contains(&listen_addr) { + self.listened_addrs.push(listen_addr.clone()) + } + self.behaviour + .inject_new_listen_addr(listener_id, &listen_addr); + return Some(SwarmEvent::NewListenAddr { + listener_id, + address: listen_addr, + }); + } + ListenersEvent::AddressExpired { + listener_id, + listen_addr, + } => { + log::debug!( + "Listener {:?}; Expired address {:?}.", + listener_id, + listen_addr + ); + self.listened_addrs.retain(|a| a != &listen_addr); + self.behaviour + .inject_expired_listen_addr(listener_id, &listen_addr); + return Some(SwarmEvent::ExpiredListenAddr { + listener_id, + address: listen_addr, + }); + } + ListenersEvent::Closed { + listener_id, + addresses, + reason, + } => { + log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason); + for addr in addresses.iter() { + self.behaviour.inject_expired_listen_addr(listener_id, addr); + } + self.behaviour.inject_listener_closed( + listener_id, + match &reason { + Ok(()) => Ok(()), + Err(err) => Err(err), + }, + ); + return Some(SwarmEvent::ListenerClosed { + listener_id, + addresses, + reason, + }); + } + ListenersEvent::Error { listener_id, error } => { + self.behaviour.inject_listener_error(listener_id, &error); + return Some(SwarmEvent::ListenerError { listener_id, error }); + } + } + None + } + /// Internal function used by everything event-related. /// /// Polls the `Swarm` for the next event. @@ -846,97 +941,11 @@ where // Poll the listener(s) for new connections. match ListenersStream::poll(Pin::new(&mut this.listeners), cx) { - Poll::Pending => { - listeners_not_ready = true; - } - Poll::Ready(ListenersEvent::Incoming { - listener_id: _, - upgrade, - local_addr, - send_back_addr, - }) => { - let handler = this.behaviour.new_handler(); - match this.pool.add_incoming( - upgrade, - handler, - IncomingInfo { - local_addr: &local_addr, - send_back_addr: &send_back_addr, - }, - ) { - Ok(_connection_id) => { - return Poll::Ready(SwarmEvent::IncomingConnection { - local_addr, - send_back_addr, - }); - } - Err((connection_limit, handler)) => { - this.behaviour.inject_listen_failure( - &local_addr, - &send_back_addr, - handler, - ); - log::warn!("Incoming connection rejected: {:?}", connection_limit); - } - }; - } - Poll::Ready(ListenersEvent::NewAddress { - listener_id, - listen_addr, - }) => { - log::debug!("Listener {:?}; New address: {:?}", listener_id, listen_addr); - if !this.listened_addrs.contains(&listen_addr) { - this.listened_addrs.push(listen_addr.clone()) - } - this.behaviour - .inject_new_listen_addr(listener_id, &listen_addr); - return Poll::Ready(SwarmEvent::NewListenAddr { - listener_id, - address: listen_addr, - }); - } - Poll::Ready(ListenersEvent::AddressExpired { - listener_id, - listen_addr, - }) => { - log::debug!( - "Listener {:?}; Expired address {:?}.", - listener_id, - listen_addr - ); - this.listened_addrs.retain(|a| a != &listen_addr); - this.behaviour - .inject_expired_listen_addr(listener_id, &listen_addr); - return Poll::Ready(SwarmEvent::ExpiredListenAddr { - listener_id, - address: listen_addr, - }); - } - Poll::Ready(ListenersEvent::Closed { - listener_id, - addresses, - reason, - }) => { - log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason); - for addr in addresses.iter() { - this.behaviour.inject_expired_listen_addr(listener_id, addr); + Poll::Pending => listeners_not_ready = true, + Poll::Ready(listeners_event) => { + if let Some(swarm_event) = this.handle_listeners_event(listeners_event) { + return Poll::Ready(swarm_event); } - this.behaviour.inject_listener_closed( - listener_id, - match &reason { - Ok(()) => Ok(()), - Err(err) => Err(err), - }, - ); - return Poll::Ready(SwarmEvent::ListenerClosed { - listener_id, - addresses, - reason, - }); - } - Poll::Ready(ListenersEvent::Error { listener_id, error }) => { - this.behaviour.inject_listener_error(listener_id, &error); - return Poll::Ready(SwarmEvent::ListenerError { listener_id, error }); } }