Skip to content

Commit

Permalink
Avoid unbounded RwLock guards in PeerManager (#1139)
Browse files Browse the repository at this point in the history
* avoid unbounded `RwLock` guards

* restrict `get_map` function argument to `FnOnce`
  • Loading branch information
pvdrz committed Feb 21, 2022
1 parent fff487c commit 8ac4b9b
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 98 deletions.
12 changes: 5 additions & 7 deletions bee-api/bee-rest-api/src/endpoints/routes/api/v1/add_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,15 @@ pub(crate) fn add_peer(
}
};

match peer_manager.get(&peer_id) {
Some(peer_entry) => {
peer_manager
.get_map(&peer_id, |peer_entry| {
let peer_dto = PeerDto::from(peer_entry.0.as_ref());
Ok(warp::reply::with_status(
warp::reply::json(&SuccessBody::new(AddPeerResponse(peer_dto))),
StatusCode::OK,
))
}

None => {
})
.unwrap_or_else(|| {
let alias = if alias_v.is_null() {
None
} else {
Expand Down Expand Up @@ -125,6 +124,5 @@ pub(crate) fn add_peer(
}))),
StatusCode::OK,
))
}
}
})
}
13 changes: 7 additions & 6 deletions bee-api/bee-rest-api/src/endpoints/routes/api/v1/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ pub(crate) fn filter(
}

pub(crate) fn peer(peer_id: PeerId, peer_manager: ResourceHandle<PeerManager>) -> Result<impl Reply, Rejection> {
match peer_manager.get(&peer_id) {
Some(peer_entry) => Ok(warp::reply::json(&SuccessBody::new(PeerResponse(PeerDto::from(
peer_entry.0.as_ref(),
))))),
None => Err(reject::custom(CustomRejection::NotFound("peer not found".to_string()))),
}
peer_manager
.get_map(&peer_id, |peer_entry| {
Ok(warp::reply::json(&SuccessBody::new(PeerResponse(PeerDto::from(
peer_entry.0.as_ref(),
)))))
})
.unwrap_or_else(|| Err(reject::custom(CustomRejection::NotFound("peer not found".to_string()))))
}
8 changes: 5 additions & 3 deletions bee-protocol/src/workers/message/hasher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,11 @@ where

metrics.known_messages_inc();
if let Some(peer_id) = from {
if let Some(ref peer) = peer_manager.get(&peer_id) {
peer.0.metrics().known_messages_inc();
}
peer_manager
.get_map(&peer_id, |peer| {
peer.0.metrics().known_messages_inc();
})
.unwrap_or_default();
}
continue;
}
Expand Down
6 changes: 4 additions & 2 deletions bee-protocol/src/workers/message/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,10 @@ where
metrics.known_messages_inc();
if let Some(ref peer_id) = from {
peer_manager
.get(peer_id)
.map(|peer| (*peer).0.metrics().known_messages_inc());
.get_map(peer_id, |peer| {
(*peer).0.metrics().known_messages_inc();
})
.unwrap_or_default();
}
continue;
};
Expand Down
69 changes: 40 additions & 29 deletions bee-protocol/src/workers/peer/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,31 +130,42 @@ where
gossip_in: receiver,
gossip_out: sender,
} => {
// TODO write a get_mut peer manager method
if let Some(mut peer) = peer_manager.get_mut(&peer_id) {
let (shutdown_tx, shutdown_rx) = oneshot::channel();

peer.0.set_connected(true);
peer.1 = Some((sender, shutdown_tx));

tokio::spawn(
PeerWorker::new(
peer.0.clone(),
metrics.clone(),
hasher.clone(),
message_responder.clone(),
milestone_responder.clone(),
milestone_requester.clone(),
)
.run(
tangle.clone(),
requested_milestones.clone(),
receiver,
shutdown_rx,
),
);

info!("Connected peer {}.", peer.0.alias());
{
let metrics = metrics.clone();
let hasher = hasher.clone();
let message_responder = message_responder.clone();
let milestone_responder = milestone_responder.clone();
let milestone_requester = milestone_requester.clone();
let tangle = tangle.clone();
let requested_milestones = requested_milestones.clone();

peer_manager
.get_mut_map(&peer_id, move |peer| {
let (shutdown_tx, shutdown_rx) = oneshot::channel();

peer.0.set_connected(true);
peer.1 = Some((sender, shutdown_tx));

tokio::spawn(
PeerWorker::new(
peer.0.clone(),
metrics,
hasher,
message_responder,
milestone_responder,
milestone_requester,
)
.run(
tangle,
requested_milestones,
receiver,
shutdown_rx,
),
);

info!("Connected peer {}.", peer.0.alias());
})
.unwrap_or_default();
}

// TODO can't do it in the if because of deadlock, but it's not really right to do it here.
Expand All @@ -165,17 +176,17 @@ where
&*metrics,
);
}
NetworkEvent::PeerDisconnected { peer_id } => {
if let Some(mut peer) = peer_manager.get_mut(&peer_id) {
NetworkEvent::PeerDisconnected { peer_id } => peer_manager
.get_mut_map(&peer_id, |peer| {
peer.0.set_connected(false);
if let Some((_, shutdown)) = peer.1.take() {
if let Err(e) = shutdown.send(()) {
warn!("Sending shutdown to {} failed: {:?}.", peer.0.alias(), e);
}
}
info!("Disconnected peer {}.", peer.0.alias());
}
}
})
.unwrap_or_default(),
_ => (), // Ignore all other events for now
}
}
Expand Down
17 changes: 11 additions & 6 deletions bee-protocol/src/workers/peer/manager_res.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use bee_runtime::{node::Node, worker::Worker};
use async_trait::async_trait;
use futures::channel::oneshot;
use log::debug;
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use parking_lot::RwLock;

use std::{
convert::Infallible,
Expand Down Expand Up @@ -101,13 +101,18 @@ impl PeerManager {
self.inner.read().peers.is_empty()
}

// TODO find a way to only return a ref to the peer.
pub fn get(&self, id: &PeerId) -> Option<impl std::ops::Deref<Target = PeerTuple> + '_> {
RwLockReadGuard::try_map(self.inner.read(), |map| map.get(id)).ok()
pub fn get_map<T>(&self, id: &PeerId, f: impl FnOnce(&PeerTuple) -> T) -> Option<T> {
let guard = self.inner.read();
let output = guard.get(id).map(f);
drop(guard);
output
}

pub fn get_mut(&self, id: &PeerId) -> Option<impl std::ops::DerefMut<Target = PeerTuple> + '_> {
RwLockWriteGuard::try_map(self.inner.write(), |map| map.get_mut(id)).ok()
pub fn get_mut_map<T>(&self, id: &PeerId, f: impl FnOnce(&mut PeerTuple) -> T) -> Option<T> {
let mut guard = self.inner.write();
let output = guard.get_mut(id).map(f);
drop(guard);
output
}

pub fn get_all(&self) -> Vec<Arc<Peer>> {
Expand Down
98 changes: 53 additions & 45 deletions bee-protocol/src/workers/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,73 +26,81 @@ impl Sender<MilestoneRequestPacket> {
peer_manager: &PeerManager,
metrics: &NodeMetrics,
) {
if let Some(ref peer) = peer_manager.get(id) {
if let Some(ref sender) = peer.1 {
match sender.0.send(tlv_to_bytes(packet)) {
Ok(_) => {
peer.0.metrics().milestone_requests_sent_inc();
metrics.milestone_requests_sent_inc();
}
Err(e) => {
warn!("Sending MilestoneRequestPacket to {} failed: {:?}.", id, e);
peer_manager
.get_map(id, |peer| {
if let Some(ref sender) = peer.1 {
match sender.0.send(tlv_to_bytes(packet)) {
Ok(_) => {
peer.0.metrics().milestone_requests_sent_inc();
metrics.milestone_requests_sent_inc();
}
Err(e) => {
warn!("Sending MilestoneRequestPacket to {} failed: {:?}.", id, e);
}
}
}
}
}
})
.unwrap_or_default()
}
}

impl Sender<MessagePacket> {
pub(crate) fn send(packet: &MessagePacket, id: &PeerId, peer_manager: &PeerManager, metrics: &NodeMetrics) {
if let Some(ref peer) = peer_manager.get(id) {
if let Some(ref sender) = peer.1 {
match sender.0.send(tlv_to_bytes(packet)) {
Ok(_) => {
peer.0.metrics().messages_sent_inc();
metrics.messages_sent_inc();
}
Err(e) => {
warn!("Sending MessagePacket to {} failed: {:?}.", id, e);
peer_manager
.get_map(id, |peer| {
if let Some(ref sender) = peer.1 {
match sender.0.send(tlv_to_bytes(packet)) {
Ok(_) => {
peer.0.metrics().messages_sent_inc();
metrics.messages_sent_inc();
}
Err(e) => {
warn!("Sending MessagePacket to {} failed: {:?}.", id, e);
}
}
}
}
}
})
.unwrap_or_default()
}
}

impl Sender<MessageRequestPacket> {
pub(crate) fn send(packet: &MessageRequestPacket, id: &PeerId, peer_manager: &PeerManager, metrics: &NodeMetrics) {
if let Some(ref peer) = peer_manager.get(id) {
if let Some(ref sender) = peer.1 {
match sender.0.send(tlv_to_bytes(packet)) {
Ok(_) => {
peer.0.metrics().message_requests_sent_inc();
metrics.message_requests_sent_inc();
}
Err(e) => {
warn!("Sending MessageRequestPacket to {} failed: {:?}.", id, e);
peer_manager
.get_map(id, |peer| {
if let Some(ref sender) = peer.1 {
match sender.0.send(tlv_to_bytes(packet)) {
Ok(_) => {
peer.0.metrics().message_requests_sent_inc();
metrics.message_requests_sent_inc();
}
Err(e) => {
warn!("Sending MessageRequestPacket to {} failed: {:?}.", id, e);
}
}
}
}
}
})
.unwrap_or_default()
}
}

impl Sender<HeartbeatPacket> {
pub(crate) fn send(packet: &HeartbeatPacket, id: &PeerId, peer_manager: &PeerManager, metrics: &NodeMetrics) {
if let Some(ref peer) = peer_manager.get(id) {
if let Some(ref sender) = peer.1 {
match sender.0.send(tlv_to_bytes(packet)) {
Ok(_) => {
peer.0.metrics().heartbeats_sent_inc();
peer.0.set_heartbeat_sent_timestamp();
metrics.heartbeats_sent_inc();
}
Err(e) => {
warn!("Sending HeartbeatPacket to {} failed: {:?}.", id, e);
peer_manager
.get_map(id, |peer| {
if let Some(ref sender) = peer.1 {
match sender.0.send(tlv_to_bytes(packet)) {
Ok(_) => {
peer.0.metrics().heartbeats_sent_inc();
peer.0.set_heartbeat_sent_timestamp();
metrics.heartbeats_sent_inc();
}
Err(e) => {
warn!("Sending HeartbeatPacket to {} failed: {:?}.", id, e);
}
}
}
}
}
})
.unwrap_or_default();
}
}

0 comments on commit 8ac4b9b

Please sign in to comment.