Skip to content

Commit

Permalink
swarm/src/lib: Extract ListenersEvent handling into new method
Browse files Browse the repository at this point in the history
  • Loading branch information
mxinden committed May 2, 2022
1 parent 9f0ca95 commit 87b7cb3
Showing 1 changed file with 99 additions and 90 deletions.
189 changes: 99 additions & 90 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,101 @@ where
None
}

fn handle_listeners_event(
&mut self,
event: ListenersEvent<transport::Boxed<(PeerId, StreamMuxerBox)>>,
) -> Option<SwarmEvent<TBehaviour::OutEvent, THandlerErr<TBehaviour>>> {
match event {
ListenersEvent::Incoming {
listener_id: _,
upgrade,
local_addr,
send_back_addr,
} => {
let handler = self.behaviour.new_handler();
match self.pool.add_incoming(
upgrade,
handler,
IncomingInfo {
local_addr: &local_addr,
send_back_addr: &send_back_addr,
},
) {
Ok(_connection_id) => {
return Some(SwarmEvent::IncomingConnection {
local_addr,
send_back_addr,
});
}
Err((connection_limit, handler)) => {
self.behaviour
.inject_listen_failure(&local_addr, &send_back_addr, handler);
log::warn!("Incoming connection rejected: {:?}", connection_limit);
}
};
}
ListenersEvent::NewAddress {
listener_id,
listen_addr,
} => {
log::debug!("Listener {:?}; New address: {:?}", listener_id, listen_addr);
if !self.listened_addrs.contains(&listen_addr) {
self.listened_addrs.push(listen_addr.clone())
}
self.behaviour
.inject_new_listen_addr(listener_id, &listen_addr);
return Some(SwarmEvent::NewListenAddr {
listener_id,
address: listen_addr,
});
}
ListenersEvent::AddressExpired {
listener_id,
listen_addr,
} => {
log::debug!(
"Listener {:?}; Expired address {:?}.",
listener_id,
listen_addr
);
self.listened_addrs.retain(|a| a != &listen_addr);
self.behaviour
.inject_expired_listen_addr(listener_id, &listen_addr);
return Some(SwarmEvent::ExpiredListenAddr {
listener_id,
address: listen_addr,
});
}
ListenersEvent::Closed {
listener_id,
addresses,
reason,
} => {
log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason);
for addr in addresses.iter() {
self.behaviour.inject_expired_listen_addr(listener_id, addr);
}
self.behaviour.inject_listener_closed(
listener_id,
match &reason {
Ok(()) => Ok(()),
Err(err) => Err(err),
},
);
return Some(SwarmEvent::ListenerClosed {
listener_id,
addresses,
reason,
});
}
ListenersEvent::Error { listener_id, error } => {
self.behaviour.inject_listener_error(listener_id, &error);
return Some(SwarmEvent::ListenerError { listener_id, error });
}
}
None
}

/// Internal function used by everything event-related.
///
/// Polls the `Swarm` for the next event.
Expand All @@ -846,97 +941,11 @@ where

// Poll the listener(s) for new connections.
match ListenersStream::poll(Pin::new(&mut this.listeners), cx) {
Poll::Pending => {
listeners_not_ready = true;
}
Poll::Ready(ListenersEvent::Incoming {
listener_id: _,
upgrade,
local_addr,
send_back_addr,
}) => {
let handler = this.behaviour.new_handler();
match this.pool.add_incoming(
upgrade,
handler,
IncomingInfo {
local_addr: &local_addr,
send_back_addr: &send_back_addr,
},
) {
Ok(_connection_id) => {
return Poll::Ready(SwarmEvent::IncomingConnection {
local_addr,
send_back_addr,
});
}
Err((connection_limit, handler)) => {
this.behaviour.inject_listen_failure(
&local_addr,
&send_back_addr,
handler,
);
log::warn!("Incoming connection rejected: {:?}", connection_limit);
}
};
}
Poll::Ready(ListenersEvent::NewAddress {
listener_id,
listen_addr,
}) => {
log::debug!("Listener {:?}; New address: {:?}", listener_id, listen_addr);
if !this.listened_addrs.contains(&listen_addr) {
this.listened_addrs.push(listen_addr.clone())
}
this.behaviour
.inject_new_listen_addr(listener_id, &listen_addr);
return Poll::Ready(SwarmEvent::NewListenAddr {
listener_id,
address: listen_addr,
});
}
Poll::Ready(ListenersEvent::AddressExpired {
listener_id,
listen_addr,
}) => {
log::debug!(
"Listener {:?}; Expired address {:?}.",
listener_id,
listen_addr
);
this.listened_addrs.retain(|a| a != &listen_addr);
this.behaviour
.inject_expired_listen_addr(listener_id, &listen_addr);
return Poll::Ready(SwarmEvent::ExpiredListenAddr {
listener_id,
address: listen_addr,
});
}
Poll::Ready(ListenersEvent::Closed {
listener_id,
addresses,
reason,
}) => {
log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason);
for addr in addresses.iter() {
this.behaviour.inject_expired_listen_addr(listener_id, addr);
Poll::Pending => listeners_not_ready = true,
Poll::Ready(listeners_event) => {
if let Some(swarm_event) = this.handle_listeners_event(listeners_event) {
return Poll::Ready(swarm_event);
}
this.behaviour.inject_listener_closed(
listener_id,
match &reason {
Ok(()) => Ok(()),
Err(err) => Err(err),
},
);
return Poll::Ready(SwarmEvent::ListenerClosed {
listener_id,
addresses,
reason,
});
}
Poll::Ready(ListenersEvent::Error { listener_id, error }) => {
this.behaviour.inject_listener_error(listener_id, &error);
return Poll::Ready(SwarmEvent::ListenerError { listener_id, error });
}
}

Expand Down

0 comments on commit 87b7cb3

Please sign in to comment.