From 824b0ad95ecd48f8799ca7a3f570d4badb831100 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Tue, 27 Sep 2022 15:45:49 +0100 Subject: [PATCH 01/10] linux: Feature gate async runtime, allowing opting between Tokio or smol --- Cargo.toml | 7 ++++++- src/linux.rs | 13 ++++++++++--- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0f6039a..2c19435 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,11 @@ license = "MIT OR Apache-2.0" description = "crossplatform asynchronous network watcher" repository = "https://github.com/mxinden/if-watch" +[features] +default = ["smol_socket"] +tokio = ["rtnetlink/tokio_socket"] +smol_socket = ["rtnetlink/smol_socket"] + [dependencies] fnv = "1.0.7" futures = "0.3.19" @@ -15,7 +20,7 @@ ipnet = "2.3.1" log = "0.4.14" [target.'cfg(target_os = "linux")'.dependencies] -rtnetlink = { version = "0.10.0", default-features = false, features = ["smol_socket"] } +rtnetlink = { version = "0.10.0", default-features = false } [target.'cfg(any(target_os = "macos", target_os = "ios"))'.dependencies] core-foundation = "0.9.2" diff --git a/src/linux.rs b/src/linux.rs index 0494c59..7c67951 100644 --- a/src/linux.rs +++ b/src/linux.rs @@ -7,7 +7,14 @@ use rtnetlink::constants::{RTMGRP_IPV4_IFADDR, RTMGRP_IPV6_IFADDR}; use rtnetlink::packet::address::nlas::Nla; use rtnetlink::packet::{AddressMessage, RtnlMessage}; use rtnetlink::proto::{Connection, NetlinkPayload}; -use rtnetlink::sys::{AsyncSocket, SmolSocket, SocketAddr}; +use rtnetlink::sys::{AsyncSocket, SocketAddr}; +// If "only" tokio or smol+tokio are enabled, we use tokio, +// as smol is activated via default features. +#[cfg(feature = "tokio_socket")] +use rtnetlink::sys::TokioSocket as Socket; +// Otherwise, when only smol enabled, we use smol. +#[cfg(all(not(feature = "tokio_socket"), feature = "smol_socket"))] +use rtnetlink::sys::SmolSocket as Socket; use std::collections::VecDeque; use std::future::Future; use std::io::{Error, ErrorKind, Result}; @@ -16,7 +23,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; pub struct IfWatcher { - conn: Connection, + conn: Connection, messages: Pin> + Send>>, addrs: FnvHashSet, queue: VecDeque, @@ -32,7 +39,7 @@ impl std::fmt::Debug for IfWatcher { impl IfWatcher { pub fn new() -> Result { - let (mut conn, handle, messages) = rtnetlink::new_connection_with_socket::()?; + let (mut conn, handle, messages) = rtnetlink::new_connection_with_socket::()?; let groups = RTMGRP_IPV4_IFADDR | RTMGRP_IPV6_IFADDR; let addr = SocketAddr::new(0, groups); conn.socket_mut().socket_mut().bind(&addr)?; From 82e2cc9e007bfcbd3fcd590b221d68c049dabe74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Sat, 1 Oct 2022 15:36:10 +0100 Subject: [PATCH 02/10] linux: make features trully addictive, by making smol the default `IfWatcher` and introducing `TokioIfWatcher`. --- CHANGELOG.md | 5 +++++ src/lib.rs | 32 +++++++++++++++++++++--------- src/linux.rs | 56 ++++++++++++++++++++++++++++++++++++++++------------ 3 files changed, 71 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e859d58..1766f59 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [2.1.0] - [unreleased] + +### Added +- Allow opting for Tokio instead of smol for the `AsyncSocket`. See [PR 27](https://github.com/mxinden/if-watch/pull/27). + ## [2.0.0] ### Changed diff --git a/src/lib.rs b/src/lib.rs index 2067560..60653ed 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,12 +2,15 @@ #![deny(missing_docs)] #![deny(warnings)] -use futures::stream::FusedStream; -use futures::Stream; +#[cfg(not(target_os = "linux"))] +use futures::stream::{FusedStream, Stream}; pub use ipnet::{IpNet, Ipv4Net, Ipv6Net}; -use std::io::Result; -use std::pin::Pin; -use std::task::{Context, Poll}; +#[cfg(not(target_os = "linux"))] +use std::{ + io::Result, + pin::Pin, + task::{Context, Poll}, +}; #[cfg(target_os = "macos")] mod apple; @@ -36,11 +39,17 @@ use apple as platform_impl; target_os = "windows", )))] use fallback as platform_impl; -#[cfg(target_os = "linux")] -use linux as platform_impl; #[cfg(target_os = "windows")] use win as platform_impl; +#[cfg(target_os = "linux")] +#[cfg(feature = "tokio_socket")] +pub use linux::TokioIfWatch; + +#[cfg(target_os = "linux")] +#[cfg(feature = "smol_socket")] +pub use linux::SmolIfWatcher as IfWatcher; + /// An address change event. #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub enum IfEvent { @@ -51,11 +60,13 @@ pub enum IfEvent { } /// Watches for interface changes. +#[cfg(not(target_os = "linux"))] #[derive(Debug)] pub struct IfWatcher(platform_impl::IfWatcher); +#[cfg(not(target_os = "linux"))] impl IfWatcher { - /// Create a watcher + /// Create a watcher. pub fn new() -> Result { platform_impl::IfWatcher::new().map(Self) } @@ -71,6 +82,7 @@ impl IfWatcher { } } +#[cfg(not(target_os = "linux"))] impl Stream for IfWatcher { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -78,6 +90,7 @@ impl Stream for IfWatcher { } } +#[cfg(not(target_os = "linux"))] impl FusedStream for IfWatcher { fn is_terminated(&self) -> bool { false @@ -86,7 +99,8 @@ impl FusedStream for IfWatcher { #[cfg(test)] mod tests { - use super::*; + use super::IfWatcher; + use std::pin::Pin; use futures::StreamExt; #[test] diff --git a/src/linux.rs b/src/linux.rs index 7c67951..c309b2e 100644 --- a/src/linux.rs +++ b/src/linux.rs @@ -1,20 +1,17 @@ use crate::{IfEvent, IpNet, Ipv4Net, Ipv6Net}; use fnv::FnvHashSet; use futures::ready; -use futures::stream::{Stream, TryStreamExt}; +use futures::stream::{FusedStream, Stream, TryStreamExt}; use futures::StreamExt; use rtnetlink::constants::{RTMGRP_IPV4_IFADDR, RTMGRP_IPV6_IFADDR}; use rtnetlink::packet::address::nlas::Nla; use rtnetlink::packet::{AddressMessage, RtnlMessage}; use rtnetlink::proto::{Connection, NetlinkPayload}; -use rtnetlink::sys::{AsyncSocket, SocketAddr}; -// If "only" tokio or smol+tokio are enabled, we use tokio, -// as smol is activated via default features. +#[cfg(feature = "smol_socket")] +use rtnetlink::sys::SmolSocket; #[cfg(feature = "tokio_socket")] -use rtnetlink::sys::TokioSocket as Socket; -// Otherwise, when only smol enabled, we use smol. -#[cfg(all(not(feature = "tokio_socket"), feature = "smol_socket"))] -use rtnetlink::sys::SmolSocket as Socket; +use rtnetlink::sys::TokioSocket; +use rtnetlink::sys::{AsyncSocket, SocketAddr}; use std::collections::VecDeque; use std::future::Future; use std::io::{Error, ErrorKind, Result}; @@ -22,14 +19,22 @@ use std::net::{Ipv4Addr, Ipv6Addr}; use std::pin::Pin; use std::task::{Context, Poll}; -pub struct IfWatcher { - conn: Connection, +#[cfg(feature = "tokio_socket")] +/// Watches for interface changes. +pub type TokioIfWatcher = IfWatcher; + +#[cfg(feature = "smol_socket")] +/// Watches for interface changes. +pub type SmolIfWatcher = IfWatcher; + +pub struct IfWatcher { + conn: Connection, messages: Pin> + Send>>, addrs: FnvHashSet, queue: VecDeque, } -impl std::fmt::Debug for IfWatcher { +impl std::fmt::Debug for IfWatcher { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { f.debug_struct("IfWatcher") .field("addrs", &self.addrs) @@ -37,9 +42,13 @@ impl std::fmt::Debug for IfWatcher { } } -impl IfWatcher { +impl IfWatcher +where + T: AsyncSocket + Unpin, +{ + /// Create a watcher. pub fn new() -> Result { - let (mut conn, handle, messages) = rtnetlink::new_connection_with_socket::()?; + let (mut conn, handle, messages) = rtnetlink::new_connection_with_socket::()?; let groups = RTMGRP_IPV4_IFADDR | RTMGRP_IPV6_IFADDR; let addr = SocketAddr::new(0, groups); conn.socket_mut().socket_mut().bind(&addr)?; @@ -67,6 +76,7 @@ impl IfWatcher { }) } + /// Iterate over current networks. pub fn iter(&self) -> impl Iterator { self.addrs.iter() } @@ -87,6 +97,7 @@ impl IfWatcher { } } + /// Poll for an address change event. pub fn poll_if_event(&mut self, cx: &mut Context) -> Poll> { loop { if let Some(event) = self.queue.pop_front() { @@ -136,3 +147,22 @@ fn iter_nets(msg: AddressMessage) -> impl Iterator { } }) } + +impl Stream for IfWatcher +where + T: AsyncSocket + Unpin, +{ + type Item = Result; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::into_inner(self).poll_if_event(cx).map(Some) + } +} + +impl FusedStream for IfWatcher +where + T: AsyncSocket + AsyncSocket + Unpin, +{ + fn is_terminated(&self) -> bool { + false + } +} From acf95c325d0097543d8c2eca85c526702eedfa25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Mon, 3 Oct 2022 16:22:42 +0100 Subject: [PATCH 03/10] linux: encapsulate runtimes in their own modules --- Cargo.toml | 4 ++-- src/lib.rs | 10 +++++----- src/linux.rs | 26 ++++++++++++++++---------- 3 files changed, 23 insertions(+), 17 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2c19435..46c3c6c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,9 +9,9 @@ description = "crossplatform asynchronous network watcher" repository = "https://github.com/mxinden/if-watch" [features] -default = ["smol_socket"] +default = ["smol"] tokio = ["rtnetlink/tokio_socket"] -smol_socket = ["rtnetlink/smol_socket"] +smol = ["rtnetlink/smol_socket"] [dependencies] fnv = "1.0.7" diff --git a/src/lib.rs b/src/lib.rs index 60653ed..fd79b30 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,12 +43,12 @@ use fallback as platform_impl; use win as platform_impl; #[cfg(target_os = "linux")] -#[cfg(feature = "tokio_socket")] -pub use linux::TokioIfWatch; +#[cfg(feature = "tokio")] +pub use linux::tokio; #[cfg(target_os = "linux")] -#[cfg(feature = "smol_socket")] -pub use linux::SmolIfWatcher as IfWatcher; +#[cfg(feature = "smol")] +pub use linux::smol::IfWatcher; /// An address change event. #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] @@ -100,8 +100,8 @@ impl FusedStream for IfWatcher { #[cfg(test)] mod tests { use super::IfWatcher; - use std::pin::Pin; use futures::StreamExt; + use std::pin::Pin; #[test] fn test_ip_watch() { diff --git a/src/linux.rs b/src/linux.rs index c309b2e..6fa3ab1 100644 --- a/src/linux.rs +++ b/src/linux.rs @@ -7,10 +7,6 @@ use rtnetlink::constants::{RTMGRP_IPV4_IFADDR, RTMGRP_IPV6_IFADDR}; use rtnetlink::packet::address::nlas::Nla; use rtnetlink::packet::{AddressMessage, RtnlMessage}; use rtnetlink::proto::{Connection, NetlinkPayload}; -#[cfg(feature = "smol_socket")] -use rtnetlink::sys::SmolSocket; -#[cfg(feature = "tokio_socket")] -use rtnetlink::sys::TokioSocket; use rtnetlink::sys::{AsyncSocket, SocketAddr}; use std::collections::VecDeque; use std::future::Future; @@ -19,13 +15,23 @@ use std::net::{Ipv4Addr, Ipv6Addr}; use std::pin::Pin; use std::task::{Context, Poll}; -#[cfg(feature = "tokio_socket")] -/// Watches for interface changes. -pub type TokioIfWatcher = IfWatcher; +#[cfg(feature = "tokio")] +pub mod tokio { + //! An interface watcher that uses `rtnetlink`'s [`TokioSocket`](rtnetlink::sys::TokioSocket) + use rtnetlink::sys::TokioSocket; -#[cfg(feature = "smol_socket")] -/// Watches for interface changes. -pub type SmolIfWatcher = IfWatcher; + /// Watches for interface changes. + pub type IfWatcher = super::IfWatcher; +} + +#[cfg(feature = "smol")] +pub mod smol { + //! An interface watcher that uses `rtnetlink`'s [`SmolSocket`](rtnetlink::sys::SmolSocket) + use rtnetlink::sys::SmolSocket; + + /// Watches for interface changes. + pub type IfWatcher = super::IfWatcher; +} pub struct IfWatcher { conn: Connection, From 4818613844ad3bdfe8389407e13f3d68440f3cc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Mon, 17 Oct 2022 19:35:37 +0100 Subject: [PATCH 04/10] general: feature gate every platform under tokio and smol --- CHANGELOG.md | 8 ++-- Cargo.toml | 10 +++-- examples/if_watch.rs | 4 +- src/apple.rs | 93 +++++++++++++++++++++++++++++++++++++++-- src/fallback.rs | 39 +++++++++++++++++- src/lib.rs | 98 +++++++++++++++++++------------------------- src/win.rs | 39 +++++++++++++++++- 7 files changed, 221 insertions(+), 70 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1766f59..744b70b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,10 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [2.1.0] - [unreleased] +## [3.0.0] - [unreleased] -### Added -- Allow opting for Tokio instead of smol for the `AsyncSocket`. See [PR 27](https://github.com/mxinden/if-watch/pull/27). +### Changed +- Feature gate async runtime, allowing opting between Tokio or smol. For every OS each `IfWatcher` is + under the `tokio` or `smol` module. This makes it a breaking change as there + is no more a default implementation. See [PR 27](https://github.com/mxinden/if-watch/pull/27). ## [2.0.0] diff --git a/Cargo.toml b/Cargo.toml index 46c3c6c..5369884 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "if-watch" -version = "2.0.0" +version = "3.0.0" authors = ["David Craven ", "Parity Technologies Limited "] edition = "2021" keywords = ["asynchronous", "routing"] @@ -10,8 +10,8 @@ repository = "https://github.com/mxinden/if-watch" [features] default = ["smol"] -tokio = ["rtnetlink/tokio_socket"] -smol = ["rtnetlink/smol_socket"] +tokio = ["dep:tokio", "rtnetlink/tokio_socket"] +smol = ["dep:smol", "rtnetlink/smol_socket"] [dependencies] fnv = "1.0.7" @@ -26,6 +26,8 @@ rtnetlink = { version = "0.10.0", default-features = false } core-foundation = "0.9.2" if-addrs = "0.7.0" system-configuration = "0.5.0" +tokio = { version = "1.21.2", features = ["rt"], optional = true } +smol = { version = "1.2.5", optional = true } [target.'cfg(target_os = "windows")'.dependencies] if-addrs = "0.7.0" @@ -37,3 +39,5 @@ if-addrs = "0.7.0" [dev-dependencies] env_logger = "0.9.0" +smol = "1.2.5" +tokio = { version = "1.21.2", features = ["rt", "macros"] } diff --git a/examples/if_watch.rs b/examples/if_watch.rs index 2f8721a..012eed1 100644 --- a/examples/if_watch.rs +++ b/examples/if_watch.rs @@ -1,9 +1,9 @@ use futures::StreamExt; -use if_watch::IfWatcher; +use if_watch::smol::IfWatcher; fn main() { env_logger::init(); - futures::executor::block_on(async { + smol::block_on(async { let mut set = IfWatcher::new().unwrap(); loop { let event = set.select_next_some().await; diff --git a/src/apple.rs b/src/apple.rs index f775058..b76b6c2 100644 --- a/src/apple.rs +++ b/src/apple.rs @@ -4,31 +4,95 @@ use core_foundation::runloop::{kCFRunLoopCommonModes, CFRunLoop}; use core_foundation::string::CFString; use fnv::FnvHashSet; use futures::channel::mpsc; -use futures::stream::Stream; +use futures::stream::{FusedStream, Stream}; +use futures::Future; use if_addrs::IfAddr; use std::collections::VecDeque; use std::io::Result; +use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; use system_configuration::dynamic_store::{ SCDynamicStore, SCDynamicStoreBuilder, SCDynamicStoreCallBackContext, }; +#[cfg(feature = "tokio")] +pub mod tokio { + //! An interface watcher that uses the `tokio` runtime. + use futures::Future; + + #[doc(hidden)] + pub struct TokioRuntime; + + impl super::Runtime for TokioRuntime { + fn spawn(f: F) + where + F: Future, + F: Send + 'static, + ::Output: Send + 'static, + { + tokio::spawn(f); + } + } + + /// Watches for interface changes. + pub type IfWatcher = super::IfWatcher; +} + +#[cfg(feature = "smol")] +pub mod smol { + //! An interface watcher that uses the `smol` runtime. + + use futures::Future; + + #[doc(hidden)] + pub struct SmolRuntime; + + impl super::Runtime for SmolRuntime { + fn spawn(f: F) + where + F: Future, + F: Send + 'static, + ::Output: Send + 'static, + { + smol::spawn(f).detach(); + } + } + + /// Watches for interface changes. + pub type IfWatcher = super::IfWatcher; +} + #[derive(Debug)] -pub struct IfWatcher { +pub struct IfWatcher { addrs: FnvHashSet, queue: VecDeque, rx: mpsc::Receiver<()>, + runtime: PhantomData, } -impl IfWatcher { +#[doc(hidden)] +pub trait Runtime { + fn spawn(f: F) + where + F: Future, + F: Send + 'static, + ::Output: Send + 'static; +} + +impl IfWatcher +where + T: Runtime, +{ + /// Create a watcher. pub fn new() -> Result { let (tx, rx) = mpsc::channel(1); - std::thread::spawn(|| background_task(tx)); + T::spawn(async { background_task(tx) }); let mut watcher = Self { addrs: Default::default(), queue: Default::default(), rx, + runtime: PhantomData, }; watcher.resync()?; Ok(watcher) @@ -55,10 +119,12 @@ impl IfWatcher { Ok(()) } + /// Iterate over current networks. pub fn iter(&self) -> impl Iterator { self.addrs.iter() } + /// Poll for an address change event. pub fn poll_if_event(&mut self, cx: &mut Context) -> Poll> { loop { if let Some(event) = self.queue.pop_front() { @@ -74,6 +140,25 @@ impl IfWatcher { } } +impl Stream for IfWatcher +where + T: Runtime + Unpin, +{ + type Item = Result; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::into_inner(self).poll_if_event(cx).map(Some) + } +} + +impl FusedStream for IfWatcher +where + T: Runtime + Unpin, +{ + fn is_terminated(&self) -> bool { + false + } +} + fn ifaddr_to_ipnet(addr: IfAddr) -> IpNet { match addr { IfAddr::V4(ip) => { diff --git a/src/fallback.rs b/src/fallback.rs index c05ccdf..a5ec9e7 100644 --- a/src/fallback.rs +++ b/src/fallback.rs @@ -1,6 +1,6 @@ use crate::IfEvent; use async_io::Timer; -use futures::stream::Stream; +use futures::stream::{FusedStream, Stream}; use if_addrs::IfAddr; use ipnet::{IpNet, Ipv4Net, Ipv6Net}; use std::collections::{HashSet, VecDeque}; @@ -10,6 +10,26 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; +#[cfg(feature = "tokio")] +pub mod tokio { + //! An interface watcher. + //! **On this platform there is no difference between `tokio` and `smol` features,** + //! **this was done to maintain the api compatible with other platforms**. + + /// Watches for interface changes. + pub type IfWatcher = super::IfWatcher; +} + +#[cfg(feature = "smol")] +pub mod smol { + //! An interface watcher. + //! **On this platform there is no difference between `tokio` and `smol` features,** + //! **this was done to maintain the api compatible with other platforms**. + + /// Watches for interface changes. + pub type IfWatcher = super::IfWatcher; +} + /// An address set/watcher #[derive(Debug)] pub struct IfWatcher { @@ -19,7 +39,7 @@ pub struct IfWatcher { } impl IfWatcher { - /// Create a watcher + /// Create a watcher. pub fn new() -> Result { Ok(Self { addrs: Default::default(), @@ -45,10 +65,12 @@ impl IfWatcher { Ok(()) } + /// Iterate over current networks. pub fn iter(&self) -> impl Iterator { self.addrs.iter() } + /// Poll for an address change event. pub fn poll_if_event(&mut self, cx: &mut Context) -> Poll> { loop { if let Some(event) = self.queue.pop_front() { @@ -64,6 +86,19 @@ impl IfWatcher { } } +impl Stream for IfWatcher { + type Item = Result; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::into_inner(self).poll_if_event(cx).map(Some) + } +} + +impl FusedStream for IfWatcher { + fn is_terminated(&self) -> bool { + false + } +} + fn ifaddr_to_ipnet(addr: IfAddr) -> IpNet { match addr { IfAddr::V4(ip) => { diff --git a/src/lib.rs b/src/lib.rs index fd79b30..096f129 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,15 +2,7 @@ #![deny(missing_docs)] #![deny(warnings)] -#[cfg(not(target_os = "linux"))] -use futures::stream::{FusedStream, Stream}; pub use ipnet::{IpNet, Ipv4Net, Ipv6Net}; -#[cfg(not(target_os = "linux"))] -use std::{ - io::Result, - pin::Pin, - task::{Context, Poll}, -}; #[cfg(target_os = "macos")] mod apple; @@ -28,8 +20,14 @@ mod linux; #[cfg(target_os = "windows")] mod win; -#[cfg(target_os = "macos")] -use apple as platform_impl; +#[cfg(any(target_os = "macos", target_os = "ios"))] +#[cfg(feature = "tokio")] +pub use apple::tokio; + +#[cfg(any(target_os = "macos", target_os = "ios"))] +#[cfg(feature = "smol")] +pub use apple::smol; + #[cfg(target_os = "ios")] use apple as platform_impl; #[cfg(not(any( @@ -39,8 +37,14 @@ use apple as platform_impl; target_os = "windows", )))] use fallback as platform_impl; + #[cfg(target_os = "windows")] -use win as platform_impl; +#[cfg(feature = "tokio")] +pub use win::tokio; + +#[cfg(target_os = "windows")] +#[cfg(feature = "smol")] +pub use win::smol; #[cfg(target_os = "linux")] #[cfg(feature = "tokio")] @@ -48,7 +52,7 @@ pub use linux::tokio; #[cfg(target_os = "linux")] #[cfg(feature = "smol")] -pub use linux::smol::IfWatcher; +pub use linux::smol; /// An address change event. #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] @@ -59,66 +63,50 @@ pub enum IfEvent { Down(IpNet), } -/// Watches for interface changes. -#[cfg(not(target_os = "linux"))] -#[derive(Debug)] -pub struct IfWatcher(platform_impl::IfWatcher); - -#[cfg(not(target_os = "linux"))] -impl IfWatcher { - /// Create a watcher. - pub fn new() -> Result { - platform_impl::IfWatcher::new().map(Self) - } - - /// Iterate over current networks. - pub fn iter(&self) -> impl Iterator { - self.0.iter() - } - - /// Poll for an address change event. - pub fn poll_if_event(&mut self, cx: &mut Context) -> Poll> { - self.0.poll_if_event(cx) - } -} - -#[cfg(not(target_os = "linux"))] -impl Stream for IfWatcher { - type Item = Result; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::into_inner(self).poll_if_event(cx).map(Some) - } -} - -#[cfg(not(target_os = "linux"))] -impl FusedStream for IfWatcher { - fn is_terminated(&self) -> bool { - false - } -} - #[cfg(test)] mod tests { - use super::IfWatcher; use futures::StreamExt; use std::pin::Pin; #[test] - fn test_ip_watch() { - futures::executor::block_on(async { + fn test_smol_ip_watch() { + use super::smol::IfWatcher; + + smol::block_on(async { let mut set = IfWatcher::new().unwrap(); let event = set.select_next_some().await.unwrap(); println!("Got event {:?}", event); }); } + #[tokio::test] + async fn test_tokio_ip_watch() { + use super::tokio::IfWatcher; + + let mut set = IfWatcher::new().unwrap(); + let event = set.select_next_some().await.unwrap(); + println!("Got event {:?}", event); + } + #[test] - fn test_is_send() { - futures::executor::block_on(async { + fn test_smol_is_send() { + use super::smol::IfWatcher; + + smol::block_on(async { fn is_send(_: T) {} is_send(IfWatcher::new()); is_send(IfWatcher::new().unwrap()); is_send(Pin::new(&mut IfWatcher::new().unwrap())); }); } + + #[tokio::test] + async fn test_tokio_is_send() { + use super::tokio::IfWatcher; + + fn is_send(_: T) {} + is_send(IfWatcher::new()); + is_send(IfWatcher::new().unwrap()); + is_send(Pin::new(&mut IfWatcher::new().unwrap())); + } } diff --git a/src/win.rs b/src/win.rs index d73f32b..57e2790 100644 --- a/src/win.rs +++ b/src/win.rs @@ -1,10 +1,12 @@ use crate::{IfEvent, IpNet, Ipv4Net, Ipv6Net}; use fnv::FnvHashSet; +use futures::stream::{FusedStream, Stream}; use futures::task::AtomicWaker; use if_addrs::IfAddr; use std::collections::VecDeque; use std::ffi::c_void; use std::io::{Error, ErrorKind, Result}; +use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; @@ -14,6 +16,26 @@ use windows::Win32::NetworkManagement::IpHelper::{ MIB_NOTIFICATION_TYPE, }; +#[cfg(feature = "tokio")] +pub mod tokio { + //! An interface watcher. + //! **On Windows there is no difference between `tokio` and `smol` features,** + //! **this was done to maintain the api compatible with other platforms**. + + /// Watches for interface changes. + pub type IfWatcher = super::IfWatcher; +} + +#[cfg(feature = "smol")] +pub mod smol { + //! An interface watcher. + //! **On Windows there is no difference between `tokio` and `smol` features,** + //! **this was done to maintain the api compatible with other platforms**. + + /// Watches for interface changes. + pub type IfWatcher = super::IfWatcher; +} + /// An address set/watcher #[derive(Debug)] pub struct IfWatcher { @@ -26,7 +48,7 @@ pub struct IfWatcher { } impl IfWatcher { - /// Create a watcher + /// Create a watcher. pub fn new() -> Result { let resync = Arc::new(AtomicBool::new(true)); let waker = Arc::new(AtomicWaker::new()); @@ -63,10 +85,12 @@ impl IfWatcher { Ok(()) } + /// Iterate over current networks. pub fn iter(&self) -> impl Iterator { self.addrs.iter() } + /// Poll for an address change event. pub fn poll_if_event(&mut self, cx: &mut Context) -> Poll> { loop { if let Some(event) = self.queue.pop_front() { @@ -83,6 +107,19 @@ impl IfWatcher { } } +impl Stream for IfWatcher { + type Item = Result; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::into_inner(self).poll_if_event(cx).map(Some) + } +} + +impl FusedStream for IfWatcher { + fn is_terminated(&self) -> bool { + false + } +} + fn ifaddr_to_ipnet(addr: IfAddr) -> IpNet { match addr { IfAddr::V4(ip) => { From d5979380fe227ea98347958adf1407b0f48c1e6a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Tue, 18 Oct 2022 10:22:06 +0100 Subject: [PATCH 05/10] review: address suggestions --- Cargo.toml | 6 +++++- src/apple.rs | 4 ++-- src/lib.rs | 14 +++++++++++--- 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5369884..61dc471 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,6 @@ description = "crossplatform asynchronous network watcher" repository = "https://github.com/mxinden/if-watch" [features] -default = ["smol"] tokio = ["dep:tokio", "rtnetlink/tokio_socket"] smol = ["dep:smol", "rtnetlink/smol_socket"] @@ -41,3 +40,8 @@ if-addrs = "0.7.0" env_logger = "0.9.0" smol = "1.2.5" tokio = { version = "1.21.2", features = ["rt", "macros"] } + +[[example]] +name = "if_watch" +required-features = ["smol"] + diff --git a/src/apple.rs b/src/apple.rs index b76b6c2..727d6f1 100644 --- a/src/apple.rs +++ b/src/apple.rs @@ -87,7 +87,7 @@ where /// Create a watcher. pub fn new() -> Result { let (tx, rx) = mpsc::channel(1); - T::spawn(async { background_task(tx) }); + T::spawn(background_task(tx)); let mut watcher = Self { addrs: Default::default(), queue: Default::default(), @@ -183,7 +183,7 @@ fn callback(_store: SCDynamicStore, _changed_keys: CFArray, info: &mut } } -fn background_task(tx: mpsc::Sender<()>) { +async fn background_task(tx: mpsc::Sender<()>) { let store = SCDynamicStoreBuilder::new("global-network-watcher") .callback_context(SCDynamicStoreCallBackContext { callout: callback, diff --git a/src/lib.rs b/src/lib.rs index 096f129..6f9ac7b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,15 +28,23 @@ pub use apple::tokio; #[cfg(feature = "smol")] pub use apple::smol; -#[cfg(target_os = "ios")] -use apple as platform_impl; +#[cfg(feature = "smol")] +#[cfg(not(any( + target_os = "ios", + target_os = "linux", + target_os = "macos", + target_os = "windows", +)))] +pub use fallback::smol; + +#[cfg(feature = "tokio")] #[cfg(not(any( target_os = "ios", target_os = "linux", target_os = "macos", target_os = "windows", )))] -use fallback as platform_impl; +pub use fallback::tokio; #[cfg(target_os = "windows")] #[cfg(feature = "tokio")] From bd399c264f704e8c88a307a93a984365fc1035dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Tue, 18 Oct 2022 10:22:19 +0100 Subject: [PATCH 06/10] ci: fix android build step --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 48e1277..d8d5f74 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -78,7 +78,7 @@ jobs: - name: Build android if: contains(matrix.platform.target, 'android') - run: cargo apk --target ${{ matrix.platform.target }} build --workspace --all-features + run: cargo apk build --target ${{ matrix.platform.target }} --all-features - name: Rust tests if: matrix.platform.cross == false From ba8e1d4c51bcee6f7c58692c23f0692d4ceabf64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Tue, 18 Oct 2022 16:38:42 +0100 Subject: [PATCH 07/10] fallback: remove unused Future import --- src/fallback.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/fallback.rs b/src/fallback.rs index a5ec9e7..c4d5e52 100644 --- a/src/fallback.rs +++ b/src/fallback.rs @@ -4,7 +4,6 @@ use futures::stream::{FusedStream, Stream}; use if_addrs::IfAddr; use ipnet::{IpNet, Ipv4Net, Ipv6Net}; use std::collections::{HashSet, VecDeque}; -use std::future::Future; use std::io::Result; use std::pin::Pin; use std::task::{Context, Poll}; From 66218b6891b6fd089bd8b52fd38ee9f6073e9fc7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Thu, 20 Oct 2022 16:54:51 +0100 Subject: [PATCH 08/10] ci: cdylib to crate-type on Cargo.toml, to fix android ci build. It was failing with: `libif_watch.so` doesn't exist. --- Cargo.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 61dc471..abf8224 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,9 @@ license = "MIT OR Apache-2.0" description = "crossplatform asynchronous network watcher" repository = "https://github.com/mxinden/if-watch" +[lib] +crate-type = ["cdylib", "lib"] + [features] tokio = ["dep:tokio", "rtnetlink/tokio_socket"] smol = ["dep:smol", "rtnetlink/smol_socket"] From 9e7146ebfbf8a1ad7522db97b82b97479ed8c1ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Sat, 22 Oct 2022 14:45:18 +0100 Subject: [PATCH 09/10] apple: revert to previous implementation --- src/apple.rs | 78 ++++++++++------------------------------------------ 1 file changed, 14 insertions(+), 64 deletions(-) diff --git a/src/apple.rs b/src/apple.rs index 727d6f1..c3099a4 100644 --- a/src/apple.rs +++ b/src/apple.rs @@ -5,11 +5,9 @@ use core_foundation::string::CFString; use fnv::FnvHashSet; use futures::channel::mpsc; use futures::stream::{FusedStream, Stream}; -use futures::Future; use if_addrs::IfAddr; use std::collections::VecDeque; use std::io::Result; -use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; use system_configuration::dynamic_store::{ @@ -18,81 +16,39 @@ use system_configuration::dynamic_store::{ #[cfg(feature = "tokio")] pub mod tokio { - //! An interface watcher that uses the `tokio` runtime. - use futures::Future; - - #[doc(hidden)] - pub struct TokioRuntime; - - impl super::Runtime for TokioRuntime { - fn spawn(f: F) - where - F: Future, - F: Send + 'static, - ::Output: Send + 'static, - { - tokio::spawn(f); - } - } + //! An interface watcher. + //! **On Apple Platforms there is no difference between `tokio` and `smol` features,** + //! **this was done to maintain the api compatible with other platforms**. /// Watches for interface changes. - pub type IfWatcher = super::IfWatcher; + pub type IfWatcher = super::IfWatcher; } #[cfg(feature = "smol")] pub mod smol { - //! An interface watcher that uses the `smol` runtime. - - use futures::Future; - - #[doc(hidden)] - pub struct SmolRuntime; - - impl super::Runtime for SmolRuntime { - fn spawn(f: F) - where - F: Future, - F: Send + 'static, - ::Output: Send + 'static, - { - smol::spawn(f).detach(); - } - } + //! An interface watcher. + //! **On Apple platforms there is no difference between `tokio` and `smol` features,** + //! **this was done to maintain the api compatible with other platforms**. /// Watches for interface changes. - pub type IfWatcher = super::IfWatcher; + pub type IfWatcher = super::IfWatcher; } #[derive(Debug)] -pub struct IfWatcher { +pub struct IfWatcher { addrs: FnvHashSet, queue: VecDeque, rx: mpsc::Receiver<()>, - runtime: PhantomData, -} - -#[doc(hidden)] -pub trait Runtime { - fn spawn(f: F) - where - F: Future, - F: Send + 'static, - ::Output: Send + 'static; } -impl IfWatcher -where - T: Runtime, -{ - /// Create a watcher. +impl IfWatcher { pub fn new() -> Result { let (tx, rx) = mpsc::channel(1); - T::spawn(background_task(tx)); + std::thread::spawn(|| background_task(tx)); let mut watcher = Self { addrs: Default::default(), queue: Default::default(), rx, - runtime: PhantomData, }; watcher.resync()?; Ok(watcher) @@ -140,20 +96,14 @@ where } } -impl Stream for IfWatcher -where - T: Runtime + Unpin, -{ +impl Stream for IfWatcher { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::into_inner(self).poll_if_event(cx).map(Some) } } -impl FusedStream for IfWatcher -where - T: Runtime + Unpin, -{ +impl FusedStream for IfWatcher { fn is_terminated(&self) -> bool { false } @@ -183,7 +133,7 @@ fn callback(_store: SCDynamicStore, _changed_keys: CFArray, info: &mut } } -async fn background_task(tx: mpsc::Sender<()>) { +fn background_task(tx: mpsc::Sender<()>) { let store = SCDynamicStoreBuilder::new("global-network-watcher") .callback_context(SCDynamicStoreCallBackContext { callout: callback, From ba8cd7a137966c9121af3e7b51147ddbf0ec1a69 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 4 Nov 2022 16:05:17 +0100 Subject: [PATCH 10/10] Update CHANGELOG.md --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 744b70b..3c86b11 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [3.0.0] - [unreleased] +## [3.0.0] ### Changed - Feature gate async runtime, allowing opting between Tokio or smol. For every OS each `IfWatcher` is