Skip to content

Commit

Permalink
general: feature gate every platform under tokio and smol
Browse files Browse the repository at this point in the history
  • Loading branch information
jxs committed Oct 17, 2022
1 parent acf95c3 commit 7d6efc4
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 68 deletions.
8 changes: 5 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [2.1.0] - [unreleased]
## [3.0.0] - [unreleased]

### Added
- Allow opting for Tokio instead of smol for the `AsyncSocket`. See [PR 27](https://github.com/mxinden/if-watch/pull/27).
### Changed
- Feature gate async runtime, allowing opting between Tokio or smol. For every OS each `IfWatcher` is
under the `tokio` or `smol` module. This makes it a breaking change as there
is no more a default implementation. See [PR 27](https://github.com/mxinden/if-watch/pull/27).

## [2.0.0]

Expand Down
10 changes: 7 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "if-watch"
version = "2.0.0"
version = "3.0.0"
authors = ["David Craven <david@craven.ch>", "Parity Technologies Limited <admin@parity.io>"]
edition = "2021"
keywords = ["asynchronous", "routing"]
Expand All @@ -10,8 +10,8 @@ repository = "https://github.com/mxinden/if-watch"

[features]
default = ["smol"]
tokio = ["rtnetlink/tokio_socket"]
smol = ["rtnetlink/smol_socket"]
tokio = ["dep:tokio", "rtnetlink/tokio_socket"]
smol = ["dep:smol", "rtnetlink/smol_socket"]

[dependencies]
fnv = "1.0.7"
Expand All @@ -26,6 +26,8 @@ rtnetlink = { version = "0.10.0", default-features = false }
core-foundation = "0.9.2"
if-addrs = "0.7.0"
system-configuration = "0.5.0"
tokio = { version = "1.21.2", features = ["rt"], optional = true }
smol = { version = "1.2.5", optional = true }

[target.'cfg(target_os = "windows")'.dependencies]
if-addrs = "0.7.0"
Expand All @@ -37,3 +39,5 @@ if-addrs = "0.7.0"

[dev-dependencies]
env_logger = "0.9.0"
smol = "1.2.5"
tokio = { version = "1.21.2", features = ["rt", "macros"] }
4 changes: 2 additions & 2 deletions examples/if_watch.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use futures::StreamExt;
use if_watch::IfWatcher;
use if_watch::smol::IfWatcher;

fn main() {
env_logger::init();
futures::executor::block_on(async {
smol::block_on(async {
let mut set = IfWatcher::new().unwrap();
loop {
let event = set.select_next_some().await;
Expand Down
90 changes: 86 additions & 4 deletions src/apple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,94 @@ use core_foundation::runloop::{kCFRunLoopCommonModes, CFRunLoop};
use core_foundation::string::CFString;
use fnv::FnvHashSet;
use futures::channel::mpsc;
use futures::stream::Stream;
use futures::stream::{FusedStream, Stream};
use futures::Future;
use if_addrs::IfAddr;
use std::collections::VecDeque;
use std::io::Result;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use system_configuration::dynamic_store::{
SCDynamicStore, SCDynamicStoreBuilder, SCDynamicStoreCallBackContext,
};

#[cfg(feature = "tokio")]
pub mod tokio {
//! An interface watcher that uses the `tokio` runtime.
use futures::Future;

#[doc(hidden)]
pub struct TokioRuntime;

impl super::Runtime for TokioRuntime {
fn spawn<F>(f: F)
where
F: Future,
F: Send + 'static,
<F as Future>::Output: Send + 'static,
{
tokio::spawn(f);
}
}

/// Watches for interface changes.
pub type IfWatcher = super::IfWatcher<TokioRuntime>;
}

#[cfg(feature = "smol")]
pub mod smol {
//! An interface watcher that uses the `smol` runtime.

use futures::Future;

#[doc(hidden)]
pub struct SmolRuntime;

impl super::Runtime for SmolRuntime {
fn spawn<F>(f: F)
where
F: Future,
F: Send + 'static,
<F as Future>::Output: Send + 'static,
{
smol::spawn(f).detach();
}
}

/// Watches for interface changes.
pub type IfWatcher = super::IfWatcher<SmolRuntime>;
}

#[derive(Debug)]
pub struct IfWatcher {
pub struct IfWatcher<T> {
addrs: FnvHashSet<IpNet>,
queue: VecDeque<IfEvent>,
rx: mpsc::Receiver<()>,
runtime: PhantomData<T>,
}

impl IfWatcher {
#[doc(hidden)]
pub trait Runtime {
fn spawn<F>(f: F)
where
F: Future,
F: Send + 'static,
<F as Future>::Output: Send + 'static;
}

impl<T> IfWatcher<T>
where
T: Runtime,
{
pub fn new() -> Result<Self> {
let (tx, rx) = mpsc::channel(1);
std::thread::spawn(|| background_task(tx));
T::spawn(async { background_task(tx) });
let mut watcher = Self {
addrs: Default::default(),
queue: Default::default(),
rx,
runtime: PhantomData,
};
watcher.resync()?;
Ok(watcher)
Expand Down Expand Up @@ -74,6 +137,25 @@ impl IfWatcher {
}
}

impl<T> Stream for IfWatcher<T>
where
T: Runtime + Unpin,
{
type Item = Result<IfEvent>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::into_inner(self).poll_if_event(cx).map(Some)
}
}

impl<T> FusedStream for IfWatcher<T>
where
T: Runtime + Unpin,
{
fn is_terminated(&self) -> bool {
false
}
}

fn ifaddr_to_ipnet(addr: IfAddr) -> IpNet {
match addr {
IfAddr::V4(ip) => {
Expand Down
35 changes: 34 additions & 1 deletion src/fallback.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::IfEvent;
use async_io::Timer;
use futures::stream::Stream;
use futures::stream::{FusedStream, Stream};
use if_addrs::IfAddr;
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
use std::collections::{HashSet, VecDeque};
Expand All @@ -10,6 +10,26 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

#[cfg(feature = "tokio")]
pub mod tokio {
//! An interface watcher.
//! **On this platform there is no difference between `tokio` and `smol` features,**
//! **this was done to maintain the api compatible with other platforms**.

/// Watches for interface changes.
pub type IfWatcher = super::IfWatcher;
}

#[cfg(feature = "smol")]
pub mod smol {
//! An interface watcher.
//! **On this platform there is no difference between `tokio` and `smol` features,**
//! **this was done to maintain the api compatible with other platforms**.

/// Watches for interface changes.
pub type IfWatcher = super::IfWatcher;
}

/// An address set/watcher
#[derive(Debug)]
pub struct IfWatcher {
Expand Down Expand Up @@ -64,6 +84,19 @@ impl IfWatcher {
}
}

impl Stream for IfWatcher {
type Item = Result<IfEvent>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::into_inner(self).poll_if_event(cx).map(Some)
}
}

impl FusedStream for IfWatcher {
fn is_terminated(&self) -> bool {
false
}
}

fn ifaddr_to_ipnet(addr: IfAddr) -> IpNet {
match addr {
IfAddr::V4(ip) => {
Expand Down
98 changes: 43 additions & 55 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,7 @@
#![deny(missing_docs)]
#![deny(warnings)]

#[cfg(not(target_os = "linux"))]
use futures::stream::{FusedStream, Stream};
pub use ipnet::{IpNet, Ipv4Net, Ipv6Net};
#[cfg(not(target_os = "linux"))]
use std::{
io::Result,
pin::Pin,
task::{Context, Poll},
};

#[cfg(target_os = "macos")]
mod apple;
Expand All @@ -28,8 +20,14 @@ mod linux;
#[cfg(target_os = "windows")]
mod win;

#[cfg(target_os = "macos")]
use apple as platform_impl;
#[cfg(any(target_os = "macos", target_os = "ios"))]
#[cfg(feature = "tokio")]
pub use apple::tokio;

#[cfg(any(target_os = "macos", target_os = "ios"))]
#[cfg(feature = "smol")]
pub use apple::smol;

#[cfg(target_os = "ios")]
use apple as platform_impl;
#[cfg(not(any(
Expand All @@ -39,16 +37,22 @@ use apple as platform_impl;
target_os = "windows",
)))]
use fallback as platform_impl;

#[cfg(target_os = "windows")]
use win as platform_impl;
#[cfg(feature = "tokio")]
pub use win::tokio;

#[cfg(target_os = "windows")]
#[cfg(feature = "smol")]
pub use win::smol;

#[cfg(target_os = "linux")]
#[cfg(feature = "tokio")]
pub use linux::tokio;

#[cfg(target_os = "linux")]
#[cfg(feature = "smol")]
pub use linux::smol::IfWatcher;
pub use linux::smol;

/// An address change event.
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
Expand All @@ -59,66 +63,50 @@ pub enum IfEvent {
Down(IpNet),
}

/// Watches for interface changes.
#[cfg(not(target_os = "linux"))]
#[derive(Debug)]
pub struct IfWatcher(platform_impl::IfWatcher);

#[cfg(not(target_os = "linux"))]
impl IfWatcher {
/// Create a watcher.
pub fn new() -> Result<Self> {
platform_impl::IfWatcher::new().map(Self)
}

/// Iterate over current networks.
pub fn iter(&self) -> impl Iterator<Item = &IpNet> {
self.0.iter()
}

/// Poll for an address change event.
pub fn poll_if_event(&mut self, cx: &mut Context) -> Poll<Result<IfEvent>> {
self.0.poll_if_event(cx)
}
}

#[cfg(not(target_os = "linux"))]
impl Stream for IfWatcher {
type Item = Result<IfEvent>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::into_inner(self).poll_if_event(cx).map(Some)
}
}

#[cfg(not(target_os = "linux"))]
impl FusedStream for IfWatcher {
fn is_terminated(&self) -> bool {
false
}
}

#[cfg(test)]
mod tests {
use super::IfWatcher;
use futures::StreamExt;
use std::pin::Pin;

#[test]
fn test_ip_watch() {
futures::executor::block_on(async {
fn test_smol_ip_watch() {
use super::smol::IfWatcher;

smol::block_on(async {
let mut set = IfWatcher::new().unwrap();
let event = set.select_next_some().await.unwrap();
println!("Got event {:?}", event);
});
}

#[tokio::test]
async fn test_tokio_ip_watch() {
use super::tokio::IfWatcher;

let mut set = IfWatcher::new().unwrap();
let event = set.select_next_some().await.unwrap();
println!("Got event {:?}", event);
}

#[test]
fn test_is_send() {
futures::executor::block_on(async {
fn test_smol_is_send() {
use super::smol::IfWatcher;

smol::block_on(async {
fn is_send<T: Send>(_: T) {}
is_send(IfWatcher::new());
is_send(IfWatcher::new().unwrap());
is_send(Pin::new(&mut IfWatcher::new().unwrap()));
});
}

#[tokio::test]
async fn test_tokio_is_send() {
use super::tokio::IfWatcher;

fn is_send<T: Send>(_: T) {}
is_send(IfWatcher::new());
is_send(IfWatcher::new().unwrap());
is_send(Pin::new(&mut IfWatcher::new().unwrap()));
}
}
Loading

0 comments on commit 7d6efc4

Please sign in to comment.