-
Notifications
You must be signed in to change notification settings - Fork 47
/
sync.rs
85 lines (78 loc) · 1.97 KB
/
sync.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
use crate::{
os::windows::{
limbo_pool::{LimboPool, MaybeReject},
winprelude::*,
FileHandle,
},
DebugExpectExt, OrErrno, LOCK_POISON,
};
use std::{
io,
sync::{
mpsc::{sync_channel, SyncSender, TrySendError},
Mutex, OnceLock,
},
thread,
};
use windows_sys::Win32::System::Pipes::DisconnectNamedPipe;
pub(crate) struct Corpse {
pub handle: FileHandle,
pub is_server: bool,
}
impl Corpse {
#[inline]
pub fn disconnect(&self) -> io::Result<()> {
unsafe { DisconnectNamedPipe(self.handle.as_int_handle()).true_val_or_errno(()) }
}
}
impl Drop for Corpse {
fn drop(&mut self) {
if self.is_server {
self.disconnect()
.debug_expect("named pipe server disconnect failed");
}
}
}
type Limbo = LimboPool<SyncSender<Corpse>>;
static LIMBO: OnceLock<Mutex<Limbo>> = OnceLock::new();
fn limbo_keeper_name(idx: usize) -> String {
match idx {
usize::MAX => "limbo keeper".to_string(),
x => format!("limbo keeper {}", x.wrapping_add(1)),
}
}
pub(crate) fn send_off(c: Corpse) {
fn bury(c: Corpse) {
c.handle.flush().debug_expect("limbo flush failed");
}
fn tryf(sender: &mut SyncSender<Corpse>, c: Corpse) -> MaybeReject<Corpse> {
sender.try_send(c).map_err(|e| match e {
TrySendError::Full(c) | TrySendError::Disconnected(c) => c,
})
}
fn createf(idx: usize, c: Corpse) -> SyncSender<Corpse> {
let (tx, rx) = sync_channel::<Corpse>(1);
thread::Builder::new()
.name(limbo_keeper_name(idx))
.spawn(move || {
while let Ok(h) = rx.recv() {
bury(h);
}
})
.debug_expect("failed to spawn newcomer to limbo pool");
tx.try_send(c)
.debug_expect("newcomer to limbo pool already failed");
tx
}
fn fullf(idx: usize, c: Corpse) {
thread::Builder::new()
.name(limbo_keeper_name(idx))
.spawn(move || {
bury(c);
})
.debug_expect("failed to spawn newcomer to limbo pool");
}
let mutex = LIMBO.get_or_init(Default::default);
let mut limbo = mutex.lock().expect(LOCK_POISON);
limbo.linear_try_or_create(c, tryf, createf, fullf);
}