Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework v2 #320

Merged
merged 1 commit into from
Oct 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions lol2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,22 @@ http-serde = "1"
log = "0.4"
moka = { version = "0.12", features = ["sync"] }
once_cell = "1.18"
phi-detector = "0.3"
prost = "0.12"
rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_bytes = "0.11"
shrinkwraprs = "0.3"
spin = "0.9"
tokio = { version = "1", features = ["rt"] }
tokio-retry = "0.3"
tokio-util = "0.7"
tonic = "0.10"
phi-detector = "0.3"
prost = "0.12"
rand = "0.8"
tower = "0.4.13"

[dev-dependencies]
tokio = { version = "1", features = ["full"] }

[build-dependencies]
tonic-build = "0.10"
prost-build = "0.12"
prost-build = "0.12"
3 changes: 2 additions & 1 deletion lol2/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

let mut config = prost_build::Config::new();
config.bytes(&[
".lol2.Request.message",
".lol2.WriteRequest.message",
".lol2.ReadRequest.message",
".lol2.Response.message",
".lol2.KernRequest.message",
".lol2.LogStreamEntry.command",
Expand Down
21 changes: 11 additions & 10 deletions lol2/proto/lol2.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@ import "google/protobuf/empty.proto";

package lol2;

message Request {
message WriteRequest {
bytes message = 1;
bool mutation = 2;
// unique identifier of this request
// duplicated requests with the same unique identifier are only executed once.
string request_id = 2;
}

message ReadRequest {
bytes message = 1;
}

message Response {
bytes message = 1;
}
Expand Down Expand Up @@ -72,21 +79,15 @@ message RemoveServerRequest {
string server_id = 1;
}

message ClusterInfo {
optional string known_leader_id = 1;
repeated string known_members = 2;
}

service Raft {
rpc Process(Request) returns (Response);
rpc Write(WriteRequest) returns (Response);
rpc Read(ReadRequest) returns (Response);
rpc ProcessKernRequest (KernRequest) returns (google.protobuf.Empty);
rpc GetClusterInfo (google.protobuf.Empty) returns (ClusterInfo);
rpc RequestVote (VoteRequest) returns (VoteResponse);
rpc AddServer (AddServerRequest) returns (google.protobuf.Empty);
rpc RemoveServer (RemoveServerRequest) returns (google.protobuf.Empty);
rpc SendLogStream (stream LogStreamChunk) returns (SendLogStreamResponse);
rpc GetSnapshot (GetSnapshotRequest) returns (stream SnapshotChunk);
rpc SendHeartbeat (Heartbeat) returns (google.protobuf.Empty);
rpc TimeoutNow (google.protobuf.Empty) returns (google.protobuf.Empty);
rpc Noop (google.protobuf.Empty) returns (google.protobuf.Empty);
}
2 changes: 1 addition & 1 deletion lol2/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::*;

pub type RaftClient = raft::raft_client::RaftClient<tonic::transport::channel::Channel>;
pub use raft::{AddServerRequest, ClusterInfo, RemoveServerRequest, Request, Response};
pub use raft::{AddServerRequest, ReadRequest, RemoveServerRequest, Response, WriteRequest};
10 changes: 7 additions & 3 deletions lol2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use process::RaftProcess;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use tonic::transport::Uri;
use tonic::transport::{Endpoint, Uri};

mod raft {
tonic::include_proto!("lol2");
Expand All @@ -36,7 +36,11 @@ mod raft {
)]
pub struct NodeId(#[serde(with = "http_serde::uri")] Uri);
impl NodeId {
pub fn new(uri: Uri) -> NodeId {
NodeId(uri)
pub fn new(uri: Uri) -> Self {
Self(uri)
}
pub fn from_str(url: &str) -> Result<Self> {
let url = url.parse()?;
Ok(Self(url))
}
}
11 changes: 5 additions & 6 deletions lol2/src/process/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ use super::*;

pub mod request {
use super::*;
pub struct UserRequest {
pub struct UserWriteRequest {
pub message: Bytes,
pub request_id: String,
}
pub struct UserReadRequest {
pub message: Bytes,
pub mutation: bool,
}
pub struct KernRequest {
pub message: Bytes,
Expand Down Expand Up @@ -42,8 +45,4 @@ pub mod response {
pub success: bool,
pub log_last_index: Index,
}
pub struct ClusterInfo {
pub known_leader: Option<NodeId>,
pub known_members: HashSet<NodeId>,
}
}
12 changes: 8 additions & 4 deletions lol2/src/process/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@ use super::*;

#[derive(serde::Serialize, serde::Deserialize)]
pub enum Command<'a> {
Noop,
Snapshot {
Barrier(Term),
ClusterConfiguration {
membership: HashSet<NodeId>,
},
ClusterConfiguration {
Snapshot {
membership: HashSet<NodeId>,
},
Req {
ExecuteRequest {
#[serde(with = "serde_bytes")]
message: &'a [u8],
request_id: String,
},
CompleteRequest {
request_id: String,
},
}

Expand Down
102 changes: 78 additions & 24 deletions lol2/src/process/command_log/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
use super::*;

impl CommandLog {
pub fn register_completion(&self, index: Index, completion: Completion) {
match completion {
Completion::User(completion) => {
self.user_completions.lock().insert(index, completion);
}
Completion::Kern(completion) => {
self.kern_completions.lock().insert(index, completion);
}
}
}

pub async fn advance_snapshot_index(&self) -> Result<()> {
let cur_snapshot_index = self.snapshot_index.load(Ordering::SeqCst);
let cur_snapshot_index = self.snapshot_pointer.load(Ordering::SeqCst);
let proposed_snapshot_index = self.app.propose_new_snapshot().await?;
if proposed_snapshot_index > cur_snapshot_index {
info!("find a newer proposed snapshot@{proposed_snapshot_index}. will move the snapshot index.");
Expand All @@ -27,6 +38,7 @@ impl CommandLog {
..old_entry
}
};
// TODO wait for follower catch up
self.insert_snapshot(new_snapshot_entry).await?;
}
Ok(())
Expand All @@ -40,49 +52,91 @@ impl CommandLog {

let process_index = cur_user_index + 1;
let e = self.get_entry(process_index).await?;
let command = Command::deserialize(&e.command);

debug!("process user@{process_index}");
match Command::deserialize(&e.command) {
Command::Snapshot { .. } => {
app.install_snapshot(process_index).await?;
}
Command::Req { message } => {
let resp = app.process_write(message, process_index).await?;
if let Some(user_completion) =
self.user_completions.lock().unwrap().remove(&process_index)
{
user_completion.complete_with(resp);
let do_process = match command {
Command::ExecuteRequest { .. } => true,
Command::CompleteRequest { .. } => true,
Command::Snapshot { .. } => true,
_ => false,
};

if do_process {
debug!("process user@{process_index}");
match command {
Command::Snapshot { .. } => {
app.install_snapshot(process_index).await?;
}
Command::ExecuteRequest {
message,
request_id,
} => {
// If the request has never been executed, we should execute it.
if self.response_cache.should_execute(&request_id) {
let resp = app.process_write(message, process_index).await?;
self.response_cache
.insert_response(request_id.clone(), resp);
}

// Leader may have the completion for the request.
if let Some(user_completion) =
self.user_completions.lock().remove(&process_index)
{
if let Some(resp) = self.response_cache.get_response(&request_id) {
user_completion.complete_with(resp);
// After the request is completed, we queue a `CompleteRequest` command for terminating the context.
// This should be queued and replicated to the followers otherwise followers
// will never know the request is completed and the context will never be terminated.
let command = Command::CompleteRequest { request_id };
self.append_new_entry(Command::serialize(command), None)
.await
.ok();
}
}
}
Command::CompleteRequest { request_id } => {
self.response_cache.complete_response(&request_id);
}
_ => {}
}
_ => {}
}

self.user_pointer.fetch_max(process_index, Ordering::SeqCst);
self.user_pointer.store(process_index, Ordering::SeqCst);

Ok(true)
}

pub(crate) async fn advance_kern_process(&self, voter: Voter) -> Result<bool> {
let cur_kern_index = self.kern_pointer.load(Ordering::SeqCst);
if cur_kern_index >= self.commit_index.load(Ordering::SeqCst) {
if cur_kern_index >= self.commit_pointer.load(Ordering::SeqCst) {
return Ok(false);
}

let process_index = cur_kern_index + 1;
let e = self.get_entry(process_index).await?;
let command = Command::deserialize(&e.command);

debug!("process kern@{process_index}");
if std::matches!(Command::deserialize(&e.command), Command::Noop) {
let term = e.this_clock.term;
voter.commit_safe_term(term);
}
let do_process = match command {
Command::Barrier { .. } => true,
Command::ClusterConfiguration { .. } => true,
_ => false,
};

if let Some(kern_completion) = self.kern_completions.lock().unwrap().remove(&process_index)
{
kern_completion.complete();
if do_process {
debug!("process kern@{process_index}");
match command {
Command::Barrier(term) => {
voter.commit_safe_term(term);
}
Command::ClusterConfiguration { .. } => {}
_ => {}
}
if let Some(kern_completion) = self.kern_completions.lock().remove(&process_index) {
kern_completion.complete();
}
}

self.kern_pointer.fetch_max(process_index, Ordering::SeqCst);
self.kern_pointer.store(process_index, Ordering::SeqCst);

Ok(true)
}
Expand Down
Loading
Loading