Skip to content

Commit

Permalink
Quic update identity (solana-labs#33865)
Browse files Browse the repository at this point in the history
Update the Quic transport layer keypair and identity when the Validator's identity keypair is updated
  • Loading branch information
ryleung-solana authored Dec 8, 2023
1 parent 8c6239c commit 132c910
Show file tree
Hide file tree
Showing 11 changed files with 198 additions and 54 deletions.
23 changes: 20 additions & 3 deletions client/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use {
},
},
solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
solana_sdk::{pubkey::Pubkey, signature::Keypair, transport::Result as TransportResult},
solana_sdk::{
pubkey::Pubkey, quic::NotifyKeyUpdate, signature::Keypair,
transport::Result as TransportResult,
},
solana_streamer::streamer::StakedNodes,
solana_udp_client::{UdpConfig, UdpConnectionManager, UdpPool},
std::{
Expand Down Expand Up @@ -43,6 +46,15 @@ pub enum NonblockingClientConnection {
Udp(Arc<<UdpBaseClientConnection as BaseClientConnection>::NonblockingClientConnection>),
}

impl NotifyKeyUpdate for ConnectionCache {
fn update_key(&self, key: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
match self {
Self::Udp(_) => Ok(()),
Self::Quic(backend) => backend.update_key(key),
}
}
}

impl ConnectionCache {
pub fn new(name: &'static str) -> Self {
if DEFAULT_CONNECTION_CACHE_USE_QUIC {
Expand Down Expand Up @@ -217,7 +229,8 @@ mod tests {
crossbeam_channel::unbounded,
solana_sdk::{net::DEFAULT_TPU_COALESCE, signature::Keypair},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, streamer::StakedNodes,
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, quic::SpawnServerResult,
streamer::StakedNodes,
},
std::{
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
Expand Down Expand Up @@ -245,7 +258,11 @@ mod tests {

let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));

let (response_recv_endpoint, response_recv_thread) = solana_streamer::quic::spawn_server(
let SpawnServerResult {
endpoint: response_recv_endpoint,
thread: response_recv_thread,
key_updater: _,
} = solana_streamer::quic::spawn_server(
"quic_streamer_test",
response_recv_socket,
&keypair2,
Expand Down
12 changes: 11 additions & 1 deletion connection-cache/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use {
log::*,
rand::{thread_rng, Rng},
solana_measure::measure::Measure,
solana_sdk::timing::AtomicInterval,
solana_sdk::{signature::Keypair, timing::AtomicInterval},
std::{
net::SocketAddr,
sync::{atomic::Ordering, Arc, RwLock},
Expand Down Expand Up @@ -38,6 +38,7 @@ pub trait ConnectionManager: Send + Sync + 'static {

fn new_connection_pool(&self) -> Self::ConnectionPool;
fn new_connection_config(&self) -> Self::NewConnectionConfig;
fn update_key(&self, _key: &Keypair) -> Result<(), Box<dyn std::error::Error>>;
}

pub struct ConnectionCache<
Expand Down Expand Up @@ -137,6 +138,11 @@ where
.unwrap()
}

pub fn update_key(&self, key: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
let mut map = self.map.write().unwrap();
map.clear();
self.connection_manager.update_key(key)
}
/// Create a lazy connection object under the exclusive lock of the cache map if there is not
/// enough used connections in the connection pool for the specified address.
/// Returns CreateConnectionResult.
Expand Down Expand Up @@ -636,6 +642,10 @@ mod tests {
fn new_connection_config(&self) -> Self::NewConnectionConfig {
MockUdpConfig::new().unwrap()
}

fn update_key(&self, _key: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
}

impl BlockingClientConnection for MockUdpConnection {
Expand Down
3 changes: 2 additions & 1 deletion core/src/admin_rpc_post_init.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
solana_gossip::cluster_info::ClusterInfo,
solana_runtime::bank_forks::BankForks,
solana_sdk::pubkey::Pubkey,
solana_sdk::{pubkey::Pubkey, quic::NotifyKeyUpdate},
std::{
collections::HashSet,
sync::{Arc, RwLock},
Expand All @@ -14,4 +14,5 @@ pub struct AdminRpcRequestMetadataPostInit {
pub bank_forks: Arc<RwLock<BankForks>>,
pub vote_account: Pubkey,
pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>,
pub notifies: Vec<Arc<dyn NotifyKeyUpdate + Sync + Send>>,
}
47 changes: 29 additions & 18 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ use {
rpc_subscriptions::RpcSubscriptions,
},
solana_runtime::{bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache},
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair},
solana_sdk::{clock::Slot, pubkey::Pubkey, quic::NotifyKeyUpdate, signature::Keypair},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
quic::{spawn_server, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
quic::{spawn_server, SpawnServerResult, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
streamer::StakedNodes,
},
solana_turbine::broadcast_stage::{BroadcastStage, BroadcastStageType},
Expand Down Expand Up @@ -111,7 +111,7 @@ impl Tpu {
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
block_production_method: BlockProductionMethod,
_generator_config: Option<GeneratorConfig>, /* vestigial code for replay invalidator */
) -> Self {
) -> (Self, Vec<Arc<dyn NotifyKeyUpdate + Sync + Send>>) {
let TpuSockets {
transactions: transactions_sockets,
transaction_forwards: tpu_forwards_sockets,
Expand Down Expand Up @@ -148,7 +148,11 @@ impl Tpu {

let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote();

let (_, tpu_quic_t) = spawn_server(
let SpawnServerResult {
endpoint: _,
thread: tpu_quic_t,
key_updater,
} = spawn_server(
"quic_streamer_tpu",
transactions_quic_sockets,
keypair,
Expand All @@ -168,7 +172,11 @@ impl Tpu {
)
.unwrap();

let (_, tpu_forwards_quic_t) = spawn_server(
let SpawnServerResult {
endpoint: _,
thread: tpu_forwards_quic_t,
key_updater: forwards_key_updater,
} = spawn_server(
"quic_streamer_tpu_forwards",
transactions_forwards_quic_sockets,
keypair,
Expand Down Expand Up @@ -259,19 +267,22 @@ impl Tpu {
turbine_quic_endpoint_sender,
);

Self {
fetch_stage,
sigverify_stage,
vote_sigverify_stage,
banking_stage,
cluster_info_vote_listener,
broadcast_stage,
tpu_quic_t,
tpu_forwards_quic_t,
tpu_entry_notifier,
staked_nodes_updater_service,
tracer_thread_hdl,
}
(
Self {
fetch_stage,
sigverify_stage,
vote_sigverify_stage,
banking_stage,
cluster_info_vote_listener,
broadcast_stage,
tpu_quic_t,
tpu_forwards_quic_t,
tpu_entry_notifier,
staked_nodes_updater_service,
tracer_thread_hdl,
},
vec![key_updater, forwards_key_updater],
)
}

pub fn join(self) -> thread::Result<()> {
Expand Down
19 changes: 11 additions & 8 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1080,13 +1080,6 @@ impl Validator {
exit.clone(),
);

*admin_rpc_service_post_init.write().unwrap() = Some(AdminRpcRequestMetadataPostInit {
bank_forks: bank_forks.clone(),
cluster_info: cluster_info.clone(),
vote_account: *vote_account,
repair_whitelist: config.repair_whitelist.clone(),
});

let waited_for_supermajority = wait_for_supermajority(
config,
Some(&mut process_blockstore),
Expand Down Expand Up @@ -1295,7 +1288,7 @@ impl Validator {
};
}

let tpu = Tpu::new(
let (tpu, mut key_notifies) = Tpu::new(
&cluster_info,
&poh_recorder,
entry_receiver,
Expand Down Expand Up @@ -1346,6 +1339,16 @@ impl Validator {
);

*start_progress.write().unwrap() = ValidatorStartProgress::Running;
key_notifies.push(connection_cache);

*admin_rpc_service_post_init.write().unwrap() = Some(AdminRpcRequestMetadataPostInit {
bank_forks: bank_forks.clone(),
cluster_info: cluster_info.clone(),
vote_account: *vote_account,
repair_whitelist: config.repair_whitelist.clone(),
notifies: key_notifies,
});

Ok(Self {
stats_reporter_service,
gossip_service,
Expand Down
56 changes: 45 additions & 11 deletions quic-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,39 +84,52 @@ impl ConnectionPool for QuicPool {
}
}

#[derive(Clone)]
pub struct QuicConfig {
client_certificate: Arc<QuicClientCertificate>,
// Arc to prevent having to copy the struct
client_certificate: RwLock<Arc<QuicClientCertificate>>,
maybe_staked_nodes: Option<Arc<RwLock<StakedNodes>>>,
maybe_client_pubkey: Option<Pubkey>,

// The optional specified endpoint for the quic based client connections
// If not specified, the connection cache will create as needed.
client_endpoint: Option<Endpoint>,
addr: IpAddr,
}

impl Clone for QuicConfig {
fn clone(&self) -> Self {
let cert_guard = self.client_certificate.read().unwrap();
QuicConfig {
client_certificate: RwLock::new(cert_guard.clone()),
maybe_staked_nodes: self.maybe_staked_nodes.clone(),
maybe_client_pubkey: self.maybe_client_pubkey,
client_endpoint: self.client_endpoint.clone(),
addr: self.addr,
}
}
}

impl NewConnectionConfig for QuicConfig {
fn new() -> Result<Self, ClientError> {
let (cert, priv_key) =
new_self_signed_tls_certificate(&Keypair::new(), IpAddr::V4(Ipv4Addr::UNSPECIFIED))?;
let addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
let (cert, priv_key) = new_self_signed_tls_certificate(&Keypair::new(), addr)?;
Ok(Self {
client_certificate: Arc::new(QuicClientCertificate {
client_certificate: RwLock::new(Arc::new(QuicClientCertificate {
certificate: cert,
key: priv_key,
}),
})),
maybe_staked_nodes: None,
maybe_client_pubkey: None,
client_endpoint: None,
addr,
})
}
}

impl QuicConfig {
fn create_endpoint(&self) -> QuicLazyInitializedEndpoint {
QuicLazyInitializedEndpoint::new(
self.client_certificate.clone(),
self.client_endpoint.as_ref().cloned(),
)
let cert_guard = self.client_certificate.read().unwrap();
QuicLazyInitializedEndpoint::new(cert_guard.clone(), self.client_endpoint.as_ref().cloned())
}

fn compute_max_parallel_streams(&self) -> usize {
Expand All @@ -143,7 +156,23 @@ impl QuicConfig {
ipaddr: IpAddr,
) -> Result<(), RcgenError> {
let (cert, priv_key) = new_self_signed_tls_certificate(keypair, ipaddr)?;
self.client_certificate = Arc::new(QuicClientCertificate {
self.addr = ipaddr;

let mut cert_guard = self.client_certificate.write().unwrap();

*cert_guard = Arc::new(QuicClientCertificate {
certificate: cert,
key: priv_key,
});
Ok(())
}

pub fn update_keypair(&self, keypair: &Keypair) -> Result<(), RcgenError> {
let (cert, priv_key) = new_self_signed_tls_certificate(keypair, self.addr)?;

let mut cert_guard = self.client_certificate.write().unwrap();

*cert_guard = Arc::new(QuicClientCertificate {
certificate: cert,
key: priv_key,
});
Expand Down Expand Up @@ -212,6 +241,11 @@ impl ConnectionManager for QuicConnectionManager {
fn new_connection_config(&self) -> QuicConfig {
self.connection_config.clone()
}

fn update_key(&self, key: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
self.connection_config.update_keypair(key)?;
Ok(())
}
}

impl QuicConnectionManager {
Expand Down
22 changes: 17 additions & 5 deletions quic-client/tests/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ mod tests {
},
solana_sdk::{net::DEFAULT_TPU_COALESCE, packet::PACKET_DATA_SIZE, signature::Keypair},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, streamer::StakedNodes,
tls_certificates::new_self_signed_tls_certificate,
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, quic::SpawnServerResult,
streamer::StakedNodes, tls_certificates::new_self_signed_tls_certificate,
},
std::{
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
Expand Down Expand Up @@ -68,7 +68,11 @@ mod tests {
let (sender, receiver) = unbounded();
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let (s, exit, keypair, ip) = server_args();
let (_, t) = solana_streamer::quic::spawn_server(
let SpawnServerResult {
endpoint: _,
thread: t,
key_updater: _,
} = solana_streamer::quic::spawn_server(
"quic_streamer_test",
s.try_clone().unwrap(),
&keypair,
Expand Down Expand Up @@ -204,7 +208,11 @@ mod tests {
let (sender, receiver) = unbounded();
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
let (request_recv_socket, request_recv_exit, keypair, request_recv_ip) = server_args();
let (request_recv_endpoint, request_recv_thread) = solana_streamer::quic::spawn_server(
let SpawnServerResult {
endpoint: request_recv_endpoint,
thread: request_recv_thread,
key_updater: _,
} = solana_streamer::quic::spawn_server(
"quic_streamer_test",
request_recv_socket.try_clone().unwrap(),
&keypair,
Expand All @@ -228,7 +236,11 @@ mod tests {
let addr = response_recv_socket.local_addr().unwrap().ip();
let port = response_recv_socket.local_addr().unwrap().port();
let server_addr = SocketAddr::new(addr, port);
let (response_recv_endpoint, response_recv_thread) = solana_streamer::quic::spawn_server(
let SpawnServerResult {
endpoint: response_recv_endpoint,
thread: response_recv_thread,
key_updater: _,
} = solana_streamer::quic::spawn_server(
"quic_streamer_test",
response_recv_socket,
&keypair2,
Expand Down
7 changes: 6 additions & 1 deletion sdk/src/quic.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![cfg(feature = "full")]
//! Definitions related to Solana over QUIC.
use std::time::Duration;
use {crate::signer::keypair::Keypair, std::time::Duration};

pub const QUIC_PORT_OFFSET: u16 = 6;
// Empirically found max number of concurrent streams
Expand Down Expand Up @@ -35,3 +36,7 @@ pub const QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO: u64 = 128;
/// The receive window for QUIC connection from maximum staked nodes is
/// set to this ratio times [`solana_sdk::packet::PACKET_DATA_SIZE`]
pub const QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO: u64 = 512;

pub trait NotifyKeyUpdate {
fn update_key(&self, key: &Keypair) -> Result<(), Box<dyn std::error::Error>>;
}
Loading

0 comments on commit 132c910

Please sign in to comment.