Skip to content

Commit

Permalink
try to fix test code and did the cargo +nightly fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
PerfectLaugh committed Aug 20, 2018
1 parent e422e6f commit fe3a06e
Show file tree
Hide file tree
Showing 71 changed files with 3,289 additions and 1,977 deletions.
8 changes: 2 additions & 6 deletions benches/bench_poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ extern crate mio;
extern crate test;

use mio::*;
use test::Bencher;
use std::sync::Arc;
use std::thread;
use test::Bencher;

#[bench]
fn bench_poll(bench: &mut Bencher) {
Expand All @@ -20,11 +20,7 @@ fn bench_poll(bench: &mut Bencher) {
let mut set_readiness = vec![];

for i in 0..NUM {
let (r, s) = Registration::new(
&poll,
Token(i),
Ready::readable(),
PollOpt::edge());
let (r, s) = Registration::new(&poll, Token(i), Ready::readable(), PollOpt::edge());

registrations.push(r);
set_readiness.push(s);
Expand Down
101 changes: 68 additions & 33 deletions src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

#![allow(unused_imports, deprecated, missing_debug_implementations)]

use {io, Ready, Poll, PollOpt, Registration, SetReadiness, Token};
use event::Evented;
use lazycell::{LazyCell, AtomicLazyCell};
use lazycell::{AtomicLazyCell, LazyCell};
use std::any::Any;
use std::fmt;
use std::error;
use std::sync::{mpsc, Arc};
use std::fmt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{mpsc, Arc};
use {io, Poll, PollOpt, Ready, Registration, SetReadiness, Token};

/// Creates a new asynchronous channel, where the `Receiver` can be registered
/// with `Poll`.
Expand Down Expand Up @@ -116,12 +116,10 @@ struct Inner {

impl<T> Sender<T> {
pub fn send(&self, t: T) -> Result<(), SendError<T>> {
self.tx.send(t)
.map_err(SendError::from)
.and_then(|_| {
self.ctl.inc()?;
Ok(())
})
self.tx.send(t).map_err(SendError::from).and_then(|_| {
self.ctl.inc()?;
Ok(())
})
}
}

Expand All @@ -136,21 +134,17 @@ impl<T> Clone for Sender<T> {

impl<T> SyncSender<T> {
pub fn send(&self, t: T) -> Result<(), SendError<T>> {
self.tx.send(t)
.map_err(From::from)
.and_then(|_| {
self.ctl.inc()?;
Ok(())
})
self.tx.send(t).map_err(From::from).and_then(|_| {
self.ctl.inc()?;
Ok(())
})
}

pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
self.tx.try_send(t)
.map_err(From::from)
.and_then(|_| {
self.ctl.inc()?;
Ok(())
})
self.tx.try_send(t).map_err(From::from).and_then(|_| {
self.ctl.inc()?;
Ok(())
})
}
}

Expand All @@ -173,11 +167,23 @@ impl<T> Receiver<T> {
}

impl<T> Evented for Receiver<T> {
fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
fn register(
&self,
poll: &Poll,
token: Token,
interest: Ready,
opts: PollOpt,
) -> io::Result<()> {
self.ctl.register(poll, token, interest, opts)
}

fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
fn reregister(
&self,
poll: &Poll,
token: Token,
interest: Ready,
opts: PollOpt,
) -> io::Result<()> {
self.ctl.reregister(poll, token, interest, opts)
}

Expand Down Expand Up @@ -211,7 +217,9 @@ impl SenderCtl {
impl Clone for SenderCtl {
fn clone(&self) -> SenderCtl {
self.inner.senders.fetch_add(1, Ordering::Relaxed);
SenderCtl { inner: self.inner.clone() }
SenderCtl {
inner: self.inner.clone(),
}
}
}

Expand Down Expand Up @@ -250,36 +258,63 @@ impl ReceiverCtl {
}

impl Evented for ReceiverCtl {
fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
fn register(
&self,
poll: &Poll,
token: Token,
interest: Ready,
opts: PollOpt,
) -> io::Result<()> {
if self.registration.borrow().is_some() {
return Err(io::Error::new(io::ErrorKind::Other, "receiver already registered"));
return Err(io::Error::new(
io::ErrorKind::Other,
"receiver already registered",
));
}

let (registration, set_readiness) = Registration::new(poll, token, interest, opts);


if self.inner.pending.load(Ordering::Relaxed) > 0 {
// TODO: Don't drop readiness
let _ = set_readiness.set_readiness(Ready::readable());
}

self.registration.fill(registration).ok().expect("unexpected state encountered");
self.inner.set_readiness.fill(set_readiness).ok().expect("unexpected state encountered");
self.registration
.fill(registration)
.ok()
.expect("unexpected state encountered");
self.inner
.set_readiness
.fill(set_readiness)
.ok()
.expect("unexpected state encountered");

Ok(())
}

fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
fn reregister(
&self,
poll: &Poll,
token: Token,
interest: Ready,
opts: PollOpt,
) -> io::Result<()> {
match self.registration.borrow() {
Some(registration) => registration.update(poll, token, interest, opts),
None => Err(io::Error::new(io::ErrorKind::Other, "receiver not registered")),
None => Err(io::Error::new(
io::ErrorKind::Other,
"receiver not registered",
)),
}
}

fn deregister(&self, poll: &Poll) -> io::Result<()> {
match self.registration.borrow() {
Some(registration) => registration.deregister(poll),
None => Err(io::Error::new(io::ErrorKind::Other, "receiver not registered")),
None => Err(io::Error::new(
io::ErrorKind::Other,
"receiver not registered",
)),
}
}
}
Expand Down
62 changes: 45 additions & 17 deletions src/deprecated/event_loop.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use {channel, Poll, Events, Token};
use event::Evented;
use deprecated::{Handler, NotifyError};
use event_imp::{Event, Ready, PollOpt};
use timer::{self, Timer, Timeout};
use std::{io, fmt, usize};
use event::Evented;
use event_imp::{Event, PollOpt, Ready};
use std::default::Default;
use std::time::Duration;
use std::{fmt, io, usize};
use timer::{self, Timeout, Timer};
use {channel, Events, Poll, Token};

#[derive(Debug, Default, Clone)]
pub struct EventLoopBuilder {
Expand Down Expand Up @@ -101,7 +101,6 @@ const NOTIFY: Token = Token(usize::MAX - 1);
const TIMER: Token = Token(usize::MAX - 2);

impl<H: Handler> EventLoop<H> {

/// Constructs a new `EventLoop` using the default configuration values.
/// The `EventLoop` will not be running.
pub fn new() -> io::Result<EventLoop<H>> {
Expand All @@ -122,7 +121,12 @@ impl<H: Handler> EventLoop<H> {
let (tx, rx) = channel::sync_channel(config.notify_capacity);

// Register the notification wakeup FD with the IO poller
poll.register(&rx, NOTIFY, Ready::readable(), PollOpt::edge() | PollOpt::oneshot())?;
poll.register(
&rx,
NOTIFY,
Ready::readable(),
PollOpt::edge() | PollOpt::oneshot(),
)?;
poll.register(&timer, TIMER, Ready::readable(), PollOpt::edge())?;

Ok(EventLoop {
Expand Down Expand Up @@ -241,15 +245,29 @@ impl<H: Handler> EventLoop<H> {
}

/// Registers an IO handle with the event loop.
pub fn register<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()>
where E: Evented
pub fn register<E: ?Sized>(
&mut self,
io: &E,
token: Token,
interest: Ready,
opt: PollOpt,
) -> io::Result<()>
where
E: Evented,
{
self.poll.register(io, token, interest, opt)
}

/// Re-Registers an IO handle with the event loop.
pub fn reregister<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()>
where E: Evented
pub fn reregister<E: ?Sized>(
&mut self,
io: &E,
token: Token,
interest: Ready,
opt: PollOpt,
) -> io::Result<()>
where
E: Evented,
{
self.poll.reregister(io, token, interest, opt)
}
Expand All @@ -275,7 +293,10 @@ impl<H: Handler> EventLoop<H> {
///
/// Warning: kqueue effectively builds in deregister when using edge-triggered mode with
/// oneshot. Calling `deregister()` on the socket will cause a TcpStream error.
pub fn deregister<E: ?Sized>(&mut self, io: &E) -> io::Result<()> where E: Evented {
pub fn deregister<E: ?Sized>(&mut self, io: &E) -> io::Result<()>
where
E: Evented,
{
self.poll.deregister(io)
}

Expand Down Expand Up @@ -328,7 +349,7 @@ impl<H: Handler> EventLoop<H> {
match evt.token() {
NOTIFY => self.notify(handler),
TIMER => self.timer_process(handler),
_ => self.io_event(handler, evt)
_ => self.io_event(handler, evt),
}

i += 1;
Expand All @@ -348,7 +369,12 @@ impl<H: Handler> EventLoop<H> {
}

// Re-register
let _ = self.poll.reregister(&self.notify_rx, NOTIFY, Ready::readable(), PollOpt::edge() | PollOpt::oneshot());
let _ = self.poll.reregister(
&self.notify_rx,
NOTIFY,
Ready::readable(),
PollOpt::edge() | PollOpt::oneshot(),
);
}

fn timer_process(&mut self, handler: &mut H) {
Expand All @@ -370,7 +396,7 @@ impl<H: Handler> fmt::Debug for EventLoop<H> {

/// Sends messages to the EventLoop from other threads.
pub struct Sender<M> {
tx: channel::SyncSender<M>
tx: channel::SyncSender<M>,
}

impl<M> fmt::Debug for Sender<M> {
Expand All @@ -379,9 +405,11 @@ impl<M> fmt::Debug for Sender<M> {
}
}

impl<M> Clone for Sender <M> {
impl<M> Clone for Sender<M> {
fn clone(&self) -> Sender<M> {
Sender { tx: self.tx.clone() }
Sender {
tx: self.tx.clone(),
}
}
}

Expand Down
17 changes: 6 additions & 11 deletions src/deprecated/handler.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use deprecated::EventLoop;
use {Ready, Token};
use deprecated::{EventLoop};

#[allow(unused_variables)]
pub trait Handler: Sized {
Expand All @@ -16,22 +16,17 @@ pub trait Handler: Sized {
///
/// This function will only be invoked a single time per socket per event
/// loop tick.
fn ready(&mut self, event_loop: &mut EventLoop<Self>, token: Token, events: Ready) {
}
fn ready(&mut self, event_loop: &mut EventLoop<Self>, token: Token, events: Ready) {}

/// Invoked when a message has been received via the event loop's channel.
fn notify(&mut self, event_loop: &mut EventLoop<Self>, msg: Self::Message) {
}
fn notify(&mut self, event_loop: &mut EventLoop<Self>, msg: Self::Message) {}

/// Invoked when a timeout has completed.
fn timeout(&mut self, event_loop: &mut EventLoop<Self>, timeout: Self::Timeout) {
}
fn timeout(&mut self, event_loop: &mut EventLoop<Self>, timeout: Self::Timeout) {}

/// Invoked when `EventLoop` has been interrupted by a signal interrupt.
fn interrupted(&mut self, event_loop: &mut EventLoop<Self>) {
}
fn interrupted(&mut self, event_loop: &mut EventLoop<Self>) {}

/// Invoked at the end of an event loop tick.
fn tick(&mut self, event_loop: &mut EventLoop<Self>) {
}
fn tick(&mut self, event_loop: &mut EventLoop<Self>) {}
}
2 changes: 1 addition & 1 deletion src/deprecated/io.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use ::io::MapNonBlock;
use io::MapNonBlock;
use std::io::{self, Read, Write};

pub trait TryRead {
Expand Down
Loading

0 comments on commit fe3a06e

Please sign in to comment.