Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
distractedm1nd committed Jul 8, 2024
1 parent 817f02d commit 473f31c
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 74 deletions.
27 changes: 18 additions & 9 deletions src/da.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@ use std::{
str::FromStr,
sync::Arc,
};
use tokio::{sync::mpsc, task::spawn};
use tokio::{
sync::{
mpsc::{channel, Receiver, Sender},
Mutex,
},
task::spawn,
};

#[derive(Serialize, Deserialize, Clone)]
pub struct EpochJson {
Expand Down Expand Up @@ -76,6 +82,8 @@ impl Signable for EpochJson {
}
}

/// Message represents an internal message that a new height has been reached on the DA layer
/// and the sync target should be updated.
enum Message {
UpdateTarget(u64),
}
Expand All @@ -92,8 +100,9 @@ pub trait DataAvailabilityLayer: Send + Sync {
pub struct CelestiaConnection {
pub client: celestia_rpc::Client,
pub namespace_id: Namespace,
tx: Arc<mpsc::Sender<Message>>,
rx: Arc<tokio::sync::Mutex<mpsc::Receiver<Message>>>,

synctarget_tx: Arc<Sender<Message>>,
synctarget_rx: Arc<Mutex<Receiver<Message>>>,
}

/// The `NoopDataAvailabilityLayer` is a mock implementation of the `DataAvailabilityLayer` trait.
Expand Down Expand Up @@ -137,7 +146,7 @@ impl CelestiaConnection {
namespace_hex: &String,
) -> Result<Self, DataAvailabilityError> {
// TODO: Make buffer size constant
let (tx, rx) = mpsc::channel(5);
let (tx, rx) = channel(5);

let client = Client::new(&connection_string, auth_token)
.await
Expand Down Expand Up @@ -165,16 +174,16 @@ impl CelestiaConnection {
Ok(CelestiaConnection {
client,
namespace_id,
tx: Arc::new(tx),
rx: Arc::new(tokio::sync::Mutex::new(rx)),
synctarget_tx: Arc::new(tx),
synctarget_rx: Arc::new(Mutex::new(rx)),
})
}
}

#[async_trait]
impl DataAvailabilityLayer for CelestiaConnection {
async fn get_message(&self) -> Result<u64, DataAvailabilityError> {
match self.rx.lock().await.recv().await {
match self.synctarget_rx.lock().await.recv().await {
Some(Message::UpdateTarget(height)) => Ok(height),
None => Err(DataAvailabilityError::ChannelReceiveError),
}
Expand Down Expand Up @@ -256,13 +265,13 @@ impl DataAvailabilityLayer for CelestiaConnection {
))
})?;

let tx1 = self.tx.clone();
let synctarget_buffer = self.synctarget_tx.clone();
spawn(async move {
while let Some(extended_header_result) = header_sub.next().await {
match extended_header_result {
Ok(extended_header) => {
let height = extended_header.header.height.value();
match tx1.send(Message::UpdateTarget(height)).await {
match synctarget_buffer.send(Message::UpdateTarget(height)).await {
Ok(_) => {
debug!("Sent message to channel. Height: {}", height);
}
Expand Down
141 changes: 77 additions & 64 deletions src/node_types.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use crate::error::DataAvailabilityError;
use async_trait::async_trait;
use bellman::groth16::Proof;
use bls12_381::Bls12;
use crypto_hash::{hex_digest, Algorithm};
use ed25519_dalek::{Signer, SigningKey};
use indexed_merkle_tree::{
error::MerkleTreeError,
node::{LeafNode, Node},
tree::IndexedMerkleTree,
use indexed_merkle_tree::{error::MerkleTreeError, node::Node, tree::IndexedMerkleTree};
use std::{self, io::ErrorKind, sync::Arc, time::Duration};
use tokio::{
sync::{
mpsc::{channel, Receiver, Sender},
Mutex,
},
task::spawn,
time::sleep,
};
use std::{self, collections::VecDeque, io::ErrorKind, sync::Arc, time::Duration};
use tokio::{sync::mpsc, task::spawn, time::sleep};

use crate::{
cfg::Config,
Expand All @@ -25,12 +26,20 @@ use crate::{
},
};

/// DA_RETRY_COUNT determines how many times to retry epoch submission.
const DA_RETRY_COUNT: u64 = 5;
/// DA_RETRY_COUNT determines how long to wait between failed submissions.
const DA_RETRY_INTERVAL: Duration = Duration::from_secs(5);
/// CHANNEL_BUFFER_SIZE determines the size of the channel buffer for the DA backlog.
const CHANNEL_BUFFER_SIZE: usize = 5;

#[async_trait]
pub trait NodeType {
async fn start(self: Arc<Self>) -> std::result::Result<(), std::io::Error>;
// async fn stop(&self) -> Result<(), String>;
}

// Message represents an internal message that can be sent between sequencer threads.
enum Message {
FinalizedEpoch(EpochJson),
}
Expand All @@ -42,8 +51,8 @@ pub struct Sequencer {
pub ws: WebServer,
pub key: SigningKey,

tx: Arc<mpsc::Sender<Message>>,
rx: Arc<tokio::sync::Mutex<mpsc::Receiver<Message>>>,
epoch_buffer_tx: Arc<Sender<Message>>,
epoch_buffer_rx: Arc<Mutex<Receiver<Message>>>,
}

pub struct LightClient {
Expand Down Expand Up @@ -71,55 +80,9 @@ impl NodeType for Sequencer {
}
}

let cloned_self = self.clone();

debug!("starting main sequencer loop");
let tx1 = self.tx.clone();
let self_arc = self.clone();
spawn(async move {
loop {
match self_arc.finalize_epoch().await {
Ok(epoch) => {
info!(
"sequencer_loop: finalized epoch {}",
self_arc.db.get_epoch().unwrap()
);
tx1.send(Message::FinalizedEpoch(epoch));
}
Err(e) => error!("sequencer_loop: finalizing epoch: {}", e),
}
// elapsed time/ticker instead of sleep
sleep(Duration::from_secs(self_arc.epoch_duration)).await;
}
});

debug!("starting da submission loop");
spawn(async move {
loop {
let epoch = self.get_message().await.unwrap();
let mut retry_counter = 0;
loop {
// todo: make constant
if retry_counter > 5 {
// todo: graceful shutdown
panic!("da_loop: too many retries, giving up");
}
match self.da.submit(&epoch).await {
Ok(height) => {
info!("da_loop: submitted epoch at height {}", height);
break;
}
Err(e) => {
error!("da_loop: submitting epoch: {}", e);
retry_counter += 1;
}
};
}
}
});

// starting the webserver
cloned_self.ws.start(cloned_self.clone()).await
self.clone().main_loop().await;
self.clone().da_loop().await;
self.clone().ws.start(self.clone()).await
}
}

Expand Down Expand Up @@ -212,18 +175,68 @@ impl Sequencer {
cfg: Config,
key: SigningKey,
) -> Sequencer {
// TODO: Make buffer size constant
let (tx, rx) = mpsc::channel(5);
let (tx, rx) = channel(CHANNEL_BUFFER_SIZE);
Sequencer {
db,
da,
epoch_duration: cfg.epoch_time,
ws: WebServer::new(cfg.webserver.unwrap()),
key,
tx: Arc::new(tx),
rx: Arc::new(tokio::sync::Mutex::new(rx)),
epoch_buffer_tx: Arc::new(tx),
epoch_buffer_rx: Arc::new(tokio::sync::Mutex::new(rx)),
}
}

// main_loop is responsible for finalizing epochs every epoch length and writing them to the buffer for DA submission.
async fn main_loop(self: Arc<Self>) {
info!("starting main sequencer loop");
let tx1 = self.epoch_buffer_tx.clone();
spawn(async move {
loop {
match self.finalize_epoch().await {
Ok(epoch) => {
info!(
"sequencer_loop: finalized epoch {}",
self.db.get_epoch().unwrap()
);
tx1.send(Message::FinalizedEpoch(epoch));
}
Err(e) => error!("sequencer_loop: finalizing epoch: {}", e),
}
// elapsed time/ticker instead of sleep
sleep(Duration::from_secs(self.epoch_duration)).await;
}
});
}

// da_loop is responsible for submitting finalized epochs to the DA layer.
async fn da_loop(self: Arc<Self>) {
info!("starting da submission loop");
spawn(async move {
loop {
let epoch = self.get_message().await.unwrap();
let mut retry_counter = 0;
loop {
if retry_counter > DA_RETRY_COUNT {
// todo: graceful shutdown
panic!("da_loop: too many retries, giving up");
}
match self.da.submit(&epoch).await {
Ok(height) => {
info!("da_loop: submitted epoch at height {}", height);
break;
}
Err(e) => {
error!("da_loop: submitting epoch: {}", e);
retry_counter += 1;
sleep(DA_RETRY_INTERVAL).await;
}
};
}
}
});
}

/// Initializes the epoch state by setting up the input table and incrementing the epoch number.
/// Periodically calls the `set_epoch_commitment` function to update the commitment for the current epoch.
///
Expand Down Expand Up @@ -304,7 +317,7 @@ impl Sequencer {
}

async fn get_message(&self) -> std::result::Result<EpochJson, DataAvailabilityError> {
match self.rx.lock().await.recv().await {
match self.epoch_buffer_rx.lock().await.recv().await {
Some(Message::FinalizedEpoch(epoch)) => Ok(epoch),
None => Err(DataAvailabilityError::ChannelReceiveError),
}
Expand Down
2 changes: 1 addition & 1 deletion src/webserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl WebServer {
/* let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
builder.set_private_key_file(env.key_path, SslFiletype::PEM).unwrap();
builder.set_certificate_chain_file(env.cert_path).unwrap(); */
info!("Starting webserver on {}:{}", self.cfg.ip, self.cfg.port);
info!("starting webserver on {}:{}", self.cfg.ip, self.cfg.port);
let ctx = Data::new(session.clone());
let (ip, port) = (self.cfg.ip.clone(), self.cfg.port);

Expand Down

0 comments on commit 473f31c

Please sign in to comment.