Skip to content

Commit

Permalink
WIP: some refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
ktiays committed Mar 22, 2023
1 parent 411ac01 commit 443d9a6
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 42 deletions.
2 changes: 1 addition & 1 deletion .vscode/tasks.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"tasks": [
{
"type": "npm",
"script": "build:dev",
"script": "package:dev",
"isBackground": false,
"presentation": {
"reveal": "never"
Expand Down
6 changes: 3 additions & 3 deletions crates/node-bridge/src/bindings/https.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ extern "C" {
pub type ClientRequest;

#[wasm_bindgen]
pub fn request(url: &str, options: &JsValue) -> ClientRequest;
pub fn request(url: &str, options: JsValue) -> ClientRequest;

#[wasm_bindgen(method)]
pub fn write(this: &ClientRequest, chunk: Buffer);
Expand All @@ -16,13 +16,13 @@ extern "C" {
pub fn end(this: &ClientRequest);

#[wasm_bindgen(method)]
pub fn on(this: &ClientRequest, event: &str, listener: &JsValue);
pub fn on(this: &ClientRequest, event: &str, listener: JsValue);
}

#[wasm_bindgen(module = "node:https")]
extern "C" {
pub type IncomingMessage;

#[wasm_bindgen(method)]
pub fn on(this: &IncomingMessage, event: &str, listener: &JsValue);
pub fn on(this: &IncomingMessage, event: &str, listener: JsValue);
}
7 changes: 3 additions & 4 deletions crates/node-bridge/src/futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub struct AsyncIterSender<T> {

pin_project! {
pub struct AsyncIterInner<T> {
ready_values: VecDeque<T>,
ready_values: VecDeque<Option<T>>,
defer_next: Option<Defer>,
#[pin]
active_fut: Option<JsFuture>,
Expand Down Expand Up @@ -97,7 +97,7 @@ impl<T> Stream for AsyncIter<T> {
}

impl<T> AsyncIterSender<T> {
pub fn send(&mut self, value: T) {
pub fn send(&mut self, value: Option<T>) {
let mut inner_mut = self.inner.borrow_mut();
inner_mut.ready_values.push_back(value);

Expand All @@ -113,7 +113,6 @@ impl<T> Stream for AsyncIterInner<T> {
type Item = T;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
crate::bindings::console::log_str("poll once");
let mut this = self.project();

if let Some(fut) = this.active_fut.as_mut().as_pin_mut() {
Expand All @@ -126,7 +125,7 @@ impl<T> Stream for AsyncIterInner<T> {
}

if let Some(value) = this.ready_values.pop_front() {
return Poll::Ready(Some(value));
return Poll::Ready(value);
}

let defer = Defer::new();
Expand Down
98 changes: 67 additions & 31 deletions crates/node-bridge/src/http_client.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use std::collections::HashMap;
use std::fmt::Debug;
use std::future::Future;
use std::future::{Future, IntoFuture};
use std::pin::Pin;
use std::task::{Context, Poll};

use futures::future::{select, Either};
use js_sys::{Object as JsObject, Reflect};
use wasm_bindgen::prelude::*;

use crate::bindings::{https::*, Buffer};
use crate::futures::{AsyncIter, Defer};
use crate::{closure, closure_once};

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum HttpMethod {
Expand All @@ -21,11 +23,12 @@ pub enum HttpMethod {
impl ToString for HttpMethod {
fn to_string(&self) -> String {
match self {
HttpMethod::Get => "GET".to_owned(),
HttpMethod::Post => "POST".to_owned(),
HttpMethod::Put => "PUT".to_owned(),
HttpMethod::Delete => "DELETE".to_owned(),
HttpMethod::Get => "GET",
HttpMethod::Post => "POST",
HttpMethod::Put => "PUT",
HttpMethod::Delete => "DELETE",
}
.to_owned()
}
}

Expand Down Expand Up @@ -74,7 +77,28 @@ impl HttpRequest {
}
Reflect::set(&options, &"headers".into(), &headers_obj)?;

let req = request(&self.url, &options);
let req = request(&self.url, options.into());

let defer_resp = Defer::new();
let defer_resp_clone = defer_resp.clone();
req.on(
"response",
closure!(|resp: IncomingMessage| {
defer_resp_clone.resolve(resp.into());
})
.into_js_value(),
);

let defer_err = Defer::new();
let defer_err_clone = defer_err.clone();
req.on(
"error",
closure_once!(|err: JsValue| {
crate::bindings::console::error1(&err);
defer_err_clone.resolve(err);
})
.into_js_value(),
);

// Send the request with an optional body.
if let Some(body) = self.body {
Expand All @@ -83,14 +107,20 @@ impl HttpRequest {
}
req.end();

#[cfg(debug_assertions)]
crate::bindings::console::log2(&"request sent: ".into(), &req);

// Wait for the response.
let defer_resp = Defer::new();
let defer_resp_clone = defer_resp.clone();
let on_resp_closure: Closure<dyn FnMut(_)> = Closure::new(move |resp: IncomingMessage| {
defer_resp_clone.resolve(resp.into());
});
req.on("response", on_resp_closure.as_ref());
let resp: IncomingMessage = defer_resp.await?.into();
let resp: IncomingMessage =
match select(defer_resp.into_future(), defer_err.into_future()).await {
Either::Left((Ok(resp), _)) => Ok(resp),
Either::Right((Ok(err), _)) => Err(err),
_ => unreachable!("Impossible code path"),
}?
.into();

#[cfg(debug_assertions)]
crate::bindings::console::log2(&"response received: ".into(), &resp);

Ok(HttpResponse::new(resp))
}
Expand All @@ -99,39 +129,45 @@ impl HttpRequest {
pub struct HttpResponse {
fut: Pin<Box<dyn Future<Output = Result<(), JsValue>>>>,
data_stream: AsyncIter<Buffer>,

// Ensure the closure alive during receiving response data.
#[allow(dead_code)]
on_data_closure: Closure<dyn FnMut(Buffer)>,
}

impl HttpResponse {
fn new(resp: IncomingMessage) -> Self {
let data_stream = AsyncIter::new();
let mut data_stream_sender = data_stream.sender();

let on_data_closure: Closure<dyn FnMut(_)> = Closure::new(move |chunk: Buffer| {
crate::bindings::console::log_str("recv chunk");
data_stream_sender.send(chunk);
});
resp.on("data", on_data_closure.as_ref());
let mut data_stream_sender_for_close = data_stream.sender();

resp.on(
"data",
closure!(|chunk: Buffer| {
#[cfg(debug_assertions)]
crate::bindings::console::log_str("chunk received");
data_stream_sender.send(Some(chunk));
})
.into_js_value(),
);

let defer_close = Defer::new();
let defer_close_clone = defer_close.clone();
resp.on(
"close",
closure_once!(|| {
#[cfg(debug_assertions)]
crate::bindings::console::log_str("response closed");
data_stream_sender_for_close.send(None);
defer_close_clone.resolve(JsValue::UNDEFINED);
})
.into_js_value(),
);

let fut = async move {
let defer_close = Defer::new();
let defer_close_clone = defer_close.clone();
let on_close_closure: Closure<dyn FnMut()> = Closure::new(move || {
defer_close_clone.resolve(JsValue::UNDEFINED);
});
resp.on("close", on_close_closure.as_ref());
defer_close.await?;

Ok(())
};

Self {
fut: Box::pin(fut),
data_stream,
on_data_closure,
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/node-bridge/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod bindings;
pub mod futures;
pub mod http_client;
pub mod macros;
27 changes: 27 additions & 0 deletions crates/node-bridge/src/macros.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#[macro_export]
macro_rules! closure {
(|$($arg:ident : $arg_type:ty),*| { $($body:tt)* }) => {
$crate::closure!(new @ $($arg : $arg_type),* { $($body)* })
};

(|| { $($body:tt)* }) => {
$crate::closure!(new @ { $($body)* })
};

($create_fn:ident @ $($arg:ident : $arg_type:ty),* { $($body:tt)* }) => {
wasm_bindgen::closure::Closure::$create_fn(move |$($arg : $arg_type),*| {
$($body)*
}) as wasm_bindgen::closure::Closure<dyn FnMut($($arg_type),*)>
};
}

#[macro_export]
macro_rules! closure_once {
(|$($arg:ident : $arg_type:ty),*| { $($body:tt)* }) => {
$crate::closure!(once @ $($arg : $arg_type),* { $($body)* })
};

(|| { $($body:tt)* }) => {
$crate::closure!(once @ { $($body)* })
};
}
8 changes: 5 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
"scripts": {
"vscode:prepublish": "npm run package",
"compile": "webpack",
"compile:rust": "cd crates/cursor-core && wasm-pack build --target nodejs --weak-refs --dev && cp pkg/cursor_core_bg.wasm ../../dist/cursor_core_bg.wasm",
"build:dev": "npm run compile:rust && npm run compile",
"compile-rust": "cd crates/cursor-core && wasm-pack build --target nodejs --weak-refs",
"compile-rust:dev": "cd crates/cursor-core && wasm-pack build --target nodejs --weak-refs --dev",
"copy-rust": "cp crates/cursor-core/pkg/cursor_core_bg.wasm dist/cursor_core_bg.wasm",
"watch": "webpack --watch",
"package": "webpack --mode production --devtool hidden-source-map",
"package:dev": "npm run compile-rust:dev && npm run compile && npm run copy-rust",
"package": "npm run compile-rust && webpack --mode production --devtool hidden-source-map && npm run copy-rust",
"compile-tests": "tsc -p . --outDir out",
"watch-tests": "tsc -p . -w --outDir out",
"pretest": "npm run compile-tests && npm run compile && npm run lint",
Expand Down

0 comments on commit 443d9a6

Please sign in to comment.