Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compaction foundations #109

Merged
merged 10 commits into from
Nov 10, 2021
Prev Previous commit
Next Next commit
Add tests for stream subscription functionality
Added tests for stream::subscriber::spawn_group_fetch.

Added tests for stream::subscriber::try_record_delivery_response.

Added tests for stream::subscriber::ensure_subscriber_record.
  • Loading branch information
thedodd committed Nov 9, 2021
commit 75ea29867f962ff403f9e76a85f1def322f654e5
4 changes: 2 additions & 2 deletions hadron-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions hadron-operator/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 4 additions & 5 deletions hadron-stream/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion hadron-stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ codegen-units = 1
anyhow = "1"
arc-swap = "1"
base64 = "0.13"
bytes = "1"
envy = "0.4"
futures = "0.3"
hadron-core = { path = "../hadron-core" }
Expand Down
54 changes: 54 additions & 0 deletions hadron-stream/src/fixtures.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use anyhow::{Context, Result};
use rand::prelude::*;

use crate::grpc::Event;
use crate::models::stream::Subscription;
use crate::stream::{KEY_STREAM_LAST_WRITTEN_OFFSET, PREFIX_STREAM_EVENT, PREFIX_STREAM_SUBS, PREFIX_STREAM_SUB_OFFSETS, PREFIX_STREAM_TS};
use crate::utils;

/// Setup some stream data in the given DB tree returning the last written offset.
pub async fn setup_stream_data(db: &sled::Tree) -> Result<(i64, u64)> {
let mut batch = sled::Batch::default();
let mut last_offset = 0;
let ts = time::OffsetDateTime::now_utc().unix_timestamp();
for offset in 0..rand::thread_rng().gen_range(50..100) {
let event = Event::new_test(offset, "test", "empty");
let event_bytes = utils::encode_model(&event)?;
batch.insert(&utils::encode_byte_prefix(PREFIX_STREAM_EVENT, offset), event_bytes.as_slice());
last_offset = offset;
}
batch.insert(KEY_STREAM_LAST_WRITTEN_OFFSET, &utils::encode_u64(last_offset));
batch.insert(&utils::encode_byte_prefix_i64(PREFIX_STREAM_TS, ts), &utils::encode_u64(last_offset));
db.apply_batch(batch)
.context("error applying batch to write test data to stream")?;
Ok((ts, last_offset))
}

/// Setup some subscriptions data in the given DB tree returning the set of created subs.
pub async fn setup_subs_data(db: &sled::Tree) -> Result<Vec<(Subscription, u64)>> {
let mut batch = sled::Batch::default();
let mut subs = vec![];
for offset in 0..rand::thread_rng().gen_range(50..100) {
let sub = Subscription { group_name: offset.to_string(), max_batch_size: 50 };
let sub_encoded = utils::encode_model(&sub)?;
let sub_model_key = utils::ivec_from_iter(
PREFIX_STREAM_SUBS
.iter()
.copied()
.chain(sub.group_name.as_bytes().iter().copied()),
);
let sub_offset_key = utils::ivec_from_iter(
PREFIX_STREAM_SUB_OFFSETS
.iter()
.copied()
.chain(sub.group_name.as_bytes().iter().copied()),
);
batch.insert(sub_model_key, sub_encoded.as_slice());
batch.insert(sub_offset_key, &utils::encode_u64(offset));
subs.push((sub, offset));
}
db.apply_batch(batch)
.context("error applying batch to write test data to stream")?;
subs.sort_by(|a, b| a.1.cmp(&b.1));
Ok(subs)
}
24 changes: 7 additions & 17 deletions hadron-stream/src/grpc/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,21 +232,15 @@ pub mod stream_controller_server {
#[doc = " Open a metadata stream."]
async fn metadata(&self, request: tonic::Request<super::MetadataRequest>) -> Result<tonic::Response<Self::MetadataStream>, tonic::Status>;
#[doc = " Open a stream publisher channel."]
async fn stream_publish(
&self, request: tonic::Request<super::StreamPublishRequest>,
) -> Result<tonic::Response<super::StreamPublishResponse>, tonic::Status>;
async fn stream_publish(&self, request: tonic::Request<super::StreamPublishRequest>) -> Result<tonic::Response<super::StreamPublishResponse>, tonic::Status>;
#[doc = "Server streaming response type for the StreamSubscribe method."]
type StreamSubscribeStream: futures_core::Stream<Item = Result<super::StreamSubscribeResponse, tonic::Status>> + Send + 'static;
#[doc = " Open a stream subscriber channel."]
async fn stream_subscribe(
&self, request: tonic::Request<tonic::Streaming<super::StreamSubscribeRequest>>,
) -> Result<tonic::Response<Self::StreamSubscribeStream>, tonic::Status>;
async fn stream_subscribe(&self, request: tonic::Request<tonic::Streaming<super::StreamSubscribeRequest>>) -> Result<tonic::Response<Self::StreamSubscribeStream>, tonic::Status>;
#[doc = "Server streaming response type for the PipelineSubscribe method."]
type PipelineSubscribeStream: futures_core::Stream<Item = Result<super::PipelineSubscribeResponse, tonic::Status>> + Send + 'static;
#[doc = " Open a pipeline subscriber channel."]
async fn pipeline_subscribe(
&self, request: tonic::Request<tonic::Streaming<super::PipelineSubscribeRequest>>,
) -> Result<tonic::Response<Self::PipelineSubscribeStream>, tonic::Status>;
async fn pipeline_subscribe(&self, request: tonic::Request<tonic::Streaming<super::PipelineSubscribeRequest>>) -> Result<tonic::Response<Self::PipelineSubscribeStream>, tonic::Status>;
}
#[doc = " The Hadron stream controller interface."]
#[derive(Debug)]
Expand Down Expand Up @@ -308,8 +302,7 @@ pub mod stream_controller_server {
let inner = inner.0;
let method = MetadataSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc =
tonic::server::Grpc::new(codec).apply_compression_config(accept_compression_encodings, send_compression_encodings);
let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config(accept_compression_encodings, send_compression_encodings);
let res = grpc.server_streaming(method, req).await;
Ok(res)
};
Expand All @@ -334,8 +327,7 @@ pub mod stream_controller_server {
let inner = inner.0;
let method = StreamPublishSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc =
tonic::server::Grpc::new(codec).apply_compression_config(accept_compression_encodings, send_compression_encodings);
let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config(accept_compression_encodings, send_compression_encodings);
let res = grpc.unary(method, req).await;
Ok(res)
};
Expand All @@ -361,8 +353,7 @@ pub mod stream_controller_server {
let inner = inner.0;
let method = StreamSubscribeSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc =
tonic::server::Grpc::new(codec).apply_compression_config(accept_compression_encodings, send_compression_encodings);
let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config(accept_compression_encodings, send_compression_encodings);
let res = grpc.streaming(method, req).await;
Ok(res)
};
Expand All @@ -388,8 +379,7 @@ pub mod stream_controller_server {
let inner = inner.0;
let method = PipelineSubscribeSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc =
tonic::server::Grpc::new(codec).apply_compression_config(accept_compression_encodings, send_compression_encodings);
let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config(accept_compression_encodings, send_compression_encodings);
let res = grpc.streaming(method, req).await;
Ok(res)
};
Expand Down
2 changes: 2 additions & 0 deletions hadron-stream/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ mod config;
mod config_test;
mod database;
mod error;
#[cfg(test)]
mod fixtures;
mod futures;
mod grpc;
mod models;
Expand Down
33 changes: 9 additions & 24 deletions hadron-stream/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ mod publisher;
#[cfg(test)]
mod publisher_test;
mod subscriber;
#[cfg(test)]
mod subscriber_test;

use std::collections::HashMap;
use std::sync::Arc;
Expand Down Expand Up @@ -121,8 +123,7 @@ pub struct StreamCtl {
impl StreamCtl {
/// Create a new instance.
pub async fn new(
config: Arc<Config>, db: Database, shutdown_tx: broadcast::Sender<()>, requests_tx: mpsc::Sender<StreamCtlMsg>,
requests_rx: mpsc::Receiver<StreamCtlMsg>,
config: Arc<Config>, db: Database, shutdown_tx: broadcast::Sender<()>, requests_tx: mpsc::Sender<StreamCtlMsg>, requests_rx: mpsc::Receiver<StreamCtlMsg>,
) -> Result<(Self, watch::Receiver<u64>)> {
// Recover stream state.
let partition = config.partition;
Expand Down Expand Up @@ -217,9 +218,7 @@ impl StreamCtl {
}

/// Handle a request to setup a subscriber channel.
async fn handle_request_subscribe(
&mut self, tx: mpsc::Sender<RpcResult<StreamSubscribeResponse>>, rx: Streaming<StreamSubscribeRequest>, setup: StreamSubscribeSetup,
) {
async fn handle_request_subscribe(&mut self, tx: mpsc::Sender<RpcResult<StreamSubscribeResponse>>, rx: Streaming<StreamSubscribeRequest>, setup: StreamSubscribeSetup) {
let _ = self.subs_tx.send(StreamSubCtlMsg::Request { tx, rx, setup }).await;
}

Expand All @@ -231,19 +230,11 @@ impl StreamCtl {
return;
}
self.is_compacting = true;
let (config, tree, ts, stream_tx, shutdown_tx) = (
self.config.clone(),
self.tree.clone(),
self.earliest_timestamp,
self.requests_tx.clone(),
self.shutdown_tx.clone(),
);
let (config, tree, ts, stream_tx, shutdown_tx) = (self.config.clone(), self.tree.clone(), self.earliest_timestamp, self.requests_tx.clone(), self.shutdown_tx.clone());
let _handle = tokio::spawn(async move {
match compact_stream(config, tree, ts).await {
Ok(earliest_timestamp) => {
let _res = stream_tx
.send(StreamCtlMsg::CompactionFinished { earliest_timestamp })
.await;
let _res = stream_tx.send(StreamCtlMsg::CompactionFinished { earliest_timestamp }).await;
}
Err(err) => {
tracing::error!(error = ?err, "error during compaction routine, shutting down");
Expand Down Expand Up @@ -297,10 +288,7 @@ async fn compact_stream(config: Arc<Config>, tree: Tree, earliest_timestamp: Opt
let ttl = match &config.retention_policy.strategy {
StreamRetentionPolicy::Retain => return Ok(earliest_timestamp),
StreamRetentionPolicy::Time => {
let ttl = config
.retention_policy
.retention_seconds
.unwrap_or_else(StreamRetentionSpec::retention_seconds_default);
let ttl = config.retention_policy.retention_seconds.unwrap_or_else(StreamRetentionSpec::retention_seconds_default);
time::Duration::seconds(i64::try_from(ttl).unwrap_or(i64::MAX))
}
};
Expand Down Expand Up @@ -352,8 +340,7 @@ async fn compact_stream(config: Arc<Config>, tree: Tree, earliest_timestamp: Opt

// Apply the batch.
tracing::debug!("compacting timestamp and event records up through offset {}", last_ts_offset);
tree.apply_batch(batch)
.context("error applying compaction batch to stream tree")?;
tree.apply_batch(batch).context("error applying compaction batch to stream tree")?;
tree.flush().context(ERR_DB_FLUSH)?;
Ok(next_earliest_timestamp)
})
Expand All @@ -367,9 +354,7 @@ async fn compact_stream(config: Arc<Config>, tree: Tree, earliest_timestamp: Opt
async fn recover_stream_state(tree: Tree) -> Result<StreamRecoveryState> {
let val = Database::spawn_blocking(move || -> Result<StreamRecoveryState> {
// Fetch next offset info.
let offset_opt = tree
.get(KEY_STREAM_LAST_WRITTEN_OFFSET)
.context("error fetching next offset key during recovery")?;
let offset_opt = tree.get(KEY_STREAM_LAST_WRITTEN_OFFSET).context("error fetching next offset key during recovery")?;
let last_written_offset = offset_opt
.map(|val| utils::decode_u64(&val).context("error decoding offset value from storage"))
.transpose()?
Expand Down
Loading