Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create ListenersEvent::Closed on Swarm::remove_listener #2261

Merged
merged 6 commits into from
Oct 11, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion core/src/connection/listeners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ where
listeners: VecDeque<Pin<Box<Listener<TTrans>>>>,
/// The next listener ID to assign.
next_id: ListenerId,
/// Pending listeners events to return from `poll`.
elenaf9 marked this conversation as resolved.
Show resolved Hide resolved
pending_events: VecDeque<ListenersEvent<TTrans>>,
}

/// The ID of a single listener.
Expand Down Expand Up @@ -177,6 +179,7 @@ where
transport,
listeners: VecDeque::new(),
next_id: ListenerId(1),
pending_events: VecDeque::new(),
}
}

Expand All @@ -187,6 +190,7 @@ where
transport,
listeners: VecDeque::with_capacity(capacity),
next_id: ListenerId(1),
pending_events: VecDeque::new(),
}
}

Expand Down Expand Up @@ -216,7 +220,13 @@ where
/// Return `Ok(())` if a listener with this ID was in the list.
pub fn remove_listener(&mut self, id: ListenerId) -> Result<(), ()> {
if let Some(i) = self.listeners.iter().position(|l| l.id == id) {
self.listeners.remove(i);
let mut listener = self.listeners.remove(i).ok_or(())?;
let listener_project = listener.as_mut().project();
self.pending_events.push_back(ListenersEvent::Closed {
listener_id: *listener_project.id,
addresses: listener_project.addresses.drain(..).collect(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would std::mem::replace also work here? Slightly more readable IMO.

reason: Ok(()),
});
Ok(())
} else {
Err(())
Expand All @@ -235,6 +245,10 @@ where

/// Provides an API similar to `Stream`, except that it cannot end.
pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<ListenersEvent<TTrans>> {
// Return pending events from closed listeners.
if let Some(event) = self.pending_events.pop_front() {
return Poll::Ready(event);
}
// We remove each element from `listeners` one by one and add them back.
let mut remaining = self.listeners.len();
while let Some(mut listener) = self.listeners.pop_back() {
Expand Down Expand Up @@ -538,4 +552,36 @@ mod tests {
}
});
}

#[test]
fn listener_closed() {
async_std::task::block_on(async move {
let mem_transport = transport::MemoryTransport::default();

let mut listeners = ListenersStream::new(mem_transport);
let id = listeners.listen_on("/memory/0".parse().unwrap()).unwrap();

let event = listeners.next().await.unwrap();
let addr;
if let ListenersEvent::NewAddress { listen_addr, .. } = event {
addr = listen_addr
} else {
panic!("Was expecting the listen address to be reported")
}

listeners.remove_listener(id).unwrap();

match listeners.next().await.unwrap() {
ListenersEvent::Closed {
listener_id,
addresses,
reason: Ok(()),
} => {
assert_eq!(listener_id, id);
assert!(addresses.contains(&addr));
}
other => panic!("Unexpected listeners event: {:?}", other),
}
});
}
}