Skip to content

Commit

Permalink
Initial solution for windows mio problem? Idk...
Browse files Browse the repository at this point in the history
  • Loading branch information
PerfectLaugh committed Aug 19, 2018
1 parent 82066e5 commit e422e6f
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 4 deletions.
25 changes: 25 additions & 0 deletions src/sys/windows/from_raw_arc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
use std::ops::Deref;
use std::mem;
use std::sync::atomic::{self, AtomicUsize, Ordering};
use winapi::OVERLAPPED;

pub struct FromRawArc<T> {
_inner: *mut Inner<T>,
Expand Down Expand Up @@ -86,6 +87,30 @@ impl<T> Drop for FromRawArc<T> {
}
}

unsafe impl Send for FromRawArcStore { }
unsafe impl Sync for FromRawArcStore { }

pub struct FromRawArcStore {
_ptr: *mut OVERLAPPED,
deallocator: fn(*mut OVERLAPPED),
}

impl FromRawArcStore {
pub fn new(ptr: *mut OVERLAPPED, deallocator: fn(*mut OVERLAPPED)) -> FromRawArcStore {
let store = FromRawArcStore {
_ptr: unsafe { mem::transmute(ptr) },
deallocator: deallocator,
};
store
}
}

impl Drop for FromRawArcStore {
fn drop(&mut self) {
(self.deallocator)(self._ptr);
}
}

#[cfg(test)]
mod tests {
use super::FromRawArc;
Expand Down
47 changes: 47 additions & 0 deletions src/sys/windows/selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::os::windows::prelude::*;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
use std::time::Duration;
use std::mem;

use lazycell::AtomicLazyCell;

Expand All @@ -16,6 +17,7 @@ use miow::iocp::{CompletionPort, CompletionStatus};
use event_imp::{Event, Evented, Ready};
use poll::{self, Poll};
use sys::windows::buffer_pool::BufferPool;
use sys::windows::from_raw_arc::FromRawArcStore;
use {Token, PollOpt};

/// Each Selector has a globally unique(ish) ID associated with it. This ID
Expand Down Expand Up @@ -48,6 +50,8 @@ struct SelectorInner {
/// Primitives will take buffers from this pool to perform I/O operations,
/// and once complete they'll be put back in.
buffers: Mutex<BufferPool>,

incompletes: Mutex<Vec<(isize, FromRawArcStore)>>,
}

impl Selector {
Expand All @@ -61,6 +65,7 @@ impl Selector {
id: id,
port: cp,
buffers: Mutex::new(BufferPool::new(256)),
incompletes: Mutex::new(Vec::new()),
}),
}
})
Expand Down Expand Up @@ -91,6 +96,20 @@ impl Selector {
ret = true;
continue;
}
// Deadlock will occur if you don't release it first before the callback.
{
let mut incompletes = self.inner.incompletes.lock().unwrap();
let pos = incompletes.iter().position(|item| item.0 == (status.overlapped() as isize));
match pos {
Some(pos) => {
let store = incompletes.remove(pos);
mem::forget(store);
},
None => {
trace!("cannot find store, omiting...");
}
}
}

let callback = unsafe {
(*(status.overlapped() as *mut Overlapped)).callback
Expand All @@ -104,6 +123,16 @@ impl Selector {
Ok(ret)
}

pub fn store_overlapped_content(&self, ptr: *mut OVERLAPPED, deallocator: fn(*mut OVERLAPPED)) {
let mut incompletes = self.inner.incompletes.lock().unwrap();
incompletes.push((ptr as isize, FromRawArcStore::new(ptr, deallocator)));
}

pub fn clean_overlapped_content(&self, ptr: *mut OVERLAPPED) {
let mut incompletes = self.inner.incompletes.lock().unwrap();
incompletes.retain(|item| ptr as isize != item.0);
}

/// Gets a reference to the underlying `CompletionPort` structure.
pub fn port(&self) -> &CompletionPort {
&self.inner.port
Expand Down Expand Up @@ -404,6 +433,24 @@ impl ReadyBinding {
.as_ref().unwrap()
.deregister(poll)
}

pub fn store_overlapped_content(&self, ptr: *mut OVERLAPPED, deallocator: fn(*mut OVERLAPPED)) {
if let Some(i) = self.binding.selector.borrow() {
let selector = Selector {
inner: i.clone(),
};
selector.store_overlapped_content(ptr, deallocator);
}
}

pub fn clean_overlapped_content(&self, ptr: *mut OVERLAPPED) {
if let Some(i) = self.binding.selector.borrow() {
let selector = Selector {
inner: i.clone(),
};
selector.clean_overlapped_content(ptr);
}
}
}

fn other(s: &str) -> io::Error {
Expand Down
40 changes: 36 additions & 4 deletions src/sys/windows/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,13 +394,14 @@ impl StreamImp {
self.inner.inner.lock().unwrap()
}

fn schedule_connect(&self, addr: &SocketAddr) -> io::Result<()> {
fn schedule_connect(&self, addr: &SocketAddr, me: &mut StreamInner) -> io::Result<()> {
unsafe {
trace!("scheduling a connect");
self.inner.socket.connect_overlapped(addr, &[], self.inner.read.as_mut_ptr())?;
}
// see docs above on StreamImp.inner for rationale on forget
mem::forget(self.clone());
me.iocp.store_overlapped_content(self.inner.read.as_mut_ptr(), read_deallocate);
Ok(())
}

Expand Down Expand Up @@ -454,6 +455,7 @@ impl StreamImp {
// see docs above on StreamImp.inner for rationale on forget
me.read = State::Pending(());
mem::forget(self.clone());
me.iocp.store_overlapped_content(self.inner.read.as_mut_ptr(), read_deallocate);
}
Err(e) => {
me.read = State::Error(e);
Expand Down Expand Up @@ -499,6 +501,7 @@ impl StreamImp {
// see docs above on StreamImp.inner for rationale on forget
me.write = State::Pending((buf, pos));
mem::forget(self.clone());
me.iocp.store_overlapped_content(self.inner.write.as_mut_ptr(), write_deallocate);
break;
}
Err(e) => {
Expand Down Expand Up @@ -578,6 +581,20 @@ fn write_done(status: &OVERLAPPED_ENTRY) {
}
}

fn read_deallocate(ptr: *mut OVERLAPPED) {
let me = StreamImp {
inner: unsafe { overlapped2arc!(ptr, StreamIo, read) },
};
drop(me);
}

fn write_deallocate(ptr: *mut OVERLAPPED) {
let me = StreamImp {
inner: unsafe { overlapped2arc!(ptr, StreamIo, write) },
};
drop(me);
}

impl Evented for TcpStream {
fn register(&self, poll: &Poll, token: Token,
interest: Ready, opts: PollOpt) -> io::Result<()> {
Expand All @@ -595,7 +612,7 @@ impl Evented for TcpStream {
// successful connect will worry about generating writable/readable
// events and scheduling a new read.
if let Some(addr) = me.deferred_connect.take() {
return self.imp.schedule_connect(&addr).map(|_| ())
return self.imp.schedule_connect(&addr, &mut me).map(|_| ())
}
self.post_register(interest, &mut me);
Ok(())
Expand Down Expand Up @@ -632,11 +649,15 @@ impl Drop for TcpStream {
// Note that "Empty" here may mean that a connect is pending, so we
// cancel even if that happens as well.
unsafe {
match self.inner().read {
let inner = self.inner();
match inner.read {
State::Pending(_) | State::Empty => {
trace!("cancelling active TCP read");
drop(super::cancel(&self.imp.inner.socket,
&self.imp.inner.read));
trace!("cleaning remaining overlapped contents");
inner.iocp.clean_overlapped_content(self.imp.inner.read.as_mut_ptr());
inner.iocp.clean_overlapped_content(self.imp.inner.write.as_mut_ptr());
}
State::Ready(_) | State::Error(_) => {}
}
Expand Down Expand Up @@ -754,6 +775,7 @@ impl ListenerImp {
// see docs above on StreamImp.inner for rationale on forget
me.accept = State::Pending(socket);
mem::forget(self.clone());
me.iocp.store_overlapped_content(self.inner.accept.as_mut_ptr(), accept_dellocate);
}
Err(e) => {
me.accept = State::Error(e);
Expand Down Expand Up @@ -794,6 +816,13 @@ fn accept_done(status: &OVERLAPPED_ENTRY) {
me2.add_readiness(&mut me, Ready::readable());
}

fn accept_dellocate(ptr: *mut OVERLAPPED) {
let me = ListenerImp {
inner: unsafe { overlapped2arc!(ptr, ListenerIo, accept) },
};
drop(me);
}

impl Evented for TcpListener {
fn register(&self, poll: &Poll, token: Token,
interest: Ready, opts: PollOpt) -> io::Result<()> {
Expand Down Expand Up @@ -836,11 +865,14 @@ impl Drop for TcpListener {
fn drop(&mut self) {
// If we're still internally reading, we're no longer interested.
unsafe {
match self.inner().accept {
let inner = self.inner();
match inner.accept {
State::Pending(_) => {
trace!("cancelling active TCP accept");
drop(super::cancel(&self.imp.inner.socket,
&self.imp.inner.accept));
trace!("cleaning remaining overlapped contents");
inner.iocp.clean_overlapped_content(self.imp.inner.accept.as_mut_ptr());
}
State::Empty |
State::Ready(_) |
Expand Down
17 changes: 17 additions & 0 deletions src/sys/windows/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ impl UdpSocket {
}?;
me.write = State::Pending(owned_buf);
mem::forget(self.imp.clone());
me.iocp.store_overlapped_content(self.imp.inner.write.as_mut_ptr(), send_deallocate);
Ok(amt)
}

Expand Down Expand Up @@ -147,6 +148,7 @@ impl UdpSocket {
}?;
me.write = State::Pending(owned_buf);
mem::forget(self.imp.clone());
me.iocp.store_overlapped_content(self.imp.inner.write.as_mut_ptr(), send_deallocate);
Ok(amt)
}

Expand Down Expand Up @@ -313,6 +315,7 @@ impl Imp {
Ok(_) => {
me.read = State::Pending(buf);
mem::forget(self.clone());
me.iocp.store_overlapped_content(self.inner.read.as_mut_ptr(), recv_deallocate);
}
Err(e) => {
me.read = State::Error(e);
Expand Down Expand Up @@ -411,3 +414,17 @@ fn recv_done(status: &OVERLAPPED_ENTRY) {
me.read = State::Ready(buf);
me2.add_readiness(&mut me, Ready::readable());
}

fn send_deallocate(ptr: *mut OVERLAPPED) {
let me = Imp {
inner: unsafe { overlapped2arc!(ptr, Io, write) },
};
drop(me);
}

fn recv_deallocate(ptr: *mut OVERLAPPED) {
let me = Imp {
inner: unsafe { overlapped2arc!(ptr, Io, read) },
};
drop(me);
}
1 change: 1 addition & 0 deletions test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ mod test_tcp_level;
mod test_udp_level;
mod test_udp_socket;
mod test_write_then_drop;
mod test_drop_cancels_interest_and_shuts_down;

#[cfg(feature = "with-deprecated")]
mod test_notify;
Expand Down
62 changes: 62 additions & 0 deletions test/test_drop_cancels_interest_and_shuts_down.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#[test]
fn drop_cancels_interest_and_shuts_down() {
use mio::net::TcpStream;
use mio::*;
use std::io;
use std::io::Read;
use std::net::TcpListener;
use std::thread;
use std::time::Duration;

use env_logger;
let _ = env_logger::init();
let l = TcpListener::bind("127.0.0.1:18000").unwrap();
let addr = l.local_addr().unwrap();

let t = thread::spawn(move || {
let mut s = l.incoming().next().unwrap().unwrap();
s.set_read_timeout(Some(Duration::from_secs(5)))
.expect("set_read_timeout");
let r = s.read(&mut [0; 16]);
match r {
Ok(_) => (),
Err(e) => {
if e.kind() != io::ErrorKind::UnexpectedEof {
eprintln!("{}", e);
panic!(e);
}
}
}
});

let poll = Poll::new().unwrap();
let mut s = TcpStream::connect(&addr).unwrap();

poll.register(
&s,
Token(1),
Ready::readable() | Ready::writable(),
PollOpt::edge(),
).unwrap();
let mut events = Events::with_capacity(16);
'outer: loop {
poll.poll(&mut events, None).unwrap();
for event in &events {
if event.token() == Token(1) {
// connected
break 'outer;
}
}
}

let mut b = [0; 1024];
match s.read(&mut b) {
Ok(_) => panic!("unexpected ok"),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => (),
Err(e) => panic!("unexpected error: {:?}", e),
}

drop(poll);
drop(s);
t.join().unwrap();
}

0 comments on commit e422e6f

Please sign in to comment.