Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split node initialization and start #789

Merged
merged 1 commit into from
Mar 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions oak/server/rust/oak_runtime/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ mod wasm;
/// Nodes must not do any work until the [`Node::start`] method is invoked.
pub trait Node: Send + Sync {
/// Starts executing the node.
///
/// This method will be invoked in a blocking fashion by the [`Runtime`], therefore node
/// implementations should make sure that they create separate background threads to execute
/// actual work, and wait on them to terminate when [`Node::stop`] is called.
fn start(&mut self) -> Result<(), OakStatus>;

/// Stops executing the node.
Expand Down
205 changes: 148 additions & 57 deletions oak/server/rust/oak_runtime/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,7 @@ use crate::node;
mod channel;
pub use channel::{Handle, HandleDirection};

struct Node {
#[allow(dead_code)]
reference: NodeId,

// An option type allows the instance to be swapped out during `runtime.stop`
instance: Option<Box<dyn crate::node::Node>>,
struct NodeInfo {
/// The Label associated with this node.
///
/// This is set at node creation time and does not change after that.
Expand Down Expand Up @@ -83,7 +78,11 @@ pub struct Runtime {

channels: channel::ChannelMapping,

nodes: RwLock<HashMap<NodeId, Node>>,
/// Runtime-specific state for each node instance.
node_infos: RwLock<HashMap<NodeId, NodeInfo>>,

/// Currently running node instances, so that [`Runtime::stop`] can terminate all of them.
node_instances: Mutex<HashMap<NodeId, Box<dyn crate::node::Node>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two fields look like they should be covered by the same Mutex.

(In C++ this would probably be a base class that holds data (≈ NodeInfo), but which still has pure virtual functions to be overridden (≈ node::Node). Not completely sure what the Rust equivalent would be – maybe to extend the node::Node trait to cover behaviour encompassed by what's in NodeInfo, and then each implementor of the trait has-a NodeInfo for that.)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I see the problem with the current split, do you have an example of something that may go wrong in this case? (perhaps I'll even add a test to make sure it does not happen, if it's doable)

Anyways, I suspect what you are suggesting is actually similar to the previous version, in which Node (now NodeInfo) had an instance field itself, but to me, separating them this way makes Runtime::stop cleaner (assuming it's not incorrect).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having small granular locks for things that may need to be updated in a synchronized fashion implies a locking hierarchy, and there's nothing here that documents such a locking hierarchy.

The obvious example is if there's (say) a current chunk of code that adds a new Node, and which does so by first adding to node_infos then adds to node_instances. If someone later adds different code that happens to add things to node_instances before node_infos, then there's a deadlock.

If all related state – here, everything that's indexed by NodeId – is under the same lock, then there's no trap for unwary maintainers in the future.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the point of this PR is exactly to decouple those two fields, so that they can be updated independently (no need to ever hold a lock over both). I have added a helper method add_node_instance to make things clearer now, and added more comments. PTAL.

next_node_id: AtomicU64,
}

Expand All @@ -94,7 +93,9 @@ impl Runtime {
configuration,
terminating: AtomicBool::new(false),
channels: channel::ChannelMapping::new(),
nodes: RwLock::new(HashMap::new()),

node_infos: RwLock::new(HashMap::new()),
node_instances: Mutex::new(HashMap::new()),

// NodeId(0) reserved for RUNTIME_NODE_ID.
next_node_id: AtomicU64::new(1),
Expand Down Expand Up @@ -143,18 +144,9 @@ impl Runtime {
/// Thread safe method for signaling termination to a [`Runtime`] and waiting for its node
/// threads to terminate.
pub fn stop(&self) {
// Take the list of nodes out of the runtime instance, and set the terminating flag; this
// will prevent additional nodes from starting to wait again, because `wait_on_channels`
// will return immediately with `OakStatus::ErrTerminated`.
let instances: Vec<_> = {
let mut nodes = self.nodes.write().unwrap();
self.terminating.store(true, SeqCst);

nodes
.values_mut()
.filter_map(|n| n.instance.take())
.collect()
};
// Set the terminating flag; this will prevent additional nodes from starting to wait again,
// because `wait_on_channels` will return immediately with `OakStatus::ErrTerminated`.
self.terminating.store(true, SeqCst);

// Unpark any threads that are blocked waiting on any channels.
for channel in self
Expand All @@ -181,7 +173,12 @@ impl Runtime {
// for any additional work to be finished here. This may take an arbitrary amount of time,
// depending on the work that the node thread has to perform, but at least we know that the
// it will not be able to enter again in a blocking state.
for mut instance in instances {
for instance in self
.node_instances
.lock()
.expect("could not acquire lock on node_instances")
.values_mut()
{
instance.stop();
}
}
Expand All @@ -196,12 +193,12 @@ impl Runtime {
return;
}

let nodes = self.nodes.read().unwrap();
let node = nodes
let node_infos = self.node_infos.read().unwrap();
let node_info = node_infos
.get(&node_id)
.expect("Invalid node_id passed into track_handles_in_node!");

let mut tracked_handles = node.handles.lock().unwrap();
let mut tracked_handles = node_info.handles.lock().unwrap();
for handle in handles {
tracked_handles.insert(handle);
}
Expand All @@ -215,12 +212,12 @@ impl Runtime {
return Ok(());
}

let nodes = self.nodes.read().unwrap();
let node_infos = self.node_infos.read().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(pre-existing) This lock scope extends to the end of the method, which is needed because of the use of the NodeInfo that's owned by the node_infos field. I wonder if a with_node_info(fn) accessor might make this pattern clearer/safer?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would we provide as a function in that case? If you are talking just about extracting tracked_handles, I don't think that would improve things, we would still either return a reference, or have to clone it. I am probably missing your point though, could you clarify?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about something analogous to the ChannelMapping::with_channel method; the function would just be the code in the rest of the block, but with the scope of lock & reference ownership clearer:

    fn with_node_info<U, F: FnOnce(&NodeInfo) -> U>(&self, node_id: NodeId, f: F) -> U {
        let node_infos = self.node_infos.read().unwrap();
        let node_info = node_infos
            .get(&node_id)
            .expect("Invalid node_id passed into with_node_info");
        f(node_info)
    }

    /// Validate the [`NodeId`] has access to [`Handle`], returning `Err(OakStatus::ErrBadHandle)`
    /// if access is not allowed.
    fn validate_handle_access(&self, node_id: NodeId, handle: Handle) -> Result<(), OakStatus> {
        // Allow RUNTIME_NODE_ID access to all handles.
        if node_id == RUNTIME_NODE_ID {
            return Ok(());
        }

        self.with_node_info(node_id, |node_info| {
            let tracked_handles = node_info.handles.lock().unwrap();

            // Check the handle exists in the handles associated with a node, otherwise
            // return ErrBadHandle.
            if tracked_handles.contains(&handle) {
                Ok(())
            } else {
                error!(
                    "validate_handle_access: handle {:?} not found in node {:?}",
                    handle, node_id
                );
                Err(OakStatus::ErrBadHandle)
            }
        })
    }

Probably for another day though (maybe under #779)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it is just re-implementing lexical scoping? I agree with the sentiment, but I think we should instead be more conscious of how we scope variables (especially lock guards), possibly by adding explicit anonymous blocks to delimit scopes (with appropriate comments). But happy to keep discussing separately.

Copy link
Contributor

@blaxill blaxill Mar 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@daviddrysdale what do you think about going even further and turning even e.g. let tracked_handles = node_info.handles.lock().unwrap(); into closure passing style (e.g. node_info.with_handles(|handles|{handles.containts(handle)})). Some consistency around locking patterns might be nice, but I'm not sure if its worth going this far?

I'm also interested if scoping locks under lifetimes could be more ergonomic, something like:

    pub fn with_all_channels<'a>(
        &'a self,
    ) -> std::sync::RwLockReadGuard<'a, HashMap<ChannelId, Channel>> {
        self.channels.read().unwrap()
    }

    // this would need something like MappedRwLockReadGuard from lock_api: https://docs.rs/lock_api/0.3.3/lock_api/struct.MappedRwLockReadGuard.html
    pub fn with_channel<'a>(
        &'a self,
        channel_id: ChannelId,
    ) -> MappedRwLockReadGuard<'a, Result<Channel, OakStatus>> {
        self.channels.read().unwrap().map(|channels|{
            channels.get(&channel_id).ok_or(OakStatus::ErrBadHandle)
        }
    }

Although this would reintroduce lock scopes being overly large if not careful.(edit: lexical scoping)

// Lookup the node_id in the runtime's nodes hashmap.
let node = nodes
let node_info = node_infos
.get(&node_id)
.expect("Invalid node_id passed into validate_handle_access!");
let tracked_handles = node.handles.lock().unwrap();
let tracked_handles = node_info.handles.lock().unwrap();

// Check the handle exists in the handles associated with a node, otherwise
// return ErrBadHandle.
Expand All @@ -246,12 +243,12 @@ impl Runtime {
return Ok(());
}

let nodes = self.nodes.read().unwrap();
let node = nodes
let node_infos = self.node_infos.read().unwrap();
let node_info = node_infos
.get(&node_id)
.expect("Invalid node_id passed into filter_optional_handles!");

let tracked_handles = node.handles.lock().unwrap();
let tracked_handles = node_info.handles.lock().unwrap();
for handle in handles {
// Check handle is accessible by the node.
if !tracked_handles.contains(&handle) {
Expand Down Expand Up @@ -558,9 +555,11 @@ impl Runtime {

if node_id != RUNTIME_NODE_ID {
// Remove handle from the nodes available handles
let nodes = self.nodes.read().unwrap();
let node = nodes.get(&node_id).expect("channel_close: No such node_id");
let mut handles = node.handles.lock().unwrap();
let node_infos = self.node_infos.read().unwrap();
let node_info = node_infos
.get(&node_id)
.expect("channel_close: No such node_id");
let mut handles = node_info.handles.lock().unwrap();
handles.remove(&reference);
}

Expand All @@ -582,11 +581,11 @@ impl Runtime {

// Close any remaining handles
let remaining_handles: Vec<_> = {
let nodes = self.nodes.read().unwrap();
let node = nodes
let node_infos = self.node_infos.read().unwrap();
let node_info = node_infos
.get(&node_id)
.expect("remove_node_id: No such node_id");
let handles = node.handles.lock().unwrap();
let handles = node_info.handles.lock().unwrap();
handles.iter().copied().collect()
};

Expand All @@ -601,17 +600,28 @@ impl Runtime {
}
}

let mut nodes = self.nodes.write().unwrap();
nodes
let mut node_infos = self.node_infos.write().unwrap();
node_infos
.remove(&node_id)
.expect("remove_node_id: Node didn't exist!");
}

/// Add an [`NodeId`] [`Node`] pair to the [`Runtime`]. This method temporarily holds the node
/// write lock.
fn add_running_node(&self, reference: NodeId, node: Node) {
let mut nodes = self.nodes.write().unwrap();
nodes.insert(reference, node);
/// Add an [`NodeId`] [`NodeInfo`] pair to the [`Runtime`]. This method temporarily holds the
/// [`Runtime::node_infos`] write lock.
fn add_node_info(&self, reference: NodeId, node_info: NodeInfo) {
self.node_infos
.write()
.expect("could not acquire lock on node_infos")
.insert(reference, node_info);
}

/// Add an [`NodeId`] [`crate::node::Node`] pair to the [`Runtime`]. This method temporarily
/// holds the [`Runtime::node_instances`] lock.
fn add_node_instance(&self, node_reference: NodeId, node_instance: Box<dyn crate::node::Node>) {
self.node_instances
.lock()
.expect("could not acquire lock on node_instances")
.insert(node_reference, node_instance);
}

/// Thread safe method that attempts to create a node within the [`Runtime`] corresponding to a
Expand Down Expand Up @@ -645,7 +655,7 @@ impl Runtime {

let reader = self.channels.duplicate_reference(reader)?;

let mut instance = self
let instance = self
.configuration
.nodes
.get(module_name)
Expand All @@ -661,22 +671,103 @@ impl Runtime {
)
})?;

// Try starting the node instance first. If this fails, then directly return the error to
// the caller.
instance.start()?;

// If the node was successfully started, insert it in the list of currently running
// nodes.
self.add_running_node(
reference,
Node {
reference,
instance: Some(instance),
self.node_start_instance(reference, instance, label, vec![reader])?;

Ok(())
}

/// Starts a newly created node instance, by first initializing the necessary [`NodeInfo`] data
/// structure in [`Runtime`], allowing it to access the provided [`Handle`]s, then calling
/// [`Node::start`] on the instance, and finally storing a reference to the running instance
/// in [`Runtime::node_instances`] so that it can later be terminated.
fn node_start_instance<I>(
&self,
node_reference: NodeId,
mut node_instance: Box<dyn crate::node::Node>,
label: &oak_abi::label::Label,
initial_handles: I,
) -> Result<(), OakStatus>
where
I: IntoIterator<Item = Handle>,
{
// First create the necessary info data structure in the Runtime, otherwise calls that the
// node makes to the Runtime during `Node::start` (synchronously or asynchronously) may
// fail.
self.add_node_info(
node_reference,
NodeInfo {
label: label.clone(),
handles: Mutex::new(vec![reader].into_iter().collect()),
handles: Mutex::new(HashSet::new()),
},
);

Ok(())
// Make sure that the provided initial handles are tracked in the newly created node from
// the start.
self.track_handles_in_node(node_reference, initial_handles);

// Try to start the node instance, and store the result in a temporary variable to be
// returned later.
//
// In order for this to work correctly, the `NodeInfo` entry must already exist in
// `Runtime`, which is why we could not start this instance before the call to
// `Runtime::add_node_info` above.
//
// On the other hand, we also cannot start it after the call to `Runtime::add_node_instance`
// below, because that takes ownership of the instance itself.
//
// We also want no locks to be held while the instance is starting.
let result = node_instance.start();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The general advice for writing clean code is that concurrency is a separate concern and usually deserves to be handle in a separate class (a dedicated data structure). I am not sure if this advice is applicable to Rust code, but in any other programming language I've used, I'd write a wrapper class to hold node_infos and node_instances with high level methods for get, insert, remove, etc. All the concurrency logic (including any necessary syncing between node_infos and node_instances) would be handled in that class. The only problem I currently see with this approach is that if node_instance.start() must absolutely be called in between updating node_infos and updating node_instances. Is that the case?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main point of this PR is so that the node instance can be started so that the relevant data structures (NodeInfo in this case) are available to the runtime. I agree with the desire of encapsulating any synchronization in high level accessors, but I'm not entirely sure how to go about it here, perhaps let's discuss separately? If you have any idea in mind and you would like to prototype it in a PR, that would be great!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Let's discuss it separately. What I am thinking of is just a bit of refactoring that can be done separately, after you are done with this PR. And I think it might address some of the comments from @daviddrysdale too (but I am not a true Rustacean yet, so I could be wrong).


// Regardless of the result of `Node::start`, insert the now running instance to the list of
// running instances (by moving it), so that `Node::stop` will be called on it eventually.
self.add_node_instance(node_reference, node_instance);

// Return the result of `Node::start`.
result
}
}

#[test]
fn create_channel() {
let configuration = crate::runtime::Configuration {
nodes: HashMap::new(),
entry_module: "test_module".to_string(),
entrypoint: "test_function".to_string(),
};
let runtime = Arc::new(crate::runtime::Runtime::create(configuration));

// Define a node implementation for test that exercises parts of the [`Runtime`] ABI.
struct TestNode {
node_id: NodeId,
runtime: Arc<Runtime>,
};

impl crate::node::Node for TestNode {
fn start(&mut self) -> Result<(), OakStatus> {
// Attempt to perform an operation that requires the [`Runtime`] to have created an
// appropriate [`NodeInfo`] instanace.
let (_write_handle, _read_handle) = self
.runtime
.new_channel(self.node_id, &oak_abi::label::Label::public_trusted());
Ok(())
}
fn stop(&mut self) {}
}

// Manually allocate a new [`NodeId`].
let node_reference = runtime.new_node_reference();

// Create an instance of [`TestNode`] without starting it.
let node_instance = TestNode {
node_id: node_reference,
runtime: runtime.clone(),
};

let result = runtime.node_start_instance(
node_reference,
Box::new(node_instance),
&oak_abi::label::Label::public_trusted(),
vec![],
);
assert_eq!(Ok(()), result);
}