From bf258f27bd444576ead7f20010284a554995ca82 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 15 Aug 2019 16:34:33 +0200 Subject: [PATCH 1/7] Update to stables futures --- Cargo.toml | 5 +- src/bridge_state.rs | 23 ++-- src/testing.rs | 71 +++++------ src/voter/mod.rs | 253 ++++++++++++++++++-------------------- src/voter/past_rounds.rs | 97 +++++++++------ src/voter/voting_round.rs | 56 +++++---- 6 files changed, 258 insertions(+), 247 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6fa84441..d666df67 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,16 +8,15 @@ repository = "https://github.com/paritytech/finality-grandpa" edition = "2018" [dependencies] -futures = "0.1" +futures-preview = "0.3.0-alpha.17" +futures-timer = "0.3.0" log = "0.4" parking_lot = { version = "0.9", optional = true } parity-scale-codec = { version = "1.0.3", optional = true, default-features = false, features = ["derive"] } num = { package = "num-traits", version = "0.2", default-features = false } [dev-dependencies] -exit-future = "0.1.2" rand = "0.6.0" -tokio = "0.1.8" [features] default = ["std"] diff --git a/src/bridge_state.rs b/src/bridge_state.rs index fd9bab3a..3388e784 100644 --- a/src/bridge_state.rs +++ b/src/bridge_state.rs @@ -18,18 +18,19 @@ use crate::round::State as RoundState; use futures::task; use parking_lot::{RwLock, RwLockReadGuard}; use std::sync::Arc; +use std::task::Context; // round state bridged across rounds. struct Bridged { inner: RwLock>, - task: task::AtomicTask, + waker: task::AtomicWaker, } impl Bridged { fn new(inner: RwLock>) -> Self { Bridged { inner, - task: task::AtomicTask::new(), + waker: task::AtomicWaker::new(), } } } @@ -41,7 +42,7 @@ impl PriorView { /// Push an update to the latter view. pub(crate) fn update(&self, new: RoundState) { *self.0.inner.write() = new; - self.0.task.notify(); + self.0.waker.wake(); } } @@ -50,8 +51,8 @@ pub(crate) struct LatterView(Arc>); impl LatterView { /// Fetch a handle to the last round-state. - pub(crate) fn get(&self) -> RwLockReadGuard> { - self.0.task.register(); + pub(crate) fn get(&self, cx: &mut Context) -> RwLockReadGuard> { + self.0.waker.register(cx.waker()); self.0.inner.read() } } @@ -73,7 +74,7 @@ pub(crate) fn bridge_state(initial: RoundState) -> (PriorView, #[cfg(test)] mod tests { use futures::prelude::*; - use std::sync::Barrier; + use std::{sync::Barrier, task::Poll}; use super::*; #[test] @@ -86,11 +87,11 @@ mod tests { }; let (prior, latter) = bridge_state(initial); - let waits_for_finality = ::futures::future::poll_fn(move || -> Poll<(), ()> { - if latter.get().finalized.is_some() { - Ok(Async::Ready(())) + let waits_for_finality = ::futures::future::poll_fn(move |cx| -> Poll<()> { + if latter.get(cx).finalized.is_some() { + Poll::Ready(()) } else { - Ok(Async::NotReady) + Poll::Pending } }); @@ -107,6 +108,6 @@ mod tests { }); barrier.wait(); - waits_for_finality.wait().unwrap(); + futures::executor::block_on(waits_for_finality); } } diff --git a/src/testing.rs b/src/testing.rs index fe29971d..ce47dc38 100644 --- a/src/testing.rs +++ b/src/testing.rs @@ -133,12 +133,14 @@ pub mod environment { use crate::voter::{RoundData, CommunicationIn, CommunicationOut, Callback}; use crate::{Chain, Commit, Error, Equivocation, Message, Prevote, Precommit, PrimaryPropose, SignedMessage, HistoricalVotes}; use futures::prelude::*; - use futures::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; + use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender}; + use futures_timer::Delay; use parking_lot::Mutex; use std::collections::HashMap; + use std::pin::Pin; use std::sync::Arc; - use std::time::{Instant, Duration}; - use tokio::timer::Delay; + use std::task::{Context, Poll}; + use std::time::Duration; #[derive(Hash, Debug, Clone, Copy, PartialEq, Eq, Ord, PartialOrd)] pub struct Id(pub u32); @@ -194,26 +196,25 @@ pub mod environment { } impl crate::voter::Environment<&'static str, u32> for Environment { - type Timer = Box + Send + 'static>; + type Timer = Pin> + Send + 'static>>; type Id = Id; type Signature = Signature; - type In = Box,Error=Error> + Send + 'static>; - type Out = Box,SinkError=Error> + Send + 'static>; + type In = Pin,Error>> + Send + 'static>>; + type Out = Pin,Error=Error> + Send + 'static>>; type Error = Error; fn round_data(&self, round: u64) -> RoundData { const GOSSIP_DURATION: Duration = Duration::from_millis(500); - let now = Instant::now(); let (incoming, outgoing) = self.network.make_round_comms(round, self.local_id); RoundData { voter_id: Some(self.local_id), - prevote_timer: Box::new(Delay::new(now + GOSSIP_DURATION) + prevote_timer: Box::pin(Delay::new(GOSSIP_DURATION) .map_err(|_| panic!("Timer failed"))), - precommit_timer: Box::new(Delay::new(now + GOSSIP_DURATION + GOSSIP_DURATION) + precommit_timer: Box::pin(Delay::new(GOSSIP_DURATION + GOSSIP_DURATION) .map_err(|_| panic!("Timer failed"))), - incoming: Box::new(incoming), - outgoing: Box::new(outgoing), + incoming: Box::pin(incoming), + outgoing: Box::pin(outgoing), } } @@ -225,8 +226,7 @@ pub mod environment { let delay = Duration::from_millis( rand::thread_rng().gen_range(0, COMMIT_DELAY_MILLIS)); - let now = Instant::now(); - Box::new(Delay::new(now + delay).map_err(|_| panic!("Timer failed"))) + Box::pin(Delay::new(delay).map_err(|_| panic!("Timer failed"))) } fn completed( @@ -316,13 +316,13 @@ pub mod environment { // add a node to the network for a round. fn add_node M>(&mut self, f: F) -> ( - impl Stream, - impl Sink + impl Stream>, + impl Sink ) { let (tx, rx) = mpsc::unbounded(); let messages_out = self.raw_sender.clone() .sink_map_err(|e| panic!("Error sending messages: {:?}", e)) - .with(move |message| Ok(f(message))); + .with(move |message| future::ready(Ok(f(message)))); // get history to the node. for prior_message in self.history.iter().cloned() { @@ -330,18 +330,17 @@ pub mod environment { } self.senders.push(tx); - let rx = rx.map_err(|e| panic!("Error receiving messages: {:?}", e)); - (rx, messages_out) + (rx.map(Ok), messages_out) } // do routing work - fn route(&mut self) -> Poll<(), ()> { + fn route(&mut self, cx: &mut Context) -> Poll<()> { loop { - match self.receiver.poll().map_err(|e| panic!("Error routing messages: {:?}", e))? { - Async::NotReady => return Ok(Async::NotReady), - Async::Ready(None) => return Ok(Async::Ready(())), - Async::Ready(Some(item)) => { + match Stream::poll_next(Pin::new(&mut self.receiver), cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => return Poll::Ready(()), + Poll::Ready(Some(item)) => { self.history.push(item.clone()); for sender in &self.senders { let _ = sender.unbounded_send(item.clone()); @@ -376,8 +375,8 @@ pub mod environment { impl Network { pub fn make_round_comms(&self, round_number: u64, node_id: Id) -> ( - impl Stream,Error=Error>, - impl Sink,SinkError=Error> + impl Stream,Error>>, + impl Sink,Error=Error> ) { let mut rounds = self.rounds.lock(); rounds.entry(round_number) @@ -390,8 +389,8 @@ pub mod environment { } pub fn make_global_comms(&self) -> ( - impl Stream,Error=Error>, - impl Sink,SinkError=Error> + impl Stream,Error>>, + impl Sink,Error=Error> ) { let mut global_messages = self.global_messages.lock(); global_messages.add_node(|message| match message { @@ -412,20 +411,22 @@ pub mod environment { } impl Future for NetworkRouting { - type Item = (); - type Error = (); + type Output = (); - fn poll(&mut self) -> Poll<(), ()> { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> { let mut rounds = self.rounds.lock(); - rounds.retain(|_, round| match round.route() { - Ok(Async::Ready(())) | Err(()) => false, - Ok(Async::NotReady) => true, + rounds.retain(|_, round| match round.route(cx) { + Poll::Ready(()) => false, + Poll::Pending => true, }); let mut global_messages = self.global_messages.lock(); - let _ = global_messages.route(); + let _ = global_messages.route(cx); - Ok(Async::NotReady) + Poll::Pending } } + + impl Unpin for NetworkRouting { + } } diff --git a/src/voter/mod.rs b/src/voter/mod.rs index 0cb825a0..3c6256af 100644 --- a/src/voter/mod.rs +++ b/src/voter/mod.rs @@ -25,13 +25,15 @@ //! votes will not be pushed to the sink. The protocol state machine still //! transitions state as if the votes had been pushed out. -use futures::prelude::*; -use futures::sync::mpsc::{self, UnboundedReceiver}; +use futures::{prelude::*, ready}; +use futures::channel::mpsc::{self, UnboundedReceiver}; #[cfg(feature = "std")] use log::trace; use std::collections::VecDeque; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; use crate::round::State as RoundState; use crate::{ @@ -50,11 +52,11 @@ mod voting_round; /// /// This encapsulates the database and networking layers of the chain. pub trait Environment: Chain { - type Timer: Future; + type Timer: Future> + Unpin; type Id: Ord + Clone + Eq + ::std::fmt::Debug; type Signature: Eq + Clone; - type In: Stream, Error=Self::Error>; - type Out: Sink, SinkError=Self::Error>; + type In: Stream, Self::Error>> + Unpin; + type Out: Sink, Error=Self::Error> + Unpin; type Error: From + ::std::error::Error; /// Produce data necessary to start a round of voting. This may also be called @@ -319,13 +321,13 @@ pub struct RoundData { pub outgoing: Output, } -struct Buffered { +struct Buffered { inner: S, - buffer: VecDeque, + buffer: VecDeque, } -impl Buffered { - fn new(inner: S) -> Buffered { +impl + Unpin, I> Buffered { + fn new(inner: S) -> Buffered { Buffered { buffer: VecDeque::new(), inner @@ -334,40 +336,33 @@ impl Buffered { // push an item into the buffered sink. // the sink _must_ be driven to completion with `poll` afterwards. - fn push(&mut self, item: S::SinkItem) { + fn push(&mut self, item: I) { self.buffer.push_back(item); } // returns ready when the sink and the buffer are completely flushed. - fn poll(&mut self) -> Poll<(), S::SinkError> { - let polled = self.schedule_all()?; + fn poll(&mut self, cx: &mut Context) -> Poll> { + let polled = self.schedule_all(cx)?; match polled { - Async::Ready(()) => self.inner.poll_complete(), - Async::NotReady => { - self.inner.poll_complete()?; - Ok(Async::NotReady) + Poll::Ready(()) => Sink::poll_flush(Pin::new(&mut self.inner), cx), + Poll::Pending => { + ready!(Sink::poll_flush(Pin::new(&mut self.inner), cx))?; + Poll::Pending } } } - fn schedule_all(&mut self) -> Poll<(), S::SinkError> { - while let Some(front) = self.buffer.pop_front() { - match self.inner.start_send(front) { - Ok(AsyncSink::Ready) => continue, - Ok(AsyncSink::NotReady(front)) => { - self.buffer.push_front(front); - break; - } - Err(e) => return Err(e), - } - } + fn schedule_all(&mut self, cx: &mut Context) -> Poll> { + while !self.buffer.is_empty() { + ready!(Sink::poll_ready(Pin::new(&mut self.inner), cx))?; - if self.buffer.is_empty() { - Ok(Async::Ready(())) - } else { - Ok(Async::NotReady) + let item = self.buffer.pop_front() + .expect("we checked self.buffer.is_empty() just above; qed"); + Sink::start_send(Pin::new(&mut self.inner), item)?; } + + Poll::Ready(Ok(())) } } @@ -443,8 +438,8 @@ fn instantiate_last_round>( pub struct Voter, GlobalIn, GlobalOut> where H: Clone + Eq + Ord + ::std::fmt::Debug, N: Copy + BlockNumberOps + ::std::fmt::Debug, - GlobalIn: Stream, Error=E::Error>, - GlobalOut: Sink, SinkError=E::Error>, + GlobalIn: Stream, E::Error>> + Unpin, + GlobalOut: Sink, Error=E::Error> + Unpin, { env: Arc, voters: VoterSet, @@ -453,7 +448,7 @@ pub struct Voter, GlobalIn, GlobalOut> where finalized_notifications: UnboundedReceiver>, last_finalized_number: N, global_in: GlobalIn, - global_out: Buffered, + global_out: Buffered>, // the commit protocol might finalize further than the current round (if we're // behind), we keep track of last finalized in round so we don't violate any // assumptions from round-to-round. @@ -463,8 +458,8 @@ pub struct Voter, GlobalIn, GlobalOut> where impl, GlobalIn, GlobalOut> Voter where H: Clone + Eq + Ord + ::std::fmt::Debug, N: Copy + BlockNumberOps + ::std::fmt::Debug, - GlobalIn: Stream, Error=E::Error>, - GlobalOut: Sink, SinkError=E::Error>, + GlobalIn: Stream, E::Error>> + Unpin, + GlobalOut: Sink, Error=E::Error> + Unpin, { /// Create new `Voter` tracker with given round number and base block. /// @@ -540,15 +535,14 @@ impl, GlobalIn, GlobalOut> Voter Result<(), E::Error> { + fn prune_background_rounds(&mut self, cx: &mut Context) -> Result<(), E::Error> { // Do work on all background rounds, broadcasting any commits generated. - while let Async::Ready(Some((number, commit))) = self.past_rounds.poll()? { + while let Poll::Ready(Some(item)) = Stream::poll_next(Pin::new(&mut self.past_rounds), cx) { + let (number, commit) = item?; self.global_out.push(CommunicationOut::Commit(number, commit)); } - while let Async::Ready(res) = self.finalized_notifications.poll() - .expect("unbounded receivers do not have spurious errors; qed") - { + while let Poll::Ready(res) = Stream::poll_next(Pin::new(&mut self.finalized_notifications), cx) { let (f_hash, f_num, round, commit) = res.expect("one sender always kept alive in self.best_round; qed"); @@ -575,9 +569,9 @@ impl, GlobalIn, GlobalOut> Voter Result<(), E::Error> { - while let Async::Ready(Some(item)) = self.global_in.poll()? { - match item { + fn process_incoming(&mut self, cx: &mut Context) -> Result<(), E::Error> { + while let Poll::Ready(Some(item)) = Stream::poll_next(Pin::new(&mut self.global_in), cx) { + match item? { CommunicationIn::Commit(round_number, commit, mut process_commit_outcome) => { trace!(target: "afg", "Got commit for round_number {:?}: target_number: {:?}, target_hash: {:?}", round_number, @@ -682,13 +676,13 @@ impl, GlobalIn, GlobalOut> Voter Poll<(), E::Error> { + fn process_best_round(&mut self, cx: &mut Context) -> Poll> { // If the current `best_round` is completable and we've already precommitted, // we start a new round at `best_round + 1`. let should_start_next = { - let completable = match self.best_round.poll()? { - Async::Ready(()) => true, - Async::NotReady => false, + let completable = match self.best_round.poll(cx)? { + Poll::Ready(()) => true, + Poll::Pending => false, }; let precommitted = match self.best_round.state() { @@ -699,7 +693,7 @@ impl, GlobalIn, GlobalOut> Voter, GlobalIn, GlobalOut> Voter Result<(), E::Error> { @@ -749,21 +743,28 @@ impl, GlobalIn, GlobalOut> Voter, GlobalIn, GlobalOut> Future for Voter where H: Clone + Eq + Ord + ::std::fmt::Debug, N: Copy + BlockNumberOps + ::std::fmt::Debug, - GlobalIn: Stream, Error=E::Error>, - GlobalOut: Sink, SinkError=E::Error>, + GlobalIn: Stream, E::Error>> + Unpin, + GlobalOut: Sink, Error=E::Error> + Unpin, { - type Item = (); - type Error = E::Error; + type Output = Result<(), E::Error>; - fn poll(&mut self) -> Poll<(), E::Error> { - self.process_incoming()?; - self.prune_background_rounds()?; - self.global_out.poll()?; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.process_incoming(cx)?; + self.prune_background_rounds(cx)?; + self.global_out.poll(cx)?; - self.process_best_round() + self.process_best_round(cx) } } +impl, GlobalIn, GlobalOut> Unpin for Voter where + H: Clone + Eq + Ord + ::std::fmt::Debug, + N: Copy + BlockNumberOps + ::std::fmt::Debug, + GlobalIn: Stream, E::Error>> + Unpin, + GlobalOut: Sink, Error=E::Error> + Unpin, +{ +} + /// Validate the given catch up and return a completed round with all prevotes /// and precommits from the catch up imported. If the catch up is invalid `None` /// is returned instead. @@ -896,9 +897,8 @@ mod tests { chain::GENESIS_HASH, environment::{Environment, Id, Signature}, }; + use futures_timer::TryFutureExt as _; use std::time::Duration; - use tokio::prelude::FutureExt; - use tokio::runtime::current_thread; #[test] fn talking_to_myself() { @@ -906,11 +906,11 @@ mod tests { let voters = std::iter::once((local_id, 100)).collect(); let (network, routing_task) = testing::environment::make_network(); - let (signal, exit) = ::exit_future::signal(); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); let global_comms = network.make_global_comms(); let env = Arc::new(Environment::new(network, local_id)); - current_thread::block_on_all(::futures::future::lazy(move || { + futures::executor::block_on(::futures::future::lazy(move |_| { // initialize chain let last_finalized = env.with_chain(|chain| { chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]); @@ -928,17 +928,15 @@ mod tests { last_finalized, last_finalized, ); - tokio::spawn(exit.clone() - .until(voter.map_err(|_| panic!("Error voting"))).map(|_| ())); + threads_pool.spawn_ok(voter.map(|v| v.expect("Error voting"))); - tokio::spawn(exit.until(routing_task).map(|_| ())); + threads_pool.spawn_ok(routing_task); // wait for the best block to finalize. finalized - .take_while(|&(_, n, _)| Ok(n < 6)) - .for_each(|_| Ok(())) - .map(|_| signal.fire()) - })).unwrap(); + .take_while(|&(_, n, _)| future::ready(n < 6)) + .for_each(|_| future::ready(())) + }).flatten()); } #[test] @@ -947,13 +945,11 @@ mod tests { let voters: VoterSet<_> = (0..10).map(|i| (Id(i), 1)).collect(); let (network, routing_task) = testing::environment::make_network(); - let (signal, exit) = ::exit_future::signal(); - - current_thread::block_on_all(::futures::future::lazy(move || { - tokio::spawn(exit.clone().until(routing_task).map(|_| ())); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); + futures::executor::block_on(::futures::future::lazy(move |_| { // 3 voters offline. - let finalized_streams = (0..7).map(move |i| { + let finalized_streams = (0..7).map(|i| { let local_id = Id(i); // initialize chain let env = Arc::new(Environment::new(network.clone(), local_id)); @@ -973,17 +969,17 @@ mod tests { last_finalized, last_finalized, ); - tokio::spawn(exit.clone() - .until(voter.map_err(|_| panic!("Error voting"))).map(|_| ())); + threads_pool.spawn_ok(voter.map(|v| v.expect("Error voting"))); // wait for the best block to be finalized by all honest voters finalized - .take_while(|&(_, n, _)| Ok(n < 6)) - .for_each(|_| Ok(())) - }); + .take_while(|&(_, n, _)| future::ready(n < 6)) + .for_each(|_| future::ready(())) + }).collect::>(); - ::futures::future::join_all(finalized_streams).map(|_| signal.fire()) - })).unwrap(); + threads_pool.spawn_ok(routing_task.map(|_| ())); + ::futures::future::join_all(finalized_streams.into_iter()) + }).flatten()); } #[test] @@ -994,11 +990,11 @@ mod tests { let (network, routing_task) = testing::environment::make_network(); let (commits, _) = network.make_global_comms(); - let (signal, exit) = ::exit_future::signal(); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); let global_comms = network.make_global_comms(); let env = Arc::new(Environment::new(network, local_id)); - current_thread::block_on_all(::futures::future::lazy(move || { + futures::executor::block_on(::futures::future::lazy(move |_| { // initialize chain let last_finalized = env.with_chain(|chain| { chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]); @@ -1016,14 +1012,13 @@ mod tests { last_finalized, last_finalized, ); - tokio::spawn(exit.clone() - .until(voter.map_err(|_| panic!("Error voting"))).map(|_| ())); + threads_pool.spawn_ok(voter.map(|v| v.expect("Error voting"))); - tokio::spawn(exit.until(routing_task).map(|_| ())); + threads_pool.spawn_ok(routing_task); // wait for the node to broadcast a commit message - commits.take(1).for_each(|_| Ok(())).map(|_| signal.fire()) - })).unwrap(); + commits.take(1).for_each(|_| future::ready(())) + }).flatten()); } #[test] @@ -1059,11 +1054,11 @@ mod tests { }], }); - let (signal, exit) = ::exit_future::signal(); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); let global_comms = network.make_global_comms(); let env = Arc::new(Environment::new(network, local_id)); - current_thread::block_on_all(::futures::future::lazy(move || { + futures::executor::block_on(::futures::future::lazy(move |_| { // initialize chain let last_finalized = env.with_chain(|chain| { chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]); @@ -1080,46 +1075,45 @@ mod tests { last_finalized, last_finalized, ); - tokio::spawn(exit.clone() - .until(voter.map_err(|e| panic!("Error voting: {:?}", e))).map(|_| ())); + threads_pool.spawn_ok(voter.map(|v| v.expect("Error voting: {:?}"))); - tokio::spawn(exit.clone().until(routing_task).map(|_| ())); + threads_pool.spawn_ok(routing_task.map(|_| ())); - tokio::spawn(exit.until(::futures::future::lazy(|| { - round_stream.into_future().map_err(|(e, _)| e) - .and_then(|(value, stream)| { // wait for a prevote + threads_pool.spawn_ok(::futures::future::lazy(move |_| { + round_stream.into_future() + .then(|(value, stream)| { // wait for a prevote assert!(match value { - Some(SignedMessage { message: Message::Prevote(_), id: Id(5), .. }) => true, + Some(Ok(SignedMessage { message: Message::Prevote(_), id: Id(5), .. })) => true, _ => false, }); let votes = vec![prevote, precommit].into_iter().map(Result::Ok); - round_sink.send_all(futures::stream::iter_result(votes)).map(|_| stream) // send our prevote + futures::stream::iter(votes).forward(round_sink).map(|_| stream) // send our prevote }) - .and_then(|stream| { + .then(|stream| { stream.take_while(|value| match value { // wait for a precommit - SignedMessage { message: Message::Precommit(_), id: Id(5), .. } => Ok(false), - _ => Ok(true), - }).for_each(|_| Ok(())) + Ok(SignedMessage { message: Message::Precommit(_), id: Id(5), .. }) => future::ready(false), + _ => future::ready(true), + }).for_each(|_| future::ready(())) }) - .and_then(|_| { + .then(|_| { // send our commit - commits_sink.send(CommunicationOut::Commit(commit.0, commit.1)) + stream::iter(std::iter::once(Ok(CommunicationOut::Commit(commit.0, commit.1)))).forward(commits_sink) }) .map_err(|_| ()) - })).map(|_| ())); + }).flatten().map(|_| ())); // wait for the first commit (ours) - commits_stream.into_future().map_err(|_| ()) - .and_then(|(_, stream)| { - stream.take(1).for_each(|_| Ok(())) // the second commit should never arrive + commits_stream.into_future() + .then(|(_, stream)| { + stream.take(1).for_each(|_| future::ready(())) // the second commit should never arrive + .map(|()| Ok::<(), std::io::Error>(())) .timeout(Duration::from_millis(500)).map_err(|_| ()) }) .then(|res| { assert!(res.is_err()); // so the previous future times out - signal.fire(); futures::future::ok::<(), ()>(()) }) - })).unwrap(); + }).flatten()).unwrap(); } #[test] @@ -1134,7 +1128,7 @@ mod tests { let (network, routing_task) = testing::environment::make_network(); let (_, commits_sink) = network.make_global_comms(); - let (signal, exit) = ::exit_future::signal(); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); // this is a commit for a previous round let commit = (0, Commit { @@ -1149,7 +1143,7 @@ mod tests { let global_comms = network.make_global_comms(); let env = Arc::new(Environment::new(network, local_id)); - current_thread::block_on_all(::futures::future::lazy(move || { + futures::executor::block_on(::futures::future::lazy(move |_| { // initialize chain let last_finalized = env.with_chain(|chain| { chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]); @@ -1166,20 +1160,18 @@ mod tests { last_finalized, last_finalized, ); - tokio::spawn(exit.clone() - .until(voter.map_err(|_| panic!("Error voting"))).map(|_| ())); + threads_pool.spawn_ok(voter.map(|v| v.expect("Error voting"))); - tokio::spawn(exit.until(routing_task).map(|_| ())); + threads_pool.spawn_ok(routing_task.map(|_| ())); - tokio::spawn(commits_sink.send(CommunicationOut::Commit(commit.0, commit.1)) + threads_pool.spawn_ok(stream::iter(std::iter::once(Ok(CommunicationOut::Commit(commit.0, commit.1)))).forward(commits_sink) .map_err(|_| ()).map(|_| ())); // wait for the commit message to be processed which finalized block 6 env.finalized_stream() - .take_while(|&(_, n, _)| Ok(n < 6)) - .for_each(|_| Ok(())) - .map(|_| signal.fire()) - })).unwrap(); + .take_while(|&(_, n, _)| future::ready(n < 6)) + .for_each(|_| future::ready(())) + }).flatten()); } #[test] @@ -1188,10 +1180,10 @@ mod tests { let voters: VoterSet<_> = (0..3).map(|i| (Id(i), 1)).collect(); let (network, routing_task) = testing::environment::make_network(); - let (signal, exit) = ::exit_future::signal(); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); - current_thread::block_on_all(::futures::future::lazy(move || { - tokio::spawn(exit.clone().until(routing_task).map(|_| ())); + futures::executor::block_on(::futures::future::lazy(move |_| { + threads_pool.spawn_ok(routing_task.map(|_| ())); // initialize unsynced voter at round 0 let mut unsynced_voter = { @@ -1240,15 +1232,16 @@ mod tests { // poll until it's caught up. // should skip to round 6 - ::futures::future::poll_fn(move || -> Poll<(), ()> { - let poll = unsynced_voter.poll().map_err(|_| ())?; + ::futures::future::poll_fn(move |cx| -> Poll> { + let poll = Future::poll(Pin::new(&mut unsynced_voter), cx); if unsynced_voter.best_round.round_number() == 6 { - Ok(Async::Ready(())) + Poll::Ready(Ok(())) } else { - Ok(poll) + futures::ready!(poll).map_err(|_| ())?; + Poll::Ready(Ok(())) } - }).map(move |_| signal.fire()) - })).unwrap(); + }) + }).flatten()).unwrap(); } #[test] diff --git a/src/voter/past_rounds.rs b/src/voter/past_rounds.rs index ad7eeb19..77480f75 100644 --- a/src/voter/past_rounds.rs +++ b/src/voter/past_rounds.rs @@ -22,16 +22,18 @@ //! - Passing it any validated commits (so backgrounded rounds don't produce conflicting ones) #[cfg(feature = "std")] -use futures::try_ready; +use futures::ready; use futures::prelude::*; use futures::stream::{self, futures_unordered::FuturesUnordered}; use futures::task; -use futures::sync::mpsc; +use futures::channel::mpsc; #[cfg(feature = "std")] use log::{trace, debug}; use std::cmp; use std::collections::HashMap; +use std::pin::Pin; +use std::task::{Context, Poll}; use crate::{Commit, BlockNumberOps}; use super::Environment; @@ -46,7 +48,7 @@ struct BackgroundRound> where N: Copy + BlockNumberOps + ::std::fmt::Debug, { inner: VotingRound, - task: Option, + waker: Option, finalized_number: N, round_committer: Option>, } @@ -75,8 +77,8 @@ impl> BackgroundRound where // wake up the future to be polled if done. if self.is_done() { - if let Some(ref task) = self.task { - task.notify(); + if let Some(ref waker) = self.waker { + waker.wake_by_ref(); } } } @@ -97,35 +99,40 @@ impl> Future for BackgroundRound where H: Clone + Eq + Ord + ::std::fmt::Debug, N: Copy + BlockNumberOps + ::std::fmt::Debug, { - type Item = BackgroundRoundChange; - type Error = E::Error; + type Output = Result, E::Error>; - fn poll(&mut self) -> Poll { - self.task = Some(::futures::task::current()); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + self.waker = Some(cx.waker().clone()); - self.inner.poll()?; + self.inner.poll(cx)?; self.round_committer = match self.round_committer.take() { None => None, - Some(mut committer) => match committer.commit(&mut self.inner)? { - Async::Ready(None) => None, - Async::Ready(Some(commit)) => return Ok(Async::Ready( + Some(mut committer) => match committer.commit(cx, &mut self.inner)? { + Poll::Ready(None) => None, + Poll::Ready(Some(commit)) => return Poll::Ready(Ok( BackgroundRoundChange::Committed(commit) )), - Async::NotReady => Some(committer), + Poll::Pending => Some(committer), } }; if self.is_done() { // if this is fully concluded (has committed _and_ estimate finalized) // we bail for real. - Ok(Async::Ready(BackgroundRoundChange::Concluded(self.round_number()))) + Poll::Ready(Ok(BackgroundRoundChange::Concluded(self.round_number()))) } else { - Ok(Async::NotReady) + Poll::Pending } } } +impl> Unpin for BackgroundRound where + H: Clone + Eq + Ord + ::std::fmt::Debug, + N: Copy + BlockNumberOps + ::std::fmt::Debug, +{ +} + struct RoundCommitter> where H: Clone + Eq + Ord + ::std::fmt::Debug, N: Copy + BlockNumberOps + ::std::fmt::Debug, @@ -169,26 +176,26 @@ impl> RoundCommitter where Ok(true) } - fn commit(&mut self, voting_round: &mut VotingRound) - -> Poll>, E::Error> + fn commit(&mut self, cx: &mut Context, voting_round: &mut VotingRound) + -> Poll>, E::Error>> { - while let Ok(Async::Ready(Some(commit))) = self.import_commits.poll() { + while let Poll::Ready(Some(commit)) = Stream::poll_next(Pin::new(&mut self.import_commits), cx) { if !self.import_commit(voting_round, commit)? { trace!(target: "afg", "Ignoring invalid commit"); } } - try_ready!(self.commit_timer.poll()); + ready!(Future::poll(Pin::new(&mut self.commit_timer), cx))?; match (self.last_commit.take(), voting_round.finalized()) { (None, Some(_)) => { - Ok(Async::Ready(voting_round.finalizing_commit().cloned())) + Poll::Ready(Ok(voting_round.finalizing_commit().cloned())) }, (Some(Commit { target_number, .. }), Some((_, finalized_number))) if target_number < *finalized_number => { - Ok(Async::Ready(voting_round.finalizing_commit().cloned())) + Poll::Ready(Ok(voting_round.finalizing_commit().cloned())) }, _ => { - Ok(Async::Ready(None)) + Poll::Ready(Ok(None)) }, } } @@ -212,24 +219,26 @@ impl SelfReturningFuture { } } -impl Future for SelfReturningFuture { - type Item = (F::Item, F); - type Error = F::Error; +impl Future for SelfReturningFuture { + type Output = (F::Output, F); - fn poll(&mut self) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { match self.inner.take() { None => panic!("poll after return is not done in this module; qed"), - Some(mut f) => match f.poll()? { - Async::Ready(item) => Ok(Async::Ready((item, f))), - Async::NotReady => { + Some(mut f) => match Future::poll(Pin::new(&mut f), cx) { + Poll::Ready(item) => Poll::Ready((item, f)), + Poll::Pending => { self.inner = Some(f); - Ok(Async::NotReady) + Poll::Pending } } } } } +impl Unpin for SelfReturningFuture { +} + /// A stream for past rounds, which produces any commit messages from those /// rounds and drives them to completion. pub(super) struct PastRounds> where @@ -258,7 +267,7 @@ impl> PastRounds where let (tx, rx) = mpsc::unbounded(); let background = BackgroundRound { inner: round, - task: None, + waker: None, // https://github.com/paritytech/finality-grandpa/issues/50 finalized_number: N::zero(), round_committer: Some(RoundCommitter::new( @@ -297,13 +306,12 @@ impl> Stream for PastRounds where H: Clone + Eq + Ord + ::std::fmt::Debug, N: Copy + BlockNumberOps + ::std::fmt::Debug, { - type Item = (u64, Commit); - type Error = E::Error; + type Item = Result<(u64, Commit), E::Error>; - fn poll(&mut self) -> Poll, E::Error> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { loop { - match self.past_rounds.poll()? { - Async::Ready(Some((BackgroundRoundChange::Concluded(number), round))) => { + match Stream::poll_next(Pin::new(&mut self.past_rounds), cx) { + Poll::Ready(Some((Ok(BackgroundRoundChange::Concluded(number)), round))) => { let round = &round.inner; round.env().concluded( round.round_number(), @@ -314,7 +322,7 @@ impl> Stream for PastRounds where self.commit_senders.remove(&number); } - Async::Ready(Some((BackgroundRoundChange::Committed(commit), round))) => { + Poll::Ready(Some((Ok(BackgroundRoundChange::Committed(commit)), round))) => { let number = round.round_number(); // reschedule until irrelevant. @@ -328,11 +336,18 @@ impl> Stream for PastRounds where commit.target_hash, ); - return Ok(Async::Ready(Some((number, commit)))); + return Poll::Ready(Some(Ok((number, commit)))); } - Async::Ready(None) => return Ok(Async::Ready(None)), - Async::NotReady => return Ok(Async::NotReady), + Poll::Ready(Some((Err(err), _))) => return Poll::Ready(Some(Err(err))), + Poll::Ready(None) => return Poll::Ready(None), + Poll::Pending => return Poll::Pending, } } } } + +impl> Unpin for PastRounds where + H: Clone + Eq + Ord + ::std::fmt::Debug, + N: Copy + BlockNumberOps + ::std::fmt::Debug, +{ +} diff --git a/src/voter/voting_round.rs b/src/voter/voting_round.rs index 1c760a51..2b212e46 100644 --- a/src/voter/voting_round.rs +++ b/src/voter/voting_round.rs @@ -15,13 +15,15 @@ //! Logic for voting and handling messages within a single round. #[cfg(feature = "std")] -use futures::try_ready; +use futures::ready; use futures::prelude::*; -use futures::sync::mpsc::UnboundedSender; +use futures::channel::mpsc::UnboundedSender; #[cfg(feature = "std")] use log::{trace, warn, debug}; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; use crate::round::{Round, State as RoundState}; use crate::{ @@ -60,7 +62,7 @@ pub(super) struct VotingRound> where voting: Voting, votes: Round, incoming: E::In, - outgoing: Buffered, + outgoing: Buffered>, state: Option>, // state machine driving votes. bridged_round_state: Option>, // updates to later round last_round_state: Option>, // updates from prior round @@ -174,26 +176,26 @@ impl> VotingRound where } } - /// Poll the round. When the round is completable and messages have been flushed, it will return `Async::Ready` but + /// Poll the round. When the round is completable and messages have been flushed, it will return `Poll::Ready` but /// can continue to be polled. - pub(super) fn poll(&mut self) -> Poll<(), E::Error> { + pub(super) fn poll(&mut self, cx: &mut Context) -> Poll> { trace!(target: "afg", "Polling round {}, state = {:?}, step = {:?}", self.votes.number(), self.votes.state(), self.state); let pre_state = self.votes.state(); - self.process_incoming()?; + self.process_incoming(cx)?; // we only cast votes when we have access to the previous round state. // we might have started this round as a prospect "future" round to // check whether the voter is lagging behind the current round. - let last_round_state = self.last_round_state.as_ref().map(|s| s.get().clone()); + let last_round_state = self.last_round_state.as_ref().map(|s| s.get(cx).clone()); if let Some(ref last_round_state) = last_round_state { self.primary_propose(last_round_state)?; - self.prevote(last_round_state)?; - self.precommit(last_round_state)?; + self.prevote(cx, last_round_state)?; + self.precommit(cx, last_round_state)?; } - try_ready!(self.outgoing.poll()); - self.process_incoming()?; // in case we got a new message signed locally. + ready!(self.outgoing.poll(cx))?; + self.process_incoming(cx)?; // in case we got a new message signed locally. // broadcast finality notifications after attempting to cast votes let post_state = self.votes.state(); @@ -201,7 +203,7 @@ impl> VotingRound where // early exit if the current round is not completable if !self.votes.completable() { - return Ok(Async::NotReady); + return Poll::Pending; } // make sure that the previous round estimate has been finalized @@ -236,7 +238,7 @@ impl> VotingRound where if !last_round_estimate_finalized { trace!(target: "afg", "Round {} completable but estimate not finalized.", self.round_number()); self.log_participation(log::Level::Trace); - return Ok(Async::NotReady); + return Poll::Pending; } debug!(target: "afg", "Completed round {}, state = {:?}, step = {:?}", @@ -245,7 +247,7 @@ impl> VotingRound where self.log_participation(log::Level::Debug); // both exit conditions verified, we can complete this round - Ok(Async::Ready(())) + Poll::Ready(Ok(())) } /// Inspect the state of this round. @@ -384,10 +386,10 @@ impl> VotingRound where number, precommit_weight, threshold, total_weight, n_precommits, n_voters); } - fn process_incoming(&mut self) -> Result<(), E::Error> { - while let Async::Ready(Some(incoming)) = self.incoming.poll()? { + fn process_incoming(&mut self, cx: &mut Context) -> Result<(), E::Error> { + while let Poll::Ready(Some(incoming)) = Stream::poll_next(Pin::new(&mut self.incoming), cx) { trace!(target: "afg", "Got incoming message"); - self.handle_vote(incoming)?; + self.handle_vote(incoming?)?; } Ok(()) @@ -435,14 +437,14 @@ impl> VotingRound where Ok(()) } - fn prevote(&mut self, last_round_state: &RoundState) -> Result<(), E::Error> { + fn prevote(&mut self, cx: &mut Context, last_round_state: &RoundState) -> Result<(), E::Error> { let state = self.state.take(); let mut handle_prevote = |mut prevote_timer: E::Timer, precommit_timer: E::Timer, proposed| { - let should_prevote = match prevote_timer.poll() { - Err(e) => return Err(e), - Ok(Async::Ready(())) => true, - Ok(Async::NotReady) => self.votes.completable(), + let should_prevote = match Future::poll(Pin::new(&mut prevote_timer), cx) { + Poll::Ready(Err(e)) => return Err(e), + Poll::Ready(Ok(())) => true, + Poll::Pending => self.votes.completable(), }; if should_prevote { @@ -484,7 +486,7 @@ impl> VotingRound where Ok(()) } - fn precommit(&mut self, last_round_state: &RoundState) -> Result<(), E::Error> { + fn precommit(&mut self, cx: &mut Context, last_round_state: &RoundState) -> Result<(), E::Error> { match self.state.take() { Some(State::Prevoted(mut precommit_timer)) => { let last_round_estimate = last_round_state.estimate.clone() @@ -497,10 +499,10 @@ impl> VotingRound where p_g == &last_round_estimate || self.env.is_equal_or_descendent_of(last_round_estimate.0, p_g.0.clone()) }) - } && match precommit_timer.poll() { - Err(e) => return Err(e), - Ok(Async::Ready(())) => true, - Ok(Async::NotReady) => self.votes.completable(), + } && match Future::poll(Pin::new(&mut precommit_timer), cx) { + Poll::Ready(Err(e)) => return Err(e), + Poll::Ready(Ok(())) => true, + Poll::Pending => self.votes.completable(), }; if should_precommit { From 9e7fbc81f31f4032954083fbbb481434f32b80c6 Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Mon, 25 Nov 2019 22:41:11 +0100 Subject: [PATCH 2/7] Fix a race in a voter test by using a LocalPool. Using a LocalPool is also much closer to the previous tokio-based code which used the current-thread executor for all tasks. --- src/bridge_state.rs | 1 - src/voter/mod.rs | 66 ++++++++++++++++++++++------------------ src/voter/past_rounds.rs | 2 +- 3 files changed, 37 insertions(+), 32 deletions(-) diff --git a/src/bridge_state.rs b/src/bridge_state.rs index 3388e784..ad3bf416 100644 --- a/src/bridge_state.rs +++ b/src/bridge_state.rs @@ -73,7 +73,6 @@ pub(crate) fn bridge_state(initial: RoundState) -> (PriorView, #[cfg(test)] mod tests { - use futures::prelude::*; use std::{sync::Barrier, task::Poll}; use super::*; diff --git a/src/voter/mod.rs b/src/voter/mod.rs index 3c6256af..7ca0b660 100644 --- a/src/voter/mod.rs +++ b/src/voter/mod.rs @@ -751,7 +751,7 @@ impl, GlobalIn, GlobalOut> Future for Voter, cx: &mut Context) -> Poll> { self.process_incoming(cx)?; self.prune_background_rounds(cx)?; - self.global_out.poll(cx)?; + let _ = self.global_out.poll(cx)?; self.process_best_round(cx) } @@ -897,7 +897,9 @@ mod tests { chain::GENESIS_HASH, environment::{Environment, Id, Signature}, }; + use futures::task::SpawnExt; use futures_timer::TryFutureExt as _; + use std::iter; use std::time::Duration; #[test] @@ -1128,10 +1130,8 @@ mod tests { let (network, routing_task) = testing::environment::make_network(); let (_, commits_sink) = network.make_global_comms(); - let threads_pool = futures::executor::ThreadPool::new().unwrap(); - // this is a commit for a previous round - let commit = (0, Commit { + let commit = Commit { target_hash: "E", target_number: 6, precommits: vec![SignedPrecommit { @@ -1139,39 +1139,45 @@ mod tests { signature: Signature(test_id.0), id: test_id }], - }); + }; let global_comms = network.make_global_comms(); let env = Arc::new(Environment::new(network, local_id)); - futures::executor::block_on(::futures::future::lazy(move |_| { - // initialize chain - let last_finalized = env.with_chain(|chain| { - chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]); - chain.last_finalized() - }); - // run voter in background. scheduling it to shut down at the end. - let voter = Voter::new( - env.clone(), - voters.clone(), - global_comms, - 1, - Vec::new(), - last_finalized, - last_finalized, - ); - threads_pool.spawn_ok(voter.map(|v| v.expect("Error voting"))); + // initialize chain + let last_finalized = env.with_chain(|chain| { + chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]); + chain.last_finalized() + }); - threads_pool.spawn_ok(routing_task.map(|_| ())); + let last_round_state = RoundState::genesis((GENESIS_HASH, 1)); + + // run voter in background. + let voter = Voter::new( + env.clone(), + voters.clone(), + global_comms, + 1, + last_round_state, + last_finalized, + ); - threads_pool.spawn_ok(stream::iter(std::iter::once(Ok(CommunicationOut::Commit(commit.0, commit.1)))).forward(commits_sink) - .map_err(|_| ()).map(|_| ())); + let mut pool = futures::executor::LocalPool::new(); + pool.spawner().spawn(voter.map(|v| v.expect("Error voting"))).unwrap(); + pool.spawner().spawn(routing_task.map(|_| ())).unwrap(); - // wait for the commit message to be processed which finalized block 6 - env.finalized_stream() - .take_while(|&(_, n, _)| future::ready(n < 6)) - .for_each(|_| future::ready(())) - }).flatten()); + // Send the commit message. + pool.spawner().spawn( + stream::iter(iter::once(Ok(CommunicationOut::Commit(0, commit.clone())))) + .forward(commits_sink) + .map(|_| ())).unwrap(); + + // Wait for the commit message to be processed which finalized block 6 + let finalized = pool.run_until(env.finalized_stream() + .into_future() + .map(move |(msg, _)| msg.unwrap().2)); + + assert_eq!(finalized, commit); } #[test] diff --git a/src/voter/past_rounds.rs b/src/voter/past_rounds.rs index 77480f75..51dbad28 100644 --- a/src/voter/past_rounds.rs +++ b/src/voter/past_rounds.rs @@ -104,7 +104,7 @@ impl> Future for BackgroundRound where fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { self.waker = Some(cx.waker().clone()); - self.inner.poll(cx)?; + let _ = self.inner.poll(cx)?; self.round_committer = match self.round_committer.take() { None => None, From 572ad769b5252d098dd96733e0124109034105f4 Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Tue, 26 Nov 2019 17:30:48 +0100 Subject: [PATCH 3/7] Convert remaining tests to use LocalPool. --- src/voter/mod.rs | 488 +++++++++++++++++------------------------------ 1 file changed, 172 insertions(+), 316 deletions(-) diff --git a/src/voter/mod.rs b/src/voter/mod.rs index 7ca0b660..c7b594a6 100644 --- a/src/voter/mod.rs +++ b/src/voter/mod.rs @@ -897,6 +897,7 @@ mod tests { chain::GENESIS_HASH, environment::{Environment, Id, Signature}, }; + use futures::executor::LocalPool; use futures::task::SpawnExt; use futures_timer::TryFutureExt as _; use std::iter; @@ -908,12 +909,52 @@ mod tests { let voters = std::iter::once((local_id, 100)).collect(); let (network, routing_task) = testing::environment::make_network(); - let threads_pool = futures::executor::ThreadPool::new().unwrap(); let global_comms = network.make_global_comms(); let env = Arc::new(Environment::new(network, local_id)); - futures::executor::block_on(::futures::future::lazy(move |_| { + + // initialize chain + let last_finalized = env.with_chain(|chain| { + chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]); + chain.last_finalized() + }); + + let last_round_state = RoundState::genesis((GENESIS_HASH, 1)); + + // run voter in background. scheduling it to shut down at the end. + let finalized = env.finalized_stream(); + let voter = Voter::new( + env.clone(), + voters, + global_comms, + 0, + last_round_state, + last_finalized, + ); + + let mut pool = LocalPool::new(); + pool.spawner().spawn(voter.map(|v| v.expect("Error voting"))).unwrap(); + pool.spawner().spawn(routing_task).unwrap(); + + // wait for the best block to finalize. + pool.run_until(finalized + .take_while(|&(_, n, _)| future::ready(n < 6)) + .for_each(|_| future::ready(()))) + } + + #[test] + fn finalizing_at_fault_threshold() { + // 10 voters + let voters: VoterSet<_> = (0..10).map(|i| (Id(i), 1)).collect(); + + let (network, routing_task) = testing::environment::make_network(); + let mut pool = LocalPool::new(); + + // 3 voters offline. + let finalized_streams = (0..7).map(|i| { + let local_id = Id(i); // initialize chain + let env = Arc::new(Environment::new(network.clone(), local_id)); let last_finalized = env.with_chain(|chain| { chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]); chain.last_finalized() @@ -923,65 +964,25 @@ mod tests { let finalized = env.finalized_stream(); let voter = Voter::new( env.clone(), - voters, - global_comms, + voters.clone(), + network.make_global_comms(), 0, Vec::new(), last_finalized, last_finalized, ); - threads_pool.spawn_ok(voter.map(|v| v.expect("Error voting"))); - threads_pool.spawn_ok(routing_task); + pool.spawner().spawn(voter.map(|v| v.expect("Error voting"))).unwrap(); - // wait for the best block to finalize. + // wait for the best block to be finalized by all honest voters finalized .take_while(|&(_, n, _)| future::ready(n < 6)) .for_each(|_| future::ready(())) - }).flatten()); - } - - #[test] - fn finalizing_at_fault_threshold() { - // 10 voters - let voters: VoterSet<_> = (0..10).map(|i| (Id(i), 1)).collect(); - - let (network, routing_task) = testing::environment::make_network(); - let threads_pool = futures::executor::ThreadPool::new().unwrap(); - - futures::executor::block_on(::futures::future::lazy(move |_| { - // 3 voters offline. - let finalized_streams = (0..7).map(|i| { - let local_id = Id(i); - // initialize chain - let env = Arc::new(Environment::new(network.clone(), local_id)); - let last_finalized = env.with_chain(|chain| { - chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]); - chain.last_finalized() - }); - - // run voter in background. scheduling it to shut down at the end. - let finalized = env.finalized_stream(); - let voter = Voter::new( - env.clone(), - voters.clone(), - network.make_global_comms(), - 0, - Vec::new(), - last_finalized, - last_finalized, - ); - threads_pool.spawn_ok(voter.map(|v| v.expect("Error voting"))); + }).collect::>(); - // wait for the best block to be finalized by all honest voters - finalized - .take_while(|&(_, n, _)| future::ready(n < 6)) - .for_each(|_| future::ready(())) - }).collect::>(); + pool.spawner().spawn(routing_task.map(|_| ())).unwrap(); - threads_pool.spawn_ok(routing_task.map(|_| ())); - ::futures::future::join_all(finalized_streams.into_iter()) - }).flatten()); + pool.run_until(future::join_all(finalized_streams.into_iter())); } #[test] @@ -992,35 +993,33 @@ mod tests { let (network, routing_task) = testing::environment::make_network(); let (commits, _) = network.make_global_comms(); - let threads_pool = futures::executor::ThreadPool::new().unwrap(); - let global_comms = network.make_global_comms(); let env = Arc::new(Environment::new(network, local_id)); - futures::executor::block_on(::futures::future::lazy(move |_| { - // initialize chain - let last_finalized = env.with_chain(|chain| { - chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]); - chain.last_finalized() - }); + // initialize chain + let last_finalized = env.with_chain(|chain| { + chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]); + chain.last_finalized() + }); - // run voter in background. scheduling it to shut down at the end. - let voter = Voter::new( - env.clone(), - voters.clone(), - global_comms, - 0, - Vec::new(), - last_finalized, - last_finalized, - ); - threads_pool.spawn_ok(voter.map(|v| v.expect("Error voting"))); + let last_round_state = RoundState::genesis((GENESIS_HASH, 1)); - threads_pool.spawn_ok(routing_task); + // run voter in background. scheduling it to shut down at the end. + let voter = Voter::new( + env.clone(), + voters.clone(), + global_comms, + 0, + last_round_state, + last_finalized, + ); + + let mut pool = LocalPool::new(); + pool.spawner().spawn(voter.map(|v| v.expect("Error voting"))).unwrap(); + pool.spawner().spawn(routing_task).unwrap(); - // wait for the node to broadcast a commit message - commits.take(1).for_each(|_| future::ready(())) - }).flatten()); + // wait for the node to broadcast a commit message + pool.run_until(commits.take(1).for_each(|_| future::ready(()))) } #[test] @@ -1056,66 +1055,63 @@ mod tests { }], }); - let threads_pool = futures::executor::ThreadPool::new().unwrap(); - let global_comms = network.make_global_comms(); let env = Arc::new(Environment::new(network, local_id)); - futures::executor::block_on(::futures::future::lazy(move |_| { - // initialize chain - let last_finalized = env.with_chain(|chain| { - chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]); - chain.last_finalized() - }); - // run voter in background. scheduling it to shut down at the end. - let voter = Voter::new( - env.clone(), - voters.clone(), - global_comms, - 0, - Vec::new(), - last_finalized, - last_finalized, - ); - threads_pool.spawn_ok(voter.map(|v| v.expect("Error voting: {:?}"))); - - threads_pool.spawn_ok(routing_task.map(|_| ())); - - threads_pool.spawn_ok(::futures::future::lazy(move |_| { - round_stream.into_future() - .then(|(value, stream)| { // wait for a prevote - assert!(match value { - Some(Ok(SignedMessage { message: Message::Prevote(_), id: Id(5), .. })) => true, - _ => false, - }); - let votes = vec![prevote, precommit].into_iter().map(Result::Ok); - futures::stream::iter(votes).forward(round_sink).map(|_| stream) // send our prevote - }) - .then(|stream| { - stream.take_while(|value| match value { // wait for a precommit - Ok(SignedMessage { message: Message::Precommit(_), id: Id(5), .. }) => future::ready(false), - _ => future::ready(true), - }).for_each(|_| future::ready(())) - }) - .then(|_| { - // send our commit - stream::iter(std::iter::once(Ok(CommunicationOut::Commit(commit.0, commit.1)))).forward(commits_sink) - }) - .map_err(|_| ()) - }).flatten().map(|_| ())); - - // wait for the first commit (ours) - commits_stream.into_future() - .then(|(_, stream)| { - stream.take(1).for_each(|_| future::ready(())) // the second commit should never arrive - .map(|()| Ok::<(), std::io::Error>(())) - .timeout(Duration::from_millis(500)).map_err(|_| ()) + // initialize chain + let last_finalized = env.with_chain(|chain| { + chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]); + chain.last_finalized() + }); + + let last_round_state = RoundState::genesis((GENESIS_HASH, 1)); + + // run voter in background. scheduling it to shut down at the end. + let voter = Voter::new( + env.clone(), + voters.clone(), + global_comms, + 0, + last_round_state, + last_finalized, + ); + + let mut pool = LocalPool::new(); + pool.spawner().spawn(voter.map(|v| v.expect("Error voting: {:?}"))).unwrap(); + pool.spawner().spawn(routing_task.map(|_| ())).unwrap(); + + pool.spawner().spawn( + round_stream.into_future() + .then(|(value, stream)| { // wait for a prevote + assert!(match value { + Some(Ok(SignedMessage { message: Message::Prevote(_), id: Id(5), .. })) => true, + _ => false, + }); + let votes = vec![prevote, precommit].into_iter().map(Result::Ok); + futures::stream::iter(votes).forward(round_sink).map(|_| stream) // send our prevote }) - .then(|res| { - assert!(res.is_err()); // so the previous future times out - futures::future::ok::<(), ()>(()) + .then(|stream| { + stream.take_while(|value| match value { // wait for a precommit + Ok(SignedMessage { message: Message::Precommit(_), id: Id(5), .. }) => future::ready(false), + _ => future::ready(true), + }).for_each(|_| future::ready(())) }) - }).flatten()).unwrap(); + .then(|_| { + // send our commit + stream::iter(iter::once(Ok(CommunicationOut::Commit(commit.0, commit.1)))).forward(commits_sink) + }) + .map(|_| ()) + ).unwrap(); + + // wait for the first commit (ours) + let res = pool.run_until(commits_stream.into_future() + .then(|(_, stream)| { + stream.take(1).for_each(|_| future::ready(())) // the second commit should never arrive + .map(|()| Ok::<(), std::io::Error>(())) + .timeout(Duration::from_millis(500)).map_err(|_| ()) + })); + + assert!(res.is_err()) // so the previous future times out } #[test] @@ -1162,7 +1158,7 @@ mod tests { last_finalized, ); - let mut pool = futures::executor::LocalPool::new(); + let mut pool = LocalPool::new(); pool.spawner().spawn(voter.map(|v| v.expect("Error voting"))).unwrap(); pool.spawner().spawn(routing_task.map(|_| ())).unwrap(); @@ -1172,7 +1168,7 @@ mod tests { .forward(commits_sink) .map(|_| ())).unwrap(); - // Wait for the commit message to be processed which finalized block 6 + // Wait for the commit message to be processed. let finalized = pool.run_until(env.finalized_stream() .into_future() .map(move |(msg, _)| msg.unwrap().2)); @@ -1186,206 +1182,66 @@ mod tests { let voters: VoterSet<_> = (0..3).map(|i| (Id(i), 1)).collect(); let (network, routing_task) = testing::environment::make_network(); - let threads_pool = futures::executor::ThreadPool::new().unwrap(); - - futures::executor::block_on(::futures::future::lazy(move |_| { - threads_pool.spawn_ok(routing_task.map(|_| ())); - - // initialize unsynced voter at round 0 - let mut unsynced_voter = { - let local_id = Id(4); - - let env = Arc::new(Environment::new(network.clone(), local_id)); - let last_finalized = env.with_chain(|chain| { - chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]); - chain.last_finalized() - }); - - Voter::new( - env.clone(), - voters.clone(), - network.make_global_comms(), - 0, - Vec::new(), - last_finalized, - last_finalized, - ) - }; - - let pv = |id| crate::SignedPrevote { - prevote: crate::Prevote { target_hash: "C", target_number: 4 }, - id: Id(id), - signature: Signature(99), - }; - - let pc = |id| crate::SignedPrecommit { - precommit: crate::Precommit { target_hash: "C", target_number: 4 }, - id: Id(id), - signature: Signature(99), - }; - - // send in a catch-up message for round 5. - network.send_message(CommunicationIn::CatchUp( - CatchUp { - base_number: 1, - base_hash: GENESIS_HASH, - round_number: 5, - prevotes: vec![pv(0), pv(1), pv(2)], - precommits: vec![pc(0), pc(1), pc(2)], - }, - Callback::Blank, - )); - - // poll until it's caught up. - // should skip to round 6 - ::futures::future::poll_fn(move |cx| -> Poll> { - let poll = Future::poll(Pin::new(&mut unsynced_voter), cx); - if unsynced_voter.best_round.round_number() == 6 { - Poll::Ready(Ok(())) - } else { - futures::ready!(poll).map_err(|_| ())?; - Poll::Ready(Ok(())) - } - }) - }).flatten()).unwrap(); - } - - #[test] - fn pick_up_from_prior_without_grandparent_state() { - let local_id = Id(5); - let voters = std::iter::once((local_id, 100)).collect(); - - let (network, routing_task) = testing::environment::make_network(); - let (signal, exit) = ::exit_future::signal(); - - let global_comms = network.make_global_comms(); - let env = Arc::new(Environment::new(network, local_id)); - current_thread::block_on_all(::futures::future::lazy(move || { - // initialize chain - let last_finalized = env.with_chain(|chain| { - chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]); - chain.last_finalized() - }); - - // run voter in background. scheduling it to shut down at the end. - let finalized = env.finalized_stream(); - let voter = Voter::new( - env.clone(), - voters, - global_comms, - 10, - Vec::new(), - last_finalized, - last_finalized, - ); - tokio::spawn(exit.clone() - .until(voter.map_err(|_| panic!("Error voting"))).map(|_| ())); - - tokio::spawn(exit.until(routing_task).map(|_| ())); - - // wait for the best block to finalize. - finalized - .take_while(|&(_, n, _)| Ok(n < 6)) - .for_each(|_| Ok(())) - .map(|_| signal.fire()) - })).unwrap(); - } + let mut pool = LocalPool::new(); - #[test] - fn pick_up_from_prior_with_grandparent_state() { - let local_id = Id(99); - let voters = (0..100).map(|id| (Id(id), 1)).collect::>(); + pool.spawner().spawn(routing_task.map(|_| ())).unwrap(); - let (network, routing_task) = testing::environment::make_network(); - let (signal, exit) = ::exit_future::signal(); + // initialize unsynced voter at round 0 + let mut unsynced_voter = { + let local_id = Id(4); - let global_comms = network.make_global_comms(); - let env = Arc::new(Environment::new(network.clone(), local_id)); - let outer_env = env.clone(); - current_thread::block_on_all(::futures::future::lazy(move || { - // initialize chain + let env = Arc::new(Environment::new(network.clone(), local_id)); let last_finalized = env.with_chain(|chain| { chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]); chain.last_finalized() }); - let mut last_round_votes = Vec::new(); + let last_round_state = RoundState::genesis((GENESIS_HASH, 1)); - // round 1 state on disk: 67 prevotes for "E". 66 precommits for "D". 1 precommit "E". - // the round is completable, but the estimate ("E") is not finalized. - { - for id in 0..67 { - let prevote = Message::Prevote(Prevote { target_hash: "E", target_number: 6 }); - let precommit = if id < 66 { - Message::Precommit(Precommit { target_hash: "D", target_number: 5 }) - } else { - Message::Precommit(Precommit { target_hash: "E", target_number: 6 }) - }; - - last_round_votes.push(SignedMessage { - message: prevote.clone(), - signature: Signature(id), - id: Id(id), - }); - - last_round_votes.push(SignedMessage { - message: precommit.clone(), - signature: Signature(id), - id: Id(id), - }); - - // round 2 has the same votes. - // - // this means we wouldn't be able to start round 3 until - // the estimate of round-1 moves backwards. - let (_, round_sink) = network.make_round_comms(2, Id(id)); - tokio::spawn( - round_sink.send(prevote).and_then(move |sink| sink.send(precommit)) - .map_err(|_| ()) - .map(|_| ()) - ); - } - } - - // round 1 fresh communication. we send one more precommit for "D" so the estimate - // moves backwards. - { - let sender = Id(67); - let (_, round_sink) = network.make_round_comms(1, sender); - let last_precommit = Message::Precommit(Precommit { target_hash: "D", target_number: 3 }); - tokio::spawn(round_sink.send(last_precommit).map(|_| ()).map_err(|_| ())); - } - - // run voter in background. scheduling it to shut down at the end. - let voter = Voter::new( + Voter::new( env.clone(), - voters, - global_comms, - 1, - last_round_votes, - last_finalized, + voters.clone(), + network.make_global_comms(), + 0, + last_round_state, last_finalized, - ); - tokio::spawn(exit.clone() - .until(voter.map_err(|_| panic!("Error voting"))).map(|_| ())); - - tokio::spawn(exit.until(routing_task).map(|_| ())); + ) + }; - // wait until we see a prevote on round 3 from our local ID, - // indicating that the round 3 has started. + let pv = |id| crate::SignedPrevote { + prevote: crate::Prevote { target_hash: "C", target_number: 4 }, + id: Id(id), + signature: Signature(99), + }; - let (round_stream, _) = network.make_round_comms(3, Id(1000)); - round_stream - .skip_while(move |v| if let Message::Prevote(_) = v.message { - Ok(v.id != local_id) - } else { - Ok(true) - }) - .into_future() - .map(move |(x, _stream)| { signal.fire(); x }) - .map_err(|(err, _stream)| err) - })).unwrap(); + let pc = |id| crate::SignedPrecommit { + precommit: crate::Precommit { target_hash: "C", target_number: 4 }, + id: Id(id), + signature: Signature(99), + }; - assert_eq!(outer_env.last_completed_and_concluded(), (2, 1)); + // send in a catch-up message for round 5. + network.send_message(CommunicationIn::CatchUp( + CatchUp { + base_number: 1, + base_hash: GENESIS_HASH, + round_number: 5, + prevotes: vec![pv(0), pv(1), pv(2)], + precommits: vec![pc(0), pc(1), pc(2)], + }, + Callback::Blank, + )); + + // poll until it's caught up. + // should skip to round 6 + pool.run_until(future::poll_fn(move |cx| -> Poll<()> { + let poll = Future::poll(Pin::new(&mut unsynced_voter), cx); + if unsynced_voter.best_round.round_number() == 6 { + Poll::Ready(()) + } else { + futures::ready!(poll).unwrap(); + Poll::Ready(()) + } + })) } } From 4e369ebaa1794b7b413b0414dd1e6ac02170894a Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Tue, 26 Nov 2019 18:07:56 +0100 Subject: [PATCH 4/7] Upgrade futures and futures-timer. --- Cargo.toml | 4 ++-- src/testing.rs | 8 +++----- src/voter/mod.rs | 28 ++++++++++++++++++---------- 3 files changed, 23 insertions(+), 17 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d666df67..b8d54a65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,8 +8,8 @@ repository = "https://github.com/paritytech/finality-grandpa" edition = "2018" [dependencies] -futures-preview = "0.3.0-alpha.17" -futures-timer = "0.3.0" +futures = "0.3.1" +futures-timer = "2.0.2" log = "0.4" parking_lot = { version = "0.9", optional = true } parity-scale-codec = { version = "1.0.3", optional = true, default-features = false, features = ["derive"] } diff --git a/src/testing.rs b/src/testing.rs index ce47dc38..aad68ac4 100644 --- a/src/testing.rs +++ b/src/testing.rs @@ -209,10 +209,8 @@ pub mod environment { let (incoming, outgoing) = self.network.make_round_comms(round, self.local_id); RoundData { voter_id: Some(self.local_id), - prevote_timer: Box::pin(Delay::new(GOSSIP_DURATION) - .map_err(|_| panic!("Timer failed"))), - precommit_timer: Box::pin(Delay::new(GOSSIP_DURATION + GOSSIP_DURATION) - .map_err(|_| panic!("Timer failed"))), + prevote_timer: Box::pin(Delay::new(GOSSIP_DURATION).map(Ok)), + precommit_timer: Box::pin(Delay::new(GOSSIP_DURATION + GOSSIP_DURATION).map(Ok)), incoming: Box::pin(incoming), outgoing: Box::pin(outgoing), } @@ -226,7 +224,7 @@ pub mod environment { let delay = Duration::from_millis( rand::thread_rng().gen_range(0, COMMIT_DELAY_MILLIS)); - Box::pin(Delay::new(delay).map_err(|_| panic!("Timer failed"))) + Box::pin(Delay::new(delay).map(Ok)) } fn completed( diff --git a/src/voter/mod.rs b/src/voter/mod.rs index c7b594a6..4b522ef4 100644 --- a/src/voter/mod.rs +++ b/src/voter/mod.rs @@ -899,7 +899,7 @@ mod tests { }; use futures::executor::LocalPool; use futures::task::SpawnExt; - use futures_timer::TryFutureExt as _; + use futures_timer::Delay; use std::iter; use std::time::Duration; @@ -1103,15 +1103,23 @@ mod tests { .map(|_| ()) ).unwrap(); - // wait for the first commit (ours) - let res = pool.run_until(commits_stream.into_future() - .then(|(_, stream)| { - stream.take(1).for_each(|_| future::ready(())) // the second commit should never arrive - .map(|()| Ok::<(), std::io::Error>(())) - .timeout(Duration::from_millis(500)).map_err(|_| ()) - })); - - assert!(res.is_err()) // so the previous future times out + let res = pool.run_until( + // wait for the first commit (ours) + commits_stream.into_future() + .then(|(_, stream)| { + // the second commit should never arrive + let await_second = stream.take(1) + .for_each(|_| future::ready(())); + let delay = Delay::new(Duration::from_millis(500)); + future::select(await_second, delay) + })); + + match res { + future::Either::Right(((), _work)) => { + // the future timed out as expected + } + _ => panic!("Unexpected result") + } } #[test] From b6e99b132ae1be76d5e28c59e701a433e0c91bb7 Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Thu, 28 Nov 2019 14:49:38 +0100 Subject: [PATCH 5/7] Update remaining tests. --- src/voter/mod.rs | 164 +++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 146 insertions(+), 18 deletions(-) diff --git a/src/voter/mod.rs b/src/voter/mod.rs index 4b522ef4..097557c4 100644 --- a/src/voter/mod.rs +++ b/src/voter/mod.rs @@ -919,8 +919,6 @@ mod tests { chain.last_finalized() }); - let last_round_state = RoundState::genesis((GENESIS_HASH, 1)); - // run voter in background. scheduling it to shut down at the end. let finalized = env.finalized_stream(); let voter = Voter::new( @@ -928,7 +926,8 @@ mod tests { voters, global_comms, 0, - last_round_state, + Vec::new(), + last_finalized, last_finalized, ); @@ -1002,15 +1001,14 @@ mod tests { chain.last_finalized() }); - let last_round_state = RoundState::genesis((GENESIS_HASH, 1)); - // run voter in background. scheduling it to shut down at the end. let voter = Voter::new( env.clone(), voters.clone(), global_comms, 0, - last_round_state, + Vec::new(), + last_finalized, last_finalized, ); @@ -1064,15 +1062,14 @@ mod tests { chain.last_finalized() }); - let last_round_state = RoundState::genesis((GENESIS_HASH, 1)); - // run voter in background. scheduling it to shut down at the end. let voter = Voter::new( env.clone(), voters.clone(), global_comms, 0, - last_round_state, + Vec::new(), + last_finalized, last_finalized, ); @@ -1154,15 +1151,14 @@ mod tests { chain.last_finalized() }); - let last_round_state = RoundState::genesis((GENESIS_HASH, 1)); - // run voter in background. let voter = Voter::new( env.clone(), voters.clone(), global_comms, 1, - last_round_state, + Vec::new(), + last_finalized, last_finalized, ); @@ -1177,9 +1173,10 @@ mod tests { .map(|_| ())).unwrap(); // Wait for the commit message to be processed. - let finalized = pool.run_until(env.finalized_stream() - .into_future() - .map(move |(msg, _)| msg.unwrap().2)); + let finalized = pool.run_until( + env.finalized_stream() + .into_future() + .map(move |(msg, _)| msg.unwrap().2)); assert_eq!(finalized, commit); } @@ -1204,14 +1201,13 @@ mod tests { chain.last_finalized() }); - let last_round_state = RoundState::genesis((GENESIS_HASH, 1)); - Voter::new( env.clone(), voters.clone(), network.make_global_comms(), 0, - last_round_state, + Vec::new(), + last_finalized, last_finalized, ) }; @@ -1252,4 +1248,136 @@ mod tests { } })) } + + #[test] + fn pick_up_from_prior_without_grandparent_state() { + let local_id = Id(5); + let voters = std::iter::once((local_id, 100)).collect(); + + let (network, routing_task) = testing::environment::make_network(); + + let global_comms = network.make_global_comms(); + let env = Arc::new(Environment::new(network, local_id)); + + // initialize chain + let last_finalized = env.with_chain(|chain| { + chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]); + chain.last_finalized() + }); + + // run voter in background. scheduling it to shut down at the end. + let voter = Voter::new( + env.clone(), + voters, + global_comms, + 10, + Vec::new(), + last_finalized, + last_finalized, + ); + + let mut pool = LocalPool::new(); + pool.spawner().spawn(voter.map(|v| v.expect("Error voting"))).unwrap(); + pool.spawner().spawn(routing_task.map(|_| ())).unwrap(); + + // wait for the best block to finalize. + pool.run_until(env.finalized_stream() + .take_while(|&(_, n, _)| future::ready(n < 6)) + .for_each(|_| future::ready(()))) + } + + #[test] + fn pick_up_from_prior_with_grandparent_state() { + let local_id = Id(99); + let voters = (0..100).map(|id| (Id(id), 1)).collect::>(); + + let (network, routing_task) = testing::environment::make_network(); + + let global_comms = network.make_global_comms(); + let env = Arc::new(Environment::new(network.clone(), local_id)); + let outer_env = env.clone(); + + // initialize chain + let last_finalized = env.with_chain(|chain| { + chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]); + chain.last_finalized() + }); + + let mut pool = LocalPool::new(); + let mut last_round_votes = Vec::new(); + + // round 1 state on disk: 67 prevotes for "E". 66 precommits for "D". 1 precommit "E". + // the round is completable, but the estimate ("E") is not finalized. + for id in 0..67 { + let prevote = Message::Prevote(Prevote { target_hash: "E", target_number: 6 }); + let precommit = if id < 66 { + Message::Precommit(Precommit { target_hash: "D", target_number: 5 }) + } else { + Message::Precommit(Precommit { target_hash: "E", target_number: 6 }) + }; + + last_round_votes.push(SignedMessage { + message: prevote.clone(), + signature: Signature(id), + id: Id(id), + }); + + last_round_votes.push(SignedMessage { + message: precommit.clone(), + signature: Signature(id), + id: Id(id), + }); + + // round 2 has the same votes. + // + // this means we wouldn't be able to start round 3 until + // the estimate of round-1 moves backwards. + let (_, round_sink) = network.make_round_comms(2, Id(id)); + let msgs = stream::iter(iter::once(Ok(prevote)).chain(iter::once(Ok(precommit)))); + pool.spawner().spawn(msgs.forward(round_sink).map(|r| r.unwrap())).unwrap(); + } + + // round 1 fresh communication. we send one more precommit for "D" so the estimate + // moves backwards. + let sender = Id(67); + let (_, round_sink) = network.make_round_comms(1, sender); + let last_precommit = Message::Precommit(Precommit { target_hash: "D", target_number: 3 }); + pool.spawner().spawn( + stream::iter(iter::once(Ok(last_precommit))) + .forward(round_sink) + .map(|r| r.unwrap())).unwrap(); + + // run voter in background. scheduling it to shut down at the end. + let voter = Voter::new( + env.clone(), + voters, + global_comms, + 1, + last_round_votes, + last_finalized, + last_finalized, + ); + pool.spawner().spawn(voter.map_err(|_| panic!("Error voting")).map(|_| ())).unwrap(); + + pool.spawner().spawn(routing_task.map(|_| ())).unwrap(); + + // wait until we see a prevote on round 3 from our local ID, + // indicating that the round 3 has started. + + let (round_stream, _) = network.make_round_comms(3, Id(1000)); + pool.run_until(round_stream + .skip_while(move |v| { + let v = v.as_ref().unwrap(); + if let Message::Prevote(_) = v.message { + future::ready(v.id != local_id) + } else { + future::ready(true) + } + }) + .into_future() + .map(move |(x, _stream)| x)); + + assert_eq!(outer_env.last_completed_and_concluded(), (2, 1)); + } + } From 405a3fb0541655012029259e9e02646c88febf89 Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Thu, 28 Nov 2019 18:00:01 +0100 Subject: [PATCH 6/7] Small cleanup --- src/testing.rs | 9 ++++----- src/voter/mod.rs | 4 ++-- src/voter/past_rounds.rs | 13 ++----------- src/voter/voting_round.rs | 4 ++-- 4 files changed, 10 insertions(+), 20 deletions(-) diff --git a/src/testing.rs b/src/testing.rs index aad68ac4..8782f9d6 100644 --- a/src/testing.rs +++ b/src/testing.rs @@ -134,6 +134,8 @@ pub mod environment { use crate::{Chain, Commit, Error, Equivocation, Message, Prevote, Precommit, PrimaryPropose, SignedMessage, HistoricalVotes}; use futures::prelude::*; use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender}; + use futures::stream::BoxStream; + use futures::future::BoxFuture; use futures_timer::Delay; use parking_lot::Mutex; use std::collections::HashMap; @@ -196,10 +198,10 @@ pub mod environment { } impl crate::voter::Environment<&'static str, u32> for Environment { - type Timer = Pin> + Send + 'static>>; + type Timer = BoxFuture<'static, Result<(),Error>>; type Id = Id; type Signature = Signature; - type In = Pin,Error>> + Send + 'static>>; + type In = BoxStream<'static, Result,Error>>; type Out = Pin,Error=Error> + Send + 'static>>; type Error = Error; @@ -424,7 +426,4 @@ pub mod environment { Poll::Pending } } - - impl Unpin for NetworkRouting { - } } diff --git a/src/voter/mod.rs b/src/voter/mod.rs index 097557c4..e73573fb 100644 --- a/src/voter/mod.rs +++ b/src/voter/mod.rs @@ -703,7 +703,7 @@ impl, GlobalIn, GlobalOut> Voter Result<(), E::Error> { @@ -1239,7 +1239,7 @@ mod tests { // poll until it's caught up. // should skip to round 6 pool.run_until(future::poll_fn(move |cx| -> Poll<()> { - let poll = Future::poll(Pin::new(&mut unsynced_voter), cx); + let poll = unsynced_voter.poll_unpin(cx); if unsynced_voter.best_round.round_number() == 6 { Poll::Ready(()) } else { diff --git a/src/voter/past_rounds.rs b/src/voter/past_rounds.rs index 51dbad28..33cf4296 100644 --- a/src/voter/past_rounds.rs +++ b/src/voter/past_rounds.rs @@ -185,7 +185,7 @@ impl> RoundCommitter where } } - ready!(Future::poll(Pin::new(&mut self.commit_timer), cx))?; + ready!(self.commit_timer.poll_unpin(cx))?; match (self.last_commit.take(), voting_round.finalized()) { (None, Some(_)) => { @@ -225,7 +225,7 @@ impl Future for SelfReturningFuture { fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { match self.inner.take() { None => panic!("poll after return is not done in this module; qed"), - Some(mut f) => match Future::poll(Pin::new(&mut f), cx) { + Some(mut f) => match f.poll_unpin(cx) { Poll::Ready(item) => Poll::Ready((item, f)), Poll::Pending => { self.inner = Some(f); @@ -236,9 +236,6 @@ impl Future for SelfReturningFuture { } } -impl Unpin for SelfReturningFuture { -} - /// A stream for past rounds, which produces any commit messages from those /// rounds and drives them to completion. pub(super) struct PastRounds> where @@ -345,9 +342,3 @@ impl> Stream for PastRounds where } } } - -impl> Unpin for PastRounds where - H: Clone + Eq + Ord + ::std::fmt::Debug, - N: Copy + BlockNumberOps + ::std::fmt::Debug, -{ -} diff --git a/src/voter/voting_round.rs b/src/voter/voting_round.rs index 2b212e46..057c09cb 100644 --- a/src/voter/voting_round.rs +++ b/src/voter/voting_round.rs @@ -441,7 +441,7 @@ impl> VotingRound where let state = self.state.take(); let mut handle_prevote = |mut prevote_timer: E::Timer, precommit_timer: E::Timer, proposed| { - let should_prevote = match Future::poll(Pin::new(&mut prevote_timer), cx) { + let should_prevote = match prevote_timer.poll_unpin(cx) { Poll::Ready(Err(e)) => return Err(e), Poll::Ready(Ok(())) => true, Poll::Pending => self.votes.completable(), @@ -499,7 +499,7 @@ impl> VotingRound where p_g == &last_round_estimate || self.env.is_equal_or_descendent_of(last_round_estimate.0, p_g.0.clone()) }) - } && match Future::poll(Pin::new(&mut precommit_timer), cx) { + } && match precommit_timer.poll_unpin(cx) { Poll::Ready(Err(e)) => return Err(e), Poll::Ready(Ok(())) => true, Poll::Pending => self.votes.completable(), From 7e8b46f5a073311872651e2476ffb56c52a3eed1 Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Thu, 28 Nov 2019 18:45:52 +0100 Subject: [PATCH 7/7] Small cleanup. --- src/voter/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/voter/mod.rs b/src/voter/mod.rs index e73573fb..a845ec9a 100644 --- a/src/voter/mod.rs +++ b/src/voter/mod.rs @@ -1357,8 +1357,8 @@ mod tests { last_finalized, last_finalized, ); - pool.spawner().spawn(voter.map_err(|_| panic!("Error voting")).map(|_| ())).unwrap(); + pool.spawner().spawn(voter.map_err(|_| panic!("Error voting")).map(|_| ())).unwrap(); pool.spawner().spawn(routing_task.map(|_| ())).unwrap(); // wait until we see a prevote on round 3 from our local ID, @@ -1375,7 +1375,7 @@ mod tests { } }) .into_future() - .map(move |(x, _stream)| x)); + .map(|_| ())); assert_eq!(outer_env.last_completed_and_concluded(), (2, 1)); }