Skip to content

Commit

Permalink
fix: Update the http server example to conform to a newer hyper version
Browse files Browse the repository at this point in the history
Some changes were needed in the vm to prevent `Future::poll` from being called unless execution is inside a `Future::poll` call (inside a task).
  • Loading branch information
Marwes committed Apr 9, 2017
1 parent c6995f2 commit c841e4f
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 29 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ skeptic = { version = "0.6", optional = true }
collect-mac = "0.1.0"
env_logger = "0.3.4"

hyper = { git = "https://github.com/hyperium/hyper", default-features = false }
hyper = { git = "https://github.com/hyperium/hyper", default-features = false, rev = "5c1cfa2bce93a57db2f720cd04436a5f92e80634" }
curl = "0.4.1"

[features]
Expand Down
30 changes: 13 additions & 17 deletions examples/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ fn write_response(response: &ResponseBody,
use futures::future::poll_fn;
use futures::AsyncSink;

// Turn `bytes´ into a `Chunk` which can be sent to the http body
// Turn `bytes´ into a `Chunk` which can be sent to the http body
let mut unsent_chunk = Some(Ok(bytes.to_owned().into()));
let response = response.0.clone();
FutureResult(poll_fn(move || {
Expand Down Expand Up @@ -247,9 +247,7 @@ type HttpState = record_type!{
fn listen(port: i32, value: WithVM<OpaqueValue<RootedThread, Handler<Response>>>) -> IO<()> {
let WithVM { value: handler, vm: thread } = value;

use hyper::Server;
use hyper::server::Request as HyperRequest;
use hyper::server::Response as HyperResponse;
use hyper::server::{Http, Request as HyperRequest, Response as HyperResponse};

// Retrieve the `handle` function from the http module which we use to evaluate values of type // `Handler Response`
type ListenFn = fn(OpaqueValue<RootedThread, Handler<Response>>, HttpState) -> IO<Response>;
Expand Down Expand Up @@ -318,21 +316,19 @@ fn listen(port: i32, value: WithVM<OpaqueValue<RootedThread, Handler<Response>>>
}

let addr = format!("127.0.0.1:{}", port).parse().unwrap();
let (_listening, server) = Server::standalone(move |tokio| {
Server::http(&addr, tokio)
?
.handle(move || {
Ok(Listen {
handle: handle.clone(),
handler: handler.clone(),
})
},
tokio)
let result = Http::new()
.bind(&addr, move || {
Ok(Listen {
handle: handle.clone(),
handler: handler.clone(),
})
})
.unwrap();
server.run();
.and_then(|server| server.run());

IO::Value(())
match result {
Ok(()) => IO::Value(()),
Err(err) => IO::Exception(format!("{}", err)),
}
}

// To let the `http_types` module refer to `Body` and `ResponseBody` we register these types in a
Expand Down
4 changes: 2 additions & 2 deletions tests/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ fn return_delayed_future() {
let (ping_c, ping_p) = channel();
let (pong_c, pong_p) = channel();
spawn(move || {
ping_p.wait().unwrap();
pong_c.send(i).unwrap();
ping_p.wait().expect("wait");
pong_c.send(i).expect("send");
});
FutureResult(lazy(move || {
ping_c.send(()).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion vm/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,7 @@ impl<'vm, F> AsyncPushable<'vm> for FutureResult<F>
unsafe {
context.return_future(self.0);
}
Ok(Async::NotReady)
Ok(Async::Ready(()))
}
}

Expand Down
1 change: 0 additions & 1 deletion vm/src/primitives.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,6 @@ pub fn load(vm: &Thread) -> Result<()> {
slice => primitive!(3 prim::string_slice),
from_utf8 => primitive::<fn(Vec<u8>) -> StdResult<String, ()>>("from_utf8", prim::from_utf8),
char_at => primitive!(2 prim::char_at),
from_utf8 => primitive::<fn(Vec<u8>) -> StdResult<String, ()>>("from_utf8", prim::from_utf8),
as_bytes => primitive!(1 str::as_bytes)
))?;
vm.define_global("char",
Expand Down
21 changes: 14 additions & 7 deletions vm/src/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ impl ThreadInternal for Thread {
let mut context = self.current_context();
context.stack.push(Closure(closure));
context.borrow_mut().enter_scope(0, State::Closure(closure));
let async = try_future!(context.execute());
let async = try_future!(context.execute(false));
match async {
Async::Ready(context) => FutureValue::Value(Ok((self, context.unwrap().stack.pop()))),
Async::NotReady => FutureValue::Future(Execute::new(self)),
Expand Down Expand Up @@ -649,7 +649,7 @@ impl ThreadInternal for Thread {
args: VmIndex)
-> Result<Async<Option<OwnedContext<'b>>>> {
context.borrow_mut().do_call(args)?;
context.execute()
context.execute(false)
}

fn resume(&self) -> Result<Async<OwnedContext>> {
Expand All @@ -658,7 +658,7 @@ impl ThreadInternal for Thread {
// Only the top level frame left means that the thread has finished
return Err(Error::Dead);
}
context = try_ready!(context.execute()).unwrap();
context = try_ready!(context.execute(true)).unwrap();
Ok(Async::Ready(context))
}

Expand Down Expand Up @@ -969,7 +969,7 @@ impl<'b> OwnedContext<'b> {
if exists { Ok(self) } else { Err(()) }
}

fn execute(self) -> Result<Async<Option<OwnedContext<'b>>>> {
fn execute(self, polled: bool) -> Result<Async<Option<OwnedContext<'b>>>> {
let mut maybe_context = Some(self);
while let Some(mut context) = maybe_context {
debug!("STACK\n{:?}", context.stack.get_frames());
Expand Down Expand Up @@ -1000,7 +1000,7 @@ impl<'b> OwnedContext<'b> {
State::Extern(ext) => {
let instruction_index = context.borrow_mut().stack.frame.instruction_index;
context.borrow_mut().stack.frame.instruction_index = 1;
Some(try_ready!(context.execute_function(instruction_index == 0, &ext)))
Some(try_ready!(context.execute_function(instruction_index == 0, &ext, polled)))
}
State::Closure(closure) => {
let max_stack_size = context.max_stack_size;
Expand Down Expand Up @@ -1055,7 +1055,8 @@ impl<'b> OwnedContext<'b> {

fn execute_function(mut self,
initial_call: bool,
function: &ExternFunction)
function: &ExternFunction,
polled: bool)
-> Result<Async<OwnedContext<'b>>> {
info!("CALL EXTERN {} {:?}",
function.id,
Expand All @@ -1071,7 +1072,13 @@ impl<'b> OwnedContext<'b> {
if status == Status::Yield {
return Ok(Async::NotReady);
}
} else if let Some(mut poll_fn) = self.poll_fn.take() {
}
if let Some(mut poll_fn) = self.poll_fn.take() {
// We can only poll the future if the code is currently executing in a future
if !polled {
self.poll_fn = Some(poll_fn);
return Ok(Async::NotReady);
}
// Poll the future that was returned from the initial call to this extern function
match poll_fn(self.thread, &mut self)? {
Async::Ready(()) => (),
Expand Down

0 comments on commit c841e4f

Please sign in to comment.