From 56359af8e524993b41ef1c5f71f7c6655ad52938 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Tue, 10 Sep 2024 00:01:40 +0000 Subject: [PATCH] feat(pegboard): implement ws wf --- docs/libraries/workflow/GOTCHAS.md | 9 +++ lib/chirp-workflow/core/src/ctx/workflow.rs | 16 +++- lib/chirp-workflow/core/src/executable.rs | 25 ++++++- svc/Cargo.lock | 43 ++++++++++- svc/Cargo.toml | 1 + svc/pkg/pegboard/Cargo.toml | 11 +++ svc/pkg/pegboard/Service.toml | 7 ++ svc/pkg/pegboard/src/lib.rs | 14 ++++ svc/pkg/pegboard/src/ops/mod.rs | 1 + svc/pkg/pegboard/src/utils.rs | 1 + svc/pkg/pegboard/src/workflows/mod.rs | 1 + svc/pkg/pegboard/src/workflows/ws.rs | 81 +++++++++++++++++++++ 12 files changed, 205 insertions(+), 5 deletions(-) create mode 100644 svc/pkg/pegboard/Cargo.toml create mode 100644 svc/pkg/pegboard/Service.toml create mode 100644 svc/pkg/pegboard/src/lib.rs create mode 100644 svc/pkg/pegboard/src/ops/mod.rs create mode 100644 svc/pkg/pegboard/src/utils.rs create mode 100644 svc/pkg/pegboard/src/workflows/mod.rs create mode 100644 svc/pkg/pegboard/src/workflows/ws.rs diff --git a/docs/libraries/workflow/GOTCHAS.md b/docs/libraries/workflow/GOTCHAS.md index 7391237de..de1332fae 100644 --- a/docs/libraries/workflow/GOTCHAS.md +++ b/docs/libraries/workflow/GOTCHAS.md @@ -140,3 +140,12 @@ Be careful when writing your struct definitions. When force waking a sleeping workflow by setting `wake_immediate = true`, know that if the workflow is currently on a `sleep` step it will go back to sleep if it has not reached its `wake_deadline` yet. For all other steps, the workflow will continue normally (usually just go back to sleep). + +## Long-lived tasks in `ctx.join` + +When executing multiple long-lived activities in a `ctx.join` call using a tuple, remember that internally it +uses `tokio::join!` and not `tokio::try_join`. This means it will wait until all items finish and does not +short circuit when an `Err` is returned from any branch. + +So if you have an activity that errors immediately and another that takes a while to finish, the `ctx.join` +call will wait until the long task is complete (or errors) before returning. diff --git a/lib/chirp-workflow/core/src/ctx/workflow.rs b/lib/chirp-workflow/core/src/ctx/workflow.rs index 703720558..a534f95f2 100644 --- a/lib/chirp-workflow/core/src/ctx/workflow.rs +++ b/lib/chirp-workflow/core/src/ctx/workflow.rs @@ -541,17 +541,29 @@ impl WorkflowCtx { Ok(output) } - /// Joins multiple executable actions (activities, closures) and awaits them simultaneously. + /// Joins multiple executable actions (activities, closures) and awaits them simultaneously. This does not + /// short circuit in the event of an error to make sure activity side effects are recorded. pub async fn join(&mut self, exec: T) -> GlobalResult { exec.execute(self).await } + /// Joins multiple executable actions (activities, closures) and awaits them simultaneously, short + /// circuiting in the event of an error. + /// + /// **BEWARE**: You should almost **never** use `try_join` over `join`. + /// + /// The only possible case for using this over `join` is: + /// - You have long running activities that are cancellable + pub async fn try_join(&mut self, exec: T) -> GlobalResult { + exec.try_execute(self).await + } + /// Spawns a new thread to execute workflow steps in. pub fn spawn(&mut self, f: F) -> tokio::task::JoinHandle> where F: for<'a> FnOnce(&'a mut WorkflowCtx) -> AsyncResult<'a, T> + Send + 'static, { - let mut ctx = self.clone(); + let mut ctx = self.branch(); tokio::task::spawn(async move { closure(f).execute(&mut ctx).await }) } diff --git a/lib/chirp-workflow/core/src/executable.rs b/lib/chirp-workflow/core/src/executable.rs index a7231bfa2..4a3a6e88b 100644 --- a/lib/chirp-workflow/core/src/executable.rs +++ b/lib/chirp-workflow/core/src/executable.rs @@ -8,10 +8,16 @@ use crate::ctx::WorkflowCtx; /// Signifies a retryable executable entity in a workflow. For example: activity, tuple of activities (join), /// closure. #[async_trait] -pub trait Executable: Send { +pub trait Executable: Send + Sized { type Output: Send; async fn execute(self, ctx: &mut WorkflowCtx) -> GlobalResult; + /// In the event that an executable has multiple sub executables (i.e. a tuple of executables), this can + /// be implemented to provide the ability to choose whether to use `tokio::join!` or tokio::try_join!` + /// internally. Default implementation just calls `execute`. + async fn try_execute(self, ctx: &mut WorkflowCtx) -> GlobalResult { + self.execute(ctx).await + } } pub type AsyncResult<'a, T> = Pin> + Send + 'a>>; @@ -59,6 +65,23 @@ macro_rules! impl_tuple { // Handle errors here instead Ok(($($args?),*)) } + + async fn try_execute(self, ctx: &mut WorkflowCtx) -> GlobalResult { + #[allow(non_snake_case)] + let ($($args),*) = self; + + #[allow(non_snake_case)] + let ($(mut $args),*) = ($( + TupleHelper { + branch: ctx.step(), + exec: $args, + } + ),*); + + tokio::try_join!( + $($args.exec.execute(&mut $args.branch)),* + ) + } } } } diff --git a/svc/Cargo.lock b/svc/Cargo.lock index b91948c6f..a44da54e0 100644 --- a/svc/Cargo.lock +++ b/svc/Cargo.lock @@ -944,7 +944,7 @@ dependencies = [ "thiserror", "token-create", "tokio", - "tokio-tungstenite", + "tokio-tungstenite 0.21.0", "tracing", "tracing-subscriber", "trust-dns-resolver", @@ -6474,6 +6474,15 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" +[[package]] +name = "pegboard" +version = "0.0.1" +dependencies = [ + "chirp-workflow", + "serde", + "tokio-tungstenite 0.23.1", +] + [[package]] name = "pem" version = "1.1.1" @@ -9351,7 +9360,19 @@ dependencies = [ "native-tls", "tokio", "tokio-native-tls", - "tungstenite", + "tungstenite 0.21.0", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6989540ced10490aaf14e6bad2e3d33728a2813310a0c71d1574304c49631cd" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.23.0", ] [[package]] @@ -9656,6 +9677,24 @@ dependencies = [ "utf-8", ] +[[package]] +name = "tungstenite" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e2e2ce1e47ed2994fd43b04c8f618008d4cabdd5ee34027cf14f9d918edd9c8" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.1.0", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "utf-8", +] + [[package]] name = "typenum" version = "1.17.0" diff --git a/svc/Cargo.toml b/svc/Cargo.toml index cd9a1ab6e..db74fb885 100644 --- a/svc/Cargo.toml +++ b/svc/Cargo.toml @@ -178,6 +178,7 @@ members = [ "pkg/monolith/standalone/workflow-worker", "pkg/nomad/standalone/monitor", "pkg/nsfw/ops/image-score", + "pkg/pegboard", "pkg/perf/ops/log-get", "pkg/profanity/ops/check", "pkg/region/ops/get", diff --git a/svc/pkg/pegboard/Cargo.toml b/svc/pkg/pegboard/Cargo.toml new file mode 100644 index 000000000..0650aa1ac --- /dev/null +++ b/svc/pkg/pegboard/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "pegboard" +version = "0.0.1" +edition = "2018" +authors = ["Rivet Gaming, LLC "] +license = "Apache-2.0" + +[dependencies] +chirp-workflow = { path = "../../../lib/chirp-workflow/core" } +tokio-tungstenite = "0.23.1" +serde = { version = "1.0.198", features = ["derive"] } diff --git a/svc/pkg/pegboard/Service.toml b/svc/pkg/pegboard/Service.toml new file mode 100644 index 000000000..e38b34b82 --- /dev/null +++ b/svc/pkg/pegboard/Service.toml @@ -0,0 +1,7 @@ +[service] +name = "pegboard" + +[runtime] +kind = "rust" + +[package] diff --git a/svc/pkg/pegboard/src/lib.rs b/svc/pkg/pegboard/src/lib.rs new file mode 100644 index 000000000..b64c5396e --- /dev/null +++ b/svc/pkg/pegboard/src/lib.rs @@ -0,0 +1,14 @@ +use chirp_workflow::prelude::*; + +pub mod ops; +pub mod utils; +pub mod workflows; + +pub fn registry() -> WorkflowResult { + use workflows::*; + + let mut registry = Registry::new(); + registry.register_workflow::()?; + + Ok(registry) +} diff --git a/svc/pkg/pegboard/src/ops/mod.rs b/svc/pkg/pegboard/src/ops/mod.rs new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/svc/pkg/pegboard/src/ops/mod.rs @@ -0,0 +1 @@ + diff --git a/svc/pkg/pegboard/src/utils.rs b/svc/pkg/pegboard/src/utils.rs new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/svc/pkg/pegboard/src/utils.rs @@ -0,0 +1 @@ + diff --git a/svc/pkg/pegboard/src/workflows/mod.rs b/svc/pkg/pegboard/src/workflows/mod.rs new file mode 100644 index 000000000..6757c9967 --- /dev/null +++ b/svc/pkg/pegboard/src/workflows/mod.rs @@ -0,0 +1 @@ +pub mod ws; diff --git a/svc/pkg/pegboard/src/workflows/ws.rs b/svc/pkg/pegboard/src/workflows/ws.rs new file mode 100644 index 000000000..ed9121140 --- /dev/null +++ b/svc/pkg/pegboard/src/workflows/ws.rs @@ -0,0 +1,81 @@ +use std::{ + collections::HashMap, + net::SocketAddr, + sync::{Arc, RwLock}, +}; + +use chirp_workflow::prelude::*; +use futures_util::FutureExt; +// use tokio::net::{TcpListener, TcpStream}; +// use tokio_tungstenite::tungstenite::protocol::Message; + +#[derive(Debug, Serialize, Deserialize)] +pub struct Input {} + +#[workflow] +pub async fn pegboard_ws(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> { + // let addr = "127.0.0.1:8080"; + // let listener = TcpListener::bind(&addr).await?; + // println!("Listening on: {}", addr); + + let conns = Arc::new(RwLock::new(HashMap::<(), ()>::new())); + + ctx.try_join(( + closure(|ctx| socket_thread(ctx, conns.clone()).boxed()), + closure(|ctx| signal_thread(ctx, conns.clone()).boxed()), + )) + .await?; + + Ok(()) +} + +async fn socket_thread( + ctx: &mut WorkflowCtx, + conns: Arc>>, +) -> GlobalResult<()> { + ctx.repeat(|ctx| { + async move { + if let Ok((stream, addr)) = listener.accept().await { + handle_connection(stream, addr).await; + } else { + tracing::error!("failed to connect websocket"); + } + + Ok(Loop::Continue) + }.boxed() + ) + + Ok(()) +} + +async fn signal_thread( + ctx: &mut WorkflowCtx, + conns: Arc>>, +) -> GlobalResult<()> { + Ok(()) +} + +async fn handle_connection(ctx, raw_stream: TcpStream, addr: SocketAddr) { + ctx.spawn(|ctx| async move { + let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?; + let (mut write, mut read) = ws_stream.split(); + + println!("New WebSocket connection: {}", addr); + + while let Some(Ok(msg)) = read.next().await { + if msg.is_text() || msg.is_binary() { + write.send(msg).await?; + } + } + + Ok(()) + }.boxed()).await +} + +#[derive(Debug, Serialize, Deserialize, Hash)] +struct FooInput {} + +#[activity(Foo)] +async fn foo(ctx: &ActivityCtx, input: &FooInput) -> GlobalResult<()> { + Ok(()) +}