Skip to content

Commit

Permalink
Add integration tests. (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
YoEight authored Jul 6, 2024
1 parent 9aa006f commit d8567be
Show file tree
Hide file tree
Showing 29 changed files with 588 additions and 223 deletions.
349 changes: 251 additions & 98 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
[workspace]
resolver = "2"
members = [
"geth-node",
"geth-engine",
"geth-mikoshi",
"geth-repl",
"geth-client",
"geth-common",
"geth-consensus",
"geth-domain",
"geth-client-tests",
"geth-node",
]
29 changes: 29 additions & 0 deletions geth-client-tests/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
[package]
name = "geth-client-tests"
version = "0.1.0"
edition = "2021"

[dependencies.geth-client]
path = "../geth-client"

[dependencies.geth-engine]
path = "../geth-engine"

[dependencies.geth-common]
path = "../geth-common"

[dependencies.tokio]
version = "*"
features = ["full"]

[dependencies.fake]
version = "2.9"
features = ["derive"]

[dependencies]
temp-dir = "0.1"
eyre = "0.6"
uuid = "*"
serde = "1"
serde_json = "1"
futures = "0.3"
64 changes: 64 additions & 0 deletions geth-client-tests/src/append_read_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use fake::faker::name::en::Name;
use fake::{Fake, Faker};
use futures::TryStreamExt;
use temp_dir::TempDir;
use uuid::Uuid;

use geth_client::GrpcClient;
use geth_common::{AppendStreamCompleted, Client, Direction, ExpectedRevision, Propose, Revision};

use crate::tests::{client_endpoint, random_valid_options, Toto};

#[tokio::test]
async fn simple_append() -> eyre::Result<()> {
let db_dir = TempDir::new()?;
let options = random_valid_options(&db_dir);

let client = GrpcClient::new(client_endpoint(&options));
tokio::spawn(geth_engine::run(options.clone()));

let stream_name: String = Name().fake();
let event_type: String = Name().fake();
let event_id = Uuid::new_v4();
let expected: Toto = Faker.fake();

let completed = client
.append_stream(
&stream_name,
ExpectedRevision::Any,
vec![Propose {
id: event_id,
r#type: event_type.clone(),
data: serde_json::to_vec(&expected)?.into(),
}],
)
.await?;

let write_result = match completed {
AppendStreamCompleted::Success(r) => r,
AppendStreamCompleted::Error(e) => panic!("error: {}", e),
};

assert_eq!(
ExpectedRevision::Revision(1),
write_result.next_expected_version
);

let mut stream = client
.read_stream(&stream_name, Direction::Forward, Revision::Start, 1)
.await;

let event = stream.try_next().await?.unwrap();

assert_eq!(event_id, event.id);
assert_eq!(event_type, event.r#type);
assert_eq!(stream_name, event.stream_name);
assert_eq!(0, event.revision);

let actual = serde_json::from_slice::<Toto>(&event.data)?;

assert_eq!(expected.key, actual.key);
assert_eq!(expected.value, actual.value);

Ok(())
}
33 changes: 33 additions & 0 deletions geth-client-tests/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#[cfg(test)]
mod append_read_tests;

#[cfg(test)]
pub mod tests {
use fake::{Dummy, Fake};
use serde::{Deserialize, Serialize};
use temp_dir::TempDir;

use geth_common::EndPoint;
use geth_engine::Options;

pub fn random_valid_options(temp_dir: &TempDir) -> Options {
Options {
host: "127.0.0.1".to_string(),
port: (1_113..2_113).fake(),
db: temp_dir.path().as_os_str().to_str().unwrap().to_string(),
}
}

pub fn client_endpoint(options: &Options) -> EndPoint {
EndPoint {
host: options.host.clone(),
port: options.port,
}
}

#[derive(Serialize, Deserialize, Dummy)]
pub struct Toto {
pub key: String,
pub value: u64,
}
}
52 changes: 48 additions & 4 deletions geth-client/src/next/driver.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::collections::HashMap;
use std::time::Duration;

use eyre::bail;
use uuid::Uuid;

use geth_common::generated::next::protocol;
use geth_common::{EndPoint, Operation, OperationIn, OperationOut};

use crate::next::{connect_to_node, Command, Connection, Mailbox};
use crate::next::{connect_to_node, Command, ConnErr, Connection, Mailbox};

pub struct Driver {
endpoint: EndPoint,
Expand Down Expand Up @@ -108,9 +110,51 @@ impl Driver {
.parse()
.unwrap();

let conn = connect_to_node(uri, self.mailbox.clone()).await?;
self.connection = Some(conn);
let mut attempts = 0;
let max_attempts = 10;
loop {
match connect_to_node(&uri, self.mailbox.clone()).await {
Err(e) => match e {
ConnErr::Transport(e) => {
if e.to_string() == "transport error" {
attempts += 1;

if attempts > max_attempts {
tracing::error!(
"max connection attempt reached ({})",
max_attempts
);

bail!("max connection attempt reached");
}

tracing::error!(
"error when connecting to {}:{} {}/{}",
self.endpoint.host,
self.endpoint.port,
attempts,
max_attempts,
);

tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}

bail!("fatal error when connecting: {}", e);
}

Ok(())
ConnErr::Status(e) => {
// TODO - we might retry if the server is not ready or under too much load.
tracing::error!("error when reaching endpoint: {}", e);
bail!("error when reaching endpoint");
}
},

Ok(c) => {
self.connection = Some(c);
return Ok(());
}
}
}
}
}
14 changes: 11 additions & 3 deletions geth-client/src/next/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,21 @@ pub struct Command {
type Connection = UnboundedSender<protocol::OperationIn>;
type Mailbox = UnboundedSender<Msg>;

pub(crate) async fn connect_to_node(uri: Uri, mailbox: Mailbox) -> eyre::Result<Connection> {
let mut client = ProtocolClient::connect(uri).await?;
pub enum ConnErr {
Transport(tonic::transport::Error),
Status(tonic::Status),
}

pub(crate) async fn connect_to_node(uri: &Uri, mailbox: Mailbox) -> Result<Connection, ConnErr> {
let mut client = ProtocolClient::connect(uri.clone())
.await
.map_err(ConnErr::Transport)?;
let (connection, stream_request) = mpsc::unbounded_channel();

let mut stream_response = client
.multiplex(UnboundedReceiverStream::new(stream_request))
.await?
.await
.map_err(ConnErr::Status)?
.into_inner();

tokio::spawn(async move {
Expand Down
1 change: 1 addition & 0 deletions geth-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,7 @@ impl From<WriteResult> for operation_out::delete_stream_completed::DeleteResult
}
}

#[derive(Debug)]
pub enum AppendStreamCompleted {
Success(WriteResult),
Error(AppendError),
Expand Down
57 changes: 57 additions & 0 deletions geth-engine/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
[package]
name = "geth-engine"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies.geth-mikoshi]
path = "../geth-mikoshi"

[dependencies.geth-common]
path = "../geth-common"

[dependencies.geth-domain]
path = "../geth-domain"

[dependencies.tokio]
version = "1.20"
features = ["full"]

[dependencies.uuid]
version = "1"
features = ["v4"]

[dependencies.rand]
version = "0.8"
features = ["small_rng"]

[dependencies.pyro-core]
git = "https://github.com/YoEight/pyro.git"

[dependencies.pyro-runtime]
git = "https://github.com/YoEight/pyro.git"

[dependencies.clap]
version = "4.5"
features = ["derive"]

[dependencies]
tonic = "0.10"
prost = "0.12"
prost-types = "0.12"
prost-derive = "0.12"
tracing = "0.1"
futures = "*"
bytes = "1"
eyre = "0.6"
async-stream = "0.3"
byteorder = "1"
chrono = "0.4"
serde_json = "1"
moka = "0.11"
flatbuffers = "23.5.26"
async-trait = "0.1.71"

[build-dependencies]
tonic-build = "0.10"
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
9 changes: 7 additions & 2 deletions geth-node/src/grpc/mod.rs → geth-engine/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@ use tonic::transport::{self, Server};
use geth_common::generated::next::protocol::protocol_server::ProtocolServer;
use geth_common::Client;

use crate::options::Options;

mod local;
mod protocol;

pub async fn start_server<C>(client: C) -> Result<(), transport::Error>
pub async fn start_server<C>(options: Options, client: C) -> Result<(), transport::Error>
where
C: Client + Send + Sync + 'static,
{
let addr = "127.0.0.1:2113".parse().unwrap();
let addr = format!("{}:{}", options.host, options.port)
.parse()
.unwrap();

let protocols = protocol::ProtocolImpl::new(client);

tracing::info!("GethDB is listening on {}", addr);
Expand Down
File renamed without changes.
47 changes: 47 additions & 0 deletions geth-engine/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use std::path::PathBuf;
use std::sync::Arc;

use tokio::select;

use geth_domain::{Lsm, LsmSettings};
use geth_mikoshi::storage::FileSystemStorage;
use geth_mikoshi::wal::chunks::ChunkBasedWAL;
use geth_mikoshi::wal::WALRef;

use crate::domain::index::Index;
pub use crate::options::Options;
use crate::process::{InternalClient, Processes};

mod bus;
mod domain;
mod grpc;
mod messages;
mod names;
mod options;
mod process;
mod services;

pub async fn run(options: Options) -> eyre::Result<()> {
let storage = FileSystemStorage::new(PathBuf::from(&options.db))?;
let lsm = Lsm::load(LsmSettings::default(), storage.clone())?;
let index = Index::new(lsm);
let wal = WALRef::new(ChunkBasedWAL::load(storage.clone())?);
let processes = Processes::new(wal, index.clone());
let sub_client = processes.subscriptions_client().clone();
let client = Arc::new(InternalClient::new(processes));
let services = services::start(client.clone(), index, sub_client);

select! {
server = grpc::start_server(options, client) => {
if let Err(e) = server {
tracing::error!("GethDB node gRPC module crashed: {}", e);
}
}

_ = services.exited() => {
tracing::info!("GethDB node terminated");
}
}

Ok(())
}
Loading

0 comments on commit d8567be

Please sign in to comment.