Skip to content

Commit

Permalink
[config] Adds invoker segment queue limit to configuration file
Browse files Browse the repository at this point in the history
This adds a new configuration key `in-memory-queue-length-limit` under `[worker.invoker]` to enable configuring the segment queue from the configuration file. The default value matches the currently hard coded value.
  • Loading branch information
AhmedSoliman authored and tillrohrmann committed Jun 20, 2024
1 parent 98d92f0 commit c6aea0c
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 1 deletion.
3 changes: 2 additions & 1 deletion crates/invoker-impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,9 @@ where
let shutdown = cancellation_watcher();
tokio::pin!(shutdown);

let in_memory_limit = updateable_options.load().in_memory_queue_length_limit();
// Prepare the segmented queue
let mut segmented_input_queue = SegmentQueue::init(tmp_dir, 1_056_784)
let mut segmented_input_queue = SegmentQueue::init(tmp_dir, in_memory_limit)
.await
.expect("Cannot initialize input spillable queue");

Expand Down
12 changes: 12 additions & 0 deletions crates/types/src/config/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,13 @@ pub struct InvokerOptions {
/// If empty, the system temporary directory will be used instead.
tmp_dir: Option<PathBuf>,

/// # Spill invocations to disk
///
/// Defines the threshold after which queues invocations will spill to disk at
/// the path defined in `tmp-dir`. In other words, this is the number of invocations
/// that can be kept in memory before spilling to disk.
in_memory_queue_length_limit: NonZeroUsize,

/// # Limit number of concurrent invocations from this node
///
/// Number of concurrent invocations that can be processed by the invoker.
Expand All @@ -147,6 +154,10 @@ impl InvokerOptions {
self.concurrent_invocations_limit.map(Into::into)
}

pub fn in_memory_queue_length_limit(&self) -> usize {
self.in_memory_queue_length_limit.into()
}

pub fn message_size_limit(&self) -> Option<usize> {
self.message_size_limit.map(Into::into)
}
Expand All @@ -162,6 +173,7 @@ impl Default for InvokerOptions {
None,
Some(Duration::from_secs(10)),
),
in_memory_queue_length_limit: NonZeroUsize::new(1_056_784).unwrap(),
inactivity_timeout: Duration::from_secs(60).into(),
abort_timeout: Duration::from_secs(60).into(),
message_size_warning: NonZeroUsize::new(10_000_000).unwrap(), // 10MB
Expand Down

0 comments on commit c6aea0c

Please sign in to comment.