-
Notifications
You must be signed in to change notification settings - Fork 113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Split node initialization and start #789
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,12 +33,7 @@ use crate::node; | |
mod channel; | ||
pub use channel::{Handle, HandleDirection}; | ||
|
||
struct Node { | ||
#[allow(dead_code)] | ||
reference: NodeId, | ||
|
||
// An option type allows the instance to be swapped out during `runtime.stop` | ||
instance: Option<Box<dyn crate::node::Node>>, | ||
struct NodeInfo { | ||
/// The Label associated with this node. | ||
/// | ||
/// This is set at node creation time and does not change after that. | ||
|
@@ -83,7 +78,11 @@ pub struct Runtime { | |
|
||
channels: channel::ChannelMapping, | ||
|
||
nodes: RwLock<HashMap<NodeId, Node>>, | ||
/// Runtime-specific state for each node instance. | ||
node_infos: RwLock<HashMap<NodeId, NodeInfo>>, | ||
|
||
/// Currently running node instances, so that [`Runtime::stop`] can terminate all of them. | ||
node_instances: Mutex<HashMap<NodeId, Box<dyn crate::node::Node>>>, | ||
next_node_id: AtomicU64, | ||
} | ||
|
||
|
@@ -94,7 +93,9 @@ impl Runtime { | |
configuration, | ||
terminating: AtomicBool::new(false), | ||
channels: channel::ChannelMapping::new(), | ||
nodes: RwLock::new(HashMap::new()), | ||
|
||
node_infos: RwLock::new(HashMap::new()), | ||
node_instances: Mutex::new(HashMap::new()), | ||
|
||
// NodeId(0) reserved for RUNTIME_NODE_ID. | ||
next_node_id: AtomicU64::new(1), | ||
|
@@ -143,18 +144,9 @@ impl Runtime { | |
/// Thread safe method for signaling termination to a [`Runtime`] and waiting for its node | ||
/// threads to terminate. | ||
pub fn stop(&self) { | ||
// Take the list of nodes out of the runtime instance, and set the terminating flag; this | ||
// will prevent additional nodes from starting to wait again, because `wait_on_channels` | ||
// will return immediately with `OakStatus::ErrTerminated`. | ||
let instances: Vec<_> = { | ||
let mut nodes = self.nodes.write().unwrap(); | ||
self.terminating.store(true, SeqCst); | ||
|
||
nodes | ||
.values_mut() | ||
.filter_map(|n| n.instance.take()) | ||
.collect() | ||
}; | ||
// Set the terminating flag; this will prevent additional nodes from starting to wait again, | ||
// because `wait_on_channels` will return immediately with `OakStatus::ErrTerminated`. | ||
self.terminating.store(true, SeqCst); | ||
|
||
// Unpark any threads that are blocked waiting on any channels. | ||
for channel in self | ||
|
@@ -181,7 +173,12 @@ impl Runtime { | |
// for any additional work to be finished here. This may take an arbitrary amount of time, | ||
// depending on the work that the node thread has to perform, but at least we know that the | ||
// it will not be able to enter again in a blocking state. | ||
for mut instance in instances { | ||
for instance in self | ||
.node_instances | ||
.lock() | ||
.expect("could not acquire lock on node_instances") | ||
.values_mut() | ||
{ | ||
instance.stop(); | ||
} | ||
} | ||
|
@@ -196,12 +193,12 @@ impl Runtime { | |
return; | ||
} | ||
|
||
let nodes = self.nodes.read().unwrap(); | ||
let node = nodes | ||
let node_infos = self.node_infos.read().unwrap(); | ||
let node_info = node_infos | ||
.get(&node_id) | ||
.expect("Invalid node_id passed into track_handles_in_node!"); | ||
|
||
let mut tracked_handles = node.handles.lock().unwrap(); | ||
let mut tracked_handles = node_info.handles.lock().unwrap(); | ||
for handle in handles { | ||
tracked_handles.insert(handle); | ||
} | ||
|
@@ -215,12 +212,12 @@ impl Runtime { | |
return Ok(()); | ||
} | ||
|
||
let nodes = self.nodes.read().unwrap(); | ||
let node_infos = self.node_infos.read().unwrap(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (pre-existing) This lock scope extends to the end of the method, which is needed because of the use of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What would we provide as a function in that case? If you are talking just about extracting There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking about something analogous to the fn with_node_info<U, F: FnOnce(&NodeInfo) -> U>(&self, node_id: NodeId, f: F) -> U {
let node_infos = self.node_infos.read().unwrap();
let node_info = node_infos
.get(&node_id)
.expect("Invalid node_id passed into with_node_info");
f(node_info)
}
/// Validate the [`NodeId`] has access to [`Handle`], returning `Err(OakStatus::ErrBadHandle)`
/// if access is not allowed.
fn validate_handle_access(&self, node_id: NodeId, handle: Handle) -> Result<(), OakStatus> {
// Allow RUNTIME_NODE_ID access to all handles.
if node_id == RUNTIME_NODE_ID {
return Ok(());
}
self.with_node_info(node_id, |node_info| {
let tracked_handles = node_info.handles.lock().unwrap();
// Check the handle exists in the handles associated with a node, otherwise
// return ErrBadHandle.
if tracked_handles.contains(&handle) {
Ok(())
} else {
error!(
"validate_handle_access: handle {:?} not found in node {:?}",
handle, node_id
);
Err(OakStatus::ErrBadHandle)
}
})
} Probably for another day though (maybe under #779) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems like it is just re-implementing lexical scoping? I agree with the sentiment, but I think we should instead be more conscious of how we scope variables (especially lock guards), possibly by adding explicit anonymous blocks to delimit scopes (with appropriate comments). But happy to keep discussing separately. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @daviddrysdale what do you think about going even further and turning even e.g. I'm also interested if scoping locks under lifetimes could be more ergonomic, something like: pub fn with_all_channels<'a>(
&'a self,
) -> std::sync::RwLockReadGuard<'a, HashMap<ChannelId, Channel>> {
self.channels.read().unwrap()
}
// this would need something like MappedRwLockReadGuard from lock_api: https://docs.rs/lock_api/0.3.3/lock_api/struct.MappedRwLockReadGuard.html
pub fn with_channel<'a>(
&'a self,
channel_id: ChannelId,
) -> MappedRwLockReadGuard<'a, Result<Channel, OakStatus>> {
self.channels.read().unwrap().map(|channels|{
channels.get(&channel_id).ok_or(OakStatus::ErrBadHandle)
}
} Although this would reintroduce lock scopes being overly large if not careful.(edit: lexical scoping) |
||
// Lookup the node_id in the runtime's nodes hashmap. | ||
let node = nodes | ||
let node_info = node_infos | ||
.get(&node_id) | ||
.expect("Invalid node_id passed into validate_handle_access!"); | ||
let tracked_handles = node.handles.lock().unwrap(); | ||
let tracked_handles = node_info.handles.lock().unwrap(); | ||
|
||
// Check the handle exists in the handles associated with a node, otherwise | ||
// return ErrBadHandle. | ||
|
@@ -246,12 +243,12 @@ impl Runtime { | |
return Ok(()); | ||
} | ||
|
||
let nodes = self.nodes.read().unwrap(); | ||
let node = nodes | ||
let node_infos = self.node_infos.read().unwrap(); | ||
let node_info = node_infos | ||
.get(&node_id) | ||
.expect("Invalid node_id passed into filter_optional_handles!"); | ||
|
||
let tracked_handles = node.handles.lock().unwrap(); | ||
let tracked_handles = node_info.handles.lock().unwrap(); | ||
for handle in handles { | ||
// Check handle is accessible by the node. | ||
if !tracked_handles.contains(&handle) { | ||
|
@@ -558,9 +555,11 @@ impl Runtime { | |
|
||
if node_id != RUNTIME_NODE_ID { | ||
// Remove handle from the nodes available handles | ||
let nodes = self.nodes.read().unwrap(); | ||
let node = nodes.get(&node_id).expect("channel_close: No such node_id"); | ||
let mut handles = node.handles.lock().unwrap(); | ||
let node_infos = self.node_infos.read().unwrap(); | ||
let node_info = node_infos | ||
.get(&node_id) | ||
.expect("channel_close: No such node_id"); | ||
let mut handles = node_info.handles.lock().unwrap(); | ||
handles.remove(&reference); | ||
} | ||
|
||
|
@@ -582,11 +581,11 @@ impl Runtime { | |
|
||
// Close any remaining handles | ||
let remaining_handles: Vec<_> = { | ||
let nodes = self.nodes.read().unwrap(); | ||
let node = nodes | ||
let node_infos = self.node_infos.read().unwrap(); | ||
let node_info = node_infos | ||
.get(&node_id) | ||
.expect("remove_node_id: No such node_id"); | ||
let handles = node.handles.lock().unwrap(); | ||
let handles = node_info.handles.lock().unwrap(); | ||
handles.iter().copied().collect() | ||
}; | ||
|
||
|
@@ -601,17 +600,28 @@ impl Runtime { | |
} | ||
} | ||
|
||
let mut nodes = self.nodes.write().unwrap(); | ||
nodes | ||
let mut node_infos = self.node_infos.write().unwrap(); | ||
node_infos | ||
.remove(&node_id) | ||
.expect("remove_node_id: Node didn't exist!"); | ||
} | ||
|
||
/// Add an [`NodeId`] [`Node`] pair to the [`Runtime`]. This method temporarily holds the node | ||
/// write lock. | ||
fn add_running_node(&self, reference: NodeId, node: Node) { | ||
let mut nodes = self.nodes.write().unwrap(); | ||
nodes.insert(reference, node); | ||
/// Add an [`NodeId`] [`NodeInfo`] pair to the [`Runtime`]. This method temporarily holds the | ||
/// [`Runtime::node_infos`] write lock. | ||
fn add_node_info(&self, reference: NodeId, node_info: NodeInfo) { | ||
self.node_infos | ||
.write() | ||
.expect("could not acquire lock on node_infos") | ||
.insert(reference, node_info); | ||
} | ||
|
||
/// Add an [`NodeId`] [`crate::node::Node`] pair to the [`Runtime`]. This method temporarily | ||
/// holds the [`Runtime::node_instances`] lock. | ||
fn add_node_instance(&self, node_reference: NodeId, node_instance: Box<dyn crate::node::Node>) { | ||
self.node_instances | ||
.lock() | ||
.expect("could not acquire lock on node_instances") | ||
.insert(node_reference, node_instance); | ||
} | ||
|
||
/// Thread safe method that attempts to create a node within the [`Runtime`] corresponding to a | ||
|
@@ -645,7 +655,7 @@ impl Runtime { | |
|
||
let reader = self.channels.duplicate_reference(reader)?; | ||
|
||
let mut instance = self | ||
let instance = self | ||
.configuration | ||
.nodes | ||
.get(module_name) | ||
|
@@ -661,22 +671,103 @@ impl Runtime { | |
) | ||
})?; | ||
|
||
// Try starting the node instance first. If this fails, then directly return the error to | ||
// the caller. | ||
instance.start()?; | ||
|
||
// If the node was successfully started, insert it in the list of currently running | ||
// nodes. | ||
self.add_running_node( | ||
reference, | ||
Node { | ||
reference, | ||
instance: Some(instance), | ||
self.node_start_instance(reference, instance, label, vec![reader])?; | ||
|
||
Ok(()) | ||
} | ||
|
||
/// Starts a newly created node instance, by first initializing the necessary [`NodeInfo`] data | ||
/// structure in [`Runtime`], allowing it to access the provided [`Handle`]s, then calling | ||
/// [`Node::start`] on the instance, and finally storing a reference to the running instance | ||
/// in [`Runtime::node_instances`] so that it can later be terminated. | ||
fn node_start_instance<I>( | ||
&self, | ||
node_reference: NodeId, | ||
mut node_instance: Box<dyn crate::node::Node>, | ||
label: &oak_abi::label::Label, | ||
initial_handles: I, | ||
) -> Result<(), OakStatus> | ||
where | ||
I: IntoIterator<Item = Handle>, | ||
{ | ||
// First create the necessary info data structure in the Runtime, otherwise calls that the | ||
// node makes to the Runtime during `Node::start` (synchronously or asynchronously) may | ||
// fail. | ||
self.add_node_info( | ||
node_reference, | ||
NodeInfo { | ||
label: label.clone(), | ||
handles: Mutex::new(vec![reader].into_iter().collect()), | ||
handles: Mutex::new(HashSet::new()), | ||
}, | ||
); | ||
|
||
Ok(()) | ||
// Make sure that the provided initial handles are tracked in the newly created node from | ||
// the start. | ||
self.track_handles_in_node(node_reference, initial_handles); | ||
|
||
// Try to start the node instance, and store the result in a temporary variable to be | ||
// returned later. | ||
// | ||
// In order for this to work correctly, the `NodeInfo` entry must already exist in | ||
// `Runtime`, which is why we could not start this instance before the call to | ||
// `Runtime::add_node_info` above. | ||
// | ||
// On the other hand, we also cannot start it after the call to `Runtime::add_node_instance` | ||
// below, because that takes ownership of the instance itself. | ||
// | ||
// We also want no locks to be held while the instance is starting. | ||
let result = node_instance.start(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The general advice for writing clean code is that concurrency is a separate concern and usually deserves to be handle in a separate class (a dedicated data structure). I am not sure if this advice is applicable to Rust code, but in any other programming language I've used, I'd write a wrapper class to hold There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The main point of this PR is so that the node instance can be started so that the relevant data structures ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. Let's discuss it separately. What I am thinking of is just a bit of refactoring that can be done separately, after you are done with this PR. And I think it might address some of the comments from @daviddrysdale too (but I am not a true Rustacean yet, so I could be wrong). |
||
|
||
// Regardless of the result of `Node::start`, insert the now running instance to the list of | ||
// running instances (by moving it), so that `Node::stop` will be called on it eventually. | ||
self.add_node_instance(node_reference, node_instance); | ||
|
||
// Return the result of `Node::start`. | ||
result | ||
} | ||
} | ||
|
||
#[test] | ||
fn create_channel() { | ||
let configuration = crate::runtime::Configuration { | ||
nodes: HashMap::new(), | ||
entry_module: "test_module".to_string(), | ||
entrypoint: "test_function".to_string(), | ||
}; | ||
let runtime = Arc::new(crate::runtime::Runtime::create(configuration)); | ||
|
||
// Define a node implementation for test that exercises parts of the [`Runtime`] ABI. | ||
struct TestNode { | ||
node_id: NodeId, | ||
runtime: Arc<Runtime>, | ||
}; | ||
|
||
impl crate::node::Node for TestNode { | ||
fn start(&mut self) -> Result<(), OakStatus> { | ||
// Attempt to perform an operation that requires the [`Runtime`] to have created an | ||
// appropriate [`NodeInfo`] instanace. | ||
let (_write_handle, _read_handle) = self | ||
.runtime | ||
.new_channel(self.node_id, &oak_abi::label::Label::public_trusted()); | ||
Ok(()) | ||
} | ||
fn stop(&mut self) {} | ||
} | ||
|
||
// Manually allocate a new [`NodeId`]. | ||
let node_reference = runtime.new_node_reference(); | ||
|
||
// Create an instance of [`TestNode`] without starting it. | ||
let node_instance = TestNode { | ||
node_id: node_reference, | ||
runtime: runtime.clone(), | ||
}; | ||
|
||
let result = runtime.node_start_instance( | ||
node_reference, | ||
Box::new(node_instance), | ||
&oak_abi::label::Label::public_trusted(), | ||
vec![], | ||
); | ||
assert_eq!(Ok(()), result); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two fields look like they should be covered by the same
Mutex
.(In C++ this would probably be a base class that holds data (≈
NodeInfo
), but which still has pure virtual functions to be overridden (≈node::Node
). Not completely sure what the Rust equivalent would be – maybe to extend thenode::Node
trait to cover behaviour encompassed by what's inNodeInfo
, and then each implementor of the trait has-aNodeInfo
for that.)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure I see the problem with the current split, do you have an example of something that may go wrong in this case? (perhaps I'll even add a test to make sure it does not happen, if it's doable)
Anyways, I suspect what you are suggesting is actually similar to the previous version, in which
Node
(nowNodeInfo
) had aninstance
field itself, but to me, separating them this way makesRuntime::stop
cleaner (assuming it's not incorrect).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having small granular locks for things that may need to be updated in a synchronized fashion implies a locking hierarchy, and there's nothing here that documents such a locking hierarchy.
The obvious example is if there's (say) a current chunk of code that adds a new Node, and which does so by first adding to
node_infos
then adds tonode_instances
. If someone later adds different code that happens to add things tonode_instances
beforenode_infos
, then there's a deadlock.If all related state – here, everything that's indexed by
NodeId
– is under the same lock, then there's no trap for unwary maintainers in the future.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But the point of this PR is exactly to decouple those two fields, so that they can be updated independently (no need to ever hold a lock over both). I have added a helper method
add_node_instance
to make things clearer now, and added more comments. PTAL.