Skip to content

Commit

Permalink
Merge pull request #100 from romanb/new-futures-redux
Browse files Browse the repository at this point in the history
Upgrade to stable futures (#2).
  • Loading branch information
rphmeier authored Jan 15, 2020
2 parents b0c3e60 + 7e8b46f commit a6b6103
Show file tree
Hide file tree
Showing 6 changed files with 500 additions and 504 deletions.
5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,15 @@ repository = "https://github.com/paritytech/finality-grandpa"
edition = "2018"

[dependencies]
futures = "0.1"
futures = "0.3.1"
futures-timer = "2.0.2"
log = "0.4"
parking_lot = { version = "0.9", optional = true }
parity-scale-codec = { version = "1.0.3", optional = true, default-features = false, features = ["derive"] }
num = { package = "num-traits", version = "0.2", default-features = false }

[dev-dependencies]
exit-future = "0.1.2"
rand = "0.6.0"
tokio = "0.1.8"

[features]
default = ["std"]
Expand Down
24 changes: 12 additions & 12 deletions src/bridge_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@ use crate::round::State as RoundState;
use futures::task;
use parking_lot::{RwLock, RwLockReadGuard};
use std::sync::Arc;
use std::task::Context;

// round state bridged across rounds.
struct Bridged<H, N> {
inner: RwLock<RoundState<H, N>>,
task: task::AtomicTask,
waker: task::AtomicWaker,
}

impl<H, N> Bridged<H, N> {
fn new(inner: RwLock<RoundState<H, N>>) -> Self {
Bridged {
inner,
task: task::AtomicTask::new(),
waker: task::AtomicWaker::new(),
}
}
}
Expand All @@ -41,7 +42,7 @@ impl<H, N> PriorView<H, N> {
/// Push an update to the latter view.
pub(crate) fn update(&self, new: RoundState<H, N>) {
*self.0.inner.write() = new;
self.0.task.notify();
self.0.waker.wake();
}
}

Expand All @@ -50,8 +51,8 @@ pub(crate) struct LatterView<H, N>(Arc<Bridged<H, N>>);

impl<H, N> LatterView<H, N> {
/// Fetch a handle to the last round-state.
pub(crate) fn get(&self) -> RwLockReadGuard<RoundState<H, N>> {
self.0.task.register();
pub(crate) fn get(&self, cx: &mut Context) -> RwLockReadGuard<RoundState<H, N>> {
self.0.waker.register(cx.waker());
self.0.inner.read()
}
}
Expand All @@ -72,8 +73,7 @@ pub(crate) fn bridge_state<H, N>(initial: RoundState<H, N>) -> (PriorView<H, N>,

#[cfg(test)]
mod tests {
use futures::prelude::*;
use std::sync::Barrier;
use std::{sync::Barrier, task::Poll};
use super::*;

#[test]
Expand All @@ -86,11 +86,11 @@ mod tests {
};

let (prior, latter) = bridge_state(initial);
let waits_for_finality = ::futures::future::poll_fn(move || -> Poll<(), ()> {
if latter.get().finalized.is_some() {
Ok(Async::Ready(()))
let waits_for_finality = ::futures::future::poll_fn(move |cx| -> Poll<()> {
if latter.get(cx).finalized.is_some() {
Poll::Ready(())
} else {
Ok(Async::NotReady)
Poll::Pending
}
});

Expand All @@ -107,6 +107,6 @@ mod tests {
});

barrier.wait();
waits_for_finality.wait().unwrap();
futures::executor::block_on(waits_for_finality);
}
}
72 changes: 35 additions & 37 deletions src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,16 @@ pub mod environment {
use crate::voter::{RoundData, CommunicationIn, CommunicationOut, Callback};
use crate::{Chain, Commit, Error, Equivocation, Message, Prevote, Precommit, PrimaryPropose, SignedMessage, HistoricalVotes};
use futures::prelude::*;
use futures::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
use futures::stream::BoxStream;
use futures::future::BoxFuture;
use futures_timer::Delay;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::{Instant, Duration};
use tokio::timer::Delay;
use std::task::{Context, Poll};
use std::time::Duration;

#[derive(Hash, Debug, Clone, Copy, PartialEq, Eq, Ord, PartialOrd)]
pub struct Id(pub u32);
Expand Down Expand Up @@ -194,26 +198,23 @@ pub mod environment {
}

impl crate::voter::Environment<&'static str, u32> for Environment {
type Timer = Box<dyn Future<Item=(),Error=Error> + Send + 'static>;
type Timer = BoxFuture<'static, Result<(),Error>>;
type Id = Id;
type Signature = Signature;
type In = Box<dyn Stream<Item=SignedMessage<&'static str, u32, Signature, Id>,Error=Error> + Send + 'static>;
type Out = Box<dyn Sink<SinkItem=Message<&'static str, u32>,SinkError=Error> + Send + 'static>;
type In = BoxStream<'static, Result<SignedMessage<&'static str, u32, Signature, Id>,Error>>;
type Out = Pin<Box<dyn Sink<Message<&'static str, u32>,Error=Error> + Send + 'static>>;
type Error = Error;

fn round_data(&self, round: u64) -> RoundData<Self::Id, Self::Timer, Self::In, Self::Out> {
const GOSSIP_DURATION: Duration = Duration::from_millis(500);

let now = Instant::now();
let (incoming, outgoing) = self.network.make_round_comms(round, self.local_id);
RoundData {
voter_id: Some(self.local_id),
prevote_timer: Box::new(Delay::new(now + GOSSIP_DURATION)
.map_err(|_| panic!("Timer failed"))),
precommit_timer: Box::new(Delay::new(now + GOSSIP_DURATION + GOSSIP_DURATION)
.map_err(|_| panic!("Timer failed"))),
incoming: Box::new(incoming),
outgoing: Box::new(outgoing),
prevote_timer: Box::pin(Delay::new(GOSSIP_DURATION).map(Ok)),
precommit_timer: Box::pin(Delay::new(GOSSIP_DURATION + GOSSIP_DURATION).map(Ok)),
incoming: Box::pin(incoming),
outgoing: Box::pin(outgoing),
}
}

Expand All @@ -225,8 +226,7 @@ pub mod environment {
let delay = Duration::from_millis(
rand::thread_rng().gen_range(0, COMMIT_DELAY_MILLIS));

let now = Instant::now();
Box::new(Delay::new(now + delay).map_err(|_| panic!("Timer failed")))
Box::pin(Delay::new(delay).map(Ok))
}

fn completed(
Expand Down Expand Up @@ -316,32 +316,31 @@ pub mod environment {

// add a node to the network for a round.
fn add_node<N, F: Fn(N) -> M>(&mut self, f: F) -> (
impl Stream<Item=M,Error=Error>,
impl Sink<SinkItem=N,SinkError=Error>
impl Stream<Item=Result<M,Error>>,
impl Sink<N,Error=Error>
) {
let (tx, rx) = mpsc::unbounded();
let messages_out = self.raw_sender.clone()
.sink_map_err(|e| panic!("Error sending messages: {:?}", e))
.with(move |message| Ok(f(message)));
.with(move |message| future::ready(Ok(f(message))));

// get history to the node.
for prior_message in self.history.iter().cloned() {
let _ = tx.unbounded_send(prior_message);
}

self.senders.push(tx);
let rx = rx.map_err(|e| panic!("Error receiving messages: {:?}", e));

(rx, messages_out)
(rx.map(Ok), messages_out)
}

// do routing work
fn route(&mut self) -> Poll<(), ()> {
fn route(&mut self, cx: &mut Context) -> Poll<()> {
loop {
match self.receiver.poll().map_err(|e| panic!("Error routing messages: {:?}", e))? {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(None) => return Ok(Async::Ready(())),
Async::Ready(Some(item)) => {
match Stream::poll_next(Pin::new(&mut self.receiver), cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(item)) => {
self.history.push(item.clone());
for sender in &self.senders {
let _ = sender.unbounded_send(item.clone());
Expand Down Expand Up @@ -376,8 +375,8 @@ pub mod environment {

impl Network {
pub fn make_round_comms(&self, round_number: u64, node_id: Id) -> (
impl Stream<Item=SignedMessage<&'static str, u32, Signature, Id>,Error=Error>,
impl Sink<SinkItem=Message<&'static str, u32>,SinkError=Error>
impl Stream<Item=Result<SignedMessage<&'static str, u32, Signature, Id>,Error>>,
impl Sink<Message<&'static str, u32>,Error=Error>
) {
let mut rounds = self.rounds.lock();
rounds.entry(round_number)
Expand All @@ -390,8 +389,8 @@ pub mod environment {
}

pub fn make_global_comms(&self) -> (
impl Stream<Item=CommunicationIn<&'static str, u32, Signature, Id>,Error=Error>,
impl Sink<SinkItem=CommunicationOut<&'static str, u32, Signature, Id>,SinkError=Error>
impl Stream<Item=Result<CommunicationIn<&'static str, u32, Signature, Id>,Error>>,
impl Sink<CommunicationOut<&'static str, u32, Signature, Id>,Error=Error>
) {
let mut global_messages = self.global_messages.lock();
global_messages.add_node(|message| match message {
Expand All @@ -412,20 +411,19 @@ pub mod environment {
}

impl Future for NetworkRouting {
type Item = ();
type Error = ();
type Output = ();

fn poll(&mut self) -> Poll<(), ()> {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
let mut rounds = self.rounds.lock();
rounds.retain(|_, round| match round.route() {
Ok(Async::Ready(())) | Err(()) => false,
Ok(Async::NotReady) => true,
rounds.retain(|_, round| match round.route(cx) {
Poll::Ready(()) => false,
Poll::Pending => true,
});

let mut global_messages = self.global_messages.lock();
let _ = global_messages.route();
let _ = global_messages.route(cx);

Ok(Async::NotReady)
Poll::Pending
}
}
}
Loading

0 comments on commit a6b6103

Please sign in to comment.