Skip to content

Commit

Permalink
general: feature gate every platform under tokio and smol
Browse files Browse the repository at this point in the history
  • Loading branch information
jxs committed Oct 17, 2022
1 parent acf95c3 commit 4818613
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 70 deletions.
8 changes: 5 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [2.1.0] - [unreleased]
## [3.0.0] - [unreleased]

### Added
- Allow opting for Tokio instead of smol for the `AsyncSocket`. See [PR 27](https://github.com/mxinden/if-watch/pull/27).
### Changed
- Feature gate async runtime, allowing opting between Tokio or smol. For every OS each `IfWatcher` is
under the `tokio` or `smol` module. This makes it a breaking change as there
is no more a default implementation. See [PR 27](https://github.com/mxinden/if-watch/pull/27).

## [2.0.0]

Expand Down
10 changes: 7 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "if-watch"
version = "2.0.0"
version = "3.0.0"
authors = ["David Craven <david@craven.ch>", "Parity Technologies Limited <admin@parity.io>"]
edition = "2021"
keywords = ["asynchronous", "routing"]
Expand All @@ -10,8 +10,8 @@ repository = "https://github.com/mxinden/if-watch"

[features]
default = ["smol"]
tokio = ["rtnetlink/tokio_socket"]
smol = ["rtnetlink/smol_socket"]
tokio = ["dep:tokio", "rtnetlink/tokio_socket"]
smol = ["dep:smol", "rtnetlink/smol_socket"]

[dependencies]
fnv = "1.0.7"
Expand All @@ -26,6 +26,8 @@ rtnetlink = { version = "0.10.0", default-features = false }
core-foundation = "0.9.2"
if-addrs = "0.7.0"
system-configuration = "0.5.0"
tokio = { version = "1.21.2", features = ["rt"], optional = true }
smol = { version = "1.2.5", optional = true }

[target.'cfg(target_os = "windows")'.dependencies]
if-addrs = "0.7.0"
Expand All @@ -37,3 +39,5 @@ if-addrs = "0.7.0"

[dev-dependencies]
env_logger = "0.9.0"
smol = "1.2.5"
tokio = { version = "1.21.2", features = ["rt", "macros"] }
4 changes: 2 additions & 2 deletions examples/if_watch.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use futures::StreamExt;
use if_watch::IfWatcher;
use if_watch::smol::IfWatcher;

fn main() {
env_logger::init();
futures::executor::block_on(async {
smol::block_on(async {
let mut set = IfWatcher::new().unwrap();
loop {
let event = set.select_next_some().await;
Expand Down
93 changes: 89 additions & 4 deletions src/apple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,95 @@ use core_foundation::runloop::{kCFRunLoopCommonModes, CFRunLoop};
use core_foundation::string::CFString;
use fnv::FnvHashSet;
use futures::channel::mpsc;
use futures::stream::Stream;
use futures::stream::{FusedStream, Stream};
use futures::Future;
use if_addrs::IfAddr;
use std::collections::VecDeque;
use std::io::Result;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use system_configuration::dynamic_store::{
SCDynamicStore, SCDynamicStoreBuilder, SCDynamicStoreCallBackContext,
};

#[cfg(feature = "tokio")]
pub mod tokio {
//! An interface watcher that uses the `tokio` runtime.
use futures::Future;

#[doc(hidden)]
pub struct TokioRuntime;

impl super::Runtime for TokioRuntime {
fn spawn<F>(f: F)
where
F: Future,
F: Send + 'static,
<F as Future>::Output: Send + 'static,
{
tokio::spawn(f);
}
}

/// Watches for interface changes.
pub type IfWatcher = super::IfWatcher<TokioRuntime>;
}

#[cfg(feature = "smol")]
pub mod smol {
//! An interface watcher that uses the `smol` runtime.

use futures::Future;

#[doc(hidden)]
pub struct SmolRuntime;

impl super::Runtime for SmolRuntime {
fn spawn<F>(f: F)
where
F: Future,
F: Send + 'static,
<F as Future>::Output: Send + 'static,
{
smol::spawn(f).detach();
}
}

/// Watches for interface changes.
pub type IfWatcher = super::IfWatcher<SmolRuntime>;
}

#[derive(Debug)]
pub struct IfWatcher {
pub struct IfWatcher<T> {
addrs: FnvHashSet<IpNet>,
queue: VecDeque<IfEvent>,
rx: mpsc::Receiver<()>,
runtime: PhantomData<T>,
}

impl IfWatcher {
#[doc(hidden)]
pub trait Runtime {
fn spawn<F>(f: F)
where
F: Future,
F: Send + 'static,
<F as Future>::Output: Send + 'static;
}

impl<T> IfWatcher<T>
where
T: Runtime,
{
/// Create a watcher.
pub fn new() -> Result<Self> {
let (tx, rx) = mpsc::channel(1);
std::thread::spawn(|| background_task(tx));
T::spawn(async { background_task(tx) });
let mut watcher = Self {
addrs: Default::default(),
queue: Default::default(),
rx,
runtime: PhantomData,
};
watcher.resync()?;
Ok(watcher)
Expand All @@ -55,10 +119,12 @@ impl IfWatcher {
Ok(())
}

/// Iterate over current networks.
pub fn iter(&self) -> impl Iterator<Item = &IpNet> {
self.addrs.iter()
}

/// Poll for an address change event.
pub fn poll_if_event(&mut self, cx: &mut Context) -> Poll<Result<IfEvent>> {
loop {
if let Some(event) = self.queue.pop_front() {
Expand All @@ -74,6 +140,25 @@ impl IfWatcher {
}
}

impl<T> Stream for IfWatcher<T>
where
T: Runtime + Unpin,
{
type Item = Result<IfEvent>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::into_inner(self).poll_if_event(cx).map(Some)
}
}

impl<T> FusedStream for IfWatcher<T>
where
T: Runtime + Unpin,
{
fn is_terminated(&self) -> bool {
false
}
}

fn ifaddr_to_ipnet(addr: IfAddr) -> IpNet {
match addr {
IfAddr::V4(ip) => {
Expand Down
39 changes: 37 additions & 2 deletions src/fallback.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::IfEvent;
use async_io::Timer;
use futures::stream::Stream;
use futures::stream::{FusedStream, Stream};
use if_addrs::IfAddr;
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
use std::collections::{HashSet, VecDeque};
Expand All @@ -10,6 +10,26 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

#[cfg(feature = "tokio")]
pub mod tokio {
//! An interface watcher.
//! **On this platform there is no difference between `tokio` and `smol` features,**
//! **this was done to maintain the api compatible with other platforms**.

/// Watches for interface changes.
pub type IfWatcher = super::IfWatcher;
}

#[cfg(feature = "smol")]
pub mod smol {
//! An interface watcher.
//! **On this platform there is no difference between `tokio` and `smol` features,**
//! **this was done to maintain the api compatible with other platforms**.

/// Watches for interface changes.
pub type IfWatcher = super::IfWatcher;
}

/// An address set/watcher
#[derive(Debug)]
pub struct IfWatcher {
Expand All @@ -19,7 +39,7 @@ pub struct IfWatcher {
}

impl IfWatcher {
/// Create a watcher
/// Create a watcher.
pub fn new() -> Result<Self> {
Ok(Self {
addrs: Default::default(),
Expand All @@ -45,10 +65,12 @@ impl IfWatcher {
Ok(())
}

/// Iterate over current networks.
pub fn iter(&self) -> impl Iterator<Item = &IpNet> {
self.addrs.iter()
}

/// Poll for an address change event.
pub fn poll_if_event(&mut self, cx: &mut Context) -> Poll<Result<IfEvent>> {
loop {
if let Some(event) = self.queue.pop_front() {
Expand All @@ -64,6 +86,19 @@ impl IfWatcher {
}
}

impl Stream for IfWatcher {
type Item = Result<IfEvent>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::into_inner(self).poll_if_event(cx).map(Some)
}
}

impl FusedStream for IfWatcher {
fn is_terminated(&self) -> bool {
false
}
}

fn ifaddr_to_ipnet(addr: IfAddr) -> IpNet {
match addr {
IfAddr::V4(ip) => {
Expand Down
Loading

0 comments on commit 4818613

Please sign in to comment.