Skip to content

Commit

Permalink
Remove caching mechanism from bee-tangle (#1247)
Browse files Browse the repository at this point in the history
* remove cache

* keep omrsi and ymrsi uniform

* avoid caching milestones

* use `Exist` for contains

* remove unnecesary `async`

* remove `MessageRef`

* remove unnecessary clones

* Removed unused config params

Co-Authored-By: Thoralf-M <46689931+Thoralf-M@users.noreply.github.com>

* Bump bee-tangle

Co-Authored-By: Thoralf-M <46689931+Thoralf-M@users.noreply.github.com>

* Update changelog

Co-Authored-By: Thoralf-M <46689931+Thoralf-M@users.noreply.github.com>

* Update changelog

Co-Authored-By: Thoralf-M <46689931+Thoralf-M@users.noreply.github.com>

* turn insertion errors into warnings

* remove obsolete `TODO`

* test that `insert` overwrites

* fix processor logic

* add `InsertStrict` access operation

* fix data race

* document fix

* fix typo

* document test

* document merge operator

* bump `bee-storage-*` crates versions

* formatting :trollface:

* fix strict insertion test for metadata

Co-authored-by: Thibault Martinez <thibault@iota.org>
Co-authored-by: Thoralf-M <46689931+Thoralf-M@users.noreply.github.com>
  • Loading branch information
3 people committed Mar 18, 2022
1 parent fa853f6 commit 58b3df3
Show file tree
Hide file tree
Showing 86 changed files with 610 additions and 1,104 deletions.
35 changes: 24 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions bee-api/bee-rest-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ bee-message = { version = "0.1.6", path = "../../bee-message", default-features
bee-pow = { version = "0.2.0", path = "../../bee-pow", default-features = false }
bee-protocol = { version = "0.2.2", path = "../../bee-protocol", default-features = false, optional = true }
bee-runtime = { version = "0.1.1-alpha", path = "../../bee-runtime", default-features = false, optional = true }
bee-storage = { version = "0.10.0", path = "../../bee-storage/bee-storage", default-features = false, optional = true }
bee-tangle = { version = "0.2.0", path = "../../bee-tangle", default-features = false, optional = true }
bee-storage = { version = "0.11.0", path = "../../bee-storage/bee-storage", default-features = false, optional = true }
bee-tangle = { version = "0.3.0", path = "../../bee-tangle", default-features = false, optional = true }

async-trait = { version = "0.1.51", default-features = false, optional = true }
bech32 = { version = "0.8.1", default-features = false, optional = true }
Expand Down
12 changes: 7 additions & 5 deletions bee-api/bee-rest-api/src/endpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub(crate) type Bech32Hrp = String;

pub(crate) const CONFIRMED_THRESHOLD: u32 = 5;

pub async fn init_full_node<N: Node>(
pub fn init_full_node<N: Node>(
rest_api_config: RestApiConfig,
protocol_config: ProtocolConfig,
network_id: NetworkId,
Expand Down Expand Up @@ -109,7 +109,7 @@ where
requested_messages,
consensus_worker,
)
.recover(handle_rejection);
.recover(|err| async { handle_rejection(err) });

let (_, server) =
warp::serve(routes).bind_with_graceful_shutdown(rest_api_config.bind_socket_addr(), async {
Expand All @@ -125,7 +125,7 @@ where
}
}

async fn handle_rejection(err: Rejection) -> Result<impl Reply, Infallible> {
fn handle_rejection(err: Rejection) -> Result<impl Reply, Infallible> {
let (http_code, err_code, reason) = match err.find() {
// handle custom rejections
Some(CustomRejection::Forbidden) => (StatusCode::FORBIDDEN, "403", "access forbidden"),
Expand Down Expand Up @@ -153,7 +153,7 @@ async fn handle_rejection(err: Rejection) -> Result<impl Reply, Infallible> {
))
}

pub async fn init_entry_node<N: Node>(rest_api_config: RestApiConfig, node_builder: N::Builder) -> N::Builder
pub fn init_entry_node<N: Node>(rest_api_config: RestApiConfig, node_builder: N::Builder) -> N::Builder
where
N::Backend: StorageBackend,
{
Expand All @@ -174,7 +174,9 @@ where
node.spawn::<Self, _, _>(|shutdown| async move {
info!("Running.");

let health = warp::path("health").map(|| StatusCode::OK).recover(handle_rejection);
let health = warp::path("health")
.map(|| StatusCode::OK)
.recover(|err| async { handle_rejection(err) });

let (_, server) = warp::serve(health).bind_with_graceful_shutdown(config.bind_socket_addr(), async {
shutdown.await.ok();
Expand Down
19 changes: 15 additions & 4 deletions bee-api/bee-rest-api/src/endpoints/routes/api/v1/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,23 @@ pub(crate) fn filter<B: StorageBackend>(
.and(with_protocol_config(protocol_config))
.and(with_node_info(node_info))
.and(with_peer_manager(peer_manager))
.and_then(info)
.and_then(
|tangle, network_id, bech32_hrp, rest_api_config, protocol_config, node_info, peer_manager| async {
info(
tangle,
network_id,
bech32_hrp,
rest_api_config,
protocol_config,
node_info,
peer_manager,
)
},
)
.boxed()
}

pub(crate) async fn info<B: StorageBackend>(
pub(crate) fn info<B: StorageBackend>(
tangle: ResourceHandle<Tangle<B>>,
network_id: NetworkId,
bech32_hrp: Bech32Hrp,
Expand All @@ -66,14 +78,13 @@ pub(crate) async fn info<B: StorageBackend>(
let latest_milestone_index = tangle.get_latest_milestone_index();
let latest_milestone_timestamp = tangle
.get_milestone(latest_milestone_index)
.await
.map(|m| m.timestamp())
.unwrap_or_default();

Ok(warp::reply::json(&SuccessBody::new(InfoResponse {
name: node_info.name.clone(),
version: node_info.version.clone(),
is_healthy: health::is_healthy(&tangle, &peer_manager).await,
is_healthy: health::is_healthy(&tangle, &peer_manager),
network_id: network_id.0,
bech32_hrp,
min_pow_score: protocol_config.minimum_pow_score(),
Expand Down
6 changes: 3 additions & 3 deletions bee-api/bee-rest-api/src/endpoints/routes/api/v1/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ pub(crate) fn filter<B: StorageBackend>(
.and(warp::get())
.and(has_permission(ROUTE_MESSAGE, public_routes, allowed_ips))
.and(with_tangle(tangle))
.and_then(message)
.and_then(|message_id, tangle| async move { message(message_id, tangle) })
.boxed()
}

pub(crate) async fn message<B: StorageBackend>(
pub(crate) fn message<B: StorageBackend>(
message_id: MessageId,
tangle: ResourceHandle<Tangle<B>>,
) -> Result<impl Reply, Rejection> {
match tangle.get(&message_id).await.map(|m| (*m).clone()) {
match tangle.get(&message_id) {
Some(message) => Ok(warp::reply::json(&SuccessBody::new(MessageResponse(MessageDto::from(
&message,
))))),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ pub(crate) fn filter<B: StorageBackend>(
.and(warp::get())
.and(has_permission(ROUTE_MESSAGE_CHILDREN, public_routes, allowed_ips))
.and(with_tangle(tangle))
.and_then(message_children)
.and_then(|message_id, tangle| async move { message_children(message_id, tangle) })
.boxed()
}

pub async fn message_children<B: StorageBackend>(
pub fn message_children<B: StorageBackend>(
message_id: MessageId,
tangle: ResourceHandle<Tangle<B>>,
) -> Result<impl Reply, Rejection> {
let mut children = Vec::from_iter(tangle.get_children(&message_id).await.unwrap_or_default());
let mut children = Vec::from_iter(tangle.get_children(&message_id).unwrap_or_default());
let count = children.len();
let max_results = 1000;
children.truncate(max_results);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ pub(crate) fn filter<B: StorageBackend>(
.and(warp::get())
.and(has_permission(ROUTE_MESSAGE_METADATA, public_routes, allowed_ips))
.and(with_tangle(tangle))
.and_then(message_metadata)
.and_then(|message_id, tangle| async move { message_metadata(message_id, tangle) })
.boxed()
}

pub(crate) async fn message_metadata<B: StorageBackend>(
pub(crate) fn message_metadata<B: StorageBackend>(
message_id: MessageId,
tangle: ResourceHandle<Tangle<B>>,
) -> Result<impl Reply, Rejection> {
Expand All @@ -48,11 +48,8 @@ pub(crate) async fn message_metadata<B: StorageBackend>(
)));
}

match tangle.get(&message_id).await.map(|m| (*m).clone()) {
Some(message) => {
// existing message <=> existing metadata, therefore unwrap() is safe
let metadata = tangle.get_metadata(&message_id).await.unwrap();

match tangle.get_message_and_metadata(&message_id) {
Some((message, metadata)) => {
// TODO: access constants from URTS
let ymrsi_delta = 8;
let omrsi_delta = 13;
Expand Down Expand Up @@ -113,13 +110,17 @@ pub(crate) async fn message_metadata<B: StorageBackend>(
conflict_reason = None;

let cmi = *tangle.get_confirmed_milestone_index();

// unwrap() of OMRSI/YMRSI is safe since message is solid
if (cmi - *metadata.omrsi().unwrap().index()) > below_max_depth {
let (omrsi, ymrsi) = metadata
.omrsi_and_ymrsi()
.map(|(o, y)| (*o.index(), *y.index()))
.unwrap();

if (cmi - omrsi) > below_max_depth {
should_promote = Some(false);
should_reattach = Some(true);
} else if (cmi - *metadata.ymrsi().unwrap().index()) > ymrsi_delta
|| (cmi - *metadata.omrsi().unwrap().index()) > omrsi_delta
{
} else if (cmi - ymrsi) > ymrsi_delta || (cmi - omrsi) > omrsi_delta {
should_promote = Some(true);
should_reattach = Some(false);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ pub(crate) fn filter<B: StorageBackend>(
.and(warp::get())
.and(has_permission(ROUTE_MESSAGE_RAW, public_routes, allowed_ips))
.and(with_tangle(tangle))
.and_then(message_raw)
.and_then(|message_id, tangle| async move { message_raw(message_id, tangle) })
.boxed()
}

pub async fn message_raw<B: StorageBackend>(
pub fn message_raw<B: StorageBackend>(
message_id: MessageId,
tangle: ResourceHandle<Tangle<B>>,
) -> Result<impl Reply, Rejection> {
match tangle.get(&message_id).await.map(|m| (*m).clone()) {
match tangle.get(&message_id) {
Some(message) => Ok(Response::builder()
.header("Content-Type", "application/octet-stream")
.body(message.pack_new())),
Expand Down
8 changes: 4 additions & 4 deletions bee-api/bee-rest-api/src/endpoints/routes/api/v1/milestone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,16 @@ pub(crate) fn filter<B: StorageBackend>(
.and(warp::get())
.and(has_permission(ROUTE_MILESTONE, public_routes, allowed_ips))
.and(with_tangle(tangle))
.and_then(milestone)
.and_then(|milestone_index, tangle| async move { milestone(milestone_index, tangle) })
.boxed()
}

pub(crate) async fn milestone<B: StorageBackend>(
pub(crate) fn milestone<B: StorageBackend>(
milestone_index: MilestoneIndex,
tangle: ResourceHandle<Tangle<B>>,
) -> Result<impl Reply, Rejection> {
match tangle.get_milestone_message_id(milestone_index).await {
Some(message_id) => match tangle.get_metadata(&message_id).await {
match tangle.get_milestone_message_id(milestone_index) {
Some(message_id) => match tangle.get_metadata(&message_id) {
Some(metadata) => Ok(warp::reply::json(&SuccessBody::new(MilestoneResponse {
milestone_index: *milestone_index,
message_id: message_id.to_string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ pub(crate) fn filter(
.and(warp::delete())
.and(has_permission(ROUTE_REMOVE_PEER, public_routes, allowed_ips))
.and(with_network_command_sender(network_command_sender))
.and_then(remove_peer)
.and_then(|peer_id, network_controller| async move { remove_peer(peer_id, network_controller) })
.boxed()
}

pub(crate) async fn remove_peer(
pub(crate) fn remove_peer(
peer_id: PeerId,
network_controller: ResourceHandle<NetworkCommandSender>,
) -> Result<impl Reply, Rejection> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ pub(crate) async fn submit_message<B: StorageBackend>(
if parsed == 0 { None } else { Some(parsed) }
};

let message = build_message(network_id, parents, payload, nonce, rest_api_config, protocol_config).await?;
let message = build_message(network_id, parents, payload, nonce, rest_api_config, protocol_config)?;
let message_id = forward_to_message_submitter(message, tangle, message_submitter).await?;

Ok(warp::reply::with_status(
Expand All @@ -171,7 +171,7 @@ pub(crate) async fn submit_message<B: StorageBackend>(
))
}

pub(crate) async fn build_message(
pub(crate) fn build_message(
network_id: u64,
parents: Vec<MessageId>,
payload: Option<Payload>,
Expand Down Expand Up @@ -244,7 +244,7 @@ pub(crate) async fn forward_to_message_submitter<B: StorageBackend>(
) -> Result<MessageId, Rejection> {
let (message_id, message_bytes) = message.id();

if tangle.contains(&message_id).await {
if tangle.contains(&message_id) {
return Ok(message_id);
}

Expand Down
Loading

0 comments on commit 58b3df3

Please sign in to comment.