Skip to content

Commit

Permalink
instance pool resource management (#1747)
Browse files Browse the repository at this point in the history
This introduces a resource pool component for wasm instances, which allows oversubscription and but prevents deadlocks by "locking" the pool to the first engine that brings the number of available instances below the number required to execute a single message. When the pool is locked to a specific engine, only that engine will be able to acquire more instances to ensure it can completely execute its message.

Future work:

- Memory limits. Ideally we'd apply the same concept to memory as the current approach could lead to memory exhaustion if the user isn't careful and allows too much parallelism without enough memory to back it up.
- Optimal "locking" strategy. At the moment, we "lock" the pool to an engine when we drop below the number of instances required to execute a single message. However, we don't take the number of instances _already_ acquired by the engine in question when doing so. We _could_ do this, but it requires significantly more tracking.
- Fewer places where we can panic. The current code will panic (safely) if misused. I'd prefer to return results in some of these cases, but threading the errors through the call manager got a bit tricky.
- Fairness. Right now, there is no fairness. If this becomes a problem, we try switching to a fair condition variable (or implement our own fairness). But we should be "ok" for now.

Co-authored-by: Steven Allen <steven@stebalien.com>
  • Loading branch information
vyzo and Stebalien committed Jun 28, 2023
1 parent 7dc747e commit b77d152
Show file tree
Hide file tree
Showing 3 changed files with 301 additions and 55 deletions.
75 changes: 75 additions & 0 deletions fvm/src/engine/concurrency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2021-2023 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT
use std::sync::{Condvar, Mutex};

/// An engine concurrency manages the concurrency available for a single engine. It's basically a
/// semaphore that also assigns IDs to new engines.
pub(super) struct EngineConcurrency {
inner: Mutex<EngineConcurrencyInner>,
condv: Condvar,
}

struct EngineConcurrencyInner {
next_id: u64,
limit: u32,
}

impl EngineConcurrency {
pub fn new(concurrency: u32) -> Self {
EngineConcurrency {
inner: Mutex::new(EngineConcurrencyInner {
next_id: 0,
limit: concurrency,
}),
condv: Condvar::new(),
}
}

/// Acquire a new engine (well, an engine ID). This function blocks until we're below the
/// maximum engine concurrency limit.
pub fn acquire(&self) -> u64 {
let mut guard = self
.condv
.wait_while(self.inner.lock().unwrap(), |inner| inner.limit == 0)
.unwrap();
let id = guard.next_id;

guard.limit -= 1;
guard.next_id += 1;

id
}

/// Release the engine. After this is called, the caller should not allocate any more instances
/// or continue to use their engine ID.
pub fn release(&self) {
let mut guard = self.inner.lock().unwrap();
guard.limit += 1;
self.condv.notify_one();
}
}

#[test]
fn test_engine_concurrency() {
let concurrency = EngineConcurrency::new(2);
std::thread::scope(|scope| {
assert_eq!(concurrency.inner.lock().unwrap().limit, 2);
assert_eq!(concurrency.acquire(), 0);
assert_eq!(concurrency.inner.lock().unwrap().limit, 1);
assert_eq!(concurrency.acquire(), 1);
assert_eq!(concurrency.inner.lock().unwrap().limit, 0);
let threads: Vec<_> = std::iter::repeat_with(|| scope.spawn(|| concurrency.acquire()))
.take(10)
.collect();
assert_eq!(concurrency.inner.lock().unwrap().limit, 0);
for _ in &threads {
concurrency.release();
}
let mut ids: Vec<_> = threads.into_iter().map(|t| t.join().unwrap()).collect();
ids.sort();
assert_eq!(ids, (2..12).collect::<Vec<_>>());
assert_eq!(concurrency.inner.lock().unwrap().limit, 0);
concurrency.release();
assert_eq!(concurrency.inner.lock().unwrap().limit, 1);
});
}
124 changes: 124 additions & 0 deletions fvm/src/engine/instance_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2021-2023 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT
use std::sync::{Condvar, Mutex};

/// An instance pool manages the available pool of engine instances.
///
/// - When there are enough instances to execute an entire message (a full call stack), requests to
/// take an instance will succeed immediately.
/// - When the number of available instances drops below the number required to execute a single
/// message, the executor that took that last instance will get an exclusive "lock" on the
/// instance pool. This lock will be released when enough instances become available to execute an
/// entire message.
pub(super) struct InstancePool {
inner: Mutex<InstancePoolInner>,
condv: Condvar,
}

struct InstancePoolInner {
/// The number of instances available in the pool.
available: u32,
/// The maximum number of instances that can be in-use by any given engine. If available drops
/// to this limit, we'll "lock" the pool to the current executor and refuse to lend out any more
/// instances to any _other_ engine until we go back above this number.
per_engine_limit: u32,
/// The ID of the engine currently "locking" the instance pool.
locked: Option<u64>,
}

impl InstancePool {
/// Create a new instance pool.
pub fn new(available: u32, per_engine_limit: u32) -> InstancePool {
InstancePool {
inner: Mutex::new(InstancePoolInner {
available,
per_engine_limit,
locked: None,
}),
condv: Condvar::new(),
}
}

/// Put back an instance into the pool, signaling any engines waiting on an instance if
/// applicable.
pub fn put(&self) {
let mut guard = self.inner.lock().unwrap();
guard.available += 1;

// If we're above the limit, unlock and notify one.
if guard.available >= guard.per_engine_limit {
guard.locked = None;
self.condv.notify_one();
}
}

/// Take an instance out of the instance pool (where `id` is the engine's ID). This function
/// will block if the instance pool is locked to another engine.
///
/// Panics if any engine tries to allocate more than the configured `per_engine_limit`.
pub fn take(&self, id: u64) {
let mut guard = self.inner.lock().unwrap();

// Wait until we have an instance available. Either:
// 1. We own the executor lock.
// 2. We _could_ own the executor lock.
guard = self
.condv
.wait_while(guard, |p| p.locked.unwrap_or(id) != id)
.unwrap();

// We either have, or could, lock the executor. So there should be instances available.
assert!(
guard.available > 0,
"no instances available: we must have exceeded our stack depth"
);

// Take our instance and lock the executor if we're below the reservation limit.
guard.available -= 1;
if guard.available < guard.per_engine_limit {
guard.locked = Some(id);
}
}
}

#[test]
fn test_instance_pool() {
let pool = InstancePool::new(12, 10);
std::thread::scope(|scope| {
pool.take(1);
pool.take(2);
assert_eq!(pool.inner.lock().unwrap().locked, None);
pool.take(1);
assert_eq!(pool.inner.lock().unwrap().locked, Some(1));
let t1 = scope.spawn(|| {
// Take 9 more for engine 2.
for _ in 0..9 {
pool.take(2)
}
});
// give the other thread a chance...
std::thread::sleep(std::time::Duration::from_millis(10));
// Take the remaining 8 for engine 1 (we're allowed 10).
// If this is working, we should make progress.
for _ in 0..8 {
pool.take(1);
}
assert_eq!(pool.inner.lock().unwrap().available, 1);
assert_eq!(pool.inner.lock().unwrap().locked, Some(1));
// Put them all back for engine 1.
for _ in 0..10 {
pool.put();
}
t1.join().unwrap();
assert_eq!(pool.inner.lock().unwrap().locked, Some(2));

// We should have two available.
assert_eq!(pool.inner.lock().unwrap().available, 2);

// Put back enough to unlock.
for _ in 0..8 {
pool.put();
}
assert_eq!(pool.inner.lock().unwrap().locked, None);
});
}
Loading

0 comments on commit b77d152

Please sign in to comment.