diff --git a/src/raft/api/apps.rs b/src/raft/api/apps.rs index b6af2a9e70..60841b32ec 100644 --- a/src/raft/api/apps.rs +++ b/src/raft/api/apps.rs @@ -22,6 +22,7 @@ use crate::{ TremorRequest, TremorResponse, TremorStart, }, }, + system::flow::DeploymentType, }; use axum::{ extract::{self, Json, State}, @@ -218,6 +219,11 @@ async fn start( instance: body.instance.clone(), config: body.config.clone(), state: body.state(), + deployment_type: if body.single_node { + DeploymentType::OneNode + } else { + DeploymentType::AllNodes + }, }); state .raft diff --git a/src/raft/api/client.rs b/src/raft/api/client.rs index 94b5807a55..bf14ec5fde 100644 --- a/src/raft/api/client.rs +++ b/src/raft/api/client.rs @@ -191,11 +191,13 @@ impl Tremor { instance: &AppFlowInstanceId, config: std::collections::HashMap, running: bool, + single_node: bool, ) -> ClientResult { let req = TremorStart { instance: instance.clone(), config, running, + single_node, }; self.api_req::( &format!("api/apps/{}/flows/{flow}", instance.app_id()), diff --git a/src/raft/store.rs b/src/raft/store.rs index fdf238d74d..41d23e52de 100644 --- a/src/raft/store.rs +++ b/src/raft/store.rs @@ -21,7 +21,7 @@ use crate::{ ids::{AppFlowInstanceId, AppId, FlowDefinitionId}, instance::IntendedState, raft::{archive::TremorAppDef, ClusterError}, - system::Runtime, + system::{flow::DeploymentType, Runtime}, }; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use openraft::{ @@ -81,6 +81,7 @@ pub enum AppsRequest { instance: AppFlowInstanceId, config: std::collections::HashMap, state: IntendedState, + deployment_type: DeploymentType, }, /// Stopps and Undeploys an instance of a app @@ -112,6 +113,7 @@ pub(crate) struct TremorStart { pub(crate) instance: AppFlowInstanceId, pub(crate) config: std::collections::HashMap, pub(crate) running: bool, + pub(crate) single_node: bool, } impl TremorStart { pub(crate) fn state(&self) -> IntendedState { diff --git a/src/raft/store/statemachine/apps.rs b/src/raft/store/statemachine/apps.rs index 123ab38368..e24f8686c4 100644 --- a/src/raft/store/statemachine/apps.rs +++ b/src/raft/store/statemachine/apps.rs @@ -23,7 +23,7 @@ use crate::{ store_w_err, AppsRequest, StorageResult, TremorResponse, }, }, - system::Runtime, + system::{flow::DeploymentType, Runtime}, }; use rocksdb::ColumnFamily; use std::collections::HashMap; @@ -48,6 +48,7 @@ pub struct FlowInstance { pub definition: FlowDefinitionId, pub config: HashMap, pub state: IntendedState, + pub deployment_type: DeploymentType, } pub type Instances = HashMap; @@ -114,10 +115,11 @@ impl RaftStateMachine for AppsStateMachine { definition, config, state, + deployment_type, }, ) in app_instances { - me.deploy_flow(&app_id, definition, id, config, state) + me.deploy_flow(&app_id, definition, id, config, state, deployment_type) .await .map_err(store::Error::Storage)?; } @@ -169,6 +171,7 @@ impl RaftStateMachine for AppsStateMachine { s_flow.id.clone(), s_flow.config.clone(), // important: this is the new config s_flow.state, + s_flow.deployment_type, ) .await?; } else if s_flow.state != flow.state { @@ -199,6 +202,7 @@ impl RaftStateMachine for AppsStateMachine { AppFlowInstanceId::new(app_id.clone(), s_instance_id.clone()), s_flow.config.clone(), s_flow.state, + s_flow.deployment_type, ) .await?; } @@ -245,9 +249,17 @@ impl RaftStateMachine for AppsStateMachine { instance, config, state, + deployment_type, } => { - self.deploy_flow(app, flow.clone(), instance.clone(), config.clone(), *state) - .await?; + self.deploy_flow( + app, + flow.clone(), + instance.clone(), + config.clone(), + *state, + *deployment_type, + ) + .await?; Ok(TremorResponse::AppFlowInstanceId(instance.clone())) } AppsRequest::Undeploy(instance) => { @@ -316,6 +328,7 @@ impl AppsStateMachine { instance: AppFlowInstanceId, config: HashMap, intended_state: IntendedState, + deployment_type: DeploymentType, ) -> StorageResult<()> { info!("Deploying flow instance {app_id}/{flow}/{instance}"); let app = self @@ -386,6 +399,7 @@ impl AppsStateMachine { definition: flow, config, state: intended_state, // we are about to apply this state further below + deployment_type, }, ); let instances = serde_json::to_vec(&app.instances).map_err(sm_w_err)?; @@ -402,7 +416,7 @@ impl AppsStateMachine { // ensure the cluster is running self.world.wait_for_cluster().await; self.world - .deploy_flow(app_id.clone(), &deploy) + .deploy_flow(app_id.clone(), &deploy, deployment_type) .await .map_err(sm_w_err)?; // change the flow state to the intended state diff --git a/src/raft/test/learner.rs b/src/raft/test/learner.rs index 230ad9b109..2cc4479b83 100644 --- a/src/raft/test/learner.rs +++ b/src/raft/test/learner.rs @@ -140,7 +140,9 @@ end; let flow_id = FlowDefinitionId("main".to_string()); let instance = AppFlowInstanceId::new(app_id, "01".to_string()); let config = HashMap::new(); - let instance_id = client0.start(&flow_id, &instance, config, true).await?; + let instance_id = client0 + .start(&flow_id, &instance, config, true, false) + .await?; // wait for the app to be actually started // wait for the file to exist diff --git a/src/system.rs b/src/system.rs index bd9f0dc5fa..bb50def37a 100644 --- a/src/system.rs +++ b/src/system.rs @@ -22,7 +22,7 @@ use std::{ time::Duration, }; -use self::flow::Flow; +use self::flow::{DeploymentType, Flow}; use crate::{ channel::{oneshot, Sender}, connectors, @@ -172,7 +172,8 @@ impl Runtime { let mut count = 0; // first deploy them for flow in deployable.iter_flows() { - self.deploy_flow(AppId::default(), flow).await?; + self.deploy_flow(AppId::default(), flow, DeploymentType::AllNodes) + .await?; } // start flows in a second step for flow in deployable.iter_flows() { @@ -195,6 +196,7 @@ impl Runtime { &self, app_id: AppId, flow: &ast::DeployFlow<'static>, + deployment_type: DeploymentType, ) -> Result { let (tx, rx) = oneshot::channel(); self.flows @@ -203,6 +205,7 @@ impl Runtime { flow: Box::new(flow.clone()), sender: tx, raft: self.maybe_get_manager()?.unwrap_or_default(), + deployment_type, }) .await?; match rx.await? { diff --git a/src/system/flow.rs b/src/system/flow.rs index a2e51a810a..2099ade2e9 100644 --- a/src/system/flow.rs +++ b/src/system/flow.rs @@ -211,6 +211,7 @@ impl Flow { connector_id_gen: &mut ConnectorUIdGen, known_connectors: &Known, kill_switch: &KillSwitch, + deployment_type: DeploymentType, ) -> Result { let mut pipelines = HashMap::new(); let mut connectors = HashMap::new(); @@ -266,7 +267,13 @@ impl Flow { link(&connectors, &pipelines, connect).await?; } - let addr = spawn_task(ctx.clone(), pipelines, &connectors, &flow.defn.connections); + let addr = spawn_task( + ctx.clone(), + pipelines, + &connectors, + &flow.defn.connections, + deployment_type, + ); let this = Flow { alias: ctx.id, @@ -415,10 +422,13 @@ enum MsgWrapper { StopResult(ConnectorResult<()>), } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)] -enum DeploymentType { +/// How the depoloyment is distributed on a cluster +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)] +pub enum DeploymentType { #[default] + /// Let the pipeline run on all nodes in the cluster AllNodes, + /// Run the pipeline on a single node on the cluster OneNode, } @@ -446,6 +456,7 @@ impl RunningFlow { app_ctx: AppContext, pipelines: HashMap, connectors: &HashMap, + deployment_type: DeploymentType, links: &[ConnectStmt], ) -> Self { let (msg_tx, msg_rx) = bounded(crate::qsize()); @@ -515,7 +526,7 @@ impl RunningFlow { drain_tx, stop_tx, start_tx, - deployment_type: DeploymentType::OneNode, // FIXME: make configurab + deployment_type, } } @@ -559,7 +570,7 @@ impl RunningFlow { let hash_key = self.app_ctx.id.to_string(); let node_id = self.app_ctx.node_id(); - let mut current_nodes: Vec = vec![node_id]; + let mut current_nodes: Vec = vec![]; let mut slot: usize = 0; let mut intended_active_state = IntendedState::Paused; @@ -570,7 +581,6 @@ impl RunningFlow { // We only need ticks for a OneNode deployment if self.deployment_type == DeploymentType::OneNode { let tick_tx = self.msg_tx.clone(); - tick_tx.send(Msg::Tick).await?; task::spawn(async move { time::sleep(Duration::from_secs(20)).await; while tick_tx.send(Msg::Tick).await.is_ok() { @@ -578,46 +588,57 @@ impl RunningFlow { } }); } - dbg!(); while let Some(wrapped) = self.input_channel.next().await { match wrapped { MsgWrapper::Msg(Msg::Tick) => { - if let Ok(Ok(members)) = dbg!( - timeout( - Duration::from_millis(100), - self.app_ctx.raft.get_last_membership() - ) - .await - ) { + if let Ok(Ok(members)) = timeout( + Duration::from_millis(100), + self.app_ctx.raft.get_last_membership(), + ) + .await + { current_nodes = members.into_iter().collect(); slot = jh.slot(&hash_key, current_nodes.len() as u32) as usize; - if is_active_node(¤t_nodes, slot, node_id) { - // FIXME update state - - dbg!( - "active", - &hash_key, - slot, - node_id, - intended_active_state, - self.state - ); - if self.state == State::Paused - && intended_active_state == IntendedState::Running - { - self.handle_resume(&prefix).await?; + if is_active_node(¤t_nodes, slot, node_id) + && intended_active_state == IntendedState::Running + { + // dbg!( + // "active", + // &hash_key, + // slot, + // node_id, + // intended_active_state, + // self.state + // ); + + match self.state { + State::Paused => { + if let Err(e) = self.handle_resume(&prefix).await { + error!("{prefix} Error during resuming: {e}"); + self.change_state(State::Failed); + } + } + State::Initializing => { + if let Err(e) = self.handle_start(&prefix).await { + error!("{prefix} Error starting: {e}"); + self.change_state(State::Failed); + }; + } + state => { + debug!("not changing from state: {state}"); + } } } else { - dbg!( - "passive", - &hash_key, - slot, - node_id, - intended_active_state, - self.state - ); + // dbg!( + // "passive", + // &hash_key, + // slot, + // node_id, + // intended_active_state, + // self.state + // ); if self.state == State::Running { self.handle_pause(&prefix).await?; intended_active_state = IntendedState::Running; @@ -632,7 +653,6 @@ impl RunningFlow { // We are always active on a all node deployment let is_active = self.deployment_type == DeploymentType::AllNodes || is_active_node(¤t_nodes, slot, node_id); - dbg!(self.state, intended_state, self.deployment_type, is_active); intended_active_state = intended_state; match (self.state, intended_state) { @@ -1018,8 +1038,9 @@ fn spawn_task( pipelines: HashMap, connectors: &HashMap, links: &[ConnectStmt], + deployment_type: DeploymentType, ) -> Addr { - let flow = RunningFlow::new(app_ctx, pipelines, connectors, links); + let flow = RunningFlow::new(app_ctx, pipelines, connectors, deployment_type, links); let addr = flow.addr(); task::spawn(flow.run()); addr @@ -1027,14 +1048,8 @@ fn spawn_task( fn is_active_node(current_nodes: &[NodeId], slot: usize, node_id: NodeId) -> bool { match current_nodes.get(slot) { - Some(selected) if *selected == node_id => { - dbg!("active", selected); - true - } - Some(selected) => { - dbg!("passive", selected); - false - } + Some(selected) if *selected == node_id => true, + Some(_selected) => false, None => { error!(" Slot {slot} is out of bounds for membership {current_nodes:?}"); false @@ -1215,6 +1230,7 @@ mod tests { &mut connector_id_gen, &known_connectors, &kill_switch, + DeploymentType::AllNodes, ) .await?; diff --git a/src/system/flow_supervisor.rs b/src/system/flow_supervisor.rs index b48028819a..5637f42fb0 100644 --- a/src/system/flow_supervisor.rs +++ b/src/system/flow_supervisor.rs @@ -33,6 +33,8 @@ use tokio::{ use tremor_common::uids::{ConnectorUIdGen, OperatorUIdGen}; use tremor_script::ast::DeployFlow; +use super::flow::DeploymentType; + pub(crate) type Channel = Sender; /// This is control plane @@ -48,6 +50,8 @@ pub(crate) enum Msg { sender: OneShotSender>, /// API request sender raft: raft::Cluster, + /// Type of the deployment + deployment_type: DeploymentType, }, /// change instance state ChangeInstanceState { @@ -107,6 +111,7 @@ impl FlowSupervisor { sender: oneshot::Sender>, kill_switch: &KillSwitch, raft: raft::Cluster, + deployment_type: DeploymentType, ) { let id = AppFlowInstanceId::from_deploy(app_id, &flow); let res = match self.flows.entry(id.clone()) { @@ -124,6 +129,7 @@ impl FlowSupervisor { &mut self.connector_id_gen, &self.known_connectors, kill_switch, + deployment_type, ) .await .map(|deploy| { @@ -271,9 +277,17 @@ impl FlowSupervisor { flow, sender, raft: raft_api_tx, + deployment_type, } => { - self.handle_deploy(app, *flow, sender, &task_kill_switch, raft_api_tx) - .await; + self.handle_deploy( + app, + *flow, + sender, + &task_kill_switch, + raft_api_tx, + deployment_type, + ) + .await; } Msg::GetFlows(reply_tx) => self.handle_get_flows(reply_tx), Msg::GetFlow(id, reply_tx) => self.handle_get_flow(&id, reply_tx), diff --git a/tests/flows.rs b/tests/flows.rs index 291de8ffb6..168df5efd4 100644 --- a/tests/flows.rs +++ b/tests/flows.rs @@ -18,7 +18,7 @@ use tremor_common::file; use tremor_runtime::{ errors::*, ids::{AppFlowInstanceId, AppId}, - system::{Runtime, ShutdownMode, WorldConfig}, + system::{flow::DeploymentType, Runtime, ShutdownMode, WorldConfig}, }; use tremor_script::{deploy::Deploy, module::Manager}; @@ -57,7 +57,7 @@ macro_rules! test_cases { let app_id = AppId::default(); for flow in deployable.iter_flows() { let flow_alias = AppFlowInstanceId::new(app_id.clone(), flow.instance_alias.clone()); - runtime.deploy_flow(app_id.clone(), flow).await?; + runtime.deploy_flow(app_id.clone(), flow, DeploymentType::AllNodes).await?; runtime.start_flow(flow_alias).await?; } runtime.stop(ShutdownMode::Forceful).await?; diff --git a/tremor-cli/src/cli.rs b/tremor-cli/src/cli.rs index 9923dc06fa..4cffa2353c 100644 --- a/tremor-cli/src/cli.rs +++ b/tremor-cli/src/cli.rs @@ -189,6 +189,10 @@ pub(crate) enum AppsCommands { /// Deploys the pipeline in paused state #[clap(short, long, action = clap::ArgAction::SetTrue)] paused: bool, + /// Deploys the flow in single node mode, this means the flow will only run + /// on one node in the cluster and might be balanced to other nodes on resize + #[clap(short, long, action = clap::ArgAction::SetTrue)] + single_node: bool, }, /// Stops and removes an instance of a flow in an installed app diff --git a/tremor-cli/src/cluster.rs b/tremor-cli/src/cluster.rs index 5e49e61e2f..eac736c43a 100644 --- a/tremor-cli/src/cluster.rs +++ b/tremor-cli/src/cluster.rs @@ -270,13 +270,17 @@ impl AppsCommands { instance, config, paused, + single_node, } => { let config: HashMap = config.map_or_else(|| Ok(HashMap::new()), |c| serde_json::from_str(&c))?; let flow = flow.map_or_else(|| FlowDefinitionId::from("main"), FlowDefinitionId); let app_id = AppId(app); let instance_id = AppFlowInstanceId::new(app_id, instance); - match client.start(&flow, &instance_id, config, !paused).await { + match client + .start(&flow, &instance_id, config, !paused, single_node) + .await + { Ok(instance_id) => println!("Instance `{instance_id}` successfully started",), Err(e) => eprintln!("Instance `{instance_id}` failed to start: {e}"), }