Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Merge pull request #6 from paritytech/a-ext-pool
Browse files Browse the repository at this point in the history
More generic extrinsic pool
  • Loading branch information
arkpar authored Aug 29, 2018
2 parents d12426b + 26949a0 commit 293af33
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 522 deletions.
79 changes: 39 additions & 40 deletions Cargo.lock

Large diffs are not rendered by default.

34 changes: 18 additions & 16 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,10 @@ fn make_group_info(roster: DutyRoster, authorities: &[AuthorityId], local_id: Au
}

/// Polkadot proposer factory.
pub struct ProposerFactory<C, N, P> {
pub struct ProposerFactory<C, N, P>
where
P: PolkadotApi + Send + Sync + 'static
{
/// The client instance.
pub client: Arc<P>,
/// The transaction pool.
Expand Down Expand Up @@ -407,7 +410,7 @@ struct LocalDuty {
}

/// The Polkadot proposer logic.
pub struct Proposer<C: PolkadotApi> {
pub struct Proposer<C: PolkadotApi + Send + Sync> {
client: Arc<C>,
dynamic_inclusion: DynamicInclusion,
local_key: Arc<ed25519::Pair>,
Expand Down Expand Up @@ -587,10 +590,10 @@ impl<C> bft::Proposer<Block> for Proposer<C>

let local_id = self.local_key.public().0.into();
let mut next_index = {
let cur_index = self.transaction_pool.cull_and_get_pending(BlockId::hash(self.parent_hash), |pending| pending
.filter(|tx| tx.sender().map(|s| s == local_id).unwrap_or(false))
let cur_index = self.transaction_pool.cull_and_get_pending(&BlockId::hash(self.parent_hash), |pending| pending
.filter(|tx| tx.verified.sender().map(|s| s == local_id).unwrap_or(false))
.last()
.map(|tx| Ok(tx.index()))
.map(|tx| Ok(tx.verified.index()))
.unwrap_or_else(|| self.client.index(&self.parent_id, local_id))
);

Expand Down Expand Up @@ -636,9 +639,8 @@ impl<C> bft::Proposer<Block> for Proposer<C>
index: extrinsic.index,
function: extrinsic.function,
};
let uxt = UncheckedExtrinsic::new(extrinsic, signature);

self.transaction_pool.import_unchecked_extrinsic(BlockId::hash(self.parent_hash), uxt)
let uxt: Vec<u8> = Decode::decode(&mut UncheckedExtrinsic::new(extrinsic, signature).encode().as_slice()).expect("Encoded extrinsic is valid");
self.transaction_pool.submit_one(&BlockId::hash(self.parent_hash), uxt)
.expect("locally signed extrinsic is valid; qed");
}
}
Expand Down Expand Up @@ -720,7 +722,7 @@ impl ProposalTiming {
}

/// Future which resolves upon the creation of a proposal.
pub struct CreateProposal<C: PolkadotApi> {
pub struct CreateProposal<C: PolkadotApi + Send + Sync> {
parent_hash: Hash,
parent_number: BlockNumber,
parent_id: BlockId,
Expand All @@ -732,7 +734,7 @@ pub struct CreateProposal<C: PolkadotApi> {
offline: SharedOfflineTracker,
}

impl<C> CreateProposal<C> where C: PolkadotApi {
impl<C> CreateProposal<C> where C: PolkadotApi + Send + Sync {
fn propose_with(&self, candidates: Vec<CandidateReceipt>) -> Result<Block, Error> {
use polkadot_api::BlockBuilder;
use runtime_primitives::traits::{Hash as HashT, BlakeTwo256};
Expand Down Expand Up @@ -767,18 +769,18 @@ impl<C> CreateProposal<C> where C: PolkadotApi {

{
let mut unqueue_invalid = Vec::new();
let result = self.transaction_pool.cull_and_get_pending(BlockId::hash(self.parent_hash), |pending_iterator| {
let result = self.transaction_pool.cull_and_get_pending(&BlockId::hash(self.parent_hash), |pending_iterator| {
let mut pending_size = 0;
for pending in pending_iterator {
if pending_size + pending.encoded_size() >= MAX_TRANSACTIONS_SIZE { break }
if pending_size + pending.verified.encoded_size() >= MAX_TRANSACTIONS_SIZE { break }

match block_builder.push_extrinsic(pending.primitive_extrinsic()) {
match block_builder.push_extrinsic(pending.original.clone()) {
Ok(()) => {
pending_size += pending.encoded_size();
pending_size += pending.verified.encoded_size();
}
Err(e) => {
trace!(target: "transaction-pool", "Invalid transaction: {}", e);
unqueue_invalid.push(pending.hash().clone());
unqueue_invalid.push(pending.verified.hash().clone());
}
}
}
Expand Down Expand Up @@ -819,7 +821,7 @@ impl<C> CreateProposal<C> where C: PolkadotApi {
}
}

impl<C> Future for CreateProposal<C> where C: PolkadotApi {
impl<C> Future for CreateProposal<C> where C: PolkadotApi + Send + Sync {
type Item = Block;
type Error = Error;

Expand Down
1 change: 0 additions & 1 deletion consensus/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ impl Service {
let last_agreement = s.last_agreement();
let can_build_upon = last_agreement
.map_or(true, |x| !x.live || x.parent_hash != hash);

if hash == prev_best && can_build_upon {
debug!("Starting consensus round after a timeout");
start_bft(best_block, s.clone());
Expand Down
2 changes: 1 addition & 1 deletion network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub const DOT_PROTOCOL_ID: ::substrate_network::ProtocolId = *b"dot";
type FullStatus = GenericFullStatus<Block>;

/// Specialization of the network service for the polkadot protocol.
pub type NetworkService = ::substrate_network::Service<Block, PolkadotProtocol>;
pub type NetworkService = ::substrate_network::Service<Block, PolkadotProtocol, Hash>;

/// Status of a Polkadot node.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
Expand Down
1 change: 0 additions & 1 deletion service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,5 @@ substrate-runtime-io = { git = "https://github.com/paritytech/substrate" }
substrate-primitives = { git = "https://github.com/paritytech/substrate" }
substrate-network = { git = "https://github.com/paritytech/substrate" }
substrate-client = { git = "https://github.com/paritytech/substrate" }
substrate-codec = { git = "https://github.com/paritytech/substrate" }
substrate-service = { git = "https://github.com/paritytech/substrate" }
substrate-telemetry = { git = "https://github.com/paritytech/substrate" }
159 changes: 31 additions & 128 deletions service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ extern crate polkadot_transaction_pool as transaction_pool;
extern crate polkadot_network;
extern crate substrate_primitives as primitives;
extern crate substrate_network as network;
extern crate substrate_codec as codec;
extern crate substrate_client as client;
extern crate substrate_service as service;
extern crate tokio;
Expand All @@ -42,14 +41,12 @@ extern crate hex_literal;
pub mod chain_spec;

use std::sync::Arc;
use std::collections::HashMap;

use codec::{Encode, Decode};
use tokio::prelude::{Stream, Future};
use transaction_pool::TransactionPool;
use polkadot_api::{PolkadotApi, light::RemotePolkadotApiWrapper};
use polkadot_primitives::{parachain, AccountId, Block, BlockId, Hash};
use polkadot_runtime::GenesisConfig;
use client::Client;
use client::{Client, BlockchainEvents};
use polkadot_network::{PolkadotProtocol, consensus::ConsensusNetwork};
use tokio::runtime::TaskExecutor;
use service::FactoryFullConfiguration;
Expand All @@ -63,7 +60,7 @@ pub use client::ExecutionStrategy;
pub type ChainSpec = service::ChainSpec<GenesisConfig>;
/// Polkadot client type for specialised `Components`.
pub type ComponentClient<C> = Client<<C as Components>::Backend, <C as Components>::Executor, Block>;
pub type NetworkService = network::Service<Block, <Factory as service::ServiceFactory>::NetworkProtocol>;
pub type NetworkService = network::Service<Block, <Factory as service::ServiceFactory>::NetworkProtocol, Hash>;

/// A collection of type to generalise Polkadot specific components over full / light client.
pub trait Components: service::Components {
Expand Down Expand Up @@ -106,16 +103,11 @@ pub struct Factory;

impl service::ServiceFactory for Factory {
type Block = Block;
type ExtrinsicHash = Hash;
type NetworkProtocol = PolkadotProtocol;
type RuntimeDispatch = polkadot_executor::Executor;
type FullExtrinsicPool = TransactionPoolAdapter<
service::FullBackend<Self>,
service::FullExecutor<Self>,
service::FullClient<Self>
>;
type LightExtrinsicPool = TransactionPoolAdapter<
service::LightBackend<Self>,
service::LightExecutor<Self>,
type FullExtrinsicPoolApi = transaction_pool::ChainApi<service::FullClient<Self>>;
type LightExtrinsicPoolApi = transaction_pool::ChainApi<
RemotePolkadotApiWrapper<service::LightBackend<Self>, service::LightExecutor<Self>>
>;
type Genesis = GenesisConfig;
Expand All @@ -124,25 +116,17 @@ impl service::ServiceFactory for Factory {
const NETWORK_PROTOCOL_ID: network::ProtocolId = ::polkadot_network::DOT_PROTOCOL_ID;

fn build_full_extrinsic_pool(config: ExtrinsicPoolOptions, client: Arc<service::FullClient<Self>>)
-> Result<Self::FullExtrinsicPool, Error>
-> Result<TransactionPool<service::FullClient<Self>>, Error>
{
let api = client.clone();
Ok(TransactionPoolAdapter {
pool: Arc::new(TransactionPool::new(config, api)),
client: client,
imports_external_transactions: true,
})
Ok(TransactionPool::new(config, transaction_pool::ChainApi::new(api)))
}

fn build_light_extrinsic_pool(config: ExtrinsicPoolOptions, client: Arc<service::LightClient<Self>>)
-> Result<Self::LightExtrinsicPool, Error>
-> Result<TransactionPool<RemotePolkadotApiWrapper<service::LightBackend<Self>, service::LightExecutor<Self>>>, Error>
{
let api = Arc::new(RemotePolkadotApiWrapper(client.clone()));
Ok(TransactionPoolAdapter {
pool: Arc::new(TransactionPool::new(config, api)),
client: client,
imports_external_transactions: false,
})
Ok(TransactionPool::new(config, transaction_pool::ChainApi::new(api)))
}

fn build_network_protocol(config: &Configuration)
Expand Down Expand Up @@ -182,8 +166,18 @@ impl <C: Components> Service<C> {
pub fn new_light(config: Configuration, executor: TaskExecutor)
-> Result<Service<LightComponents<Factory>>, Error>
{
let service = service::Service::<LightComponents<Factory>>::new(config, executor)?;
let service = service::Service::<LightComponents<Factory>>::new(config, executor.clone())?;
let api = Arc::new(RemotePolkadotApiWrapper(service.client()));
let pool = service.extrinsic_pool();
let events = service.client().import_notification_stream()
.for_each(move |notification| {
// re-verify all transactions without the sender.
pool.retry_verification(&BlockId::hash(notification.hash), None)
.map_err(|e| warn!("Error re-verifying transactions: {:?}", e))?;
Ok(())
})
.then(|_| Ok(()));
executor.spawn(events);
Ok(Service {
client: service.client(),
network: service.network(),
Expand Down Expand Up @@ -212,7 +206,16 @@ pub fn new_full(config: Configuration, executor: TaskExecutor)

let is_validator = (config.roles & Roles::AUTHORITY) == Roles::AUTHORITY;
let service = service::Service::<FullComponents<Factory>>::new(config, executor.clone())?;

let pool = service.extrinsic_pool();
let events = service.client().import_notification_stream()
.for_each(move |notification| {
// re-verify all transactions without the sender.
pool.retry_verification(&BlockId::hash(notification.hash), None)
.map_err(|e| warn!("Error re-verifying transactions: {:?}", e))?;
Ok(())
})
.then(|_| Ok(()));
executor.spawn(events);
// Spin consensus service if configured
let consensus = if is_validator {
// Load the first available key
Expand Down Expand Up @@ -261,103 +264,3 @@ impl<C: Components> ::std::ops::Deref for Service<C> {
&self.inner
}
}

/// Transaction pool adapter.
pub struct TransactionPoolAdapter<B, E, A> where A: Send + Sync, E: Send + Sync {
imports_external_transactions: bool,
pool: Arc<TransactionPool<A>>,
client: Arc<Client<B, E, Block>>,
}

impl<B, E, A> TransactionPoolAdapter<B, E, A>
where
A: Send + Sync,
B: client::backend::Backend<Block, KeccakHasher, RlpCodec> + Send + Sync,
E: client::CallExecutor<Block, KeccakHasher, RlpCodec> + Send + Sync,
{
fn best_block_id(&self) -> Option<BlockId> {
self.client.info()
.map(|info| BlockId::hash(info.chain.best_hash))
.map_err(|e| {
debug!("Error getting best block: {:?}", e);
})
.ok()
}
}

impl<B, E, A> network::TransactionPool<Block> for TransactionPoolAdapter<B, E, A>
where
B: client::backend::Backend<Block, KeccakHasher, RlpCodec> + Send + Sync,
E: client::CallExecutor<Block, KeccakHasher, RlpCodec> + Send + Sync,
A: polkadot_api::PolkadotApi + Send + Sync,
{
fn transactions(&self) -> Vec<(Hash, Vec<u8>)> {
let best_block_id = match self.best_block_id() {
Some(id) => id,
None => return vec![],
};
self.pool.cull_and_get_pending(best_block_id, |pending| pending
.map(|t| {
let hash = t.hash().clone();
(hash, t.primitive_extrinsic())
})
.collect()
).unwrap_or_else(|e| {
warn!("Error retrieving pending set: {}", e);
vec![]
})
}

fn import(&self, transaction: &Vec<u8>) -> Option<Hash> {
if !self.imports_external_transactions {
return None;
}

let encoded = transaction.encode();
if let Some(uxt) = Decode::decode(&mut &encoded[..]) {
let best_block_id = self.best_block_id()?;
match self.pool.import_unchecked_extrinsic(best_block_id, uxt) {
Ok(xt) => Some(*xt.hash()),
Err(e) => match *e.kind() {
transaction_pool::ErrorKind::AlreadyImported(hash) => Some(hash[..].into()),
_ => {
debug!(target: "txpool", "Error adding transaction to the pool: {:?}", e);
None
},
}
}
} else {
debug!(target: "txpool", "Error decoding transaction");
None
}
}

fn on_broadcasted(&self, propagations: HashMap<Hash, Vec<String>>) {
self.pool.on_broadcasted(propagations)
}
}

impl<B, E, A> service::ExtrinsicPool<Block> for TransactionPoolAdapter<B, E, A>
where
B: client::backend::Backend<Block, KeccakHasher, RlpCodec> + Send + Sync + 'static,
E: client::CallExecutor<Block, KeccakHasher, RlpCodec> + Send + Sync + 'static,
A: polkadot_api::PolkadotApi + Send + Sync + 'static,
{
type Api = TransactionPool<A>;

fn prune_imported(&self, hash: &Hash) {
let block = BlockId::hash(*hash);
if let Err(e) = self.pool.cull(block) {
warn!("Culling error: {:?}", e);
}

if let Err(e) = self.pool.retry_verification(block) {
warn!("Re-verifying error: {:?}", e);
}
}

fn api(&self) -> Arc<Self::Api> {
self.pool.clone()
}
}

12 changes: 6 additions & 6 deletions transaction-pool/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use extrinsic_pool::{self, txpool};
use extrinsic_pool;
use polkadot_api;
use primitives::Hash;
use runtime::{Address, UncheckedExtrinsic};

error_chain! {
links {
Pool(txpool::Error, txpool::ErrorKind);
Pool(extrinsic_pool::Error, extrinsic_pool::ErrorKind);
Api(polkadot_api::Error, polkadot_api::ErrorKind);
}
errors {
Expand All @@ -33,7 +33,7 @@ error_chain! {
/// Attempted to queue an inherent transaction.
IsInherent(xt: UncheckedExtrinsic) {
description("Inherent transactions cannot be queued."),
display("Inehrent transactions cannot be queued."),
display("Inherent transactions cannot be queued."),
}
/// Attempted to queue a transaction with bad signature.
BadSignature(e: &'static str) {
Expand Down Expand Up @@ -63,10 +63,10 @@ error_chain! {
}
}

impl extrinsic_pool::api::Error for Error {
fn into_pool_error(self) -> ::std::result::Result<txpool::Error, Self> {
impl extrinsic_pool::IntoPoolError for Error {
fn into_pool_error(self) -> ::std::result::Result<extrinsic_pool::Error, Self> {
match self {
Error(ErrorKind::Pool(e), c) => Ok(txpool::Error(e, c)),
Error(ErrorKind::Pool(e), c) => Ok(extrinsic_pool::Error(e, c)),
e => Err(e),
}
}
Expand Down
Loading

0 comments on commit 293af33

Please sign in to comment.