From 1a6a720dc9290f40bb71eb38f66bd5087d47ef42 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Sat, 4 May 2024 09:59:25 -0300 Subject: [PATCH] refactor: revisit feature flag naming & grouping --- Cargo.toml | 30 +++----- src/sinks/common/mod.rs | 1 - src/sinks/mod.rs | 156 +++++++++++++++++----------------------- src/sources/mod.rs | 36 +++++----- src/sources/s3.rs | 59 ++++++--------- 5 files changed, 114 insertions(+), 168 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 97cf696e..d9aefef6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,19 +13,14 @@ authors = ["Santiago Carmuega "] [features] deno = ["deno_runtime"] wasm = ["extism"] -sink-file-rotate = ["file-rotate"] -sink-webhook = ["reqwest"] -sink-rabbitmq = ["lapin"] -sink-kafka = ["kafka"] -sink-aws-sqs = ["aws-config", "aws-types", "aws-sdk-sqs"] -sink-aws-lambda = ["aws-config", "aws-types", "aws-sdk-lambda"] -sink-aws-s3 = ["aws-config", "aws-types", "aws-sdk-s3"] -sink-gcp-pubsub = ["google-cloud-pubsub", "google-cloud-googleapis", "google-cloud-default"] -sink-gcp-cloudfunction = ["reqwest", "jsonwebtoken"] -sink-redis = ["r2d2_redis"] -sink-elasticsearch = ["elasticsearch"] -source-utxorpc = ["tonic", "futures"] +aws = ["aws-config", "aws-types", "aws-sdk-sqs", "aws-sdk-lambda", "aws-sdk-s3"] sql = ["sqlx"] +gcp = ["google-cloud-pubsub", "google-cloud-googleapis", "google-cloud-default", "jsonwebtoken"] +rabbitmq = ["lapin"] +redis = ["r2d2_redis"] +u5c = ["tonic"] +# elasticsearch = auto feature flag +# kafka = auto feature flag [dependencies] # pallas = "0.21.0" @@ -60,13 +55,15 @@ lazy_static = "1.4.0" tracing = "0.1.37" tracing-subscriber = "0.3.17" anyhow = "1.0.77" +file-rotate = { version = "0.7.5" } +reqwest = { version = "0.11", features = ["json", "multipart"] } tokio = { version = "1", features = ["rt", "rt-multi-thread"] } async-trait = "0.1.68" + elasticsearch = { version = "8.5.0-alpha.1", optional = true } murmur3 = { version = "0.5.2", optional = true } openssl = { version = "0.10", optional = true, features = ["vendored"] } lapin = { version = "2.2.1", optional = true } -reqwest = { version = "0.11", features = ["json", "multipart"], optional = true } kafka = { version = "0.9.0", optional = true } google-cloud-pubsub = { version = "0.16.0", optional = true } google-cloud-googleapis = { version = "0.10.0", optional = true } @@ -74,19 +71,12 @@ google-cloud-default = { version = "0.4.0", optional = true, features = ["pubsub r2d2_redis = { version = "0.14.0", optional = true } deno_runtime = { version = "0.126.0", optional = true } jsonwebtoken = { version = "8.3.0", optional = true } -file-rotate = { version = "0.7.5", optional = true } tonic = { version = "0.9.2", features = ["tls", "tls-roots"], optional = true } futures = { version = "0.3.28", optional = true } - -# sql sqlx = { version = "0.7", features = ["runtime-tokio", "tls-native-tls", "any", "sqlite", "postgres"], optional = true } - -# aws aws-config = { version = "^1.1", optional = true } aws-types = { version = "^1.1", optional = true } aws-sdk-s3 = { version = "^1.1", optional = true } aws-sdk-sqs = { version = "^1.1", optional = true } aws-sdk-lambda = { version = "^1.1", optional = true } - -# wasm extism = { version = "1.2.0", optional = true } diff --git a/src/sinks/common/mod.rs b/src/sinks/common/mod.rs index 5be854e9..fdd46037 100644 --- a/src/sinks/common/mod.rs +++ b/src/sinks/common/mod.rs @@ -1,2 +1 @@ -#[cfg(any(feature = "sink-webhook", feature = "sink-gcp-cloudfunction"))] pub mod web; diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index 00d98bec..faae723d 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -5,41 +5,37 @@ use crate::framework::*; mod assert; mod common; +mod file_rotate; mod noop; mod stdout; mod terminal; - -#[cfg(feature = "sink-file-rotate")] -mod file_rotate; - -#[cfg(feature = "sink-webhook")] mod webhook; -#[cfg(feature = "sink-rabbitmq")] +#[cfg(feature = "rabbitmq")] mod rabbitmq; -#[cfg(feature = "sink-kafka")] +#[cfg(feature = "kafka")] mod kafka; -#[cfg(feature = "sink-aws-sqs")] +#[cfg(feature = "aws")] mod aws_sqs; -#[cfg(feature = "sink-aws-lambda")] +#[cfg(feature = "aws")] mod aws_lambda; -#[cfg(feature = "sink-aws-s3")] +#[cfg(feature = "aws")] mod aws_s3; -#[cfg(feature = "sink-gcp-pubsub")] +#[cfg(feature = "gcp")] mod gcp_pubsub; -#[cfg(feature = "sink-gcp-cloudfunction")] +#[cfg(feature = "gcp")] mod gcp_cloudfunction; -#[cfg(feature = "sink-redis")] +#[cfg(feature = "redis")] mod redis; -#[cfg(feature = "sink-elasticsearch")] +#[cfg(feature = "elasticsearch")] mod elasticsearch; #[cfg(feature = "sql")] @@ -50,38 +46,34 @@ pub enum Bootstrapper { Stdout(stdout::Stage), Noop(noop::Stage), Assert(assert::Stage), - - #[cfg(feature = "sink-file-rotate")] FileRotate(file_rotate::Stage), - - #[cfg(feature = "sink-webhook")] WebHook(webhook::Stage), - #[cfg(feature = "sink-rabbitmq")] + #[cfg(feature = "rabbitmq")] Rabbitmq(rabbitmq::Stage), - #[cfg(feature = "sink-kafka")] + #[cfg(feature = "kafka")] Kafka(kafka::Stage), - #[cfg(feature = "sink-aws-sqs")] + #[cfg(feature = "aws")] AwsSqs(aws_sqs::Stage), - #[cfg(feature = "sink-aws-lambda")] + #[cfg(feature = "aws")] AwsLambda(aws_lambda::Stage), - #[cfg(feature = "sink-aws-s3")] + #[cfg(feature = "aws")] AwsS3(aws_s3::Stage), - #[cfg(feature = "sink-gcp-pubsub")] + #[cfg(feature = "gcp")] GcpPubSub(gcp_pubsub::Stage), - #[cfg(feature = "sink-gcp-cloudfunction")] + #[cfg(feature = "gcp")] GcpCloudFunction(gcp_cloudfunction::Stage), - #[cfg(feature = "sink-redis")] + #[cfg(feature = "redis")] Redis(redis::Stage), - #[cfg(feature = "sink-elasticsearch")] + #[cfg(feature = "elasticsearch")] ElasticSearch(elasticsearch::Stage), #[cfg(feature = "sql")] @@ -95,38 +87,34 @@ impl Bootstrapper { Bootstrapper::Stdout(p) => &mut p.input, Bootstrapper::Noop(p) => &mut p.input, Bootstrapper::Assert(p) => &mut p.input, - - #[cfg(feature = "sink-file-rotate")] Bootstrapper::FileRotate(p) => &mut p.input, - - #[cfg(feature = "sink-webhook")] Bootstrapper::WebHook(p) => &mut p.input, - #[cfg(feature = "sink-rabbitmq")] + #[cfg(feature = "rabbitmq")] Bootstrapper::Rabbitmq(p) => &mut p.input, - #[cfg(feature = "sink-kafka")] + #[cfg(feature = "kafka")] Bootstrapper::Kafka(p) => &mut p.input, - #[cfg(feature = "sink-aws-sqs")] + #[cfg(feature = "aws")] Bootstrapper::AwsSqs(p) => &mut p.input, - #[cfg(feature = "sink-aws-lambda")] + #[cfg(feature = "aws")] Bootstrapper::AwsLambda(p) => &mut p.input, - #[cfg(feature = "sink-aws-s3")] + #[cfg(feature = "aws")] Bootstrapper::AwsS3(p) => &mut p.input, - #[cfg(feature = "sink-gcp-pubsub")] + #[cfg(feature = "gcp")] Bootstrapper::GcpPubSub(p) => &mut p.input, - #[cfg(feature = "sink-gcp-cloudfunction")] + #[cfg(feature = "gcp")] Bootstrapper::GcpCloudFunction(p) => &mut p.input, - #[cfg(feature = "sink-redis")] + #[cfg(feature = "redis")] Bootstrapper::Redis(p) => &mut p.input, - #[cfg(feature = "sink-elasticsearch")] + #[cfg(feature = "elasticsearch")] Bootstrapper::ElasticSearch(p) => &mut p.input, #[cfg(feature = "sql")] @@ -140,38 +128,34 @@ impl Bootstrapper { Bootstrapper::Stdout(p) => &mut p.cursor, Bootstrapper::Noop(p) => &mut p.cursor, Bootstrapper::Assert(p) => &mut p.cursor, - - #[cfg(feature = "sink-file-rotate")] Bootstrapper::FileRotate(p) => &mut p.cursor, - - #[cfg(feature = "sink-webhook")] Bootstrapper::WebHook(p) => &mut p.cursor, - #[cfg(feature = "sink-rabbitmq")] + #[cfg(feature = "rabbitmq")] Bootstrapper::Rabbitmq(p) => &mut p.cursor, - #[cfg(feature = "sink-kafka")] + #[cfg(feature = "kafka")] Bootstrapper::Kafka(p) => &mut p.cursor, - #[cfg(feature = "sink-aws-sqs")] + #[cfg(feature = "aws")] Bootstrapper::AwsSqs(p) => &mut p.cursor, - #[cfg(feature = "sink-aws-lambda")] + #[cfg(feature = "aws")] Bootstrapper::AwsLambda(p) => &mut p.cursor, - #[cfg(feature = "sink-aws-s3")] + #[cfg(feature = "aws")] Bootstrapper::AwsS3(p) => &mut p.cursor, - #[cfg(feature = "sink-gcp-pubsub")] + #[cfg(feature = "gcp")] Bootstrapper::GcpPubSub(p) => &mut p.cursor, - #[cfg(feature = "sink-gcp-cloudfunction")] + #[cfg(feature = "gcp")] Bootstrapper::GcpCloudFunction(p) => &mut p.cursor, - #[cfg(feature = "sink-redis")] + #[cfg(feature = "redis")] Bootstrapper::Redis(p) => &mut p.cursor, - #[cfg(feature = "sink-elasticsearch")] + #[cfg(feature = "elasticsearch")] Bootstrapper::ElasticSearch(p) => &mut p.cursor, #[cfg(feature = "sql")] @@ -185,38 +169,34 @@ impl Bootstrapper { Bootstrapper::Stdout(x) => gasket::runtime::spawn_stage(x, policy), Bootstrapper::Noop(x) => gasket::runtime::spawn_stage(x, policy), Bootstrapper::Assert(x) => gasket::runtime::spawn_stage(x, policy), - - #[cfg(feature = "sink-file-rotate")] Bootstrapper::FileRotate(x) => gasket::runtime::spawn_stage(x, policy), - - #[cfg(feature = "sink-webhook")] Bootstrapper::WebHook(x) => gasket::runtime::spawn_stage(x, policy), - #[cfg(feature = "sink-rabbitmq")] + #[cfg(feature = "rabbitmq")] Bootstrapper::Rabbitmq(x) => gasket::runtime::spawn_stage(x, policy), - #[cfg(feature = "sink-kafka")] + #[cfg(feature = "kafka")] Bootstrapper::Kafka(x) => gasket::runtime::spawn_stage(x, policy), - #[cfg(feature = "sink-aws-sqs")] + #[cfg(feature = "aws")] Bootstrapper::AwsSqs(x) => gasket::runtime::spawn_stage(x, policy), - #[cfg(feature = "sink-aws-lambda")] + #[cfg(feature = "aws")] Bootstrapper::AwsLambda(x) => gasket::runtime::spawn_stage(x, policy), - #[cfg(feature = "sink-aws-s3")] + #[cfg(feature = "aws")] Bootstrapper::AwsS3(x) => gasket::runtime::spawn_stage(x, policy), - #[cfg(feature = "sink-gcp-pubsub")] + #[cfg(feature = "gcp")] Bootstrapper::GcpPubSub(x) => gasket::runtime::spawn_stage(x, policy), - #[cfg(feature = "sink-gcp-cloudfunction")] + #[cfg(feature = "gcp")] Bootstrapper::GcpCloudFunction(x) => gasket::runtime::spawn_stage(x, policy), - #[cfg(feature = "sink-redis")] + #[cfg(feature = "redis")] Bootstrapper::Redis(x) => gasket::runtime::spawn_stage(x, policy), - #[cfg(feature = "sink-elasticsearch")] + #[cfg(feature = "elasticsearch")] Bootstrapper::ElasticSearch(x) => gasket::runtime::spawn_stage(x, policy), #[cfg(feature = "sql")] @@ -232,38 +212,34 @@ pub enum Config { Stdout(stdout::Config), Noop(noop::Config), Assert(assert::Config), - - #[cfg(feature = "sink-file-rotate")] FileRotate(file_rotate::Config), - - #[cfg(feature = "sink-webhook")] WebHook(webhook::Config), - #[cfg(feature = "sink-rabbitmq")] + #[cfg(feature = "rabbitmq")] Rabbitmq(rabbitmq::Config), - #[cfg(feature = "sink-kafka")] + #[cfg(feature = "kafka")] Kafka(kafka::Config), - #[cfg(feature = "sink-aws-sqs")] + #[cfg(feature = "aws")] AwsSqs(aws_sqs::Config), - #[cfg(feature = "sink-aws-lambda")] + #[cfg(feature = "aws")] AwsLambda(aws_lambda::Config), - #[cfg(feature = "sink-aws-s3")] + #[cfg(feature = "aws")] AwsS3(aws_s3::Config), - #[cfg(feature = "sink-gcp-pubsub")] + #[cfg(feature = "gcp")] GcpPubSub(gcp_pubsub::Config), - #[cfg(feature = "sink-gcp-cloudfunction")] + #[cfg(feature = "gcp")] GcpCloudFunction(gcp_cloudfunction::Config), - #[cfg(feature = "sink-redis")] + #[cfg(feature = "redis")] Redis(redis::Config), - #[cfg(feature = "sink-elasticsearch")] + #[cfg(feature = "elasticsearch")] ElasticSearch(elasticsearch::Config), #[cfg(feature = "sql")] @@ -277,38 +253,34 @@ impl Config { Config::Stdout(c) => Ok(Bootstrapper::Stdout(c.bootstrapper(ctx)?)), Config::Noop(c) => Ok(Bootstrapper::Noop(c.bootstrapper(ctx)?)), Config::Assert(c) => Ok(Bootstrapper::Assert(c.bootstrapper(ctx)?)), - - #[cfg(feature = "sink-file-rotate")] Config::FileRotate(c) => Ok(Bootstrapper::FileRotate(c.bootstrapper(ctx)?)), - - #[cfg(feature = "sink-webhook")] Config::WebHook(c) => Ok(Bootstrapper::WebHook(c.bootstrapper(ctx)?)), - #[cfg(feature = "sink-rabbitmq")] + #[cfg(feature = "rabbitmq")] Config::Rabbitmq(c) => Ok(Bootstrapper::Rabbitmq(c.bootstrapper(ctx)?)), - #[cfg(feature = "sink-kafka")] + #[cfg(feature = "kafka")] Config::Kafka(c) => Ok(Bootstrapper::Kafka(c.bootstrapper(ctx)?)), - #[cfg(feature = "sink-aws-sqs")] + #[cfg(feature = "aws")] Config::AwsSqs(c) => Ok(Bootstrapper::AwsSqs(c.bootstrapper(ctx)?)), - #[cfg(feature = "sink-aws-lambda")] + #[cfg(feature = "aws")] Config::AwsLambda(c) => Ok(Bootstrapper::AwsLambda(c.bootstrapper(ctx)?)), - #[cfg(feature = "sink-aws-s3")] + #[cfg(feature = "aws")] Config::AwsS3(c) => Ok(Bootstrapper::AwsS3(c.bootstrapper(ctx)?)), - #[cfg(feature = "sink-gcp-pubsub")] + #[cfg(feature = "gcp")] Config::GcpPubSub(c) => Ok(Bootstrapper::GcpPubSub(c.bootstrapper(ctx)?)), - #[cfg(feature = "sink-gcp-cloudfunction")] + #[cfg(feature = "gcp")] Config::GcpCloudFunction(c) => Ok(Bootstrapper::GcpCloudFunction(c.bootstrapper(ctx)?)), - #[cfg(feature = "sink-redis")] + #[cfg(feature = "redis")] Config::Redis(c) => Ok(Bootstrapper::Redis(c.bootstrapper(ctx)?)), - #[cfg(feature = "sink-elasticsearch")] + #[cfg(feature = "elasticsearch")] Config::ElasticSearch(c) => Ok(Bootstrapper::ElasticSearch(c.bootstrapper(ctx)?)), #[cfg(feature = "sql")] diff --git a/src/sources/mod.rs b/src/sources/mod.rs index 35410cd6..8b0975af 100644 --- a/src/sources/mod.rs +++ b/src/sources/mod.rs @@ -9,23 +9,23 @@ use crate::framework::*; pub mod n2c; pub mod n2n; +#[cfg(feature = "u5c")] +pub mod utxorpc; + #[cfg(feature = "aws")] pub mod s3; -#[cfg(feature = "source-utxorpc")] -pub mod utxorpc; - pub enum Bootstrapper { N2N(n2n::Stage), #[cfg(target_family = "unix")] N2C(n2c::Stage), + #[cfg(feature = "u5c")] + UtxoRPC(utxorpc::Stage), + #[cfg(feature = "aws")] S3(s3::Stage), - - #[cfg(feature = "source-utxorpc")] - UtxoRPC(utxorpc::Stage), } impl Bootstrapper { @@ -36,11 +36,11 @@ impl Bootstrapper { #[cfg(target_family = "unix")] Bootstrapper::N2C(p) => &mut p.output, + #[cfg(feature = "u5c")] + Bootstrapper::UtxoRPC(p) => &mut p.output, + #[cfg(feature = "aws")] Bootstrapper::S3(p) => &mut p.output, - - #[cfg(feature = "source-utxorpc")] - Bootstrapper::UtxoRPC(p) => &mut p.output, } } @@ -51,11 +51,11 @@ impl Bootstrapper { #[cfg(target_family = "unix")] Bootstrapper::N2C(x) => gasket::runtime::spawn_stage(x, policy), + #[cfg(feature = "u5c")] + Bootstrapper::UtxoRPC(x) => gasket::runtime::spawn_stage(x, policy), + #[cfg(feature = "aws")] Bootstrapper::S3(x) => gasket::runtime::spawn_stage(x, policy), - - #[cfg(feature = "source-utxorpc")] - Bootstrapper::UtxoRPC(x) => gasket::runtime::spawn_stage(x, policy), } } } @@ -68,11 +68,11 @@ pub enum Config { #[cfg(target_family = "unix")] N2C(n2c::Config), + #[cfg(feature = "u5c")] + UtxoRPC(utxorpc::Config), + #[cfg(feature = "aws")] S3(s3::Config), - - #[cfg(feature = "source-utxorpc")] - UtxoRPC(utxorpc::Config), } impl Config { @@ -83,11 +83,11 @@ impl Config { #[cfg(target_family = "unix")] Config::N2C(c) => Ok(Bootstrapper::N2C(c.bootstrapper(ctx)?)), + #[cfg(feature = "u5c")] + Config::UtxoRPC(c) => Ok(Bootstrapper::UtxoRPC(c.bootstrapper(ctx)?)), + #[cfg(feature = "aws")] Config::S3(c) => Ok(Bootstrapper::S3(c.bootstrapper(ctx)?)), - - #[cfg(feature = "source-utxorpc")] - Config::UtxoRPC(c) => Ok(Bootstrapper::UtxoRPC(c.bootstrapper(ctx)?)), } } } diff --git a/src/sources/s3.rs b/src/sources/s3.rs index f02584e7..4066e0cc 100644 --- a/src/sources/s3.rs +++ b/src/sources/s3.rs @@ -1,18 +1,20 @@ +use aws_config::BehaviorVersion; use aws_sdk_s3::Client as S3Client; use gasket::framework::*; -use gasket::messaging::SendPort; use serde::Deserialize; use crate::framework::*; #[derive(Stage)] +#[stage(name = "source", unit = "KeyBatch", worker = "Worker")] #[stage(name = "source-s3")] pub struct Stage { bucket: String, items_per_batch: u32, - cursor: Cursor, - retry_policy: gasket::retries::Policy, + intersect: IntersectConfig, + + breadcrumbs: Breadcrumbs, pub output: SourceOutputPort, @@ -20,16 +22,6 @@ pub struct Stage { ops_count: gasket::metrics::Counter, } -impl gasket::framework::Stage for Stage { - fn policy(&self) -> gasket::runtime::Policy { - gasket::runtime::Policy { - work_retry: self.retry_policy.clone(), - bootstrap_retry: self.retry_policy.clone(), - ..Default::default() - } - } -} - pub struct Worker { s3_client: S3Client, last_key: String, @@ -40,20 +32,21 @@ pub struct KeyBatch { } #[async_trait::async_trait(?Send)] -impl gasket::framework::Worker for Worker { - type Unit = KeyBatch; - type Stage = Stage; - - async fn bootstrap(stage: &Self::Stage) -> Result { - let sdk_config = aws_config::load_from_env().await; +impl gasket::framework::Worker for Worker { + async fn bootstrap(stage: &Stage) -> Result { + let sdk_config = aws_config::load_defaults(BehaviorVersion::latest()).await; let s3_client = aws_sdk_s3::Client::new(&sdk_config); - let p = stage - .cursor - .latest_known_point() + let breadcrumbs = stage.breadcrumbs.points(); + let intersect = stage.intersect.points(); + + let point = breadcrumbs + .last() + .cloned() + .or_else(|| intersect.and_then(|p| p.last().cloned())) .unwrap_or(pallas::network::miniprotocols::Point::Origin); - let key = match p { + let key = match point { pallas::network::miniprotocols::Point::Origin => "origin".to_owned(), pallas::network::miniprotocols::Point::Specific(slot, _) => format!("{slot}"), }; @@ -64,10 +57,7 @@ impl gasket::framework::Worker for Worker { }) } - async fn schedule( - &mut self, - stage: &mut Self::Stage, - ) -> Result, WorkerError> { + async fn schedule(&mut self, stage: &mut Stage) -> Result, WorkerError> { let result = self .s3_client .list_objects_v2() @@ -88,11 +78,7 @@ impl gasket::framework::Worker for Worker { Ok(WorkSchedule::Unit(KeyBatch { keys })) } - async fn execute( - &mut self, - unit: &Self::Unit, - stage: &mut Self::Stage, - ) -> Result<(), WorkerError> { + async fn execute(&mut self, unit: &KeyBatch, stage: &mut Stage) -> Result<(), WorkerError> { for key in &unit.keys { let object = self .s3_client @@ -125,7 +111,7 @@ impl gasket::framework::Worker for Worker { let event = ChainEvent::Apply(point, Record::CborBlock(body.into_bytes().to_vec())); - stage.output_port.send(event.into()).await.or_panic()?; + stage.output.send(event.into()).await.or_panic()?; } Ok(()) @@ -136,7 +122,6 @@ impl gasket::framework::Worker for Worker { pub struct Config { bucket: String, items_per_batch: u32, - retry_policy: gasket::retries::Policy, } impl Config { @@ -144,9 +129,9 @@ impl Config { let stage = Stage { bucket: self.bucket, items_per_batch: self.items_per_batch, - retry_policy: self.retry_policy, - cursor: ctx.cursor.clone(), - output_port: Default::default(), + breadcrumbs: ctx.breadcrumbs.clone(), + intersect: ctx.intersect.clone(), + output: Default::default(), ops_count: Default::default(), };