From 123cf97c0ed5b0c815acbe9a855d5e32c19ac7ca Mon Sep 17 00:00:00 2001 From: "Heinz N. Gies" Date: Mon, 8 May 2023 14:36:53 +0200 Subject: [PATCH] Update tests and context passing Signed-off-by: Heinz N. Gies --- src/config.rs | 2 +- src/connectors.rs | 3 +- src/connectors/impls/elastic.rs | 4 +- src/connectors/impls/gbq/writer/sink.rs | 31 ++++++------ src/connectors/impls/gcl/writer/sink.rs | 4 +- src/connectors/impls/gcs/streamer.rs | 12 ++--- src/connectors/impls/kafka.rs | 2 +- src/connectors/sink.rs | 6 +-- src/connectors/source.rs | 43 +++++++++++------ src/connectors/tests/http/client.rs | 2 +- src/connectors/tests/kafka/consumer.rs | 6 +-- src/connectors/utils/reconnect.rs | 5 ++ src/preprocessor.rs | 63 +++++++++++++------------ src/raft/test.rs | 6 +-- src/raft/test/learner.rs | 4 +- src/system/flow.rs | 7 +-- 16 files changed, 107 insertions(+), 93 deletions(-) diff --git a/src/config.rs b/src/config.rs index dafadc46fd..40afc1130e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -385,7 +385,7 @@ mod tests { let id = Alias::new("my_id"); let res = Connector::from_config(&id, "fancy_schmancy".into(), &config); assert!(res.is_err()); - assert_eq!(String::from("Invalid Definition for connector \"app/flow::my_id\": Expected type I64 for key metrics_interval_s but got String"), res.err().map(|e| e.to_string()).unwrap_or_default()); + assert_eq!(String::from("Invalid Definition for connector \"my_id\": Expected type I64 for key metrics_interval_s but got String"), res.err().map(|e| e.to_string()).unwrap_or_default()); } #[test] diff --git a/src/connectors.rs b/src/connectors.rs index 33fd987ef5..534b9308ae 100644 --- a/src/connectors.rs +++ b/src/connectors.rs @@ -48,7 +48,6 @@ use crate::{ log_error, pipeline, qsize, raft, system::{flow::AppContext, KillSwitch, Runtime}, }; -use beef::Cow; use futures::Future; use halfbrown::HashMap; use simd_json::{Builder, Mutable, ValueAccess}; @@ -315,7 +314,7 @@ pub(crate) trait Context: Display + Clone { 'ct: 'event, { let t: &str = self.connector_type().into(); - event_meta.get(&Cow::borrowed(t)) + event_meta.get(t) } } diff --git a/src/connectors/impls/elastic.rs b/src/connectors/impls/elastic.rs index 157882c405..b9efd74480 100644 --- a/src/connectors/impls/elastic.rs +++ b/src/connectors/impls/elastic.rs @@ -971,9 +971,7 @@ mod tests { let connector_config = ConnectorConfig::from_config(&alias, builder.connector_type(), &config)?; assert_eq!( - String::from( - "Invalid Definition for connector \"app/flow::my_elastic\": empty nodes provided" - ), + String::from("Invalid Definition for connector \"my_elastic\": empty nodes provided"), builder .build(&alias, &connector_config,) .await diff --git a/src/connectors/impls/gbq/writer/sink.rs b/src/connectors/impls/gbq/writer/sink.rs index 9ae1d20f5f..be8e34efb9 100644 --- a/src/connectors/impls/gbq/writer/sink.rs +++ b/src/connectors/impls/gbq/writer/sink.rs @@ -781,7 +781,7 @@ mod test { precision: 0, scale: 0, }], - &SinkContext::dummy(), + &SinkContext::dummy("gbq_writer"), ); assert_eq!(result.0.field.len(), 0); @@ -802,7 +802,7 @@ mod test { precision: 0, scale: 0, }], - &SinkContext::dummy(), + &SinkContext::dummy("gbq_writer"), ); assert_eq!(result.0.field.len(), 0); @@ -832,7 +832,7 @@ mod test { precision: 0, scale: 0, }], - &SinkContext::dummy(), + &SinkContext::dummy("gbq_writer"), ); assert_eq!(result.1.len(), 1); @@ -864,7 +864,7 @@ mod test { precision: 0, scale: 0, }], - &SinkContext::dummy(), + &SinkContext::dummy("gbq_writer"), ); assert_eq!(result.1.len(), 1); @@ -1040,9 +1040,8 @@ mod test { let mut result = Vec::new(); assert!(encode_field(&value, &field, &mut result).is_ok()); - // json is currently not supported, so we expect the field to be skipped - assert_eq!([] as [u8; 0], result[..]); + assert_eq!([10, 2, 123, 125] as [u8; 4], result[..]); } #[test] @@ -1079,7 +1078,7 @@ mod test { #[test] pub fn mapping_generates_a_correct_descriptor() { - let ctx = SinkContext::dummy(); + let ctx = SinkContext::dummy("gbq_writer"); let mapping = JsonToProtobufMapping::new( &vec![ TableFieldSchema { @@ -1120,7 +1119,7 @@ mod test { #[test] pub fn can_map_json_to_protobuf() -> Result<()> { - let ctx = SinkContext::dummy(); + let ctx = SinkContext::dummy("gbq_writer"); let mapping = JsonToProtobufMapping::new( &vec![ TableFieldSchema { @@ -1157,7 +1156,7 @@ mod test { #[test] fn map_field_ignores_fields_that_are_not_in_definition() -> Result<()> { - let ctx = SinkContext::dummy(); + let ctx = SinkContext::dummy("gbq_writer"); let mapping = JsonToProtobufMapping::new( &vec![ TableFieldSchema { @@ -1195,7 +1194,7 @@ mod test { #[test] fn map_field_ignores_struct_fields_that_are_not_in_definition() -> Result<()> { - let ctx = SinkContext::dummy(); + let ctx = SinkContext::dummy("gbq_writer"); let mapping = JsonToProtobufMapping::new( &vec![TableFieldSchema { name: "a".to_string(), @@ -1231,7 +1230,7 @@ mod test { #[test] fn fails_on_bytes_type_mismatch() { - let ctx = SinkContext::dummy(); + let ctx = SinkContext::dummy("gbq_writer"); let mapping = JsonToProtobufMapping::new( &vec![TableFieldSchema { name: "a".to_string(), @@ -1258,7 +1257,7 @@ mod test { #[test] fn fails_if_the_event_is_not_an_object() { - let ctx = SinkContext::dummy(); + let ctx = SinkContext::dummy("gbq_writer"); let mapping = JsonToProtobufMapping::new( &vec![TableFieldSchema { name: "a".to_string(), @@ -1311,7 +1310,7 @@ mod test { .on_event( "", Event::signal_tick(), - &SinkContext::dummy(), + &SinkContext::dummy("gbq_writer"), &mut EventSerializer::dummy(None)?, 0, ) @@ -1340,7 +1339,7 @@ mod test { .on_event( "", Event::signal_tick(), - &SinkContext::dummy(), + &SinkContext::dummy("gbq_writer"), &mut EventSerializer::dummy(None)?, 0, ) @@ -1401,7 +1400,7 @@ mod test { }), ); - let ctx = SinkContext::dummy(); + let ctx = SinkContext::dummy("gbq_writer"); sink.connect(&ctx, &Attempt::default()).await?; @@ -1471,7 +1470,7 @@ mod test { }), ); - let ctx = SinkContext::dummy(); + let ctx = SinkContext::dummy("gbq_writer"); sink.connect(&ctx, &Attempt::default()).await?; diff --git a/src/connectors/impls/gcl/writer/sink.rs b/src/connectors/impls/gcl/writer/sink.rs index 2a45a8990a..6954d08391 100644 --- a/src/connectors/impls/gcl/writer/sink.rs +++ b/src/connectors/impls/gcl/writer/sink.rs @@ -356,7 +356,7 @@ mod test { tx, MockChannelFactory, ); - let sink_context = SinkContext::dummy(); + let sink_context = SinkContext::dummy("gcl_writer"); sink.connect(&sink_context, &Attempt::default()).await?; @@ -439,7 +439,7 @@ mod test { .on_event( "", Event::signal_tick(), - &SinkContext::dummy(), + &SinkContext::dummy("gcl_writer"), &mut EventSerializer::dummy(None)?, 0, ) diff --git a/src/connectors/impls/gcs/streamer.rs b/src/connectors/impls/gcs/streamer.rs index f7e6400a1f..fa7d29de6e 100644 --- a/src/connectors/impls/gcs/streamer.rs +++ b/src/connectors/impls/gcs/streamer.rs @@ -560,7 +560,7 @@ pub(crate) mod tests { ChunkedBuffer, > = YoloSink::new(sink_impl); - let context = SinkContext::dummy(); + let context = SinkContext::dummy("gcs_streamer"); let mut serializer = EventSerializer::dummy(Some(CodecConfig::from("json")))?; // simulate sink lifecycle @@ -730,7 +730,7 @@ pub(crate) mod tests { ChunkedBuffer, > = YoloSink::new(sink_impl); - let context = SinkContext::dummy(); + let context = SinkContext::dummy("gcs_streamer"); let mut serializer = EventSerializer::dummy(Some(CodecConfig::from("json")))?; // simulate sink lifecycle @@ -888,7 +888,7 @@ pub(crate) mod tests { ChunkedBuffer, > = YoloSink::new(sink_impl); - let context = SinkContext::dummy(); + let context = SinkContext::dummy("gcs_streamer"); let mut serializer = EventSerializer::dummy(Some(CodecConfig::from("json")))?; // simulate sink lifecycle @@ -967,7 +967,7 @@ pub(crate) mod tests { ChunkedBuffer, > = YoloSink::new(sink_impl); - let context = SinkContext::dummy(); + let context = SinkContext::dummy("gcs_streamer"); // simulate sink lifecycle sink.on_start(&context).await?; @@ -1011,7 +1011,7 @@ pub(crate) mod tests { ChunkedBuffer, > = ConsistentSink::new(sink_impl); - let context = SinkContext::dummy(); + let context = SinkContext::dummy("gcs_streamer"); let mut serializer = EventSerializer::dummy(Some(CodecConfig::from("json")))?; // simulate standard sink lifecycle @@ -1220,7 +1220,7 @@ pub(crate) mod tests { ChunkedBuffer, > = ConsistentSink::new(sink_impl); - let context = SinkContext::dummy(); + let context = SinkContext::dummy("gcs_streamer"); let mut serializer = EventSerializer::dummy(Some(CodecConfig::from("json")))?; // simulate standard sink lifecycle diff --git a/src/connectors/impls/kafka.rs b/src/connectors/impls/kafka.rs index 8b4a6be26c..d4af399372 100644 --- a/src/connectors/impls/kafka.rs +++ b/src/connectors/impls/kafka.rs @@ -362,7 +362,7 @@ mod tests { &literal!({ "measurement": "kafka_consumer_stats", "tags": { - "connector": "app/fake::fake" + "connector": "fake" }, "fields": { "rx_msgs": 42, diff --git a/src/connectors/sink.rs b/src/connectors/sink.rs index aae84efe5d..b159a68fe0 100644 --- a/src/connectors/sink.rs +++ b/src/connectors/sink.rs @@ -284,12 +284,12 @@ pub(crate) struct SinkContextInner { pub(crate) struct SinkContext(Arc); impl SinkContext { #[cfg(test)] - pub(crate) fn dummy() -> Self { + pub(crate) fn dummy(ct: &str) -> Self { let (tx, _rx) = bounded(1024); Self::new( SinkUId::default(), - Alias::from("dummy"), - ConnectorType::from("dummy"), + Alias::from(ct), + ConnectorType::from(ct), QuiescenceBeacon::default(), ConnectionLostNotifier::new(tx), AppContext::default(), diff --git a/src/connectors/source.rs b/src/connectors/source.rs index e0f47f82f3..b76622d8b8 100644 --- a/src/connectors/source.rs +++ b/src/connectors/source.rs @@ -303,15 +303,28 @@ pub(crate) struct SourceContext { /// kill switch pub(crate) killswitch: KillSwitch, } + impl SourceContext { pub(crate) fn killswitch(&self) -> KillSwitch { self.killswitch.clone() } + #[cfg(test)] + pub(crate) fn dummy() -> Self { + SourceContext { + alias: Alias::new("test"), + uid: SourceUId::default(), + connector_type: ConnectorType::default(), + quiescence_beacon: QuiescenceBeacon::default(), + notifier: ConnectionLostNotifier::dummy(), + app_ctx: AppContext::default(), + killswitch: KillSwitch::dummy(), + } + } } impl Display for SourceContext { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}[Source::{}]", self.app_ctx, &self.alias) + write!(f, "[Source::{}::{}]", self.app_ctx, &self.alias) } } @@ -1030,7 +1043,7 @@ where let mut ingest_ns = nanotime(); if let Some(mut stream_state) = self.streams.end_stream(stream_id) { let results = build_last_events( - &self.ctx.alias, + &self.ctx, &mut stream_state, &mut ingest_ns, pull_id, @@ -1100,7 +1113,7 @@ where if let Some(stream) = stream { let stream_state = self.streams.get_or_create_stream(stream, &self.ctx)?; let results = build_events( - &self.ctx.alias, + &self.ctx, stream_state, &mut ingest_ns, pull_id, @@ -1127,7 +1140,7 @@ where let mut stream_state = self.streams.create_anonymous_stream(codec_overwrite)?; let meta = meta.unwrap_or_else(Value::object); let mut results = build_events( - &self.ctx.alias, + &self.ctx, &mut stream_state, &mut ingest_ns, pull_id, @@ -1139,7 +1152,7 @@ where ); // finish up the stream immediately let mut last_events = build_last_events( - &self.ctx.alias, + &self.ctx, &mut stream_state, &mut ingest_ns, pull_id, @@ -1259,7 +1272,7 @@ where /// preprocessor or codec errors are turned into events to the ERR port of the source/connector #[allow(clippy::too_many_arguments)] fn build_events( - alias: &Alias, + ctx: &SourceContext, stream_state: &mut StreamState, ingest_ns: &mut u64, pull_id: u64, @@ -1273,7 +1286,7 @@ fn build_events( stream_state.preprocessors.as_mut_slice(), ingest_ns, data, - alias, + ctx, ) { Ok(processed) => { let mut res = Vec::with_capacity(processed.len()); @@ -1293,7 +1306,7 @@ fn build_events( Err(None) => continue, Err(Some(e)) => ( ERR, - make_error(alias, &e, stream_state.stream_id, pull_id, meta.clone()), + make_error(ctx, &e, stream_state.stream_id, pull_id, meta.clone()), ), }; let event = build_event( @@ -1310,7 +1323,7 @@ fn build_events( } Err(e) => { // preprocessor error - let err_payload = make_error(alias, &e, stream_state.stream_id, pull_id, meta.clone()); + let err_payload = make_error(ctx, &e, stream_state.stream_id, pull_id, meta.clone()); let event = build_event( stream_state, pull_id, @@ -1328,7 +1341,7 @@ fn build_events( /// preprocessor or codec errors are turned into events to the ERR port of the source/connector #[allow(clippy::too_many_arguments)] fn build_last_events( - alias: &Alias, + ctx: &SourceContext, stream_state: &mut StreamState, ingest_ns: &mut u64, pull_id: u64, @@ -1337,7 +1350,7 @@ fn build_last_events( meta: &Value<'static>, is_transactional: bool, ) -> Vec<(Port<'static>, Event)> { - match finish(stream_state.preprocessors.as_mut_slice(), alias) { + match finish(stream_state.preprocessors.as_mut_slice(), ctx) { Ok(processed) => { let mut res = Vec::with_capacity(processed.len()); for chunk in processed { @@ -1356,7 +1369,7 @@ fn build_last_events( Err(None) => continue, Err(Some(e)) => ( ERR, - make_error(alias, &e, stream_state.stream_id, pull_id, meta.clone()), + make_error(ctx, &e, stream_state.stream_id, pull_id, meta.clone()), ), }; let event = build_event( @@ -1373,7 +1386,7 @@ fn build_last_events( } Err(e) => { // preprocessor error - let err_payload = make_error(alias, &e, stream_state.stream_id, pull_id, meta.clone()); + let err_payload = make_error(ctx, &e, stream_state.stream_id, pull_id, meta.clone()); let event = build_event( stream_state, pull_id, @@ -1389,7 +1402,7 @@ fn build_last_events( /// create an error payload fn make_error( - connector_alias: &Alias, + ctx: &SourceContext, error: &Error, stream_id: u64, pull_id: u64, @@ -1398,7 +1411,7 @@ fn make_error( let e_string = error.to_string(); let data = literal!({ "error": e_string.clone(), - "source": connector_alias.to_string(), + "source": ctx.to_string(), "stream_id": stream_id, "pull_id": pull_id }); diff --git a/src/connectors/tests/http/client.rs b/src/connectors/tests/http/client.rs index 2f3063905d..afcd4e4ec7 100644 --- a/src/connectors/tests/http/client.rs +++ b/src/connectors/tests/http/client.rs @@ -683,7 +683,7 @@ async fn missing_tls_config_https() -> Result<()> { .map(|e| e.to_string()) .unwrap_or_default(); - assert_eq!("Invalid Definition for connector \"app/test::missing_tls_config_https\": missing tls config with 'https' url. Set 'tls' to 'true' or provide a full tls config.", res); + assert_eq!("Invalid Definition for connector \"missing_tls_config_https\": missing tls config with 'https' url. Set 'tls' to 'true' or provide a full tls config.", res); Ok(()) } diff --git a/src/connectors/tests/kafka/consumer.rs b/src/connectors/tests/kafka/consumer.rs index 03b2f82683..3e6a07ed81 100644 --- a/src/connectors/tests/kafka/consumer.rs +++ b/src/connectors/tests/kafka/consumer.rs @@ -226,7 +226,7 @@ async fn transactional_retry() -> Result<()> { assert_eq!( &literal!({ "error": "SIMD JSON error: InternalError at character 0 ('}')", - "source": "app/test::transactional_retry", + "source": "[Source::[Node::0][default/]::transactional_retry]", "stream_id": 8_589_934_592_u64, "pull_id": 1u64 }), @@ -443,7 +443,7 @@ async fn custom_no_retry() -> Result<()> { assert_eq!( &literal!({ "error": "SIMD JSON error: InternalError at character 0 ('}')", - "source": "app/test::custom_no_retry", + "source": "[Source::[Node::0][default/]::custom_no_retry]", "stream_id": 8_589_934_592_u64, "pull_id": 1u64 }), @@ -649,7 +649,7 @@ async fn performance() -> Result<()> { assert_eq!( &literal!({ "error": "SIMD JSON error: InternalError at character 0 ('}')", - "source": "app/test::performance", + "source": "[Source::[Node::0][default/]::performance]", "stream_id": 8_589_934_592_u64, "pull_id": 1u64 }), diff --git a/src/connectors/utils/reconnect.rs b/src/connectors/utils/reconnect.rs index 3dba9d3ea8..d2d2379692 100644 --- a/src/connectors/utils/reconnect.rs +++ b/src/connectors/utils/reconnect.rs @@ -210,6 +210,11 @@ impl ConnectionLostNotifier { self.0.send(Msg::ConnectionLost).await?; Ok(()) } + #[cfg(test)] + pub(crate) fn dummy() -> Self { + let (tx, _) = bounded(128); + Self(tx) + } } impl ReconnectRuntime { diff --git a/src/preprocessor.rs b/src/preprocessor.rs index 2849c17999..84338c5a00 100644 --- a/src/preprocessor.rs +++ b/src/preprocessor.rs @@ -21,7 +21,9 @@ mod remove_empty; pub(crate) mod separate; mod textual_length_prefixed; -use crate::{config::Preprocessor as PreprocessorConfig, connectors::Alias, errors::Result}; +use crate::{ + config::Preprocessor as PreprocessorConfig, connectors::source::SourceContext, errors::Result, +}; use std::str; //pub type Lines = lines::Lines; @@ -101,11 +103,11 @@ pub fn make_preprocessors(preprocessors: &[PreprocessorConfig]) -> Result], ingest_ns: &mut u64, data: Vec, - alias: &Alias, + ctx: &SourceContext, ) -> Result>> { let mut data = vec![data]; let mut data1 = Vec::new(); @@ -115,7 +117,7 @@ pub fn preprocess( match pp.process(ingest_ns, d) { Ok(mut r) => data1.append(&mut r), Err(e) => { - error!("[Connector::{alias}] Preprocessor [{i}] error: {e}"); + error!("{ctx} Preprocessor [{i}] error: {e}"); return Err(e); } } @@ -130,15 +132,15 @@ pub fn preprocess( /// # Errors /// /// * If a preprocessor failed -pub fn finish(preprocessors: &mut [Box], alias: &Alias) -> Result>> { +pub(crate) fn finish( + preprocessors: &mut [Box], + ctx: &SourceContext, +) -> Result>> { if let Some((head, tail)) = preprocessors.split_first_mut() { let mut data = match head.finish(None) { Ok(d) => d, Err(e) => { - error!( - "[Connector::{alias}] Preprocessor '{}' finish error: {e}", - head.name() - ); + error!("[{ctx}] Preprocessor '{}' finish error: {e}", head.name()); return Err(e); } }; @@ -149,10 +151,7 @@ pub fn finish(preprocessors: &mut [Box], alias: &Alias) -> Res match pp.finish(Some(d)) { Ok(mut r) => data1.append(&mut r), Err(e) => { - error!( - "[Connector::{alias}] Preprocessor '{}' finish error: {e}", - pp.name() - ); + error!("[{ctx}] Preprocessor '{}' finish error: {e}", pp.name()); return Err(e); } } @@ -334,18 +333,18 @@ mod test { let data = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; let wire = post_p.process(0, 0, &data)?; let (start, end) = wire[0].split_at(7); - let alias = Alias::new("test"); + let ctx = SourceContext::dummy(); let mut pps: Vec> = vec![Box::new(pre_p)]; - let recv = preprocess(pps.as_mut_slice(), &mut it, start.to_vec(), &alias)?; + let recv = preprocess(pps.as_mut_slice(), &mut it, start.to_vec(), &ctx)?; assert!(recv.is_empty()); - let recv = preprocess(pps.as_mut_slice(), &mut it, end.to_vec(), &alias)?; + let recv = preprocess(pps.as_mut_slice(), &mut it, end.to_vec(), &ctx)?; assert_eq!(recv[0], data); // incomplete data - let processed = preprocess(pps.as_mut_slice(), &mut it, start.to_vec(), &alias)?; + let processed = preprocess(pps.as_mut_slice(), &mut it, start.to_vec(), &ctx)?; assert!(processed.is_empty()); // not emitted upon finish - let finished = finish(pps.as_mut_slice(), &alias)?; + let finished = finish(pps.as_mut_slice(), &ctx)?; assert!(finished.is_empty()); Ok(()) @@ -598,9 +597,13 @@ mod test { #[test] fn single_pre_process_head_ok() { let pre = Box::new(BadPreprocessor {}); - let alias = crate::connectors::Alias::new("chucky".to_string()); let mut ingest_ns = 0_u64; - let r = preprocess(&mut [pre], &mut ingest_ns, b"foo".to_vec(), &alias); + let r = preprocess( + &mut [pre], + &mut ingest_ns, + b"foo".to_vec(), + &SourceContext::dummy(), + ); assert!(r.is_err()); } @@ -609,17 +612,20 @@ mod test { let noop = Box::new(NoOp {}); assert_eq!("nily", noop.name()); let pre = Box::new(BadPreprocessor {}); - let alias = crate::connectors::Alias::new("chucky".to_string()); let mut ingest_ns = 0_u64; - let r = preprocess(&mut [noop, pre], &mut ingest_ns, b"foo".to_vec(), &alias); + let r = preprocess( + &mut [noop, pre], + &mut ingest_ns, + b"foo".to_vec(), + &SourceContext::dummy(), + ); assert!(r.is_err()); } #[test] fn single_pre_finish_ok() { let pre = Box::new(BadPreprocessor {}); - let alias = crate::connectors::Alias::new("chucky".to_string()); - let r = finish(&mut [pre], &alias); + let r = finish(&mut [pre], &SourceContext::dummy()); assert!(r.is_ok()); } @@ -632,28 +638,25 @@ mod test { #[test] fn preprocess_finish_head_fail() { - let alias = crate::connectors::Alias::new("chucky".to_string()); let pre = Box::new(BadFinisher {}); - let r = finish(&mut [pre], &alias); + let r = finish(&mut [pre], &SourceContext::dummy()); assert!(r.is_err()); } #[test] fn preprocess_finish_tail_fail() { - let alias = crate::connectors::Alias::new("chucky".to_string()); let noop = Box::new(NoOp {}); let pre = Box::new(BadFinisher {}); - let r = finish(&mut [noop, pre], &alias); + let r = finish(&mut [noop, pre], &SourceContext::dummy()); assert!(r.is_err()); } #[test] fn preprocess_finish_multi_ok() { - let alias = crate::connectors::Alias::new("xyz".to_string()); let noop1 = Box::new(NoOp {}); let noop2 = Box::new(NoOp {}); let noop3 = Box::new(NoOp {}); - let r = finish(&mut [noop1, noop2, noop3], &alias); + let r = finish(&mut [noop1, noop2, noop3], &SourceContext::dummy()); assert!(r.is_ok()); } } diff --git a/src/raft/test.rs b/src/raft/test.rs index a82f4d9ebb..68adaa6d9a 100644 --- a/src/raft/test.rs +++ b/src/raft/test.rs @@ -104,7 +104,7 @@ impl TestNode { running.join().await // only ever destroy the db when the raft node is known to have stopped - //rocksdb::DB::destroy(&rocksdb::Options::default(), &path).map_err(ClusterError::Rocks) + // rocksdb::DB::destroy(&rocksdb::Options::default(), &path).map_err(ClusterError::Rocks) } fn client(&self) -> &ClusterClient { @@ -114,7 +114,7 @@ impl TestNode { #[tokio::test(flavor = "multi_thread")] async fn cluster_join_test() -> ClusterResult<()> { - let _ = env_logger::try_init(); + let _: std::result::Result<_, _> = env_logger::try_init(); let dir0 = tempfile::tempdir()?; let dir1 = tempfile::tempdir()?; let dir2 = tempfile::tempdir()?; @@ -160,7 +160,7 @@ async fn cluster_join_test() -> ClusterResult<()> { #[tokio::test(flavor = "multi_thread")] async fn kill_and_restart_voter() -> ClusterResult<()> { - let _ = env_logger::try_init(); + let _: std::result::Result<_, _> = env_logger::try_init(); let dir0 = tempfile::tempdir()?; let dir1 = tempfile::tempdir()?; let dir2 = tempfile::tempdir()?; diff --git a/src/raft/test/learner.rs b/src/raft/test/learner.rs index 2cc4479b83..e64bbf082c 100644 --- a/src/raft/test/learner.rs +++ b/src/raft/test/learner.rs @@ -21,7 +21,7 @@ use std::time::Duration; #[tokio::test(flavor = "multi_thread")] async fn add_learner_test() -> ClusterResult<()> { - let _ = env_logger::try_init(); + let _: std::result::Result<_, _> = env_logger::try_init(); let dir0 = tempfile::tempdir()?; let dir1 = tempfile::tempdir()?; let dir2 = tempfile::tempdir()?; @@ -78,7 +78,7 @@ async fn add_learner_test() -> ClusterResult<()> { #[tokio::test(flavor = "multi_thread")] async fn learner_runs_app() -> ClusterResult<()> { - let _ = env_logger::try_init(); + let _: std::result::Result<_, _> = env_logger::try_init(); let dir0 = tempfile::tempdir()?; let dir1 = tempfile::tempdir()?; let dir2 = tempfile::tempdir()?; diff --git a/src/system/flow.rs b/src/system/flow.rs index 53b4c1a444..6637f3ba5c 100644 --- a/src/system/flow.rs +++ b/src/system/flow.rs @@ -1238,14 +1238,11 @@ mod tests { start_rx.await??; let connector = flow.get_connector("foo".to_string()).await?; - assert_eq!(String::from("app/test::foo"), connector.alias.to_string()); + assert_eq!(String::from("foo"), connector.alias.to_string()); let connectors = flow.get_connectors().await?; assert_eq!(1, connectors.len()); - assert_eq!( - String::from("app/test::foo"), - connectors[0].alias.to_string() - ); + assert_eq!(String::from("foo"), connectors[0].alias.to_string()); // assert the flow has started and events are flowing let event = connector_rx.recv().await.ok_or("empty")?;