Skip to content

Commit

Permalink
Expose API for single node flow instances
Browse files Browse the repository at this point in the history
Signed-off-by: Heinz N. Gies <[email protected]>
  • Loading branch information
Licenser committed Apr 25, 2023
1 parent f81bcd7 commit f55341b
Show file tree
Hide file tree
Showing 11 changed files with 128 additions and 61 deletions.
6 changes: 6 additions & 0 deletions src/raft/api/apps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::{
TremorRequest, TremorResponse, TremorStart,
},
},
system::flow::DeploymentType,
};
use axum::{
extract::{self, Json, State},
Expand Down Expand Up @@ -218,6 +219,11 @@ async fn start(
instance: body.instance.clone(),
config: body.config.clone(),
state: body.state(),
deployment_type: if body.single_node {
DeploymentType::OneNode
} else {
DeploymentType::AllNodes
},
});
state
.raft
Expand Down
2 changes: 2 additions & 0 deletions src/raft/api/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,13 @@ impl Tremor {
instance: &AppFlowInstanceId,
config: std::collections::HashMap<String, OwnedValue>,
running: bool,
single_node: bool,
) -> ClientResult<AppFlowInstanceId> {
let req = TremorStart {
instance: instance.clone(),
config,
running,
single_node,
};
self.api_req::<TremorStart, AppFlowInstanceId>(
&format!("api/apps/{}/flows/{flow}", instance.app_id()),
Expand Down
4 changes: 3 additions & 1 deletion src/raft/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
ids::{AppFlowInstanceId, AppId, FlowDefinitionId},
instance::IntendedState,
raft::{archive::TremorAppDef, ClusterError},
system::Runtime,
system::{flow::DeploymentType, Runtime},
};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use openraft::{
Expand Down Expand Up @@ -81,6 +81,7 @@ pub enum AppsRequest {
instance: AppFlowInstanceId,
config: std::collections::HashMap<String, OwnedValue>,
state: IntendedState,
deployment_type: DeploymentType,
},

/// Stopps and Undeploys an instance of a app
Expand Down Expand Up @@ -112,6 +113,7 @@ pub(crate) struct TremorStart {
pub(crate) instance: AppFlowInstanceId,
pub(crate) config: std::collections::HashMap<String, OwnedValue>,
pub(crate) running: bool,
pub(crate) single_node: bool,
}
impl TremorStart {
pub(crate) fn state(&self) -> IntendedState {
Expand Down
24 changes: 19 additions & 5 deletions src/raft/store/statemachine/apps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
store_w_err, AppsRequest, StorageResult, TremorResponse,
},
},
system::Runtime,
system::{flow::DeploymentType, Runtime},
};
use rocksdb::ColumnFamily;
use std::collections::HashMap;
Expand All @@ -48,6 +48,7 @@ pub struct FlowInstance {
pub definition: FlowDefinitionId,
pub config: HashMap<String, simd_json::OwnedValue>,
pub state: IntendedState,
pub deployment_type: DeploymentType,
}
pub type Instances = HashMap<InstanceId, FlowInstance>;

Expand Down Expand Up @@ -114,10 +115,11 @@ impl RaftStateMachine<AppsSnapshot, AppsRequest> for AppsStateMachine {
definition,
config,
state,
deployment_type,
},
) in app_instances
{
me.deploy_flow(&app_id, definition, id, config, state)
me.deploy_flow(&app_id, definition, id, config, state, deployment_type)
.await
.map_err(store::Error::Storage)?;
}
Expand Down Expand Up @@ -169,6 +171,7 @@ impl RaftStateMachine<AppsSnapshot, AppsRequest> for AppsStateMachine {
s_flow.id.clone(),
s_flow.config.clone(), // important: this is the new config
s_flow.state,
s_flow.deployment_type,
)
.await?;
} else if s_flow.state != flow.state {
Expand Down Expand Up @@ -199,6 +202,7 @@ impl RaftStateMachine<AppsSnapshot, AppsRequest> for AppsStateMachine {
AppFlowInstanceId::new(app_id.clone(), s_instance_id.clone()),
s_flow.config.clone(),
s_flow.state,
s_flow.deployment_type,
)
.await?;
}
Expand Down Expand Up @@ -245,9 +249,17 @@ impl RaftStateMachine<AppsSnapshot, AppsRequest> for AppsStateMachine {
instance,
config,
state,
deployment_type,
} => {
self.deploy_flow(app, flow.clone(), instance.clone(), config.clone(), *state)
.await?;
self.deploy_flow(
app,
flow.clone(),
instance.clone(),
config.clone(),
*state,
*deployment_type,
)
.await?;
Ok(TremorResponse::AppFlowInstanceId(instance.clone()))
}
AppsRequest::Undeploy(instance) => {
Expand Down Expand Up @@ -316,6 +328,7 @@ impl AppsStateMachine {
instance: AppFlowInstanceId,
config: HashMap<String, simd_json::OwnedValue>,
intended_state: IntendedState,
deployment_type: DeploymentType,
) -> StorageResult<()> {
info!("Deploying flow instance {app_id}/{flow}/{instance}");
let app = self
Expand Down Expand Up @@ -386,6 +399,7 @@ impl AppsStateMachine {
definition: flow,
config,
state: intended_state, // we are about to apply this state further below
deployment_type,
},
);
let instances = serde_json::to_vec(&app.instances).map_err(sm_w_err)?;
Expand All @@ -402,7 +416,7 @@ impl AppsStateMachine {
// ensure the cluster is running
self.world.wait_for_cluster().await;
self.world
.deploy_flow(app_id.clone(), &deploy)
.deploy_flow(app_id.clone(), &deploy, deployment_type)
.await
.map_err(sm_w_err)?;
// change the flow state to the intended state
Expand Down
4 changes: 3 additions & 1 deletion src/raft/test/learner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ end;
let flow_id = FlowDefinitionId("main".to_string());
let instance = AppFlowInstanceId::new(app_id, "01".to_string());
let config = HashMap::new();
let instance_id = client0.start(&flow_id, &instance, config, true).await?;
let instance_id = client0
.start(&flow_id, &instance, config, true, false)
.await?;

// wait for the app to be actually started
// wait for the file to exist
Expand Down
7 changes: 5 additions & 2 deletions src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::{
time::Duration,
};

use self::flow::Flow;
use self::flow::{DeploymentType, Flow};
use crate::{
channel::{oneshot, Sender},
connectors,
Expand Down Expand Up @@ -172,7 +172,8 @@ impl Runtime {
let mut count = 0;
// first deploy them
for flow in deployable.iter_flows() {
self.deploy_flow(AppId::default(), flow).await?;
self.deploy_flow(AppId::default(), flow, DeploymentType::AllNodes)
.await?;
}
// start flows in a second step
for flow in deployable.iter_flows() {
Expand All @@ -195,6 +196,7 @@ impl Runtime {
&self,
app_id: AppId,
flow: &ast::DeployFlow<'static>,
deployment_type: DeploymentType,
) -> Result<AppFlowInstanceId> {
let (tx, rx) = oneshot::channel();
self.flows
Expand All @@ -203,6 +205,7 @@ impl Runtime {
flow: Box::new(flow.clone()),
sender: tx,
raft: self.maybe_get_manager()?.unwrap_or_default(),
deployment_type,
})
.await?;
match rx.await? {
Expand Down
Loading

0 comments on commit f55341b

Please sign in to comment.