Skip to content

Commit

Permalink
Ok all working again
Browse files Browse the repository at this point in the history
  • Loading branch information
alshdavid committed Jun 25, 2024
1 parent 5505668 commit 092eae1
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 78 deletions.
2 changes: 1 addition & 1 deletion crates/neon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ features = ["sync"]
optional = true

[features]
default = ["napi-8", "futures"]
default = ["napi-8"]

# Enable extracting values by serializing to JSON
serde = ["dep:serde", "dep:serde_json"]
Expand Down
25 changes: 17 additions & 8 deletions crates/neon/src/async_local/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,31 @@ use crate::sys;
pub fn spawn_async_local<'a>(
cx: &mut impl Context<'a>,
future: impl Future<Output = ()> + 'static,
) -> Result<(), ()> {
) {
// Add a future to the future pool to be executed
// whenever the Nodejs event loop is free to do so
LocalRuntime::queue_future(future).unwrap();
LocalRuntime::queue_future(future);

// If there are tasks in flight then the executor
// is already running and should be reused
if LocalRuntime::futures_count() > 1 {
return Ok(());
return;
}

// The futures executor runs on another thread and will
// use a threadsafe function to call schedule work
// on the JavaScript thread
// The futures executor runs on the main thread thread but
// the waker runs on another thread.
//
// The main thread executor will run the contained futures
// and as soon as they stall (e.g. waiting for a channel, timer, etc),
// the executor will immediately yield back to the JavaScript event loop.
//
// This "parks" the executer, which normally means the thread
// is block - however we cannot do that here so instead, there
// is a sacrificial "waker" thread who's only job is to sleep/wake and
// signal to Nodejs that futures need to be run.
//
// The waker thread notifies the main thread of pending work by
// running the futures executor within a threadsafe function
let env_raw = cx.env().to_raw();

LocalWaker::send(WakerEvent::Init(unsafe {
Expand All @@ -44,6 +55,4 @@ pub fn spawn_async_local<'a>(
}
})
}));

Ok(())
}
5 changes: 2 additions & 3 deletions crates/neon/src/async_local/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use super::executor::LocalPool;
use super::executor::LocalSpawner;
use super::executor::ThreadNotifyRef;
use futures::task::LocalSpawnExt;
use futures::task::SpawnError;
use once_cell::unsync::Lazy;

thread_local! {
Expand All @@ -21,12 +20,12 @@ impl LocalRuntime {
Self::count()
}

pub fn queue_future(future: impl Future<Output = ()> + 'static) -> Result<(), SpawnError> {
pub fn queue_future(future: impl Future<Output = ()> + 'static) {
Self::increment();
SPAWNER.with(move |ls| ls.spawn_local(async move {
future.await;
Self::decrement();
}))
})).expect("Unable to spawn future on local pool");
}

pub fn run_until_stalled(thread_notify: ThreadNotifyRef) -> bool {
Expand Down
19 changes: 10 additions & 9 deletions crates/neon/src/async_local/waker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,20 @@ pub enum WakerEvent {
}

/// The futures waker that coordinates with the futures executor to notify
/// the main thread to pause and resume execution of futures.
/// the main thread to resume execution of futures.
///
/// The waker is implemented as a dedicated system thread which is parked
/// by the local futures executor while waiting for futures to resume work.
/// by the local futures executor. Futures (like channel, timers) will
/// call the wake() method Futures Waker trait.
///
/// Once woken up, the waker resumes execution of futures on the JavaScript
/// thread by triggering a napi threadsafe function to poll the futures in
/// the local pool until no more progress can be made before yielding back
/// to the Nodejs event loop.
/// This gives it some level of portability - for instance any utilities
/// from the "async_std" crate will work however most things from Tokio
/// won't work.
///
/// This allows for the execution of Rust futures to integrate with the
/// Nodejs event loop without blocking either
/// Once woken up, the waker resumes execution of futures on the JavaScript
/// thread by triggering a napi threadsafe function which executes a callback
/// that runs on the main JavaScript thread. This callback is used to poll
/// the futures in the local pool.
pub struct LocalWaker;

impl LocalWaker {
Expand All @@ -46,7 +48,6 @@ impl LocalWaker {
fn start_waker_thread() -> Sender<WakerEvent> {
let (tx, rx) = channel();

// Dedicated waker thread to use for waiting on pending futures
thread::spawn(move || {
let thread_notify = ThreadNotify::new();
let mut handle = None::<WakerInit>;
Expand Down
74 changes: 39 additions & 35 deletions crates/neon/src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ pub use crate::types::buffer::lock::Lock;
// use crate::async_local::{root::RootGlobal, spawn_async_local};
#[cfg(feature = "async_local")]
use futures::Future;
#[cfg(feature = "async_local")]
use crate::handle::StaticHandle;

use crate::{
event::TaskBuilder,
Expand Down Expand Up @@ -290,22 +292,21 @@ pub trait Context<'a>: ContextInternal<'a> {
result
}

/// Execute a future on the local JavaScript thread. This does not block JavaScript execution.
///
/// Note: Avoid doing heavy computation on the main thread. The intended use cases this is
/// waiting on channels receivers, timers and handling async behaviors from JavaScript
#[cfg(feature = "async_local")]
fn execute_async_local<F, Fut>(&mut self, f: F)
fn spawn_local<F, Fut>(&mut self, f: F)
where
Fut: Future<Output = ()>,
F: FnOnce(AsyncContext) -> Fut + 'static,
{
use futures::Future;

let env = self.env();

crate::async_local::spawn_async_local(self, async move {
// let scope = unsafe { HandleScope::new(env.to_raw()) };
let future = f(AsyncContext { env });
future.await;
// drop(scope);
}).unwrap();
});
}

/// Executes a computation in a new memory management scope and computes a single result value that outlives the computation.
Expand Down Expand Up @@ -623,29 +624,31 @@ impl<'a> ModuleContext<'a> {
F: Fn(AsyncFunctionContext) -> Fut + 'static + Copy,
V: Value,
{
// let wrapper = JsFunction::new(self, move |mut cx| {
// let mut args = vec![];

// while let Some(arg) = cx.argument_opt(args.len()) {
// let arg = arg.as_value(&mut cx);
// let arg = RootGlobal::new(&mut cx, arg);
// args.push(arg);
// }

// let (deferred, promise) = cx.promise();
// cx.execute_async_local(move |mut cx| async move {
// let acx = AsyncFunctionContext {
// env: cx.env(),
// arguments: args,
// };
// deferred.resolve(&mut cx, f(acx).await.unwrap());
// ()
// });

// Ok(promise)
// })?;

// self.exports.clone().set(self, key, wrapper)?;
use crate::handle::StaticHandle;

let wrapper = JsFunction::new(self, move |mut cx| {
let mut args = vec![];

while let Some(arg) = cx.argument_opt(args.len()) {
let arg = arg.as_value(&mut cx);
let arg = StaticHandle::new(&mut cx, arg)?;
args.push(arg);
}

let (deferred, promise) = cx.promise();
cx.spawn_local(move |mut cx| async move {
let acx = AsyncFunctionContext {
env: cx.env(),
arguments: args,
};
deferred.resolve(&mut cx, f(acx).await.unwrap());
()
});

Ok(promise)
})?;

self.exports.clone().set(self, key, wrapper)?;
Ok(())
}

Expand Down Expand Up @@ -851,16 +854,17 @@ impl<'a> Context<'a> for FunctionContext<'a> {}
#[cfg(feature = "async_local")]
pub struct AsyncFunctionContext {
env: Env,
// arguments: Vec<RootGlobal>,
arguments: Vec<StaticHandle<JsValue>>,
}

#[cfg(feature = "async_local")]
impl<'a> AsyncFunctionContext {
pub fn argument<V: Value>(&mut self, i: usize) -> JsResult<'a, V> {
// let arg = self.arguments.get(i).unwrap().clone();
// let handle = arg.into_inner(self);
// Ok(handle)
todo!()
let arg = self.arguments.get(i).unwrap().clone();
let arg = arg.from_static(self)?;
let value = unsafe { V::from_local(self.env(), arg.to_local()) };
let handle = Handle::new_internal(value);
Ok(handle)
}
}

Expand Down
7 changes: 5 additions & 2 deletions crates/neon/src/handle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,11 @@ impl<'a, V: Value + 'a> Handle<'a, V> {
}
}

pub fn root_global(self, cx: &mut impl Context<'a>) -> NeonResult<RootGlobal<V>> {
RootGlobal::new(cx, self)
/// Detaches the value from the Nodejs garbage collector
/// and manages the variable lifetime via reference counting.
/// Useful when interacting with a value within async closures
pub fn to_static(self, cx: &mut impl Context<'a>) -> NeonResult<StaticHandle<V>> {
StaticHandle::new(cx, self)
}
}

Expand Down
42 changes: 24 additions & 18 deletions crates/neon/src/handle/root_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,36 @@ use crate::result::JsResult;
use crate::result::NeonResult;
use crate::types::JsFunction;
use crate::types::JsObject;
use crate::types::JsSymbol;

// This creates a rooted object and stores javascript
// values on it as a way to grant any JavaScript value
// a static lifetime

thread_local! {
// Symbol("__neon_cache")
static NEON_CACHE: OnceCell<Root<JsObject>> = OnceCell::default();
}

/// Reference counted JavaScript value with a static lifetime for use in async closures
pub struct RootGlobal<T> {
pub struct StaticHandle<T> {
pub(crate) count: Rc<RefCell<u32>>,
pub(crate) inner: Rc<String>,
pub(crate) inner: Rc<Root<JsSymbol>>,
_p: PhantomData<T>,
}

impl<T: Value> RootGlobal<T> {
impl<T: Value> StaticHandle<T> {
pub(crate) fn new<'a>(
cx: &mut impl Context<'a>,
value: Handle<'a, T>,
) -> NeonResult<RootGlobal<T>> {
) -> NeonResult<StaticHandle<T>> {
Ok(Self {
count: Rc::new(RefCell::new(1)),
inner: Rc::new(set_ref(cx, value)?),
_p: Default::default(),
})
}

pub fn clone<'a>(&self) -> RootGlobal<T> {
pub fn clone(&self) -> StaticHandle<T> {
let mut count = self.count.borrow_mut();
*count += 1;
drop(count);
Expand All @@ -49,16 +53,16 @@ impl<T: Value> RootGlobal<T> {
}
}

pub fn into_inner<'a>(&self, cx: &mut impl Context<'a>) -> JsResult<'a, T> {
get_ref(cx, &*self.inner)
pub fn from_static<'a>(&self, cx: &mut impl Context<'a>) -> JsResult<'a, T> {
get_ref(cx, &self.inner)
}

pub fn drop<'a>(&self, cx: &mut impl Context<'a>) -> NeonResult<()> {
let mut count = self.count.borrow_mut();
*count -= 1;

if *count == 0 {
delete_ref(cx, &*self.inner)?
delete_ref(cx, &self.inner)?
}

Ok(())
Expand All @@ -81,41 +85,43 @@ fn get_cache<'a>(cx: &mut impl Context<'a>) -> JsResult<'a, JsObject> {
Ok(neon_cache.into_inner(cx))
}

fn set_ref<'a, V: Value>(cx: &mut impl Context<'a>, value: Handle<'a, V>) -> NeonResult<String> {
fn set_ref<'a, V: Value>(
cx: &mut impl Context<'a>,
value: Handle<'a, V>,
) -> NeonResult<Root<JsSymbol>> {
let neon_cache = get_cache(cx)?;
// Is this safe?
let key = format!("{:?}", value.to_local());
let symbol = cx.symbol(format!("{:?}", value.to_local())).root(cx);

get_cache(cx)?
.get::<JsFunction, _, _>(cx, "set")?
.call_with(cx)
.this(neon_cache)
.arg(cx.string(&key))
.arg(symbol.clone(cx).into_inner(cx))
.arg(value)
.exec(cx)?;

Ok(key)
Ok(symbol)
}

fn get_ref<'a, V: Value>(cx: &mut impl Context<'a>, key: &str) -> JsResult<'a, V> {
fn get_ref<'a, V: Value>(cx: &mut impl Context<'a>, key: &Root<JsSymbol>) -> JsResult<'a, V> {
let neon_cache = get_cache(cx)?;

get_cache(cx)?
.get::<JsFunction, _, _>(cx, "get")?
.call_with(cx)
.this(neon_cache)
.arg(cx.string(&key))
.arg(key.clone(cx).into_inner(cx))
.apply(cx)
}

fn delete_ref<'a>(cx: &mut impl Context<'a>, key: &str) -> NeonResult<()> {
fn delete_ref<'a>(cx: &mut impl Context<'a>, key: &Root<JsSymbol>) -> NeonResult<()> {
let neon_cache = get_cache(cx)?;

get_cache(cx)?
.get::<JsFunction, _, _>(cx, "delete")?
.call_with(cx)
.this(neon_cache)
.arg(cx.string(&key))
.arg(key.clone(cx).into_inner(cx))
.exec(cx)?;

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions crates/neon/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
#[doc(no_inline)]
pub use crate::{
context::{
CallKind, ComputeContext, Context, ExecuteContext, FunctionContext, ModuleContext,
TaskContext,
AsyncContext, AsyncFunctionContext, CallKind, ComputeContext, Context, ExecuteContext,
FunctionContext, ModuleContext, TaskContext,
},
handle::{Handle, Root},
object::Object,
Expand Down

0 comments on commit 092eae1

Please sign in to comment.