Skip to content

Commit

Permalink
Merge pull request #89 from paritytech/revert-81-new-futures
Browse files Browse the repository at this point in the history
Revert "Update to stables futures"
  • Loading branch information
rphmeier authored Nov 9, 2019
2 parents 286133e + f9e48eb commit e00b623
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 258 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@ repository = "https://github.com/paritytech/finality-grandpa"
edition = "2018"

[dependencies]
futures-preview = "0.3.0-alpha.17"
futures-timer = "0.3.0"
futures = "0.1"
log = "0.4"
parking_lot = { version = "0.9", optional = true }
parity-scale-codec = { version = "1.0.3", optional = true, default-features = false, features = ["derive"] }
num = { package = "num-traits", version = "0.2", default-features = false }
hashbrown = { version = "0.6" }

[dev-dependencies]
exit-future = "0.1.2"
rand = "0.6.0"
tokio = "0.1.8"

[features]
default = ["std"]
Expand Down
23 changes: 11 additions & 12 deletions src/bridge_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,18 @@ use crate::round::State as RoundState;
use futures::task;
use parking_lot::{RwLock, RwLockReadGuard};
use std::sync::Arc;
use std::task::Context;

// round state bridged across rounds.
struct Bridged<H, N> {
inner: RwLock<RoundState<H, N>>,
waker: task::AtomicWaker,
task: task::AtomicTask,
}

impl<H, N> Bridged<H, N> {
fn new(inner: RwLock<RoundState<H, N>>) -> Self {
Bridged {
inner,
waker: task::AtomicWaker::new(),
task: task::AtomicTask::new(),
}
}
}
Expand All @@ -42,7 +41,7 @@ impl<H, N> PriorView<H, N> {
/// Push an update to the latter view.
pub(crate) fn update(&self, new: RoundState<H, N>) {
*self.0.inner.write() = new;
self.0.waker.wake();
self.0.task.notify();
}
}

Expand All @@ -51,8 +50,8 @@ pub(crate) struct LatterView<H, N>(Arc<Bridged<H, N>>);

impl<H, N> LatterView<H, N> {
/// Fetch a handle to the last round-state.
pub(crate) fn get(&self, cx: &mut Context) -> RwLockReadGuard<RoundState<H, N>> {
self.0.waker.register(cx.waker());
pub(crate) fn get(&self) -> RwLockReadGuard<RoundState<H, N>> {
self.0.task.register();
self.0.inner.read()
}
}
Expand All @@ -74,7 +73,7 @@ pub(crate) fn bridge_state<H, N>(initial: RoundState<H, N>) -> (PriorView<H, N>,
#[cfg(test)]
mod tests {
use futures::prelude::*;
use std::{sync::Barrier, task::Poll};
use std::sync::Barrier;
use super::*;

#[test]
Expand All @@ -87,11 +86,11 @@ mod tests {
};

let (prior, latter) = bridge_state(initial);
let waits_for_finality = ::futures::future::poll_fn(move |cx| -> Poll<()> {
if latter.get(cx).finalized.is_some() {
Poll::Ready(())
let waits_for_finality = ::futures::future::poll_fn(move || -> Poll<(), ()> {
if latter.get().finalized.is_some() {
Ok(Async::Ready(()))
} else {
Poll::Pending
Ok(Async::NotReady)
}
});

Expand All @@ -108,6 +107,6 @@ mod tests {
});

barrier.wait();
futures::executor::block_on(waits_for_finality);
waits_for_finality.wait().unwrap();
}
}
71 changes: 35 additions & 36 deletions src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,12 @@ pub mod environment {
use crate::voter::{RoundData, CommunicationIn, CommunicationOut, Callback};
use crate::{Chain, Commit, Error, Equivocation, Message, Prevote, Precommit, PrimaryPropose, SignedMessage, HistoricalVotes};
use futures::prelude::*;
use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
use futures_timer::Delay;
use futures::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use parking_lot::Mutex;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use std::time::{Instant, Duration};
use tokio::timer::Delay;

#[derive(Hash, Debug, Clone, Copy, PartialEq, Eq, Ord, PartialOrd)]
pub struct Id(pub u32);
Expand Down Expand Up @@ -189,25 +187,26 @@ pub mod environment {
}

impl crate::voter::Environment<&'static str, u32> for Environment {
type Timer = Pin<Box<dyn Future<Output=Result<(),Error>> + Send + 'static>>;
type Timer = Box<dyn Future<Item=(),Error=Error> + Send + 'static>;
type Id = Id;
type Signature = Signature;
type In = Pin<Box<dyn Stream<Item=Result<SignedMessage<&'static str, u32, Signature, Id>,Error>> + Send + 'static>>;
type Out = Pin<Box<dyn Sink<Message<&'static str, u32>,Error=Error> + Send + 'static>>;
type In = Box<dyn Stream<Item=SignedMessage<&'static str, u32, Signature, Id>,Error=Error> + Send + 'static>;
type Out = Box<dyn Sink<SinkItem=Message<&'static str, u32>,SinkError=Error> + Send + 'static>;
type Error = Error;

fn round_data(&self, round: u64) -> RoundData<Self::Id, Self::Timer, Self::In, Self::Out> {
const GOSSIP_DURATION: Duration = Duration::from_millis(500);

let now = Instant::now();
let (incoming, outgoing) = self.network.make_round_comms(round, self.local_id);
RoundData {
voter_id: Some(self.local_id),
prevote_timer: Box::pin(Delay::new(GOSSIP_DURATION)
prevote_timer: Box::new(Delay::new(now + GOSSIP_DURATION)
.map_err(|_| panic!("Timer failed"))),
precommit_timer: Box::pin(Delay::new(GOSSIP_DURATION + GOSSIP_DURATION)
precommit_timer: Box::new(Delay::new(now + GOSSIP_DURATION + GOSSIP_DURATION)
.map_err(|_| panic!("Timer failed"))),
incoming: Box::pin(incoming),
outgoing: Box::pin(outgoing),
incoming: Box::new(incoming),
outgoing: Box::new(outgoing),
}
}

Expand All @@ -219,7 +218,8 @@ pub mod environment {
let delay = Duration::from_millis(
rand::thread_rng().gen_range(0, COMMIT_DELAY_MILLIS));

Box::pin(Delay::new(delay).map_err(|_| panic!("Timer failed")))
let now = Instant::now();
Box::new(Delay::new(now + delay).map_err(|_| panic!("Timer failed")))
}

fn completed(
Expand Down Expand Up @@ -297,31 +297,32 @@ pub mod environment {

// add a node to the network for a round.
fn add_node<N, F: Fn(N) -> M>(&mut self, f: F) -> (
impl Stream<Item=Result<M,Error>>,
impl Sink<N,Error=Error>
impl Stream<Item=M,Error=Error>,
impl Sink<SinkItem=N,SinkError=Error>
) {
let (tx, rx) = mpsc::unbounded();
let messages_out = self.raw_sender.clone()
.sink_map_err(|e| panic!("Error sending messages: {:?}", e))
.with(move |message| future::ready(Ok(f(message))));
.with(move |message| Ok(f(message)));

// get history to the node.
for prior_message in self.history.iter().cloned() {
let _ = tx.unbounded_send(prior_message);
}

self.senders.push(tx);
let rx = rx.map_err(|e| panic!("Error receiving messages: {:?}", e));

(rx.map(Ok), messages_out)
(rx, messages_out)
}

// do routing work
fn route(&mut self, cx: &mut Context) -> Poll<()> {
fn route(&mut self) -> Poll<(), ()> {
loop {
match Stream::poll_next(Pin::new(&mut self.receiver), cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(item)) => {
match self.receiver.poll().map_err(|e| panic!("Error routing messages: {:?}", e))? {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(None) => return Ok(Async::Ready(())),
Async::Ready(Some(item)) => {
self.history.push(item.clone());
for sender in &self.senders {
let _ = sender.unbounded_send(item.clone());
Expand Down Expand Up @@ -356,8 +357,8 @@ pub mod environment {

impl Network {
pub fn make_round_comms(&self, round_number: u64, node_id: Id) -> (
impl Stream<Item=Result<SignedMessage<&'static str, u32, Signature, Id>,Error>>,
impl Sink<Message<&'static str, u32>,Error=Error>
impl Stream<Item=SignedMessage<&'static str, u32, Signature, Id>,Error=Error>,
impl Sink<SinkItem=Message<&'static str, u32>,SinkError=Error>
) {
let mut rounds = self.rounds.lock();
rounds.entry(round_number)
Expand All @@ -370,8 +371,8 @@ pub mod environment {
}

pub fn make_global_comms(&self) -> (
impl Stream<Item=Result<CommunicationIn<&'static str, u32, Signature, Id>,Error>>,
impl Sink<CommunicationOut<&'static str, u32, Signature, Id>,Error=Error>
impl Stream<Item=CommunicationIn<&'static str, u32, Signature, Id>,Error=Error>,
impl Sink<SinkItem=CommunicationOut<&'static str, u32, Signature, Id>,SinkError=Error>
) {
let mut global_messages = self.global_messages.lock();
global_messages.add_node(|message| match message {
Expand All @@ -392,22 +393,20 @@ pub mod environment {
}

impl Future for NetworkRouting {
type Output = ();
type Item = ();
type Error = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
fn poll(&mut self) -> Poll<(), ()> {
let mut rounds = self.rounds.lock();
rounds.retain(|_, round| match round.route(cx) {
Poll::Ready(()) => false,
Poll::Pending => true,
rounds.retain(|_, round| match round.route() {
Ok(Async::Ready(())) | Err(()) => false,
Ok(Async::NotReady) => true,
});

let mut global_messages = self.global_messages.lock();
let _ = global_messages.route(cx);
let _ = global_messages.route();

Poll::Pending
Ok(Async::NotReady)
}
}

impl Unpin for NetworkRouting {
}
}
Loading

0 comments on commit e00b623

Please sign in to comment.