Skip to content

Commit

Permalink
Added minor fixes including mocking and naming workers
Browse files Browse the repository at this point in the history
  • Loading branch information
geofmureithi committed Mar 26, 2023
1 parent 18d97b6 commit 079ec70
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 2 deletions.
5 changes: 5 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"cSpell.words": [
"apalis"
]
}
1 change: 1 addition & 0 deletions packages/apalis-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ metrics-exporter-prometheus = { version = "0.8", optional = true, default-featur
async-trait = { version = "0.1" }
graceful-shutdown = { version = "0.2", features = ["stream", "tokio-timeout" ] }
uuid = { version = "0.8", features = ["serde", "v4"] }
async-stream = "0.3"

[features]
default = [ "tower-util", "storage"]
Expand Down
46 changes: 46 additions & 0 deletions packages/apalis-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,49 @@ pub mod storage;
pub mod monitor;
/// Represents the actual executor of a [Job].
pub mod worker;

/// apalis mocking utilities
pub mod mock {
use futures::{Stream, StreamExt};
use tokio::sync::mpsc::{Receiver, Sender};
use tower::Service;

use crate::{worker::ready::ReadyWorker, job::Job};

fn build_stream<Req: Send + 'static>(mut rx: Receiver<Req>) -> impl Stream<Item = Req> {
let stream = async_stream::stream! {
while let Some(item) = rx.recv().await {
yield item;
}
};
stream.boxed()
}

/// Useful for mocking a worker usually for testing purposes
///
/// # Example
/// ```rust,no_run
/// #[tokio::test(flavor = "current_thread")]
/// async fn test_worker() {
/// let (handle, mut worker) = mock_worker(job_fn(job2));
/// handle.send(TestJob(Utc::now())).await.unwrap();
/// let res = worker.consume_next().await;
/// }
/// ```
pub fn mock_worker<S, Req>(service: S) -> (Sender<Req>, ReadyWorker<impl Stream<Item = Req>, S>)
where
S: Service<Req>,
Req: Job + Send + 'static,
{
let (tx, rx) = tokio::sync::mpsc::channel(10);
let stream = build_stream(rx);
(
tx,
ReadyWorker {
service,
stream,
name: "test-worker".to_string(),
},
)
}
}
4 changes: 2 additions & 2 deletions packages/apalis-core/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl Monitor {
<Serv as Service<JobRequest<J>>>::Future: std::marker::Send,
{
let shutdown = self.shutdown.clone();
let name = "worker.name.clone()".to_string();
let name = worker.name();
let handle = tokio::spawn(
self.shutdown
.graceful(worker.start(WorkerContext { shutdown }).map(|_| ())),
Expand Down Expand Up @@ -231,7 +231,7 @@ mod tests {
impl Stream for TestSource {
type Item = Result<TestJob, ()>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Pending
}
}
Expand Down
3 changes: 3 additions & 0 deletions packages/apalis-core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ pub trait Worker<Job>: Sized {
/// The source type that this worker will use to receive jobs.
type Source;

/// A worker must be named for identification purposes
fn name(&self) -> String;

/// Starts the worker, taking ownership of `self` and the provided `ctx`.
///
/// This method should run indefinitely or until it returns an error.
Expand Down
4 changes: 4 additions & 0 deletions packages/apalis-core/src/worker/ready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ where
{
type Service = Serv;
type Source = Strm;

fn name(&self) -> String {
self.name.to_string()
}
async fn start(self, ctx: WorkerContext) -> Result<(), super::WorkerError> {
let mut service = self.service;
let mut stream = ctx.shutdown.graceful_stream(self.stream);
Expand Down

0 comments on commit 079ec70

Please sign in to comment.