Skip to content

Commit

Permalink
rewrite our workaround same_channel thing
Browse files Browse the repository at this point in the history
also add comment about drop
  • Loading branch information
rklaehn committed Jul 24, 2024
1 parent f0ea6b5 commit 28e240b
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 22 deletions.
29 changes: 16 additions & 13 deletions iroh-blobs/src/util/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,19 +471,6 @@ impl<T> Clone for FlumeProgressSender<T> {
}
}

fn same_channel<T>(a: &async_channel::Sender<T>, b: &async_channel::Sender<T>) -> bool {
assert!(std::mem::size_of::<async_channel::Sender<T>>() == std::mem::size_of::<usize>());
fn get_arc_reference<T>(x: &async_channel::Sender<T>) -> &Arc<()> {
unsafe {
// Transmute the reference to MyNewType to a reference to Arc<()>
std::mem::transmute::<_, &Arc<()>>(x)
}
}
let a = get_arc_reference(a);
let b = get_arc_reference(b);
Arc::ptr_eq(a, b)
}

impl<T> FlumeProgressSender<T> {
/// Create a new progress sender from a flume sender.
pub fn new(sender: async_channel::Sender<T>) -> Self {
Expand All @@ -499,6 +486,22 @@ impl<T> FlumeProgressSender<T> {
}
}

fn get_as_ptr<T>(value: &T) -> Option<usize> {
use std::mem;
if mem::size_of::<T>() == std::mem::size_of::<usize>() && mem::align_of::<T>() == mem::align_of::<usize>() {
// Safe only if size and alignment requirements are met
unsafe {
Some(mem::transmute_copy(value))
}
} else {
None
}
}

fn same_channel<T>(a: &async_channel::Sender<T>, b: &async_channel::Sender<T>) -> bool {
get_as_ptr(a).unwrap() == get_as_ptr(b).unwrap()
}

impl<T> IdGenerator for FlumeProgressSender<T> {
fn new_id(&self) -> u64 {
self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
Expand Down
3 changes: 3 additions & 0 deletions iroh-docs/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,9 @@ impl Drop for SyncHandle {
fn drop(&mut self) {
// this means we're dropping the last reference
if let Some(handle) = Arc::get_mut(&mut self.join_handle) {
// this call is the reason tx can not be a tokio mpsc channel.
// we have no control about where drop is called, yet tokio send_blocking panics
// when called from inside a tokio runtime.
self.tx.send_blocking(Action::Shutdown { reply: None }).ok();
let handle = handle.take().expect("this can only run once");
if let Err(err) = handle.join() {
Expand Down
20 changes: 11 additions & 9 deletions iroh-docs/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,18 +107,20 @@ pub struct SyncOutcome {
/// Number of entries we sent.
pub num_sent: usize,
}

fn same_channel<T>(a: &async_channel::Sender<T>, b: &async_channel::Sender<T>) -> bool {
assert!(std::mem::size_of::<async_channel::Sender<T>>() == std::mem::size_of::<usize>());
fn get_arc_reference<T>(x: &async_channel::Sender<T>) -> &Arc<()> {
fn get_as_ptr<T>(value: &T) -> Option<usize> {
use std::mem;
if mem::size_of::<T>() == std::mem::size_of::<usize>() && mem::align_of::<T>() == mem::align_of::<usize>() {
// Safe only if size and alignment requirements are met
unsafe {
// Transmute the reference to MyNewType to a reference to Arc<()>
std::mem::transmute::<_, &Arc<()>>(x)
Some(mem::transmute_copy(value))
}
} else {
None
}
let a = get_arc_reference(a);
let b = get_arc_reference(b);
Arc::ptr_eq(a, b)
}

fn same_channel<T>(a: &async_channel::Sender<T>, b: &async_channel::Sender<T>) -> bool {
get_as_ptr(a).unwrap() == get_as_ptr(b).unwrap()
}

#[derive(Debug, Default)]
Expand Down

0 comments on commit 28e240b

Please sign in to comment.