Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core/swarm] Emit events for active connection close and fix disconnect(). #1619

Merged
merged 16 commits into from
Aug 4, 2020
Merged
2 changes: 1 addition & 1 deletion core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub use handler::{ConnectionHandler, ConnectionHandlerEvent, IntoConnectionHandl
pub use listeners::{ListenerId, ListenersStream, ListenersEvent};
pub use manager::ConnectionId;
pub use substream::{Substream, SubstreamEndpoint, Close};
pub use pool::{EstablishedConnection, EstablishedConnectionIter, PendingConnection};
pub use pool::{EstablishedConnection, EstablishedConnectionIter, PendingConnection, StartClose};

use crate::muxing::StreamMuxer;
use crate::{Multiaddr, PeerId};
Expand Down
62 changes: 42 additions & 20 deletions core/src/connection/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,18 +194,19 @@ pub enum Event<'a, I, O, H, TE, HE, C> {
handler: H
},

/// An established connection has encountered an error.
ConnectionError {
/// An established connection has been closed.
ConnectionClosed {
/// The connection ID.
///
/// As a result of the error, the connection has been removed
/// from the `Manager` and is being closed. Hence this ID will
/// no longer resolve to a valid entry in the manager.
/// > **Note**: Closed connections are removed from the `Manager`.
/// > Hence this ID will no longer resolve to a valid entry in
/// > the manager.
id: ConnectionId,
/// Information about the connection that encountered the error.
/// Information about the closed connection.
connected: Connected<C>,
/// The error that occurred.
error: ConnectionError<HE>,
/// The error that occurred, if any. If `None`, the connection
/// has been actively closed.
error: Option<ConnectionError<HE>>,
},

/// A connection has been established.
Expand Down Expand Up @@ -336,11 +337,11 @@ impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
/// Polls the manager for events relating to the managed connections.
pub fn poll<'a>(&'a mut self, cx: &mut Context) -> Poll<Event<'a, I, O, H, TE, HE, C>> {
// Advance the content of `local_spawns`.
while let Poll::Ready(Some(_)) = Stream::poll_next(Pin::new(&mut self.local_spawns), cx) {}
while let Poll::Ready(Some(_)) = self.local_spawns.poll_next_unpin(cx) {}

// Poll for the first event for which the manager still has a registered task, if any.
let event = loop {
match Stream::poll_next(Pin::new(&mut self.events_rx), cx) {
match self.events_rx.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => {
if self.tasks.contains_key(event.id()) { // (1)
break event
Expand Down Expand Up @@ -369,18 +370,17 @@ impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
let _ = task.remove();
Event::PendingConnectionError { id, error, handler }
}
task::Event::Error { id, error } => {
task::Event::Closed { id, error } => {
let id = ConnectionId(id);
let task = task.remove();
match task.state {
TaskState::Established(connected) =>
Event::ConnectionError { id, connected, error },
Event::ConnectionClosed { id, connected, error },
TaskState::Pending => unreachable!(
"`Event::Error` implies (2) occurred on that task and thus (3)."
"`Event::Closed` implies (2) occurred on that task and thus (3)."
),
}
}

})
} else {
unreachable!("By (1)")
Expand Down Expand Up @@ -426,10 +426,11 @@ impl<'a, I, C> EstablishedEntry<'a, I, C> {
/// > task _may not be notified_ if sending the event fails due to
/// > the connection handler not being ready at this time.
pub fn notify_handler(&mut self, event: I) -> Result<(), I> {
let cmd = task::Command::NotifyHandler(event);
let cmd = task::Command::NotifyHandler(event); // (*)
self.task.get_mut().sender.try_send(cmd)
.map_err(|e| match e.into_inner() {
task::Command::NotifyHandler(event) => event
task::Command::NotifyHandler(event) => event,
_ => unreachable!("by (*)")
romanb marked this conversation as resolved.
Show resolved Hide resolved
})
}

Expand All @@ -443,6 +444,24 @@ impl<'a, I, C> EstablishedEntry<'a, I, C> {
self.task.get_mut().sender.poll_ready(cx).map_err(|_| ())
}

/// Tries to send a close command to the associated background task,
/// thus initiating a graceful active close of the connection.
///
/// When the connection is ultimately closed, [`Event::ConnectionClosed`]
/// is emitted by [`Manager::poll`].
pub fn poll_start_close(&mut self, cx: &mut Context) -> Poll<()> {
match self.task.get_mut().sender.poll_ready(cx) {
Poll::Ready(result) => {
if result.is_ok() {
// If it fails now then the task is already gone.
let _ = self.task.get_mut().sender.try_send(task::Command::Close);
}
Poll::Ready(())
}
Poll::Pending => Poll::Pending
}
}

/// Obtains information about the established connection.
pub fn connected(&self) -> &Connected<C> {
match &self.task.get().state {
Expand All @@ -451,16 +470,18 @@ impl<'a, I, C> EstablishedEntry<'a, I, C> {
}
}

/// Closes the connection represented by this entry,
/// returning the connection information.
pub fn close(self) -> Connected<C> {
/// Instantly removes the entry from the manager, dropping
/// the command channel to the background task of the connection,
/// which will thus drop the connection asap without an orderly
/// close or emitting another event.
pub fn remove(self) -> Connected<C> {
match self.task.remove().state {
TaskState::Established(c) => c,
TaskState::Pending => unreachable!("By Entry::new()")
}
}

/// Returns the connection id.
/// Returns the connection ID.
pub fn id(&self) -> ConnectionId {
ConnectionId(*self.task.key())
}
Expand All @@ -484,3 +505,4 @@ impl<'a, I, C> PendingEntry<'a, I, C> {
self.task.remove();
}
}

Loading