Skip to content

Commit

Permalink
chore(workflows): tidy up internals (#1199)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
MasterPtato committed Oct 9, 2024
1 parent 2b26b6a commit 3f0ff1e
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 43 deletions.
7 changes: 1 addition & 6 deletions lib/chirp-workflow/core/src/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::{
builder::common as builder,
builder::BuilderError,
ctx::{
api::WORKFLOW_TIMEOUT,
common,
message::{MessageCtx, SubscriptionHandle},
},
Expand All @@ -28,11 +27,7 @@ pub async fn wait_for_workflow<W: Workflow, B: Debug + Clone>(
) -> GlobalResult<W::Output> {
let db = db_from_ctx(ctx).await?;

tokio::time::timeout(
WORKFLOW_TIMEOUT,
common::wait_for_workflow::<W>(&db, workflow_id),
)
.await?
common::wait_for_workflow::<W>(&db, workflow_id).await
}

/// Dispatch a new workflow and wait for it to complete. Has a 60s timeout.
Expand Down
2 changes: 0 additions & 2 deletions lib/chirp-workflow/core/src/ctx/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ use crate::{
workflow::{Workflow, WorkflowInput},
};

pub const WORKFLOW_TIMEOUT: Duration = Duration::from_secs(60);

pub struct ApiCtx {
ray_id: Uuid,
name: &'static str,
Expand Down
32 changes: 19 additions & 13 deletions lib/chirp-workflow/core/src/ctx/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use uuid::Uuid;
pub const SUB_WORKFLOW_RETRY: Duration = Duration::from_millis(150);
/// Time to delay a workflow from retrying after an error
pub const RETRY_TIMEOUT_MS: usize = 2000;
pub const WORKFLOW_TIMEOUT: Duration = Duration::from_secs(60);

use crate::{
ctx::OperationCtx,
Expand All @@ -16,28 +17,33 @@ use crate::{
workflow::Workflow,
};

/// Polls the database for the workflow
/// Polls the database for the workflow.
/// 60 second timeout.
pub async fn wait_for_workflow<W: Workflow>(
db: &DatabaseHandle,
workflow_id: Uuid,
) -> GlobalResult<W::Output> {
tracing::info!(workflow_name=%W::NAME, %workflow_id, "waiting for workflow");

let mut interval = tokio::time::interval(SUB_WORKFLOW_RETRY);
loop {
interval.tick().await;

// Check if state finished
let workflow = db
.get_workflow(workflow_id)
.await
.map_err(GlobalError::raw)?
.ok_or(WorkflowError::WorkflowNotFound)
.map_err(GlobalError::raw)?;
if let Some(output) = workflow.parse_output::<W>().map_err(GlobalError::raw)? {
return Ok(output);
tokio::time::timeout(WORKFLOW_TIMEOUT, async {
loop {
interval.tick().await;

// Check if state finished
let workflow = db
.get_workflow(workflow_id)
.await
.map_err(GlobalError::raw)?
.ok_or(WorkflowError::WorkflowNotFound)
.map_err(GlobalError::raw)?;
if let Some(output) = workflow.parse_output::<W>().map_err(GlobalError::raw)? {
return Ok(output);
}
}
}
})
.await?
}

pub async fn op<I>(
Expand Down
21 changes: 2 additions & 19 deletions lib/chirp-workflow/core/src/ctx/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use uuid::Uuid;
use crate::{
builder::common as builder,
ctx::{
common::{self, SUB_WORKFLOW_RETRY},
common,
message::{SubscriptionHandle, TailAnchor, TailAnchorResponse},
MessageCtx,
},
Expand Down Expand Up @@ -92,24 +92,7 @@ impl TestCtx {
&self,
workflow_id: Uuid,
) -> GlobalResult<W::Output> {
tracing::info!(workflow_name=%W::NAME, %workflow_id, "waiting for workflow");

let mut interval = tokio::time::interval(SUB_WORKFLOW_RETRY);
loop {
interval.tick().await;

// Check if state finished
let workflow = self
.db
.get_workflow(workflow_id)
.await
.map_err(GlobalError::raw)?
.ok_or(WorkflowError::WorkflowNotFound)
.map_err(GlobalError::raw)?;
if let Some(output) = workflow.parse_output::<W>().map_err(GlobalError::raw)? {
return Ok(output);
}
}
common::wait_for_workflow::<W>(&self.db, workflow_id).await
}

/// Creates a workflow builder.
Expand Down
6 changes: 3 additions & 3 deletions lib/chirp-workflow/core/src/db/pg_nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const MAX_PULLED_WORKFLOWS: i64 = 50;
const QUERY_RETRY_MS: usize = 750;
// Time in between transaction retries
const TXN_RETRY: Duration = Duration::from_millis(100);
/// Maximum times a query ran bu this database adapter is retried.
/// Maximum times a query ran by this database adapter is retried.
const MAX_QUERY_RETRIES: usize = 16;

pub struct DatabasePgNats {
Expand Down Expand Up @@ -96,7 +96,7 @@ impl DatabasePgNats {

use sqlx::Error::*;
match &err {
// Retry transaction errors immediately
// Retry transaction errors in a tight loop
Database(db_err)
if db_err
.message()
Expand All @@ -105,7 +105,7 @@ impl DatabasePgNats {
tracing::info!(message=%db_err.message(), "transaction retry");
tokio::time::sleep(TXN_RETRY).await;
}
// Retry internal errors with a backoff
// Retry other errors with a backoff
Database(_) | Io(_) | Tls(_) | Protocol(_) | PoolTimedOut | PoolClosed
| WorkerCrashed => {
tracing::warn!(?err, "query retry");
Expand Down

0 comments on commit 3f0ff1e

Please sign in to comment.