Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2xTx: Refactor to separate Transaction packets from MTU packets #29055

Closed
wants to merge 11 commits into from
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 20 additions & 17 deletions banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use {
compute_budget::ComputeBudgetInstruction,
hash::Hash,
message::Message,
packet::TransactionPacket,
pubkey::{self, Pubkey},
signature::{Keypair, Signature, Signer},
system_instruction, system_transaction,
Expand Down Expand Up @@ -178,13 +179,13 @@ fn make_transfer_transaction_with_compute_unit_price(
Transaction::new(&[from_keypair], message, recent_blockhash)
}

struct PacketsPerIteration {
packet_batches: Vec<PacketBatch>,
struct PacketsPerIteration<const N: usize> {
packet_batches: Vec<PacketBatch<N>>,
transactions: Vec<Transaction>,
packets_per_batch: usize,
}

impl PacketsPerIteration {
impl<const N: usize> PacketsPerIteration<N> {
fn new(
packets_per_batch: usize,
batches_per_iteration: usize,
Expand All @@ -203,7 +204,8 @@ impl PacketsPerIteration {
mint_txs_percentage,
);

let packet_batches: Vec<PacketBatch> = to_packet_batches(&transactions, packets_per_batch);
let packet_batches: Vec<PacketBatch<N>> =
to_packet_batches(&transactions, packets_per_batch);
assert_eq!(packet_batches.len(), batches_per_iteration);
Self {
packet_batches,
Expand Down Expand Up @@ -334,18 +336,19 @@ fn main() {
.unwrap()
.set_limits(std::u64::MAX, std::u64::MAX, std::u64::MAX);

let mut all_packets: Vec<PacketsPerIteration> = std::iter::from_fn(|| {
Some(PacketsPerIteration::new(
packets_per_batch,
batches_per_iteration,
genesis_config.hash(),
write_lock_contention,
matches.is_present("simulate_mint"),
mint_txs_percentage,
))
})
.take(num_chunks)
.collect();
let mut all_packets: Vec<PacketsPerIteration<{ TransactionPacket::DATA_SIZE }>> =
std::iter::from_fn(|| {
Some(PacketsPerIteration::new(
packets_per_batch,
batches_per_iteration,
genesis_config.hash(),
write_lock_contention,
matches.is_present("simulate_mint"),
mint_txs_percentage,
))
})
.take(num_chunks)
.collect();

let total_num_transactions: u64 = all_packets
.iter()
Expand Down Expand Up @@ -457,7 +460,7 @@ fn main() {
{
sent += packet_batch.len();
trace!(
"Sending PacketBatch index {}, {}",
"Sending Packet Batch index {}, {}",
packet_batch_index,
timestamp(),
);
Expand Down
1 change: 1 addition & 0 deletions bench-streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ publish = false
clap = { version = "3.1.5", features = ["cargo"] }
crossbeam-channel = "0.5"
solana-net-utils = { path = "../net-utils", version = "=1.15.0" }
solana-sdk = { path = "../sdk", version = "=1.15.0" }
solana-streamer = { path = "../streamer", version = "=1.15.0" }
solana-version = { path = "../version", version = "=1.15.0" }

Expand Down
21 changes: 13 additions & 8 deletions bench-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
use {
clap::{crate_description, crate_name, Arg, Command},
crossbeam_channel::unbounded,
solana_sdk::packet::Packet,
solana_streamer::{
packet::{Packet, PacketBatch, PacketBatchRecycler, PACKET_DATA_SIZE},
packet::{BatchRecycler, PacketBatch},
streamer::{receiver, PacketBatchReceiver, StreamerReceiveStats},
},
std::{
Expand All @@ -22,11 +23,11 @@ use {
fn producer(addr: &SocketAddr, exit: Arc<AtomicBool>) -> JoinHandle<()> {
let send = UdpSocket::bind("0.0.0.0:0").unwrap();
let batch_size = 10;
let mut packet_batch = PacketBatch::with_capacity(batch_size);
let mut packet_batch = PacketBatch::<{ Packet::DATA_SIZE }>::with_capacity(batch_size);
packet_batch.resize(batch_size, Packet::default());
for w in packet_batch.iter_mut() {
w.meta_mut().size = PACKET_DATA_SIZE;
w.meta_mut().set_socket_addr(addr);
w.meta.size = Packet::DATA_SIZE;
w.meta.set_socket_addr(addr);
}
let packet_batch = Arc::new(packet_batch);
spawn(move || loop {
Expand All @@ -35,8 +36,8 @@ fn producer(addr: &SocketAddr, exit: Arc<AtomicBool>) -> JoinHandle<()> {
}
let mut num = 0;
for p in packet_batch.iter() {
let a = p.meta().socket_addr();
assert!(p.meta().size <= PACKET_DATA_SIZE);
let a = p.meta.socket_addr();
assert!(p.meta.size <= Packet::DATA_SIZE);
let data = p.data(..).unwrap_or_default();
send.send_to(data, a).unwrap();
num += 1;
Expand All @@ -45,7 +46,11 @@ fn producer(addr: &SocketAddr, exit: Arc<AtomicBool>) -> JoinHandle<()> {
})
}

fn sink(exit: Arc<AtomicBool>, rvs: Arc<AtomicUsize>, r: PacketBatchReceiver) -> JoinHandle<()> {
fn sink(
exit: Arc<AtomicBool>,
rvs: Arc<AtomicUsize>,
r: PacketBatchReceiver<{ Packet::DATA_SIZE }>,
) -> JoinHandle<()> {
spawn(move || loop {
if exit.load(Ordering::Relaxed) {
return;
Expand Down Expand Up @@ -93,7 +98,7 @@ fn main() -> Result<()> {

let mut read_channels = Vec::new();
let mut read_threads = Vec::new();
let recycler = PacketBatchRecycler::default();
let recycler = BatchRecycler::<Packet>::default();
let (_port, read_sockets) = solana_net_utils::multi_bind_in_range(
ip_addr,
(port, port + num_sockets as u16),
Expand Down
4 changes: 2 additions & 2 deletions cli/src/program.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use {
loader_instruction,
message::Message,
native_token::Sol,
packet::PACKET_DATA_SIZE,
packet::Packet,
pubkey::Pubkey,
signature::{keypair_from_seed, read_keypair_file, Keypair, Signature, Signer},
system_instruction::{self, SystemError},
Expand Down Expand Up @@ -1728,7 +1728,7 @@ where
})
.unwrap() as usize;
// add 1 byte buffer to account for shortvec encoding
PACKET_DATA_SIZE.saturating_sub(tx_size).saturating_sub(1)
Packet::DATA_SIZE.saturating_sub(tx_size).saturating_sub(1)
}

#[allow(clippy::too_many_arguments)]
Expand Down
30 changes: 16 additions & 14 deletions client/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ mod tests {
rand::{Rng, SeedableRng},
rand_chacha::ChaChaRng,
solana_sdk::{
packet::TransactionPacket,
pubkey::Pubkey,
quic::{
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS,
Expand Down Expand Up @@ -657,20 +658,21 @@ mod tests {

let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));

let (response_recv_endpoint, response_recv_thread) = solana_streamer::quic::spawn_server(
response_recv_socket,
&keypair2,
response_recv_ip,
sender2,
response_recv_exit.clone(),
1,
staked_nodes,
10,
10,
response_recv_stats,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS,
)
.unwrap();
let (response_recv_endpoint, response_recv_thread) =
solana_streamer::quic::spawn_server::<{ TransactionPacket::DATA_SIZE }>(
response_recv_socket,
&keypair2,
response_recv_ip,
sender2,
response_recv_exit.clone(),
1,
staked_nodes,
10,
10,
response_recv_stats,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT_MS,
)
.unwrap();

let connection_cache = ConnectionCache::new_with_endpoint(1, response_recv_endpoint);

Expand Down
13 changes: 9 additions & 4 deletions core/benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use {
genesis_config::GenesisConfig,
hash::Hash,
message::Message,
packet::{Packet, TransactionPacket},
pubkey,
signature::{Keypair, Signature, Signer},
system_instruction, system_transaction,
Expand Down Expand Up @@ -83,7 +84,9 @@ fn bench_consume_buffered(bencher: &mut Bencher) {

let tx = test_tx();
let transactions = vec![tx; 4194304];
let batches = transactions_to_deserialized_packets(&transactions).unwrap();
let batches =
transactions_to_deserialized_packets::<{ TransactionPacket::DATA_SIZE }>(&transactions)
.unwrap();
let batches_len = batches.len();
let mut transaction_buffer = UnprocessedTransactionStorage::new_transaction_storage(
UnprocessedPacketBatches::from_iter(batches.into_iter(), 2 * batches_len),
Expand Down Expand Up @@ -251,12 +254,14 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
assert!(r.is_ok(), "sanity parallel execution");
}
bank.clear_signatures();
let verified: Vec<_> = to_packet_batches(&transactions, PACKETS_PER_BATCH);
let verified: Vec<_> =
to_packet_batches::<{ TransactionPacket::DATA_SIZE }, _>(&transactions, PACKETS_PER_BATCH);
let vote_packets = vote_txs.map(|vote_txs| {
let mut packet_batches = to_packet_batches(&vote_txs, PACKETS_PER_BATCH);
let mut packet_batches =
to_packet_batches::<{ Packet::DATA_SIZE }, _>(&vote_txs, PACKETS_PER_BATCH);
for batch in packet_batches.iter_mut() {
for packet in batch.iter_mut() {
packet.meta_mut().set_simple_vote(true);
packet.meta.set_simple_vote(true);
}
}
packet_batches
Expand Down
4 changes: 2 additions & 2 deletions core/benches/shredder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use {
Shred, ShredFlags, Shredder, DATA_SHREDS_PER_FEC_BLOCK, LEGACY_SHRED_DATA_CAPACITY,
},
solana_perf::test_tx,
solana_sdk::{hash::Hash, packet::PACKET_DATA_SIZE, signature::Keypair},
solana_sdk::{hash::Hash, packet::Packet, signature::Keypair},
test::Bencher,
};

Expand All @@ -22,7 +22,7 @@ use {
// size of nonce: 4
// size of common shred header: 83
// size of coding shred header: 6
const VALID_SHRED_DATA_LEN: usize = PACKET_DATA_SIZE - 4 - 83 - 6;
const VALID_SHRED_DATA_LEN: usize = Packet::DATA_SIZE - 4 - 83 - 6;

fn make_test_entry(txs_per_entry: u64) -> Entry {
Entry {
Expand Down
31 changes: 15 additions & 16 deletions core/benches/sigverify_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,15 @@ use {
distributions::{Distribution, Uniform},
thread_rng, Rng,
},
solana_core::{
sigverify::TransactionSigVerifier,
sigverify_stage::{SigVerifier, SigVerifyStage},
},
solana_core::{sigverify::TransactionSigVerifier, sigverify_stage::SigVerifyStage},
solana_measure::measure::Measure,
solana_perf::{
packet::{to_packet_batches, PacketBatch},
test_tx::test_tx,
},
solana_sdk::{
hash::Hash,
packet::Packet,
signature::{Keypair, Signer},
system_transaction,
timing::duration_as_ms,
Expand All @@ -35,7 +33,7 @@ fn run_bench_packet_discard(num_ips: usize, bencher: &mut Bencher) {
let len = 30 * 1000;
let chunk_size = 1024;
let tx = test_tx();
let mut batches = to_packet_batches(&vec![tx; len], chunk_size);
let mut batches = to_packet_batches::<{ Packet::DATA_SIZE }, _>(&vec![tx; len], chunk_size);

let mut total = 0;

Expand All @@ -51,7 +49,7 @@ fn run_bench_packet_discard(num_ips: usize, bencher: &mut Bencher) {
total += batch.len();
for p in batch.iter_mut() {
let ip_index = thread_rng().gen_range(0, ips.len());
p.meta_mut().addr = ips[ip_index];
p.meta.addr = ips[ip_index];
}
}
info!("total packets: {}", total);
Expand All @@ -61,10 +59,10 @@ fn run_bench_packet_discard(num_ips: usize, bencher: &mut Bencher) {
let mut num_packets = 0;
for batch in batches.iter_mut() {
for p in batch.iter_mut() {
if !p.meta().discard() {
if !p.meta.discard() {
num_packets += 1;
}
p.meta_mut().set_discard(false);
p.meta.set_discard(false);
}
}
assert_eq!(num_packets, 10_000);
Expand All @@ -91,12 +89,13 @@ fn bench_packet_discard_mixed_senders(bencher: &mut Bencher) {
std::net::IpAddr::from(addr)
}
let mut rng = thread_rng();
let mut batches = to_packet_batches(&vec![test_tx(); SIZE], CHUNK_SIZE);
let mut batches =
to_packet_batches::<{ Packet::DATA_SIZE }, _>(&vec![test_tx(); SIZE], CHUNK_SIZE);
let spam_addr = new_rand_addr(&mut rng);
for batch in batches.iter_mut() {
for packet in batch.iter_mut() {
// One spam address, ~1000 unique addresses.
packet.meta_mut().addr = if rng.gen_ratio(1, 30) {
packet.meta.addr = if rng.gen_ratio(1, 30) {
new_rand_addr(&mut rng)
} else {
spam_addr
Expand All @@ -108,17 +107,17 @@ fn bench_packet_discard_mixed_senders(bencher: &mut Bencher) {
let mut num_packets = 0;
for batch in batches.iter_mut() {
for packet in batch.iter_mut() {
if !packet.meta().discard() {
if !packet.meta.discard() {
num_packets += 1;
}
packet.meta_mut().set_discard(false);
packet.meta.set_discard(false);
}
}
assert_eq!(num_packets, 10_000);
});
}

fn gen_batches(use_same_tx: bool) -> Vec<PacketBatch> {
fn gen_batches(use_same_tx: bool) -> Vec<PacketBatch<{ Packet::DATA_SIZE }>> {
let len = 4096;
let chunk_size = 1024;
if use_same_tx {
Expand Down Expand Up @@ -186,7 +185,7 @@ fn bench_sigverify_stage(bencher: &mut Bencher) {
stage.join().unwrap();
}

fn prepare_batches(discard_factor: i32) -> (Vec<PacketBatch>, usize) {
fn prepare_batches(discard_factor: i32) -> (Vec<PacketBatch<{ Packet::DATA_SIZE }>>, usize) {
let len = 10_000; // max batch size
let chunk_size = 1024;

Expand All @@ -204,7 +203,7 @@ fn prepare_batches(discard_factor: i32) -> (Vec<PacketBatch>, usize) {
)
})
.collect();
let mut batches = to_packet_batches(&txs, chunk_size);
let mut batches = to_packet_batches::<{ Packet::DATA_SIZE }, _>(&txs, chunk_size);

let mut rng = rand::thread_rng();
let die = Uniform::<i32>::from(1..100);
Expand All @@ -214,7 +213,7 @@ fn prepare_batches(discard_factor: i32) -> (Vec<PacketBatch>, usize) {
batch.iter_mut().for_each(|p| {
let throw = die.sample(&mut rng);
if throw < discard_factor {
p.meta_mut().set_discard(true);
p.meta.set_discard(true);
c += 1;
}
})
Expand Down
Loading