Skip to content

Commit

Permalink
feat: start refactoring celestia.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
sebasti810 authored and distractedm1nd committed Aug 3, 2024
1 parent ea13d15 commit afdcac4
Showing 1 changed file with 81 additions and 84 deletions.
165 changes: 81 additions & 84 deletions src/da/celestia.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
da::{DataAvailabilityLayer, FinalizedEpoch},
error::{DAResult, DataAvailabilityError, GeneralError},
};
use anyhow::{Context, Result};
use async_trait::async_trait;
use borsh::from_slice;
use celestia_rpc::{BlobClient, Client, HeaderClient};
Expand All @@ -19,20 +20,18 @@ use tokio::{
};

impl TryFrom<&Blob> for FinalizedEpoch {
type Error = GeneralError;
type Error = anyhow::Error;

fn try_from(value: &Blob) -> Result<Self, GeneralError> {
from_slice::<Self>(&value.data)
.map_err(|e| GeneralError::DecodingError(format!("decoding blob: {}", e)))
fn try_from(value: &Blob) -> Result<Self, Self::Error> {
from_slice::<Self>(&value.data).with_context(|| "Failed to decode blob into FinalizedEpoch")
}
}

impl TryFrom<&Blob> for Operation {
type Error = GeneralError;
type Error = anyhow::Error;

fn try_from(value: &Blob) -> Result<Self, GeneralError> {
from_slice::<Self>(&value.data)
.map_err(|e| GeneralError::DecodingError(format!("decoding blob: {}", e)))
fn try_from(value: &Blob) -> Result<Self, Self::Error> {
from_slice::<Self>(&value.data).with_context(|| "Failed to decode blob into Operation")
}
}

Expand All @@ -51,16 +50,17 @@ impl CelestiaConnection {

let client = Client::new(&config.connection_string, auth_token)
.await
.map_err(|e| {
DataAvailabilityError::ConnectionError(format!(
"websocket initialization failed: {}",
e
))
})?;

let snark_namespace = create_namespace(&config.snark_namespace_id)?;
.context("Failed to initialize websocket connection")
.map_err(|e| DataAvailabilityError::ConnectionError(e.to_string()))?;

let snark_namespace = create_namespace(&config.snark_namespace_id)
.context("Failed to create snark namespace")
.map_err(|e| DataAvailabilityError::InitializationError(e.to_string()))?;

let operation_namespace = match &config.operation_namespace_id {
Some(id) => create_namespace(id)?,
Some(id) => create_namespace(id)
.context("Failed to create operation namespace")
.map_err(|e| DataAvailabilityError::InitializationError(e.to_string()))?,
None => snark_namespace,
};

Expand All @@ -75,19 +75,23 @@ impl CelestiaConnection {
}

fn create_namespace(namespace_hex: &str) -> DAResult<Namespace> {
let decoded_hex = hex::decode(namespace_hex).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::DecodingError(format!(
"decoding namespace '{}': {}",
namespace_hex, e
)))
})?;

Namespace::new_v0(&decoded_hex).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::EncodingError(format!(
"creating namespace '{}': {}",
namespace_hex, e
)))
})
let decoded_hex = hex::decode(namespace_hex)
.context(format!(
"Failed to decode namespace hex '{}'",
namespace_hex
))
.map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::DecodingError(e.to_string()))
})?;

Namespace::new_v0(&decoded_hex)
.context(format!(
"Failed to create namespace from '{}'",
namespace_hex
))
.map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::EncodingError(e.to_string()))
})
}

#[async_trait]
Expand All @@ -100,13 +104,11 @@ impl DataAvailabilityLayer for CelestiaConnection {
}

async fn initialize_sync_target(&self) -> DAResult<u64> {
match HeaderClient::header_network_head(&self.client).await {
Ok(extended_header) => Ok(extended_header.header.height.value()),
Err(err) => Err(DataAvailabilityError::NetworkError(format!(
"getting network head from da layer: {}",
err
))),
}
HeaderClient::header_network_head(&self.client)
.await
.context("Failed to get network head from DA layer")
.map(|extended_header| extended_header.header.height.value())
.map_err(|e| DataAvailabilityError::NetworkError(e.to_string()))
}

async fn get_snarks(&self, height: u64) -> DAResult<Vec<FinalizedEpoch>> {
Expand Down Expand Up @@ -186,46 +188,48 @@ impl DataAvailabilityLayer for CelestiaConnection {

async fn get_operations(&self, height: u64) -> DAResult<Vec<Operation>> {
trace!("searching for operations on da layer at height {}", height);
match BlobClient::blob_get_all(&self.client, height, &[self.operation_namespace]).await {
Ok(blobs) => {
let mut operations = Vec::new();
for blob in blobs.iter() {
match Operation::try_from(blob) {
Ok(operation) => operations.push(operation),
Err(_) => {
debug!(
"marshalling blob from height {} to operation failed: {:?}",
height, &blob
)
}
}
let blobs = BlobClient::blob_get_all(&self.client, height, &[self.operation_namespace])
.await
.with_context(|| format!("Failed to get operation blobs at height {}", height))
.map_err(|e| DataAvailabilityError::DataRetrievalError(height, e.to_string()))?;

let operations = blobs
.iter()
.filter_map(|blob| match Operation::try_from(blob) {
Ok(operation) => Some(operation),
Err(e) => {
warn!(
"Failed to parse blob from height {} to operation: {:?}",
height, e
);
None
}
Ok(operations)
}
// TODO: replace this error
Err(err) => Err(DataAvailabilityError::DataRetrievalError(
height,
format!("getting operations from da layer: {}", err),
)),
}
})
.collect();

Ok(operations)
}

async fn submit_operations(&self, operations: Vec<Operation>) -> DAResult<u64> {
debug!("posting {} operations to DA layer", operations.len());
let blobs: Result<Vec<Blob>, DataAvailabilityError> = operations
let blobs: Result<Vec<Blob>, _> = operations
.iter()
.map(|operation| {
let data = borsh::to_vec(operation).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::ParsingError(format!(
"serializing operation {}: {}",
operation, e
)))
})?;
Blob::new(self.operation_namespace, data).map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::BlobCreationError(
e.to_string(),
))
})
let data = borsh::to_vec(operation)
.with_context(|| format!("Failed to serialize operation {}", operation))
.map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::ParsingError(
e.to_string(),
))
})?;

Blob::new(self.operation_namespace, data)
.with_context(|| format!("Failed to create blob for operation {}", operation))
.map_err(|e| {
DataAvailabilityError::GeneralError(GeneralError::BlobCreationError(
e.to_string(),
))
})
})
.collect();

Expand All @@ -235,25 +239,18 @@ impl DataAvailabilityLayer for CelestiaConnection {
trace!("blob {}: {:?}", i, blob);
}

match self.client.blob_submit(&blobs, GasPrice::from(-1.0)).await {
Ok(height) => Ok(height),
Err(err) => Err(DataAvailabilityError::SubmissionError(
// todo: fucking submission error is yikes, we need anyhow
0,
err.to_string(),
)),
}
self.client
.blob_submit(&blobs, GasPrice::from(-1.0))
.await
.context("Failed to submit operation blobs")
.map_err(|e| DataAvailabilityError::SubmissionError(0, e.to_string()))
}

async fn start(&self) -> DAResult<()> {
let mut header_sub = HeaderClient::header_subscribe(&self.client)
.await
.map_err(|e| {
DataAvailabilityError::NetworkError(format!(
"subscribing to headers from da layer: {}",
e
))
})?;
.context("Failed to subscribe to headers from DA layer")
.map_err(|e| DataAvailabilityError::NetworkError(e.to_string()))?;

let synctarget_buffer = self.sync_target_tx.clone();
spawn(async move {
Expand Down

0 comments on commit afdcac4

Please sign in to comment.