Skip to content

Commit

Permalink
fix(workflows): add nats for signals
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Sep 14, 2024
1 parent b072af4 commit c4a28eb
Show file tree
Hide file tree
Showing 4 changed files with 255 additions and 143 deletions.
42 changes: 2 additions & 40 deletions lib/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ use crate::{
workflow::{Workflow, WorkflowInput},
};

/// Poll interval when polling for signals in-process
const SIGNAL_RETRY: Duration = Duration::from_millis(100);
/// Most in-process signal poll tries
const MAX_SIGNAL_RETRIES: usize = 16;
/// Most in-process sub workflow poll tries
const MAX_SUB_WORKFLOW_RETRIES: usize = 4;
/// Retry interval for failed db actions
Expand Down Expand Up @@ -620,26 +616,9 @@ impl WorkflowCtx {
else {
tracing::info!(name=%self.name, id=%self.workflow_id, "listening for signal");

let mut retries = 0;
let mut interval = tokio::time::interval(SIGNAL_RETRY);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

let ctx = ListenCtx::new(self);

loop {
interval.tick().await;

match T::listen(&ctx).await {
Ok(res) => break res,
Err(err) if matches!(err, WorkflowError::NoSignalFound(_)) => {
if retries > MAX_SIGNAL_RETRIES {
return Err(err).map_err(GlobalError::raw);
}
retries += 1;
}
err => return err.map_err(GlobalError::raw),
}
}
T::listen(&ctx).await.map_err(GlobalError::raw)?
};

// Move to next event
Expand Down Expand Up @@ -674,26 +653,9 @@ impl WorkflowCtx {
else {
tracing::info!(name=%self.name, id=%self.workflow_id, "listening for signal");

let mut retries = 0;
let mut interval = tokio::time::interval(SIGNAL_RETRY);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

let ctx = ListenCtx::new(self);

loop {
interval.tick().await;

match listener.listen(&ctx).await {
Ok(res) => break res,
Err(err) if matches!(err, WorkflowError::NoSignalFound(_)) => {
if retries > MAX_SIGNAL_RETRIES {
return Err(err).map_err(GlobalError::raw);
}
retries += 1;
}
err => return err.map_err(GlobalError::raw),
}
}
listener.listen(&ctx).await.map_err(GlobalError::raw)?
};

// Move to next event
Expand Down
Loading

0 comments on commit c4a28eb

Please sign in to comment.