Skip to content

Commit

Permalink
Draft of flow activation and deactivation for one node flows
Browse files Browse the repository at this point in the history
Signed-off-by: Heinz N. Gies <[email protected]>
  • Loading branch information
Licenser committed Apr 25, 2023
1 parent d2d8ddc commit f81bcd7
Showing 1 changed file with 13 additions and 11 deletions.
24 changes: 13 additions & 11 deletions src/system/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ impl RunningFlow {
drain_tx,
stop_tx,
start_tx,
deployment_type: DeploymentType::AllNodes,
deployment_type: DeploymentType::OneNode, // FIXME: make configurab
}
}

Expand Down Expand Up @@ -566,7 +566,7 @@ impl RunningFlow {
let jh =
jumphash::JumpHasher::new_with_keys(8_390_880_576_440_238_080, 128_034_676_764_530);

let activation_tx = self.msg_tx.clone();
// let activation_tx = self.msg_tx.clone();
// We only need ticks for a OneNode deployment
if self.deployment_type == DeploymentType::OneNode {
let tick_tx = self.msg_tx.clone();
Expand Down Expand Up @@ -604,16 +604,12 @@ impl RunningFlow {
intended_active_state,
self.state
);
if self.state == State::Paused
&& intended_active_state == IntendedState::Running
{
self.handle_resume(&prefix).await?;
}
} else {
let (reply_tx, reply_rx) = oneshot();
activation_tx
.send(Msg::ChangeState {
intended_state: IntendedState::Stopped,
reply_tx,
})
.await?;
reply_rx.await??;
// FIXME update state
dbg!(
"passive",
&hash_key,
Expand All @@ -622,6 +618,10 @@ impl RunningFlow {
intended_active_state,
self.state
);
if self.state == State::Running {
self.handle_pause(&prefix).await?;
intended_active_state = IntendedState::Running;
}
}
}
}
Expand All @@ -632,6 +632,8 @@ impl RunningFlow {
// We are always active on a all node deployment
let is_active = self.deployment_type == DeploymentType::AllNodes
|| is_active_node(&current_nodes, slot, node_id);
dbg!(self.state, intended_state, self.deployment_type, is_active);

intended_active_state = intended_state;
match (self.state, intended_state) {
(State::Initializing, IntendedState::Running) if is_active => {
Expand Down

0 comments on commit f81bcd7

Please sign in to comment.