Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial solution for windows mio problem #776 #868

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/sys/windows/from_raw_arc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
use std::ops::Deref;
use std::mem;
use std::sync::atomic::{self, AtomicUsize, Ordering};
use winapi::OVERLAPPED;

pub struct FromRawArc<T> {
_inner: *mut Inner<T>,
Expand Down Expand Up @@ -86,6 +87,29 @@ impl<T> Drop for FromRawArc<T> {
}
}

unsafe impl Send for FromRawArcStore { }
unsafe impl Sync for FromRawArcStore { }

pub struct FromRawArcStore {
_ptr: *mut OVERLAPPED,
deallocator: fn(*mut OVERLAPPED),
}

impl FromRawArcStore {
pub fn new(ptr: *mut OVERLAPPED, deallocator: fn(*mut OVERLAPPED)) -> FromRawArcStore {
FromRawArcStore {
_ptr: unsafe { mem::transmute(ptr) },
deallocator: deallocator,
}
}
}

impl Drop for FromRawArcStore {
fn drop(&mut self) {
(self.deallocator)(self._ptr);
}
}

#[cfg(test)]
mod tests {
use super::FromRawArc;
Expand Down
48 changes: 48 additions & 0 deletions src/sys/windows/selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::os::windows::prelude::*;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
use std::time::Duration;
use std::mem;

use lazycell::AtomicLazyCell;

Expand All @@ -16,6 +17,7 @@ use miow::iocp::{CompletionPort, CompletionStatus};
use event_imp::{Event, Evented, Ready};
use poll::{self, Poll};
use sys::windows::buffer_pool::BufferPool;
use sys::windows::from_raw_arc::FromRawArcStore;
use {Token, PollOpt};

/// Each Selector has a globally unique(ish) ID associated with it. This ID
Expand Down Expand Up @@ -48,6 +50,8 @@ struct SelectorInner {
/// Primitives will take buffers from this pool to perform I/O operations,
/// and once complete they'll be put back in.
buffers: Mutex<BufferPool>,

incompletes: Mutex<Vec<(isize, FromRawArcStore)>>,
}

impl Selector {
Expand All @@ -61,6 +65,7 @@ impl Selector {
id: id,
port: cp,
buffers: Mutex::new(BufferPool::new(256)),
incompletes: Mutex::new(Vec::new()),
}),
}
})
Expand Down Expand Up @@ -91,6 +96,21 @@ impl Selector {
ret = true;
continue;
}
// Deadlock will occur if you don't release it first before the callback.
{
let mut incompletes = self.inner.incompletes.lock().unwrap();
let pos = incompletes.iter().position(|item| item.0 == (status.overlapped() as isize));
match pos {
Some(pos) => {
let store = incompletes.remove(pos);
mem::forget(store);
},
None => {
trace!("cannot find store, omiting...");
continue;
}
}
}

let callback = unsafe {
(*(status.overlapped() as *mut Overlapped)).callback
Expand All @@ -104,6 +124,16 @@ impl Selector {
Ok(ret)
}

pub fn store_overlapped_content(&self, ptr: *mut OVERLAPPED, deallocator: fn(*mut OVERLAPPED)) {
let mut incompletes = self.inner.incompletes.lock().unwrap();
incompletes.push((ptr as isize, FromRawArcStore::new(ptr, deallocator)));
}

pub fn clean_overlapped_content(&self, ptr: *mut OVERLAPPED) {
let mut incompletes = self.inner.incompletes.lock().unwrap();
incompletes.retain(|item| ptr as isize != item.0);
}

/// Gets a reference to the underlying `CompletionPort` structure.
pub fn port(&self) -> &CompletionPort {
&self.inner.port
Expand Down Expand Up @@ -404,6 +434,24 @@ impl ReadyBinding {
.as_ref().unwrap()
.deregister(poll)
}

pub fn store_overlapped_content(&self, ptr: *mut OVERLAPPED, deallocator: fn(*mut OVERLAPPED)) {
if let Some(i) = self.binding.selector.borrow() {
let selector = Selector {
inner: i.clone(),
};
selector.store_overlapped_content(ptr, deallocator);
}
}

pub fn clean_overlapped_content(&self, ptr: *mut OVERLAPPED) {
if let Some(i) = self.binding.selector.borrow() {
let selector = Selector {
inner: i.clone(),
};
selector.clean_overlapped_content(ptr);
}
}
}

fn other(s: &str) -> io::Error {
Expand Down
40 changes: 36 additions & 4 deletions src/sys/windows/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,13 +394,14 @@ impl StreamImp {
self.inner.inner.lock().unwrap()
}

fn schedule_connect(&self, addr: &SocketAddr) -> io::Result<()> {
fn schedule_connect(&self, addr: &SocketAddr, me: &mut StreamInner) -> io::Result<()> {
unsafe {
trace!("scheduling a connect");
self.inner.socket.connect_overlapped(addr, &[], self.inner.read.as_mut_ptr())?;
}
// see docs above on StreamImp.inner for rationale on forget
mem::forget(self.clone());
me.iocp.store_overlapped_content(self.inner.read.as_mut_ptr(), read_deallocate);
Ok(())
}

Expand Down Expand Up @@ -454,6 +455,7 @@ impl StreamImp {
// see docs above on StreamImp.inner for rationale on forget
me.read = State::Pending(());
mem::forget(self.clone());
me.iocp.store_overlapped_content(self.inner.read.as_mut_ptr(), read_deallocate);
}
Err(e) => {
me.read = State::Error(e);
Expand Down Expand Up @@ -499,6 +501,7 @@ impl StreamImp {
// see docs above on StreamImp.inner for rationale on forget
me.write = State::Pending((buf, pos));
mem::forget(self.clone());
me.iocp.store_overlapped_content(self.inner.write.as_mut_ptr(), write_deallocate);
break;
}
Err(e) => {
Expand Down Expand Up @@ -578,6 +581,20 @@ fn write_done(status: &OVERLAPPED_ENTRY) {
}
}

fn read_deallocate(ptr: *mut OVERLAPPED) {
let me = StreamImp {
inner: unsafe { overlapped2arc!(ptr, StreamIo, read) },
};
drop(me);
}

fn write_deallocate(ptr: *mut OVERLAPPED) {
let me = StreamImp {
inner: unsafe { overlapped2arc!(ptr, StreamIo, write) },
};
drop(me);
}

impl Evented for TcpStream {
fn register(&self, poll: &Poll, token: Token,
interest: Ready, opts: PollOpt) -> io::Result<()> {
Expand All @@ -595,7 +612,7 @@ impl Evented for TcpStream {
// successful connect will worry about generating writable/readable
// events and scheduling a new read.
if let Some(addr) = me.deferred_connect.take() {
return self.imp.schedule_connect(&addr).map(|_| ())
return self.imp.schedule_connect(&addr, &mut me).map(|_| ())
}
self.post_register(interest, &mut me);
Ok(())
Expand Down Expand Up @@ -632,11 +649,15 @@ impl Drop for TcpStream {
// Note that "Empty" here may mean that a connect is pending, so we
// cancel even if that happens as well.
unsafe {
match self.inner().read {
let inner = self.inner();
match inner.read {
State::Pending(_) | State::Empty => {
trace!("cancelling active TCP read");
drop(super::cancel(&self.imp.inner.socket,
&self.imp.inner.read));
trace!("cleaning remaining overlapped contents");
inner.iocp.clean_overlapped_content(self.imp.inner.read.as_mut_ptr());
inner.iocp.clean_overlapped_content(self.imp.inner.write.as_mut_ptr());
}
State::Ready(_) | State::Error(_) => {}
}
Expand Down Expand Up @@ -754,6 +775,7 @@ impl ListenerImp {
// see docs above on StreamImp.inner for rationale on forget
me.accept = State::Pending(socket);
mem::forget(self.clone());
me.iocp.store_overlapped_content(self.inner.accept.as_mut_ptr(), accept_dellocate);
}
Err(e) => {
me.accept = State::Error(e);
Expand Down Expand Up @@ -794,6 +816,13 @@ fn accept_done(status: &OVERLAPPED_ENTRY) {
me2.add_readiness(&mut me, Ready::readable());
}

fn accept_dellocate(ptr: *mut OVERLAPPED) {
let me = ListenerImp {
inner: unsafe { overlapped2arc!(ptr, ListenerIo, accept) },
};
drop(me);
}

impl Evented for TcpListener {
fn register(&self, poll: &Poll, token: Token,
interest: Ready, opts: PollOpt) -> io::Result<()> {
Expand Down Expand Up @@ -836,11 +865,14 @@ impl Drop for TcpListener {
fn drop(&mut self) {
// If we're still internally reading, we're no longer interested.
unsafe {
match self.inner().accept {
let inner = self.inner();
match inner.accept {
State::Pending(_) => {
trace!("cancelling active TCP accept");
drop(super::cancel(&self.imp.inner.socket,
&self.imp.inner.accept));
trace!("cleaning remaining overlapped contents");
inner.iocp.clean_overlapped_content(self.imp.inner.accept.as_mut_ptr());
}
State::Empty |
State::Ready(_) |
Expand Down
17 changes: 17 additions & 0 deletions src/sys/windows/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ impl UdpSocket {
}?;
me.write = State::Pending(owned_buf);
mem::forget(self.imp.clone());
me.iocp.store_overlapped_content(self.imp.inner.write.as_mut_ptr(), send_deallocate);
Ok(amt)
}

Expand Down Expand Up @@ -147,6 +148,7 @@ impl UdpSocket {
}?;
me.write = State::Pending(owned_buf);
mem::forget(self.imp.clone());
me.iocp.store_overlapped_content(self.imp.inner.write.as_mut_ptr(), send_deallocate);
Ok(amt)
}

Expand Down Expand Up @@ -313,6 +315,7 @@ impl Imp {
Ok(_) => {
me.read = State::Pending(buf);
mem::forget(self.clone());
me.iocp.store_overlapped_content(self.inner.read.as_mut_ptr(), recv_deallocate);
}
Err(e) => {
me.read = State::Error(e);
Expand Down Expand Up @@ -411,3 +414,17 @@ fn recv_done(status: &OVERLAPPED_ENTRY) {
me.read = State::Ready(buf);
me2.add_readiness(&mut me, Ready::readable());
}

fn send_deallocate(ptr: *mut OVERLAPPED) {
let me = Imp {
inner: unsafe { overlapped2arc!(ptr, Io, write) },
};
drop(me);
}

fn recv_deallocate(ptr: *mut OVERLAPPED) {
let me = Imp {
inner: unsafe { overlapped2arc!(ptr, Io, read) },
};
drop(me);
}
1 change: 1 addition & 0 deletions test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ mod test_tcp_level;
mod test_udp_level;
mod test_udp_socket;
mod test_write_then_drop;
mod test_drop_cancels_interest_and_shuts_down;

#[cfg(feature = "with-deprecated")]
mod test_notify;
Expand Down
60 changes: 60 additions & 0 deletions test/test_drop_cancels_interest_and_shuts_down.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#[test]
fn drop_cancels_interest_and_shuts_down() {
use mio::net::TcpStream;
use mio::*;
use std::io;
use std::io::Read;
use std::net::TcpListener;
use std::thread;
use std::time::Duration;

use env_logger;
let _ = env_logger::init();
let l = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = l.local_addr().unwrap();

let t = thread::spawn(move || {
let mut s = l.incoming().next().unwrap().unwrap();
s.set_read_timeout(Some(Duration::from_secs(5)))
.expect("set_read_timeout");
let r = s.read(&mut [0; 16]);
match r {
Ok(_) => (),
Err(e) => {
if e.kind() != io::ErrorKind::UnexpectedEof {
panic!(e);
}
}
}
});

let poll = Poll::new().unwrap();
let mut s = TcpStream::connect(&addr).unwrap();

poll.register(
&s,
Token(1),
Ready::readable() | Ready::writable(),
PollOpt::edge(),
).unwrap();
let mut events = Events::with_capacity(16);
'outer: loop {
poll.poll(&mut events, None).unwrap();
for event in &events {
if event.token() == Token(1) {
// connected
break 'outer;
}
}
}

let mut b = [0; 1024];
match s.read(&mut b) {
Ok(_) => panic!("unexpected ok"),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => (),
Err(e) => panic!("unexpected error: {:?}", e),
}

drop(s);
t.join().unwrap();
}