Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

More generic extrinsic pool #6

Merged
merged 1 commit into from
Aug 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 39 additions & 40 deletions Cargo.lock

Large diffs are not rendered by default.

34 changes: 18 additions & 16 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,10 @@ fn make_group_info(roster: DutyRoster, authorities: &[AuthorityId], local_id: Au
}

/// Polkadot proposer factory.
pub struct ProposerFactory<C, N, P> {
pub struct ProposerFactory<C, N, P>
where
P: PolkadotApi + Send + Sync + 'static
{
/// The client instance.
pub client: Arc<P>,
/// The transaction pool.
Expand Down Expand Up @@ -407,7 +410,7 @@ struct LocalDuty {
}

/// The Polkadot proposer logic.
pub struct Proposer<C: PolkadotApi> {
pub struct Proposer<C: PolkadotApi + Send + Sync> {
client: Arc<C>,
dynamic_inclusion: DynamicInclusion,
local_key: Arc<ed25519::Pair>,
Expand Down Expand Up @@ -587,10 +590,10 @@ impl<C> bft::Proposer<Block> for Proposer<C>

let local_id = self.local_key.public().0.into();
let mut next_index = {
let cur_index = self.transaction_pool.cull_and_get_pending(BlockId::hash(self.parent_hash), |pending| pending
.filter(|tx| tx.sender().map(|s| s == local_id).unwrap_or(false))
let cur_index = self.transaction_pool.cull_and_get_pending(&BlockId::hash(self.parent_hash), |pending| pending
.filter(|tx| tx.verified.sender().map(|s| s == local_id).unwrap_or(false))
.last()
.map(|tx| Ok(tx.index()))
.map(|tx| Ok(tx.verified.index()))
.unwrap_or_else(|| self.client.index(&self.parent_id, local_id))
);

Expand Down Expand Up @@ -636,9 +639,8 @@ impl<C> bft::Proposer<Block> for Proposer<C>
index: extrinsic.index,
function: extrinsic.function,
};
let uxt = UncheckedExtrinsic::new(extrinsic, signature);

self.transaction_pool.import_unchecked_extrinsic(BlockId::hash(self.parent_hash), uxt)
let uxt: Vec<u8> = Decode::decode(&mut UncheckedExtrinsic::new(extrinsic, signature).encode().as_slice()).expect("Encoded extrinsic is valid");
self.transaction_pool.submit_one(&BlockId::hash(self.parent_hash), uxt)
.expect("locally signed extrinsic is valid; qed");
}
}
Expand Down Expand Up @@ -720,7 +722,7 @@ impl ProposalTiming {
}

/// Future which resolves upon the creation of a proposal.
pub struct CreateProposal<C: PolkadotApi> {
pub struct CreateProposal<C: PolkadotApi + Send + Sync> {
parent_hash: Hash,
parent_number: BlockNumber,
parent_id: BlockId,
Expand All @@ -732,7 +734,7 @@ pub struct CreateProposal<C: PolkadotApi> {
offline: SharedOfflineTracker,
}

impl<C> CreateProposal<C> where C: PolkadotApi {
impl<C> CreateProposal<C> where C: PolkadotApi + Send + Sync {
fn propose_with(&self, candidates: Vec<CandidateReceipt>) -> Result<Block, Error> {
use polkadot_api::BlockBuilder;
use runtime_primitives::traits::{Hash as HashT, BlakeTwo256};
Expand Down Expand Up @@ -767,18 +769,18 @@ impl<C> CreateProposal<C> where C: PolkadotApi {

{
let mut unqueue_invalid = Vec::new();
let result = self.transaction_pool.cull_and_get_pending(BlockId::hash(self.parent_hash), |pending_iterator| {
let result = self.transaction_pool.cull_and_get_pending(&BlockId::hash(self.parent_hash), |pending_iterator| {
let mut pending_size = 0;
for pending in pending_iterator {
if pending_size + pending.encoded_size() >= MAX_TRANSACTIONS_SIZE { break }
if pending_size + pending.verified.encoded_size() >= MAX_TRANSACTIONS_SIZE { break }

match block_builder.push_extrinsic(pending.primitive_extrinsic()) {
match block_builder.push_extrinsic(pending.original.clone()) {
Ok(()) => {
pending_size += pending.encoded_size();
pending_size += pending.verified.encoded_size();
}
Err(e) => {
trace!(target: "transaction-pool", "Invalid transaction: {}", e);
unqueue_invalid.push(pending.hash().clone());
unqueue_invalid.push(pending.verified.hash().clone());
}
}
}
Expand Down Expand Up @@ -819,7 +821,7 @@ impl<C> CreateProposal<C> where C: PolkadotApi {
}
}

impl<C> Future for CreateProposal<C> where C: PolkadotApi {
impl<C> Future for CreateProposal<C> where C: PolkadotApi + Send + Sync {
type Item = Block;
type Error = Error;

Expand Down
1 change: 0 additions & 1 deletion consensus/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ impl Service {
let last_agreement = s.last_agreement();
let can_build_upon = last_agreement
.map_or(true, |x| !x.live || x.parent_hash != hash);

if hash == prev_best && can_build_upon {
debug!("Starting consensus round after a timeout");
start_bft(best_block, s.clone());
Expand Down
2 changes: 1 addition & 1 deletion network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub const DOT_PROTOCOL_ID: ::substrate_network::ProtocolId = *b"dot";
type FullStatus = GenericFullStatus<Block>;

/// Specialization of the network service for the polkadot protocol.
pub type NetworkService = ::substrate_network::Service<Block, PolkadotProtocol>;
pub type NetworkService = ::substrate_network::Service<Block, PolkadotProtocol, Hash>;

/// Status of a Polkadot node.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
Expand Down
1 change: 0 additions & 1 deletion service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,5 @@ substrate-runtime-io = { git = "https://github.com/paritytech/substrate" }
substrate-primitives = { git = "https://github.com/paritytech/substrate" }
substrate-network = { git = "https://github.com/paritytech/substrate" }
substrate-client = { git = "https://github.com/paritytech/substrate" }
substrate-codec = { git = "https://github.com/paritytech/substrate" }
substrate-service = { git = "https://github.com/paritytech/substrate" }
substrate-telemetry = { git = "https://github.com/paritytech/substrate" }
159 changes: 31 additions & 128 deletions service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ extern crate polkadot_transaction_pool as transaction_pool;
extern crate polkadot_network;
extern crate substrate_primitives as primitives;
extern crate substrate_network as network;
extern crate substrate_codec as codec;
extern crate substrate_client as client;
extern crate substrate_service as service;
extern crate tokio;
Expand All @@ -42,14 +41,12 @@ extern crate hex_literal;
pub mod chain_spec;

use std::sync::Arc;
use std::collections::HashMap;

use codec::{Encode, Decode};
use tokio::prelude::{Stream, Future};
use transaction_pool::TransactionPool;
use polkadot_api::{PolkadotApi, light::RemotePolkadotApiWrapper};
use polkadot_primitives::{parachain, AccountId, Block, BlockId, Hash};
use polkadot_runtime::GenesisConfig;
use client::Client;
use client::{Client, BlockchainEvents};
use polkadot_network::{PolkadotProtocol, consensus::ConsensusNetwork};
use tokio::runtime::TaskExecutor;
use service::FactoryFullConfiguration;
Expand All @@ -63,7 +60,7 @@ pub use client::ExecutionStrategy;
pub type ChainSpec = service::ChainSpec<GenesisConfig>;
/// Polkadot client type for specialised `Components`.
pub type ComponentClient<C> = Client<<C as Components>::Backend, <C as Components>::Executor, Block>;
pub type NetworkService = network::Service<Block, <Factory as service::ServiceFactory>::NetworkProtocol>;
pub type NetworkService = network::Service<Block, <Factory as service::ServiceFactory>::NetworkProtocol, Hash>;

/// A collection of type to generalise Polkadot specific components over full / light client.
pub trait Components: service::Components {
Expand Down Expand Up @@ -106,16 +103,11 @@ pub struct Factory;

impl service::ServiceFactory for Factory {
type Block = Block;
type ExtrinsicHash = Hash;
type NetworkProtocol = PolkadotProtocol;
type RuntimeDispatch = polkadot_executor::Executor;
type FullExtrinsicPool = TransactionPoolAdapter<
service::FullBackend<Self>,
service::FullExecutor<Self>,
service::FullClient<Self>
>;
type LightExtrinsicPool = TransactionPoolAdapter<
service::LightBackend<Self>,
service::LightExecutor<Self>,
type FullExtrinsicPoolApi = transaction_pool::ChainApi<service::FullClient<Self>>;
type LightExtrinsicPoolApi = transaction_pool::ChainApi<
RemotePolkadotApiWrapper<service::LightBackend<Self>, service::LightExecutor<Self>>
>;
type Genesis = GenesisConfig;
Expand All @@ -124,25 +116,17 @@ impl service::ServiceFactory for Factory {
const NETWORK_PROTOCOL_ID: network::ProtocolId = ::polkadot_network::DOT_PROTOCOL_ID;

fn build_full_extrinsic_pool(config: ExtrinsicPoolOptions, client: Arc<service::FullClient<Self>>)
-> Result<Self::FullExtrinsicPool, Error>
-> Result<TransactionPool<service::FullClient<Self>>, Error>
{
let api = client.clone();
Ok(TransactionPoolAdapter {
pool: Arc::new(TransactionPool::new(config, api)),
client: client,
imports_external_transactions: true,
})
Ok(TransactionPool::new(config, transaction_pool::ChainApi::new(api)))
}

fn build_light_extrinsic_pool(config: ExtrinsicPoolOptions, client: Arc<service::LightClient<Self>>)
-> Result<Self::LightExtrinsicPool, Error>
-> Result<TransactionPool<RemotePolkadotApiWrapper<service::LightBackend<Self>, service::LightExecutor<Self>>>, Error>
{
let api = Arc::new(RemotePolkadotApiWrapper(client.clone()));
Ok(TransactionPoolAdapter {
pool: Arc::new(TransactionPool::new(config, api)),
client: client,
imports_external_transactions: false,
})
Ok(TransactionPool::new(config, transaction_pool::ChainApi::new(api)))
}

fn build_network_protocol(config: &Configuration)
Expand Down Expand Up @@ -182,8 +166,18 @@ impl <C: Components> Service<C> {
pub fn new_light(config: Configuration, executor: TaskExecutor)
-> Result<Service<LightComponents<Factory>>, Error>
{
let service = service::Service::<LightComponents<Factory>>::new(config, executor)?;
let service = service::Service::<LightComponents<Factory>>::new(config, executor.clone())?;
let api = Arc::new(RemotePolkadotApiWrapper(service.client()));
let pool = service.extrinsic_pool();
let events = service.client().import_notification_stream()
.for_each(move |notification| {
// re-verify all transactions without the sender.
pool.retry_verification(&BlockId::hash(notification.hash), None)
.map_err(|e| warn!("Error re-verifying transactions: {:?}", e))?;
Ok(())
})
.then(|_| Ok(()));
executor.spawn(events);
Ok(Service {
client: service.client(),
network: service.network(),
Expand Down Expand Up @@ -212,7 +206,16 @@ pub fn new_full(config: Configuration, executor: TaskExecutor)

let is_validator = (config.roles & Roles::AUTHORITY) == Roles::AUTHORITY;
let service = service::Service::<FullComponents<Factory>>::new(config, executor.clone())?;

let pool = service.extrinsic_pool();
let events = service.client().import_notification_stream()
.for_each(move |notification| {
// re-verify all transactions without the sender.
pool.retry_verification(&BlockId::hash(notification.hash), None)
.map_err(|e| warn!("Error re-verifying transactions: {:?}", e))?;
Ok(())
})
.then(|_| Ok(()));
executor.spawn(events);
// Spin consensus service if configured
let consensus = if is_validator {
// Load the first available key
Expand Down Expand Up @@ -261,103 +264,3 @@ impl<C: Components> ::std::ops::Deref for Service<C> {
&self.inner
}
}

/// Transaction pool adapter.
pub struct TransactionPoolAdapter<B, E, A> where A: Send + Sync, E: Send + Sync {
imports_external_transactions: bool,
pool: Arc<TransactionPool<A>>,
client: Arc<Client<B, E, Block>>,
}

impl<B, E, A> TransactionPoolAdapter<B, E, A>
where
A: Send + Sync,
B: client::backend::Backend<Block, KeccakHasher, RlpCodec> + Send + Sync,
E: client::CallExecutor<Block, KeccakHasher, RlpCodec> + Send + Sync,
{
fn best_block_id(&self) -> Option<BlockId> {
self.client.info()
.map(|info| BlockId::hash(info.chain.best_hash))
.map_err(|e| {
debug!("Error getting best block: {:?}", e);
})
.ok()
}
}

impl<B, E, A> network::TransactionPool<Block> for TransactionPoolAdapter<B, E, A>
where
B: client::backend::Backend<Block, KeccakHasher, RlpCodec> + Send + Sync,
E: client::CallExecutor<Block, KeccakHasher, RlpCodec> + Send + Sync,
A: polkadot_api::PolkadotApi + Send + Sync,
{
fn transactions(&self) -> Vec<(Hash, Vec<u8>)> {
let best_block_id = match self.best_block_id() {
Some(id) => id,
None => return vec![],
};
self.pool.cull_and_get_pending(best_block_id, |pending| pending
.map(|t| {
let hash = t.hash().clone();
(hash, t.primitive_extrinsic())
})
.collect()
).unwrap_or_else(|e| {
warn!("Error retrieving pending set: {}", e);
vec![]
})
}

fn import(&self, transaction: &Vec<u8>) -> Option<Hash> {
if !self.imports_external_transactions {
return None;
}

let encoded = transaction.encode();
if let Some(uxt) = Decode::decode(&mut &encoded[..]) {
let best_block_id = self.best_block_id()?;
match self.pool.import_unchecked_extrinsic(best_block_id, uxt) {
Ok(xt) => Some(*xt.hash()),
Err(e) => match *e.kind() {
transaction_pool::ErrorKind::AlreadyImported(hash) => Some(hash[..].into()),
_ => {
debug!(target: "txpool", "Error adding transaction to the pool: {:?}", e);
None
},
}
}
} else {
debug!(target: "txpool", "Error decoding transaction");
None
}
}

fn on_broadcasted(&self, propagations: HashMap<Hash, Vec<String>>) {
self.pool.on_broadcasted(propagations)
}
}

impl<B, E, A> service::ExtrinsicPool<Block> for TransactionPoolAdapter<B, E, A>
where
B: client::backend::Backend<Block, KeccakHasher, RlpCodec> + Send + Sync + 'static,
E: client::CallExecutor<Block, KeccakHasher, RlpCodec> + Send + Sync + 'static,
A: polkadot_api::PolkadotApi + Send + Sync + 'static,
{
type Api = TransactionPool<A>;

fn prune_imported(&self, hash: &Hash) {
let block = BlockId::hash(*hash);
if let Err(e) = self.pool.cull(block) {
warn!("Culling error: {:?}", e);
}

if let Err(e) = self.pool.retry_verification(block) {
warn!("Re-verifying error: {:?}", e);
}
}

fn api(&self) -> Arc<Self::Api> {
self.pool.clone()
}
}

12 changes: 6 additions & 6 deletions transaction-pool/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use extrinsic_pool::{self, txpool};
use extrinsic_pool;
use polkadot_api;
use primitives::Hash;
use runtime::{Address, UncheckedExtrinsic};

error_chain! {
links {
Pool(txpool::Error, txpool::ErrorKind);
Pool(extrinsic_pool::Error, extrinsic_pool::ErrorKind);
Api(polkadot_api::Error, polkadot_api::ErrorKind);
}
errors {
Expand All @@ -33,7 +33,7 @@ error_chain! {
/// Attempted to queue an inherent transaction.
IsInherent(xt: UncheckedExtrinsic) {
description("Inherent transactions cannot be queued."),
display("Inehrent transactions cannot be queued."),
display("Inherent transactions cannot be queued."),
}
/// Attempted to queue a transaction with bad signature.
BadSignature(e: &'static str) {
Expand Down Expand Up @@ -63,10 +63,10 @@ error_chain! {
}
}

impl extrinsic_pool::api::Error for Error {
fn into_pool_error(self) -> ::std::result::Result<txpool::Error, Self> {
impl extrinsic_pool::IntoPoolError for Error {
fn into_pool_error(self) -> ::std::result::Result<extrinsic_pool::Error, Self> {
match self {
Error(ErrorKind::Pool(e), c) => Ok(txpool::Error(e, c)),
Error(ErrorKind::Pool(e), c) => Ok(extrinsic_pool::Error(e, c)),
e => Err(e),
}
}
Expand Down
Loading