From 28e240b8df3c908a4a1c4b184748efc597a89ac9 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 24 Jul 2024 12:21:03 +0300 Subject: [PATCH] rewrite our workaround same_channel thing also add comment about drop --- iroh-blobs/src/util/progress.rs | 29 ++++++++++++++++------------- iroh-docs/src/actor.rs | 3 +++ iroh-docs/src/sync.rs | 20 +++++++++++--------- 3 files changed, 30 insertions(+), 22 deletions(-) diff --git a/iroh-blobs/src/util/progress.rs b/iroh-blobs/src/util/progress.rs index 8b6807a7f3..9bfb98d39e 100644 --- a/iroh-blobs/src/util/progress.rs +++ b/iroh-blobs/src/util/progress.rs @@ -471,19 +471,6 @@ impl Clone for FlumeProgressSender { } } -fn same_channel(a: &async_channel::Sender, b: &async_channel::Sender) -> bool { - assert!(std::mem::size_of::>() == std::mem::size_of::()); - fn get_arc_reference(x: &async_channel::Sender) -> &Arc<()> { - unsafe { - // Transmute the reference to MyNewType to a reference to Arc<()> - std::mem::transmute::<_, &Arc<()>>(x) - } - } - let a = get_arc_reference(a); - let b = get_arc_reference(b); - Arc::ptr_eq(a, b) -} - impl FlumeProgressSender { /// Create a new progress sender from a flume sender. pub fn new(sender: async_channel::Sender) -> Self { @@ -499,6 +486,22 @@ impl FlumeProgressSender { } } +fn get_as_ptr(value: &T) -> Option { + use std::mem; + if mem::size_of::() == std::mem::size_of::() && mem::align_of::() == mem::align_of::() { + // Safe only if size and alignment requirements are met + unsafe { + Some(mem::transmute_copy(value)) + } + } else { + None + } +} + +fn same_channel(a: &async_channel::Sender, b: &async_channel::Sender) -> bool { + get_as_ptr(a).unwrap() == get_as_ptr(b).unwrap() +} + impl IdGenerator for FlumeProgressSender { fn new_id(&self) -> u64 { self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst) diff --git a/iroh-docs/src/actor.rs b/iroh-docs/src/actor.rs index 5a04fb8599..79bd34ad28 100644 --- a/iroh-docs/src/actor.rs +++ b/iroh-docs/src/actor.rs @@ -581,6 +581,9 @@ impl Drop for SyncHandle { fn drop(&mut self) { // this means we're dropping the last reference if let Some(handle) = Arc::get_mut(&mut self.join_handle) { + // this call is the reason tx can not be a tokio mpsc channel. + // we have no control about where drop is called, yet tokio send_blocking panics + // when called from inside a tokio runtime. self.tx.send_blocking(Action::Shutdown { reply: None }).ok(); let handle = handle.take().expect("this can only run once"); if let Err(err) = handle.join() { diff --git a/iroh-docs/src/sync.rs b/iroh-docs/src/sync.rs index 4627026c6a..4e0795c3dd 100644 --- a/iroh-docs/src/sync.rs +++ b/iroh-docs/src/sync.rs @@ -107,18 +107,20 @@ pub struct SyncOutcome { /// Number of entries we sent. pub num_sent: usize, } - -fn same_channel(a: &async_channel::Sender, b: &async_channel::Sender) -> bool { - assert!(std::mem::size_of::>() == std::mem::size_of::()); - fn get_arc_reference(x: &async_channel::Sender) -> &Arc<()> { +fn get_as_ptr(value: &T) -> Option { + use std::mem; + if mem::size_of::() == std::mem::size_of::() && mem::align_of::() == mem::align_of::() { + // Safe only if size and alignment requirements are met unsafe { - // Transmute the reference to MyNewType to a reference to Arc<()> - std::mem::transmute::<_, &Arc<()>>(x) + Some(mem::transmute_copy(value)) } + } else { + None } - let a = get_arc_reference(a); - let b = get_arc_reference(b); - Arc::ptr_eq(a, b) +} + +fn same_channel(a: &async_channel::Sender, b: &async_channel::Sender) -> bool { + get_as_ptr(a).unwrap() == get_as_ptr(b).unwrap() } #[derive(Debug, Default)]