Skip to content

Commit

Permalink
remove unused object fetching fns from authority aggregator (MystenLa…
Browse files Browse the repository at this point in the history
  • Loading branch information
longbowlu committed Dec 23, 2022
1 parent ddbe478 commit b6f035a
Showing 1 changed file with 2 additions and 75 deletions.
77 changes: 2 additions & 75 deletions crates/sui-core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ use crate::safe_client::{SafeClient, SafeClientMetrics, SafeClientMetricsBase};
use crate::validator_info::make_committee;

use async_trait::async_trait;
use futures::{future, future::BoxFuture, stream::FuturesUnordered, StreamExt};
use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
use itertools::Itertools;
use move_core_types::value::MoveStructLayout;
use mysten_metrics::{monitored_future, spawn_monitored_task};
use mysten_metrics::monitored_future;
use mysten_network::config::Config;
use std::convert::AsRef;
use sui_config::genesis::Genesis;
Expand Down Expand Up @@ -47,15 +47,13 @@ use std::string::ToString;
use std::sync::Arc;
use std::time::Duration;
use sui_types::committee::{CommitteeWithNetAddresses, StakeUnit};
use tokio::sync::mpsc::Receiver;
use tokio::time::{sleep, timeout};

use crate::authority::{AuthorityState, AuthorityStore};
use crate::epoch::committee_store::CommitteeStore;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
use tap::TapFallible;

const OBJECT_DOWNLOAD_CHANNEL_BOUND: usize = 1024;
pub const DEFAULT_RETRIES: usize = 4;

#[cfg(test)]
Expand Down Expand Up @@ -1506,77 +1504,6 @@ where
Ok(ObjectRead::NotExists(object_id))
}

/// Given a list of object refs, download the objects.
pub fn fetch_objects_from_authorities(
&self,
object_refs: BTreeSet<ObjectRef>,
) -> Receiver<SuiResult<Object>> {
let (sender, receiver) = tokio::sync::mpsc::channel(OBJECT_DOWNLOAD_CHANNEL_BOUND);
for object_ref in object_refs {
let sender = sender.clone();
let client = self.authority_clients.clone();
let timeout = self.timeouts.authority_request_timeout;
spawn_monitored_task!(Self::fetch_one_object(client, object_ref, timeout, sender,));
}
// Close unused channel
drop(sender);
receiver
}

/// This function fetches one object at a time, and sends back the result over the channel
/// The object ids are also returned so the caller can determine which fetches failed
/// NOTE: This function assumes all authorities are honest
async fn fetch_one_object(
authority_clients: BTreeMap<AuthorityName, SafeClient<A>>,
object_ref: ObjectRef,
timeout: Duration,
sender: tokio::sync::mpsc::Sender<Result<Object, SuiError>>,
) {
let object_id = object_ref.0;
// Prepare the request
// TODO: We should let users decide what layout they want in the result.
let request = ObjectInfoRequest::latest_object_info_request(
object_id,
Some(ObjectFormatOptions::default()),
);

// For now assume all authorities. Assume they're all honest
// This assumption is woeful, and should be fixed
// TODO: https://github.com/MystenLabs/sui/issues/320
let results = future::join_all(authority_clients.iter().map(|(_, ac)| {
tokio::time::timeout(
timeout,
ac.handle_object_info_request(request.clone(), false),
)
}))
.await;

let mut ret_val: Result<Object, SuiError> = Err(SuiError::ObjectFetchFailed {
object_id,
err: "No authority returned the correct object".to_string(),
});
// Find the first non-error value
// There are multiple reasons why we might not have an object
// We can timeout, or the authority returns an error or simply no object
// When we get an object back, it also might not match the digest we want
for resp in results.into_iter().flatten().flatten() {
match resp.object_and_lock {
// Either the object is a shared object, in which case we don't care about its content
// because we can never keep shared objects up-to-date.
// Or if it's not shared object, we check if the digest matches.
Some(o) if o.object.is_shared() || o.object.digest() == object_ref.2 => {
ret_val = Ok(o.object);
break;
}
_ => (),
}
}
sender
.send(ret_val)
.await
.expect("Cannot send object on channel after object fetch attempt");
}

pub async fn handle_checkpoint_request(
&self,
request: &CheckpointRequest,
Expand Down

0 comments on commit b6f035a

Please sign in to comment.