Skip to content

Commit

Permalink
[LogServer] Stub log server service
Browse files Browse the repository at this point in the history
New `log-server` role and associated service to serve as the server-side of the replicated loglet
  • Loading branch information
AhmedSoliman committed Jun 24, 2024
1 parent c8999d8 commit a40701e
Show file tree
Hide file tree
Showing 12 changed files with 266 additions and 2 deletions.
45 changes: 45 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ description = "Restate makes distributed applications easy!"
# Own crates
codederror = { path = "crates/codederror" }
restate-admin = { path = "crates/admin" }
restate-admin-rest-model = { path = "crates/admin-rest-model" }
restate-base64-util = { path = "crates/base64-util" }
restate-bifrost = { path = "crates/bifrost" }
restate-cli-util = { path = "crates/cli-util" }
Expand All @@ -45,7 +46,7 @@ restate-ingress-http = { path = "crates/ingress-http" }
restate-ingress-kafka = { path = "crates/ingress-kafka" }
restate-invoker-api = { path = "crates/invoker-api" }
restate-invoker-impl = { path = "crates/invoker-impl" }
restate-admin-rest-model = { path = "crates/admin-rest-model" }
restate-log-server = { path = "crates/log-server" }
restate-metadata-store = { path = "crates/metadata-store" }
restate-node = { path = "crates/node" }
restate-partition-store = { path = "crates/partition-store" }
Expand Down
1 change: 1 addition & 0 deletions crates/bifrost/src/loglets/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@

pub mod local_loglet;
pub mod memory_loglet;
pub mod replicated_loglet;
9 changes: 9 additions & 0 deletions crates/bifrost/src/loglets/replicated_loglet/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
61 changes: 61 additions & 0 deletions crates/log-server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
[package]
name = "restate-log-server"
version.workspace = true
authors.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
publish = false

[features]
default = []
options_schema = ["dep:schemars"]
test-util = []

[dependencies]
restate-core = { workspace = true }
restate-rocksdb = { workspace = true }
restate-types = { workspace = true }

anyhow = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
bytestring = { workspace = true, features = ["serde"] }
codederror = { workspace = true }
derive_builder = { workspace = true }
derive_more = { workspace = true }
enum-map = { workspace = true, features = ["serde"] }
futures = { workspace = true }
humantime = { workspace = true }
metrics = { workspace = true }
once_cell = { workspace = true }
pin-project = { workspace = true }
rocksdb = { workspace = true }
schemars = { workspace = true, optional = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
smallvec = { workspace = true }
static_assertions = { workspace = true }
strum = { workspace = true }
strum_macros = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true, features = ["sync"] }
tokio-util = { workspace = true }
tracing = { workspace = true }


[dev-dependencies]
restate-core = { workspace = true, features = ["test-util"] }
restate-metadata-store = { workspace = true }
restate-test-util = { workspace = true }
restate-types = { workspace = true, features = ["test-util"] }

criterion = { workspace = true, features = ["async_tokio"] }
googletest = { workspace = true }
test-log = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true, features = ["test-util"] }
tracing-subscriber = { workspace = true }
tracing-test = { workspace = true }
29 changes: 29 additions & 0 deletions crates/log-server/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use codederror::CodedError;

use restate_core::ShutdownError;

/// Result type for log-server operations.
pub type Result<T, E = LogServerError> = std::result::Result<T, E>;

#[derive(Debug, thiserror::Error, CodedError)]
pub enum LogServerBuildError {
#[error("unknown")]
#[code(unknown)]
Unknown,
}

#[derive(thiserror::Error, Debug, Clone)]
pub enum LogServerError {
#[error("operation failed due to an ongoing shutdown")]
Shutdown(#[from] ShutdownError),
}
16 changes: 16 additions & 0 deletions crates/log-server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

mod error;
mod metric_definitions;
mod service;

pub use error::{LogServerBuildError, LogServerError, Result};
pub use service::LogServerService;
11 changes: 11 additions & 0 deletions crates/log-server/src/metric_definitions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

pub fn describe_metrics() {}
53 changes: 53 additions & 0 deletions crates/log-server/src/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use restate_core::network::{MessageRouterBuilder, Networking};
use restate_core::{Metadata, TaskCenter, TaskKind};
use restate_types::config::UpdateableConfiguration;

use crate::error::LogServerBuildError;
use crate::metric_definitions::describe_metrics;

pub struct LogServerService {
_updateable_config: UpdateableConfiguration,
task_center: TaskCenter,
_metadata: Metadata,
}

impl LogServerService {
pub async fn create(
updateable_config: UpdateableConfiguration,
task_center: TaskCenter,
metadata: Metadata,
_router_builder: &mut MessageRouterBuilder,
_networking: Networking,
) -> Result<Self, LogServerBuildError> {
describe_metrics();

Ok(Self {
_updateable_config: updateable_config,
task_center,
_metadata: metadata,
})
}

pub async fn start(self) -> anyhow::Result<()> {
let tc = self.task_center.clone();
tc.spawn(TaskKind::SystemService, "log-server", None, async {
self.run().await
})?;

Ok(())
}

async fn run(self) -> anyhow::Result<()> {
Ok(())
}
}
1 change: 1 addition & 0 deletions crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ restate-admin = { workspace = true, features = ["servers"] }
restate-bifrost = { workspace = true }
restate-core = { workspace = true }
restate-errors = { workspace = true }
restate-log-server = { workspace = true }
restate-metadata-store = { workspace = true }
restate-rocksdb = { workspace = true }
restate-service-client = { workspace = true }
Expand Down
36 changes: 35 additions & 1 deletion crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::future::Future;
use std::time::Duration;

use codederror::CodedError;
use restate_log_server::LogServerService;
use tokio::time::Instant;
use tracing::{debug, error, info, trace};

Expand Down Expand Up @@ -80,6 +81,12 @@ pub enum BuildError {
#[code]
restate_metadata_store::local::BuildError,
),
#[error("building log-server failed: {0}")]
LogServer(
#[from]
#[code]
restate_log_server::LogServerBuildError,
),
#[error("node neither runs cluster controller nor its address has been configured")]
#[code(unknown)]
UnknownClusterController,
Expand All @@ -98,6 +105,7 @@ pub struct Node {
metadata_store_role: Option<LocalMetadataStoreService>,
admin_role: Option<AdminRole>,
worker_role: Option<WorkerRole>,
log_server: Option<LogServerService>,
server: NetworkServer,
}

Expand Down Expand Up @@ -144,9 +152,25 @@ impl Node {
let updating_schema_information = metadata.schema_updateable();
let bifrost = BifrostService::new(metadata.clone());

let tc = task_center();
let log_server = if config.has_role(Role::LogServer) {
Some(
LogServerService::create(
updateable_config.clone(),
tc.clone(),
metadata.clone(),
&mut router_builder,
networking.clone(),
)
.await?,
)
} else {
None
};

let admin_role = if config.has_role(Role::Admin) {
Some(AdminRole::new(
task_center(),
tc.clone(),
updateable_config.clone(),
metadata.clone(),
networking.clone(),
Expand Down Expand Up @@ -204,6 +228,7 @@ impl Node {
metadata_store_role,
admin_role,
worker_role,
log_server,
server,
})
}
Expand Down Expand Up @@ -311,6 +336,15 @@ impl Node {
tc.run_in_scope("bifrost-init", None, self.bifrost.start())
.await?;

if let Some(log_server) = self.log_server {
tc.spawn(
TaskKind::SystemBoot,
"log-server-init",
None,
log_server.start(),
)?;
}

if let Some(admin_role) = self.admin_role {
tc.spawn(
TaskKind::SystemBoot,
Expand Down
3 changes: 3 additions & 0 deletions crates/types/src/nodes_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ pub enum Role {
Worker,
/// Admin runs cluster controller and user-facing admin APIs
Admin,
/// Serves the metadata store
MetadataStore,
/// Serves a log server for replicated loglets
LogServer,
}

#[serde_as]
Expand Down

0 comments on commit a40701e

Please sign in to comment.