Skip to content

Commit

Permalink
lock the versions wtf
Browse files Browse the repository at this point in the history
  • Loading branch information
Autoparallel committed Jan 10, 2024
1 parent f96fd34 commit eaa504c
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 31 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions arbiter-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ readme = "../README.md"

[dependencies]
ethers.workspace = true
artemis-core = { git = "https://github.com/paradigmxyz/artemis.git" }
# artemis-core = { path = "../../../artemis/crates/artemis-core" }
# artemis-core = { git = "https://github.com/paradigmxyz/artemis.git" }
artemis-core = { path = "../../../artemis/crates/artemis-core" }
futures-util.workspace = true
async-trait.workspace = true
serde_json.workspace = true
Expand Down
18 changes: 9 additions & 9 deletions arbiter-engine/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,15 @@ impl Agent {
self.dependents.push(dependent.to_owned());
}

pub(crate) async fn run(&mut self) -> Vec<JoinSet<()>> {
let mut join_sets = vec![];
for behavior in self.behaviors.iter_mut() {
trace!("Running behavior");
let joinset = behavior.await.unwrap();
join_sets.push(joinset);
}
join_sets
}
// pub(crate) async fn run(&mut self) -> Vec<JoinSet<()>> {
// let mut join_sets = vec![];
// for behavior in self.behaviors.iter_mut() {
// trace!("Running behavior");
// let joinset = behavior.await.unwrap();
// join_sets.push(joinset);
// }
// join_sets
// }
}

type Behavior = Pin<Box<dyn Future<Output = Result<JoinSet<()>, Box<dyn Error>>> + Send>>;
Expand Down
4 changes: 1 addition & 3 deletions arbiter-engine/src/examples/timed_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,5 @@ async fn echoer() {
let send_result = world.messager.execute(message).await;
debug!("Start message sent {:?}", send_result);

for task in tasks {
task.await.unwrap();
}
world.join().await;
}
6 changes: 4 additions & 2 deletions arbiter-engine/src/examples/token_minter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,13 @@ async fn token_minter_simulation() {
requester_agent.add_behavior(mint_behavior);

// Run the world and send the start message
let tasks = world.run().await;

let message = Message {
from: "host".to_owned(),
to: To::Agent(REQUESTER_ID.to_owned()),
data: "Start".to_owned(),
};
// TODO: Messages like this could probably be put in the `world.run()`
world.messager.execute(message).await;

let message = Message {
Expand All @@ -328,5 +329,6 @@ async fn token_minter_simulation() {
};
world.messager.execute(message).await;

futures::future::join_all(tasks).await;
let tasks = world.run().await;
world.join().await;
}
33 changes: 19 additions & 14 deletions arbiter-engine/src/world.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use ethers::{
abi::Hash,
providers::{Provider, PubsubClient},
};
use tokio::task::JoinSet;

use super::*;
use crate::{
Expand Down Expand Up @@ -42,6 +43,8 @@ pub struct World<P> {

/// The messaging layer for the world.
pub messager: Messager, // TODO: Use this as the message executor that can be given to all agents and give each agent their specific collector.

pub joinsets: Option<Vec<JoinSet<()>>>,
}

// TODO: Can add a messager as an interconnect and have the manager give each
Expand All @@ -60,6 +63,7 @@ where
agents: HashMap::new(),
provider,
messager: Messager::new(),
joinsets: None,
}
}

Expand All @@ -72,27 +76,28 @@ where
}

/// Runs the agents in the world.
pub async fn run(&mut self) -> Vec<tokio::task::JoinHandle<()>> {
pub async fn run(&mut self) {
debug!("Running world: {}", self.id);
debug!("Agents in world: {:?}", self.agents.keys());

let mut tasks = Vec::new();

let mut join_sets = vec![];
for agent in self.agents.values_mut() {
trace!("Running agent: {}", agent.id);
let join_sets = Box::leak(Box::new(agent.run().await));
for set in join_sets.iter_mut() {
let task = tokio::spawn(async move {
while let Some(next) = set.join_next().await {
if let Err(e) = next {
panic!("Error: {:?}", e);
}
}
});
tasks.push(task);

for behavior in agent.behaviors.iter_mut() {
trace!("Running behavior");
let joinset = behavior.await.unwrap();
join_sets.push(joinset);
}
}
tasks
self.joinsets = Some(join_sets);
}

pub async fn join(&mut self) {
std::thread::sleep(std::time::Duration::from_secs(3));
for joinset in self.joinsets.as_mut().unwrap().iter_mut() {
while let Some(_) = joinset.join_next().await {}
}
}
}

Expand Down

0 comments on commit eaa504c

Please sign in to comment.