From e6f611a00695a1d9069a7e85fd06bb064d9e70a2 Mon Sep 17 00:00:00 2001 From: Markus Westerlind Date: Fri, 8 Dec 2017 21:47:38 +0100 Subject: [PATCH] fix(vm): Allow async functions to run within the io primitives --- repl/Cargo.toml | 1 + repl/src/main.rs | 13 ++-- repl/src/repl.glu | 2 +- repl/src/repl.rs | 8 +- src/compiler_pipeline.rs | 140 ++++++++++++++++++++++++----------- src/io.rs | 154 +++++++++++++++++++++++++++------------ tests/api.rs | 78 ++++++++++++++------ tests/serialization.rs | 4 +- vm/src/api/de.rs | 21 +++--- vm/src/api/mac.rs | 1 - vm/src/api/mod.rs | 54 ++++++++++++-- vm/src/compiler.rs | 17 +++-- vm/src/future.rs | 2 +- vm/src/gc.rs | 8 +- vm/src/interner.rs | 6 +- vm/src/lib.rs | 2 +- vm/src/serialization.rs | 12 +-- vm/src/stack.rs | 10 ++- vm/src/thread.rs | 62 ++++++++++++---- 19 files changed, 419 insertions(+), 176 deletions(-) diff --git a/repl/Cargo.toml b/repl/Cargo.toml index af9b733b4e..a60b9b4433 100644 --- a/repl/Cargo.toml +++ b/repl/Cargo.toml @@ -22,6 +22,7 @@ gluon_completion = { path = "../completion", version = "0.6.2" } # GLUON gluon_format = { version = "0.6.2", path = "../format" } # GLUON futures = "0.1.11" +tokio-core = "0.1" clap = "2.22.0" log = "0.3.6" env_logger = { version = "0.3.4", optional = true } diff --git a/repl/src/main.rs b/repl/src/main.rs index 24db825260..e141a68049 100644 --- a/repl/src/main.rs +++ b/repl/src/main.rs @@ -8,6 +8,7 @@ extern crate env_logger; extern crate futures; #[macro_use] extern crate log; +extern crate tokio_core; extern crate walkdir; extern crate gluon; @@ -157,13 +158,15 @@ fn main() { // Need the extra stack size when compiling the program using the msvc compiler ::std::thread::Builder::new() .stack_size(2 * 1024 * 1024) - .spawn(|| if let Err(err) = run() { - let stderr = &mut io::stderr(); - let errmsg = "Error writing to stderr"; + .spawn(|| { + if let Err(err) = run() { + let stderr = &mut io::stderr(); + let errmsg = "Error writing to stderr"; - write!(stderr, "error: {}", err).expect(errmsg); + write!(stderr, "error: {}", err).expect(errmsg); - ::std::process::exit(1); + ::std::process::exit(1); + } }) .unwrap() .join() diff --git a/repl/src/repl.glu b/repl/src/repl.glu index b7f6194332..64bb88f0bd 100644 --- a/repl/src/repl.glu +++ b/repl/src/repl.glu @@ -25,7 +25,7 @@ let load_file filename : String -> IO String = let modulename = string.slice filename last_slash (string.len filename - 3) let action = do expr = io.read_file_to_string filename - do result = io.load_script modulename expr + do result = io.load_script modulename expr wrap result io.catch action (\err -> wrap err) diff --git a/repl/src/repl.rs b/repl/src/repl.rs index b510935fe9..6cb2ad7de0 100644 --- a/repl/src/repl.rs +++ b/repl/src/repl.rs @@ -19,7 +19,7 @@ use base::symbol::{Symbol, SymbolModule}; use base::types::ArcType; use parser::parse_partial_let_or_expr; use vm::{self, Error as VMError}; -use vm::api::{Function, Userdata, VmType, WithVM, IO}; +use vm::api::{OwnedFunction, Userdata, VmType, WithVM, IO}; use vm::gc::{Gc, Traverseable}; use vm::internal::ValuePrinter; use vm::thread::{RootStr, RootedValue, Thread, ThreadInternal}; @@ -311,9 +311,11 @@ fn compile_repl(vm: &Thread) -> Result<(), Box> { pub fn run() -> Result<(), Box> { let vm = new_vm(); compile_repl(&vm)?; - let mut repl: Function<&Thread, fn(()) -> IO<()>> = vm.get_global("repl")?; + let mut repl: OwnedFunction IO<()>> = vm.get_global("repl")?; debug!("Starting repl"); - repl.call(())?; + + let mut core = ::tokio_core::reactor::Core::new()?; + core.run(repl.call_async(()))?; Ok(()) } diff --git a/src/compiler_pipeline.rs b/src/compiler_pipeline.rs index ee0969961a..3cea65372c 100644 --- a/src/compiler_pipeline.rs +++ b/src/compiler_pipeline.rs @@ -8,6 +8,7 @@ //! difficult to forget a stage. use std::borrow::{Borrow, BorrowMut}; +use std::ops::Deref; use std::result::Result as StdResult; #[cfg(feature = "serde")] @@ -25,10 +26,27 @@ use vm::core; use vm::compiler::CompiledModule; use vm::future::{BoxFutureValue, FutureValue}; use vm::macros::MacroExpander; -use vm::thread::{RootedValue, Thread, ThreadInternal}; +use vm::thread::{Execute, RootedValue, Thread, ThreadInternal, VmRoot}; use {Compiler, Error, Result}; + +fn execute(vm: T, f: F) -> FutureValue> +where + T: Deref, + F: for<'vm> FnOnce(&'vm Thread) -> FutureValue>, +{ + let opt = match f(&vm) { + FutureValue::Value(result) => Some(result.map(|v| v.1)), + FutureValue::Future(_) => None, + FutureValue::Polled => return FutureValue::Polled, + }; + match opt { + Some(result) => FutureValue::Value(result.map(|v| (vm, v))), + None => FutureValue::Future(Execute::new(vm)), + } +} + pub type SalvageResult = StdResult, Error)>; /// Result type of successful macro expansion @@ -298,33 +316,40 @@ where } /// Result of successful execution -pub struct ExecuteValue<'vm, E> { +pub struct ExecuteValue +where + T: Deref, +{ pub id: Symbol, pub expr: E, pub typ: ArcType, - pub value: RootedValue<&'vm Thread>, + pub value: RootedValue, } pub trait Executable<'vm, Extra> { type Expr; - fn run_expr( + fn run_expr( self, compiler: &mut Compiler, - vm: &'vm Thread, + vm: T, name: &str, expr_str: &str, arg: Extra, - ) -> BoxFutureValue<'vm, ExecuteValue<'vm, Self::Expr>, Error>; + ) -> BoxFutureValue<'vm, ExecuteValue, Error> + where + T: Send + VmRoot<'vm>; - fn load_script( + fn load_script( self, compiler: &mut Compiler, - vm: &'vm Thread, + vm: T, filename: &str, expr_str: &str, arg: Extra, - ) -> BoxFutureValue<'vm, (), Error>; + ) -> BoxFutureValue<'vm, (), Error> + where + T: Send + VmRoot<'vm>; } impl<'vm, C, Extra> Executable<'vm, Extra> for C where @@ -333,28 +358,34 @@ where { type Expr = C::Expr; - fn run_expr( + fn run_expr( self, compiler: &mut Compiler, - vm: &'vm Thread, + vm: T, name: &str, expr_str: &str, arg: Extra, - ) -> BoxFutureValue<'vm, ExecuteValue<'vm, Self::Expr>, Error> { - match self.compile(compiler, vm, name, expr_str, arg) { + ) -> BoxFutureValue<'vm, ExecuteValue, Error> + where + T: Send + VmRoot<'vm>, + { + match self.compile(compiler, &vm, name, expr_str, arg) { Ok(v) => v.run_expr(compiler, vm, name, expr_str, ()), Err(err) => FutureValue::Value(Err(err)), } } - fn load_script( + fn load_script( self, compiler: &mut Compiler, - vm: &'vm Thread, + vm: T, filename: &str, expr_str: &str, arg: Extra, - ) -> BoxFutureValue<'vm, (), Error> { - match self.compile(compiler, vm, filename, expr_str, arg) { + ) -> BoxFutureValue<'vm, (), Error> + where + T: Send + VmRoot<'vm>, + { + match self.compile(compiler, &vm, filename, expr_str, arg) { Ok(v) => v.load_script(compiler, vm, filename, expr_str, ()), Err(err) => FutureValue::Value(Err(err)), } @@ -366,14 +397,17 @@ where { type Expr = E; - fn run_expr( + fn run_expr( self, compiler: &mut Compiler, - vm: &'vm Thread, + vm: T, name: &str, _expr_str: &str, _: (), - ) -> BoxFutureValue<'vm, ExecuteValue<'vm, Self::Expr>, Error> { + ) -> BoxFutureValue<'vm, ExecuteValue, Error> + where + T: Send + VmRoot<'vm>, + { let CompileValue { expr, typ, @@ -383,13 +417,15 @@ where let module_id = Symbol::from(format!("@{}", name)); module.function.id = module_id.clone(); let closure = try_future!(vm.global_env().new_global_thunk(module)); - vm.call_thunk(closure) + + let vm1 = vm.clone(); + execute(vm1, |vm| vm.call_thunk(closure)) .map(|(vm, value)| { ExecuteValue { id: module_id, expr: expr, typ: typ, - value: vm.root_value(value), + value: vm.root_value_with_self(value), } }) .map_err(Error::from) @@ -402,23 +438,28 @@ where }) .boxed() } - fn load_script( + fn load_script( self, compiler: &mut Compiler, - vm: &'vm Thread, + vm: T, filename: &str, expr_str: &str, _: (), - ) -> BoxFutureValue<'vm, (), Error> { + ) -> BoxFutureValue<'vm, (), Error> + where + T: Send + VmRoot<'vm>, + { use check::metadata; let run_io = compiler.run_io; let filename = filename.to_string(); - self.run_expr(compiler, vm, &filename, expr_str, ()) + let vm1 = vm.clone(); + let vm2 = vm.clone(); + self.run_expr(compiler, vm1, &filename, expr_str, ()) .and_then(move |v| { if run_io { - ::compiler_pipeline::run_io(vm, v) + ::compiler_pipeline::run_io(vm2, v) } else { FutureValue::sync(Ok(v)).boxed() } @@ -456,18 +497,21 @@ where { type Expr = (); - fn run_expr( + fn run_expr( self, _compiler: &mut Compiler, - vm: &'vm Thread, + vm: T, filename: &str, _expr_str: &str, _: (), - ) -> BoxFutureValue<'vm, ExecuteValue<'vm, Self::Expr>, Error> { + ) -> BoxFutureValue<'vm, ExecuteValue, Error> + where + T: Send + VmRoot<'vm>, + { use vm::serialization::DeSeed; let module: Module = try_future!( - DeSeed::new(vm) + DeSeed::new(&vm) .deserialize(self.0) .map_err(|err| err.to_string()) ); @@ -481,27 +525,31 @@ where .boxed(); } let typ = module.typ; + let vm1 = vm.clone(); let closure = try_future!(vm.global_env().new_global_thunk(module.module)); - vm.call_thunk(closure) + execute(vm1, |vm| vm.call_thunk(closure)) .map(|(vm, value)| { ExecuteValue { id: module_id, expr: (), typ: typ, - value: vm.root_value(value), + value: vm.root_value_with_self(value), } }) .map_err(Error::from) .boxed() } - fn load_script( + fn load_script( self, compiler: &mut Compiler, - vm: &'vm Thread, + vm: T, name: &str, _expr_str: &str, _: (), - ) -> BoxFutureValue<'vm, (), Error> { + ) -> BoxFutureValue<'vm, (), Error> + where + T: Send + VmRoot<'vm>, + { use vm::serialization::DeSeed; use vm::internal::Global; @@ -511,7 +559,7 @@ where value, id: _, } = try_future!( - DeSeed::new(vm) + DeSeed::new(&vm) .deserialize(self.0) .map_err(|err| err.to_string()) ); @@ -557,25 +605,29 @@ where .map_err(Either::Right) } -pub fn run_io<'vm, E>( - vm: &'vm Thread, - v: ExecuteValue<'vm, E>, -) -> BoxFutureValue<'vm, ExecuteValue<'vm, E>, Error> +pub fn run_io<'vm, T, E>( + vm: T, + v: ExecuteValue, +) -> BoxFutureValue<'vm, ExecuteValue, Error> where E: Send + 'vm, + T: Send + VmRoot<'vm>, { use check::check_signature; use vm::api::{VmType, IO}; use vm::api::generic::A; - if check_signature(&*vm.get_env(), &v.typ, &IO::::make_forall_type(vm)) { + if check_signature(&*vm.get_env(), &v.typ, &IO::::make_forall_type(&vm)) { let ExecuteValue { id, expr, typ, value, } = v; - vm.execute_io(*value) + + + let vm1 = vm.clone(); + execute(vm1, |vm| vm.execute_io(*value)) .map(move |(_, value)| { // The type of the new value will be `a` instead of `IO a` let actual = resolve::remove_aliases_cow(&*vm.get_env(), &typ); @@ -586,7 +638,7 @@ where ExecuteValue { id, expr, - value: vm.root_value(value), + value: vm.root_value_with_self(value), typ: actual, } }) diff --git a/src/io.rs b/src/io.rs index 695add46f2..dfcb4131f4 100644 --- a/src/io.rs +++ b/src/io.rs @@ -3,19 +3,24 @@ use std::fmt; use std::fs::File; use std::sync::Mutex; +use futures::{Future, IntoFuture}; +use futures::future::Either; + use vm::{self, ExternModule, Result, Variants}; +use vm::future::FutureValue; use vm::gc::{Gc, Traverseable}; use vm::types::*; -use vm::thread::ThreadInternal; -use vm::thread::Thread; -use vm::api::{Array, FunctionRef, Generic, Getable, Hole, OpaqueValue, TypedBytecode, Userdata, - VmType, WithVM, IO}; +use vm::thread::{RootedThread, Thread, ThreadInternal}; +use vm::api::{Array, FutureResult, Generic, Getable, OpaqueValue, OwnedFunction, TypedBytecode, + Userdata, VmType, WithVM, IO}; use vm::api::generic::{A, B}; use vm::stack::StackFrame; use vm::internal::ValuePrinter; use vm::internal::Value; +use compiler_pipeline::*; + use super::{Compiler, Error}; fn print(s: &str) -> IO<()> { @@ -118,31 +123,35 @@ fn read_line() -> IO { /// IO a -> (String -> IO a) -> IO a fn catch<'vm>( action: OpaqueValue<&'vm Thread, IO>, - mut catch: FunctionRef IO>>, -) -> IO> { - let vm = action.vm(); + mut catch: OwnedFunction IO>>, +) -> FutureResult>, Error = vm::Error> + Send>> { + let vm = action.vm().root_thread(); let frame_level = vm.context().stack.get_frames().len(); - let mut action: FunctionRef Generic> = - unsafe { Getable::from_value(vm, Variants::new(&action.get_value())).unwrap() }; - let result = action.call(()); - match result { - Ok(value) => IO::Value(value), + let mut action: OwnedFunction Generic> = + unsafe { Getable::from_value(&vm, Variants::new(&action.get_value())).unwrap() }; + + let future = action.call_async(()).then(move |result| match result { + Ok(value) => Either::A(Ok(IO::Value(value)).into_future()), Err(err) => { { let mut context = vm.context(); let mut stack = StackFrame::current(&mut context.stack); while stack.stack.get_frames().len() > frame_level { if stack.exit_scope().is_err() { - return IO::Exception("Unknown error".into()); + return Either::A(Ok(IO::Exception("Unknown error".into())).into_future()); } } } - match catch.call(format!("{}", err)) { - Ok(value) => value, - Err(err) => IO::Exception(format!("{}", err)), - } + Either::B(catch.call_async(format!("{}", err)).then(|result| { + Ok(match result { + Ok(value) => value, + Err(err) => IO::Exception(format!("{}", err)), + }) + })) } - } + }); + + FutureResult(Box::new(future)) } fn clear_frames(err: Error, frame_level: usize, mut stack: StackFrame) -> IO { @@ -161,39 +170,92 @@ fn clear_frames(err: Error, frame_level: usize, mut stack: StackFrame) -> IO) -> IO { +fn run_expr( + WithVM { vm, value: expr }: WithVM<&str>, +) -> FutureValue, Error = vm::Error> + Send>> { + let vm = vm.root_thread(); let frame_level = vm.context().stack.get_frames().len(); - let run_result = Compiler::new().run_expr::>(vm, "", expr); - let mut context = vm.context(); - let stack = StackFrame::current(&mut context.stack); - match run_result { - Ok((value, typ)) => { - let env = vm.global_env().get_env(); - unsafe { - IO::Value(format!( - "{} : {}", - ValuePrinter::new(&*env, &typ, value.get_value()).width(80), - typ - )) - } - } - Err(err) => clear_frames(err, frame_level, stack), - } + + let vm1 = vm.clone(); + let future = expr.run_expr(&mut Compiler::new(), vm1, "", expr, None) + .then(move |run_result| { + let mut context = vm.context(); + let stack = StackFrame::current(&mut context.stack); + FutureValue::sync(Ok(match run_result { + Ok(execute_value) => { + let env = vm.global_env().get_env(); + let typ = execute_value.typ; + IO::Value(format!( + "{} : {}", + ValuePrinter::new(&*env, &typ, *execute_value.value).width(80), + typ + )) + } + Err(err) => clear_frames(err, frame_level, stack), + })) + }); + + future.boxed() } -fn load_script(WithVM { vm, value: name }: WithVM<&str>, expr: &str) -> IO { +fn load_script( + WithVM { vm, value: name }: WithVM<&str>, + expr: &str, +) -> FutureValue, Error = vm::Error> + Send>> { let frame_level = vm.context().stack.get_frames().len(); - let run_result = Compiler::new() - .load_script_async(vm, name, expr) - .sync_or_error(); - let mut context = vm.context(); - let stack = StackFrame::current(&mut context.stack); - match run_result { - Ok(()) => IO::Value(format!("Loaded {}", name)), - Err(err) => clear_frames(err, frame_level, stack), + + let vm1 = vm.root_thread(); + let vm = vm.root_thread(); + let name = name.to_string(); + let future = expr.load_script(&mut Compiler::new(), vm1, &name, expr, None) + .then(move |run_result| { + let mut context = vm.context(); + let stack = StackFrame::current(&mut context.stack); + let io = match run_result { + Ok(()) => IO::Value(format!("Loaded {}", name)), + Err(err) => clear_frames(err, frame_level, stack), + }; + Ok(io).into() + }); + future.boxed() +} + +fn new_thread(WithVM { vm, .. }: WithVM<()>) -> IO { + match vm.new_thread() { + Ok(thread) => IO::Value(thread), + Err(err) => IO::Exception(err.to_string()), } } +fn new_interruptible_thread(WithVM { vm, .. }: WithVM<()>) -> IO { + use vm::thread::HookFlags; + use futures::Async; + + match vm.new_thread() { + Ok(thread) => { + { + let mut context = thread.context(); + + let mut i = 0; + context.set_hook(Some(Box::new(move |_, _| { + i += 1; + if i == 100 { + i = 0; + Ok(Async::NotReady) + } else { + Ok(Async::Ready(())) + } + }))); + context.set_hook_mask(HookFlags::CALL_FLAG); + } + + IO::Value(thread) + } + Err(err) => IO::Exception(err.to_string()), + } +} + + mod std { pub mod io { pub use io as prim; @@ -234,7 +296,9 @@ pub fn load(vm: &Thread) -> Result { println => primitive!(1 std::io::prim::println), catch => primitive!(2 std::io::prim::catch), run_expr => primitive!(1 std::io::prim::run_expr), - load_script => primitive!(2 std::io::prim::load_script) + load_script => primitive!(2 std::io::prim::load_script), + new_thread => primitive!(1 std::io::prim::new_thread), + new_interruptible_thread => primitive!(1 std::io::prim::new_interruptible_thread) }, ) } diff --git a/tests/api.rs b/tests/api.rs index 398c7e4a34..e947a2c109 100644 --- a/tests/api.rs +++ b/tests/api.rs @@ -10,7 +10,7 @@ use futures::future::lazy; use gluon::base::types::Type; use gluon::vm::{Error, ExternModule}; -use gluon::vm::api::{FunctionRef, FutureResult, Userdata, VmType}; +use gluon::vm::api::{FunctionRef, FutureResult, Userdata, VmType, IO}; use gluon::vm::thread::{Root, RootStr, RootedThread, Thread, Traverseable}; use gluon::vm::types::VmInt; use gluon::Compiler; @@ -169,31 +169,62 @@ fn return_finished_future() { assert_eq!(result, expected); } + +fn poll_n( + s: String, +) -> FutureResult, Error = Error> + Send + 'static>> { + use std::thread::spawn; + use futures::sync::oneshot::channel; + + let (ping_c, ping_p) = channel(); + let (pong_c, pong_p) = channel(); + spawn(move || { + ping_p.wait().expect("wait"); + pong_c.send(s).expect("send"); + }); + FutureResult(Box::new( + lazy(move || { + ping_c.send(()).unwrap(); + Ok(()) + }).and_then(|_| { + pong_p + .map(IO::Value) + .map_err(|err| Error::Message(format!("{}", err))) + }), + )) +} + #[test] -fn return_delayed_future() { +fn return_delayed_future_simple() { let _ = ::env_logger::init(); - fn poll_n(i: i32) -> FutureResult + Send + 'static>> { - use std::thread::spawn; - use futures::sync::oneshot::channel; - - let (ping_c, ping_p) = channel(); - let (pong_c, pong_p) = channel(); - spawn(move || { - ping_p.wait().expect("wait"); - pong_c.send(i).expect("send"); - }); - FutureResult(Box::new( - lazy(move || { - ping_c.send(()).unwrap(); - Ok(()) - }).and_then(|_| pong_p.map_err(|err| Error::Message(format!("{}", err)))), - )) - } + let expr = r#" + let poll_n = import! poll_n + poll_n "test" + "#; + + let vm = make_vm(); + add_extern_module(&vm, "poll_n", |thread| { + ExternModule::new(thread, primitive!(1 poll_n)) + }); + + let (result, _) = Compiler::new() + .run_io(true) + .run_expr::>(&vm, "", expr) + .unwrap_or_else(|err| panic!("{}", err)); + let expected = IO::Value("test".to_string()); + + assert_eq!(result, expected); +} + +#[test] +fn return_delayed_future_in_catch() { + let _ = ::env_logger::init(); let expr = r#" + let io = import! std.io let poll_n = import! poll_n - poll_n 3 + io.catch (poll_n "test") (\_ -> io.applicative.wrap "error") "#; let vm = make_vm(); @@ -201,10 +232,11 @@ fn return_delayed_future() { ExternModule::new(thread, primitive!(1 poll_n)) }); - let result = Compiler::new() - .run_expr::(&vm, "", expr) + let (result, _) = Compiler::new() + .run_io(true) + .run_expr::>(&vm, "", expr) .unwrap_or_else(|err| panic!("{}", err)); - let expected = (3, Type::int()); + let expected = IO::Value("test".to_string()); assert_eq!(result, expected); } diff --git a/tests/serialization.rs b/tests/serialization.rs index 75e36db543..42915bd340 100644 --- a/tests/serialization.rs +++ b/tests/serialization.rs @@ -152,14 +152,14 @@ fn precompile() { let precompiled_result = { let mut deserializer = serde_json::Deserializer::from_slice(&buffer); Precompiled(&mut deserializer) - .run_expr(&mut Compiler::new(), &thread, "test", "", ()) + .run_expr(&mut Compiler::new(), &*thread, "test", "", ()) .wait() .unwrap() }; let thread2 = new_vm(); assert_eq!( serialize_value( - &text.run_expr(&mut Compiler::new(), &thread2, "test", &text, None) + &text.run_expr(&mut Compiler::new(), &*thread2, "test", &text, None) .wait() .unwrap() .value diff --git a/vm/src/api/de.rs b/vm/src/api/de.rs index 65ab8f6d36..b050cf8890 100644 --- a/vm/src/api/de.rs +++ b/vm/src/api/de.rs @@ -233,13 +233,15 @@ impl<'de, 't> Deserializer<'de, 't> { Some(c) => if expected(&typ) { visit(c) } else { - Err(VmError::Message( - format!("Unable to deserialize `{}`", self.typ), - )) + Err(VmError::Message(format!( + "Unable to deserialize `{}`", + self.typ + ))) }, - _ => Err(VmError::Message( - format!("Unable to deserialize `{}`", self.typ), - )), + _ => Err(VmError::Message(format!( + "Unable to deserialize `{}`", + self.typ + ))), } } } @@ -272,9 +274,10 @@ impl<'de, 't, 'a> de::Deserializer<'de> for &'a mut Deserializer<'de, 't> { ValueRef::Float(_) => self.deserialize_f64(visitor), ValueRef::Int(_) => self.deserialize_i64(visitor), ValueRef::String(ref s) => visitor.visit_borrowed_str(s), - ValueRef::Userdata(_) | ValueRef::Internal => Err(VmError::Message( - format!("Unable to deserialize `{}`", self.typ), - )), + ValueRef::Userdata(_) | ValueRef::Internal => Err(VmError::Message(format!( + "Unable to deserialize `{}`", + self.typ + ))), } } diff --git a/vm/src/api/mac.rs b/vm/src/api/mac.rs index 97209c6731..a30d4a3146 100644 --- a/vm/src/api/mac.rs +++ b/vm/src/api/mac.rs @@ -1,4 +1,3 @@ - #[doc(hidden)] #[macro_export] macro_rules! primitive_cast { diff --git a/vm/src/api/mod.rs b/vm/src/api/mod.rs index 14cef6f575..0b6363209c 100644 --- a/vm/src/api/mod.rs +++ b/vm/src/api/mod.rs @@ -1,5 +1,6 @@ //! The marshalling api use {forget_lifetime, Error, Result, Variants}; +use future::FutureValue; use gc::{DataDef, Gc, GcPtr, Move, Traverseable}; use base::symbol::{Symbol, Symbols}; use base::scoped_map::ScopedMap; @@ -88,10 +89,10 @@ impl<'a> ValueRef<'a> { Value::Tag(tag) => ValueRef::Data(Data(DataInner::Tag(tag))), Value::Array(array) => ValueRef::Array(ArrayRef(forget_lifetime(&*array))), Value::Userdata(data) => ValueRef::Userdata(forget_lifetime(&**data)), - Value::Thread(_) | - Value::Function(_) | - Value::Closure(_) | - Value::PartialApplication(_) => ValueRef::Internal, + Value::Thread(_) + | Value::Function(_) + | Value::Closure(_) + | Value::PartialApplication(_) => ValueRef::Internal, } } } @@ -130,7 +131,9 @@ impl<'a> Data<'a> { pub fn get_variants(&self, index: usize) -> Option> { match self.0 { DataInner::Tag(_) => None, - DataInner::Data(data) => unsafe { data.fields.get(index).map(|v| Variants::new(v)) }, + DataInner::Data(data) => unsafe { + data.fields.get(index).map(|v| Variants::new(v)) + }, } } @@ -653,7 +656,9 @@ impl<'vm, 's> Pushable<'vm> for &'s String { } impl<'vm, 's> Pushable<'vm> for &'s str { fn push(self, thread: &'vm Thread, context: &mut Context) -> Result<()> { - let s = unsafe { GcStr::from_utf8_unchecked(context.alloc_with(thread, self.as_bytes())?) }; + let s = unsafe { + GcStr::from_utf8_unchecked(context.alloc_with(thread, self.as_bytes())?) + }; context.stack.push(Value::String(s)); Ok(()) } @@ -924,6 +929,42 @@ where } } +impl VmType for FutureValue +where + F: Future, + F::Item: VmType, +{ + type Type = ::Type; + fn make_type(vm: &Thread) -> ArcType { + ::make_type(vm) + } + fn extra_args() -> VmIndex { + ::extra_args() + } +} + +impl<'vm, F> AsyncPushable<'vm> for FutureValue +where + F: Future + Send + 'static, + F::Item: Pushable<'vm>, +{ + fn async_push(self, thread: &'vm Thread, context: &mut Context) -> Result> { + match self { + FutureValue::Value(result) => { + let value = result?; + value.push(thread, context).map(Async::Ready) + } + FutureValue::Future(future) => { + unsafe { + context.return_future(future); + } + Ok(Async::Ready(())) + } + FutureValue::Polled => ice!("Tried to push a polled future to gluon"), + } + } +} + pub enum RuntimeResult { Return(T), Panic(E), @@ -1542,6 +1583,7 @@ fn make_type(vm: &Thread) -> ArcType { /// Type which represents a function reference in gluon pub type FunctionRef<'vm, F> = Function<&'vm Thread, F>; +pub type OwnedFunction = Function; /// Type which represents an function in gluon pub struct Function diff --git a/vm/src/compiler.rs b/vm/src/compiler.rs index 296c48340c..975b1b1963 100644 --- a/vm/src/compiler.rs +++ b/vm/src/compiler.rs @@ -183,14 +183,16 @@ impl FunctionEnvs { let upvars_are_globals = self.envs.len() == 1; if !upvars_are_globals { let function = &mut **self; - function.function.debug_info.upvars.extend( - function.free_vars.iter().map(|&(ref name, ref typ)| { + function + .function + .debug_info + .upvars + .extend(function.free_vars.iter().map(|&(ref name, ref typ)| { UpvarInfo { name: name.declared_name().to_string(), typ: typ.clone(), } - }), - ); + })); } } @@ -553,9 +555,10 @@ impl<'a> Compiler<'a> { // Zero argument constructors can be compiled as integers Constructor(tag, 0) => function.emit(Construct { tag: tag, args: 0 }), Constructor(..) => { - return Err(Error::Message( - format!("Constructor `{}` is not fully applied", id), - )) + return Err(Error::Message(format!( + "Constructor `{}` is not fully applied", + id + ))) } } Ok(()) diff --git a/vm/src/future.rs b/vm/src/future.rs index e7ffef31be..a109b6e9cf 100644 --- a/vm/src/future.rs +++ b/vm/src/future.rs @@ -107,7 +107,7 @@ where pub fn then(self, g: G) -> FutureValue, G>>> where G: FnOnce(Result) -> FutureValue, - B: Future, + B: Future, { match self { FutureValue::Value(v) => match g(v) { diff --git a/vm/src/gc.rs b/vm/src/gc.rs index 371ad879d9..ce24953a3a 100644 --- a/vm/src/gc.rs +++ b/vm/src/gc.rs @@ -696,8 +696,9 @@ impl Gc { .map(|info| &**info as *const _) { Some(info) => info, - None => &**self.record_infos.entry(fields.to_owned()).or_insert( - Box::new(TypeInfo { + None => &**self.record_infos + .entry(fields.to_owned()) + .or_insert(Box::new(TypeInfo { drop: drop::, generation: self.generation, fields: fields @@ -706,8 +707,7 @@ impl Gc { .map(|(i, s)| (*s, i as VmIndex)) .collect(), fields_key: Arc::new(fields.to_owned()), - }), - ), + })), }, None => match self.type_infos.entry(TypeId::of::()) { Entry::Occupied(entry) => &**entry.get(), diff --git a/vm/src/interner.rs b/vm/src/interner.rs index 2f74fe07d1..88e97f5ead 100644 --- a/vm/src/interner.rs +++ b/vm/src/interner.rs @@ -99,10 +99,12 @@ impl Interner { Some(interned_str) => return Ok(*interned_str), None => (), } - let gc_str = unsafe { InternedStr(GcStr::from_utf8_unchecked(gc.alloc(s.as_bytes())?)) }; + let gc_str = + unsafe { InternedStr(GcStr::from_utf8_unchecked(gc.alloc(s.as_bytes())?)) }; // The key will live as long as the value it refers to and the static str never escapes // outside interner so this is safe - let key: &'static str = unsafe { ::std::mem::transmute::<&str, &'static str>(&gc_str) }; + let key: &'static str = + unsafe { ::std::mem::transmute::<&str, &'static str>(&gc_str) }; self.indexes.insert(key, gc_str); Ok(gc_str) } diff --git a/vm/src/lib.rs b/vm/src/lib.rs index b0342bf7f2..810d39ee5d 100644 --- a/vm/src/lib.rs +++ b/vm/src/lib.rs @@ -124,7 +124,7 @@ quick_error! { display("No metadata exists for `{}`", symbol) } WrongType(expected: ArcType, actual: ArcType) { - display("Expected a value of type `{}` but the inferred type was `{}`", + display("Expected a value of type `{}` but the returned type was `{}`", expected, actual) } OutOfMemory { limit: usize, needed: usize } { diff --git a/vm/src/serialization.rs b/vm/src/serialization.rs index 0d8c4bd6c5..f443b6538f 100644 --- a/vm/src/serialization.rs +++ b/vm/src/serialization.rs @@ -518,9 +518,9 @@ pub mod closure { unsafe { match variant { GraphVariant::Marked(id) => { - let function = seq.next_element_seed( - ::serde::de::Seed::new(&mut self.state), - )? + let function = seq.next_element_seed(::serde::de::Seed::new( + &mut self.state, + ))? .ok_or_else(|| V::Error::invalid_length(1, &self))?; let upvars = seq.next_element()? .ok_or_else(|| V::Error::invalid_length(2, &self))?; @@ -539,9 +539,9 @@ pub mod closure { self.state.gc_map.insert(id, closure); for i in 0..upvars { - let value = seq.next_element_seed( - ::serde::de::Seed::new(&mut self.state), - )? + let value = seq.next_element_seed(::serde::de::Seed::new( + &mut self.state, + ))? .ok_or_else(|| V::Error::invalid_length(i + 2, &self))?; closure.as_mut().upvars[i] = value; } diff --git a/vm/src/stack.rs b/vm/src/stack.rs index defcfaf3bd..3138ce0c6e 100644 --- a/vm/src/stack.rs +++ b/vm/src/stack.rs @@ -126,7 +126,11 @@ impl Stack { /// /// Panics if the lock is not the top-most lock pub fn release_lock(&mut self, lock: Lock) { - assert!(self.frames.pop().map(|frame| frame.offset) == Some(lock.0)); + let i = self.frames + .iter() + .rposition(|frame| frame.state == State::Lock) + .unwrap(); + assert!(self.frames.remove(i).offset == lock.0); } /// Creates a stackrace starting from `frame_level` @@ -319,6 +323,10 @@ impl<'a: 'b, 'b> StackFrame<'b> { /// Lock the stack below the current offset pub fn into_lock(mut self) -> Lock { + self.insert_lock() + } + + pub fn insert_lock(&mut self) -> Lock { let offset = self.stack.len(); self.frame = StackFrame::add_new_frame(&mut self.stack, 0, State::Lock); Lock(offset) diff --git a/vm/src/thread.rs b/vm/src/thread.rs index b295e468b4..17e3ea02b1 100644 --- a/vm/src/thread.rs +++ b/vm/src/thread.rs @@ -631,8 +631,17 @@ impl Thread { } } -pub trait VmRoot<'a>: Deref { +pub trait VmRoot<'a>: Deref + Clone + 'a { fn root(thread: &'a Thread) -> Self; + + /// Roots a value + fn root_value_with_self(self, value: Value) -> RootedValue { + self.rooted_values.write().unwrap().push(value); + RootedValue { + vm: self, + value: value, + } + } } impl<'a> VmRoot<'a> for &'a Thread { @@ -992,8 +1001,9 @@ pub struct Context { #[cfg_attr(feature = "serde_derive", serde(skip))] hook: Hook, max_stack_size: VmIndex, + /// Stack of polling functions used for extern functions returning futures #[cfg_attr(feature = "serde_derive", serde(skip))] - poll_fn: Option Result> + Send>>, + poll_fns: Vec FnMut(&'vm Thread) -> Result>> + Send>>, } impl Context { @@ -1007,7 +1017,7 @@ impl Context { previous_instruction_index: usize::max_value(), }, max_stack_size: VmIndex::max_value(), - poll_fn: None, + poll_fns: Vec::new(), } } @@ -1062,10 +1072,12 @@ impl Context { F::Item: Pushable<'vm>, { use std::mem::transmute; - self.poll_fn = Some(Box::new(move |vm, context| { - let vm = transmute::<&Thread, &'vm Thread>(vm); + self.poll_fns.push(Box::new(move |vm| { let value = try_ready!(future.poll()); - value.push(vm, context).map(Async::Ready) + + let mut context = vm.current_context(); + let vm = transmute::<&Thread, &'vm Thread>(vm); + value.push(vm, &mut context).map(|()| Async::Ready(context)) })); } } @@ -1125,6 +1137,10 @@ impl<'b> DerefMut for OwnedContext<'b> { } } +const INITIAL_CALL: usize = 0; +const POLL_CALL: usize = 1; +const IN_POLL: usize = 2; + impl<'b> OwnedContext<'b> { fn exit_scope(mut self) -> StdResult, ()> { let exists = StackFrame::current(&mut self.stack).exit_scope().is_ok(); @@ -1164,9 +1180,12 @@ impl<'b> OwnedContext<'b> { State::Excess => context.exit_scope().ok(), State::Extern(ext) => { let instruction_index = context.borrow_mut().stack.frame.instruction_index; - context.borrow_mut().stack.frame.instruction_index = 1; + if instruction_index == IN_POLL { + return Ok(Async::Ready(Some(context))); + } + context.borrow_mut().stack.frame.instruction_index = POLL_CALL; Some(try_ready!(context.execute_function( - instruction_index == 0, + instruction_index == INITIAL_CALL, &ext, polled, ))) @@ -1237,6 +1256,7 @@ impl<'b> OwnedContext<'b> { function.id, &self.stack.current_frame()[..] ); + let mut status = Status::Ok; if initial_call { // Make sure that the stack is not borrowed during the external function call @@ -1244,27 +1264,39 @@ impl<'b> OwnedContext<'b> { let thread = self.thread; drop(self); status = (function.function)(thread); - self = thread.current_context(); + if status == Status::Yield { return Ok(Async::NotReady); } + + self = thread.current_context(); } - if let Some(mut poll_fn) = self.poll_fn.take() { + if let Some(mut poll_fn) = self.poll_fns.pop() { // We can only poll the future if the code is currently executing in a future if !polled { - self.poll_fn = Some(poll_fn); + self.poll_fns.push(poll_fn); return Ok(Async::NotReady); } + + self.borrow_mut().stack.frame.instruction_index = IN_POLL; + let thread = self.thread; + drop(self); // Poll the future that was returned from the initial call to this extern function - match poll_fn(self.thread, &mut self)? { - Async::Ready(()) => (), + match poll_fn(thread)? { + Async::Ready(context) => { + self = context; + self.borrow_mut().stack.frame.instruction_index = POLL_CALL; + } Async::NotReady => { + self = thread.current_context(); + self.borrow_mut().stack.frame.instruction_index = POLL_CALL; // Restore `poll_fn` so it can be polled again - self.poll_fn = Some(poll_fn); + self.poll_fns.push(poll_fn); return Ok(Async::NotReady); } } } + // The function call is done at this point so remove any extra values from the frame and // return the value at the top of the stack let result = self.stack.pop(); @@ -1282,7 +1314,7 @@ impl<'b> OwnedContext<'b> { "Attempted to pop {:?} but {} was expected", stack.frame, function.id - ); + ) } self = self.exit_scope().map_err(|_| { Error::Message(StdString::from("Poped the last frame in execute_function"))