diff --git a/subgraph-radio/benches/gossips.rs b/subgraph-radio/benches/gossips.rs index cb2b17b..f5165b8 100644 --- a/subgraph-radio/benches/gossips.rs +++ b/subgraph-radio/benches/gossips.rs @@ -12,6 +12,7 @@ use std::sync::mpsc; use graphcast_sdk::networks::NetworkName; use graphcast_sdk::{BlockPointer, GraphcastNetworkName, LogFormat, NetworkPointer, WakuMessage}; use subgraph_radio::config::{Config, CoverageLevel, GraphStack, RadioInfrastructure, Waku}; +use subgraph_radio::operator::notifier::NotificationMode; fn gossip_poi_bench(c: &mut Criterion) { let identifiers = black_box(vec!["identifier1".to_string(), "identifier2".to_string()]); @@ -71,6 +72,8 @@ fn gossip_poi_bench(c: &mut Criterion) { log_format: LogFormat::Pretty, graphcast_network: GraphcastNetworkName::Testnet, auto_upgrade: CoverageLevel::Comprehensive, + notification_interval: 1, + notification_mode: NotificationMode::PeriodicReport, }, config_file: None, }); diff --git a/subgraph-radio/src/config.rs b/subgraph-radio/src/config.rs index fe6231e..6955bb1 100644 --- a/subgraph-radio/src/config.rs +++ b/subgraph-radio/src/config.rs @@ -16,6 +16,7 @@ use serde::{Deserialize, Serialize}; use std::collections::HashSet; use tracing::{debug, info, trace}; +use crate::operator::notifier::NotificationMode; use crate::state::{panic_hook, PersistedState}; use crate::{active_allocation_hashes, syncing_deployment_hashes}; @@ -153,7 +154,7 @@ impl Config { state } else { debug!("Created new state"); - PersistedState::new(None, None, None, None) + PersistedState::new(None, None, None, None, None) } } @@ -470,11 +471,28 @@ pub struct RadioInfrastructure { long, value_name = "LOG_FORMAT", env = "LOG_FORMAT", - help = "Support logging formats: pretty, json, full, compact", - long_help = "pretty: verbose and human readable; json: not verbose and parsable; compact: not verbose and not parsable; full: verbose and not parsible", + help = "Supported logging formats: pretty, json, full, compact", + long_help = "pretty: verbose and human readable; json: not verbose and parsable; compact: not verbose and not parsable; full: verbose and not parseable", default_value = "pretty" )] pub log_format: LogFormat, + #[clap( + long, + value_name = "NOTIFICATION_MODE", + env = "NOTIFICATION_MODE", + help = "Supported: live, periodic-report, periodic-update", + long_help = "live: send a notification as soon as it finds a divergence; periodic-report: send a notification on a specified interval (default is 24 hours but can be configured with the NOTIFICATION_INTERVAL variable) with a summary and a list of divergent subgraphs; periodic-update: send a notification on a specified interval (default is 24 hours but can be configured with the NOTIFICATION_INTERVAL variable) containing updates since the previous notification", + default_value = "live" + )] + pub notification_mode: NotificationMode, + #[clap( + long, + value_name = "NOTIFICATION_INTERVAL", + env = "NOTIFICATION_INTERVAL", + help = "Interval (in hours) between sending a divergence summary notification", + default_value = "24" + )] + pub notification_interval: u64, } #[derive(Clone, Debug, Args, Serialize, Deserialize, Default)] @@ -613,6 +631,8 @@ mod tests { log_format: LogFormat::Pretty, graphcast_network: GraphcastNetworkName::Testnet, auto_upgrade: CoverageLevel::Comprehensive, + notification_mode: NotificationMode::Live, + notification_interval: 24, }, config_file: None, } diff --git a/subgraph-radio/src/messages/upgrade.rs b/subgraph-radio/src/messages/upgrade.rs index c6717b0..bb839a9 100644 --- a/subgraph-radio/src/messages/upgrade.rs +++ b/subgraph-radio/src/messages/upgrade.rs @@ -4,6 +4,7 @@ use ethers_core::types::transaction::eip712::Eip712; use ethers_derive_eip712::*; use prost::Message; use serde::{Deserialize, Serialize}; + use tracing::{debug, info}; use graphcast_sdk::{ diff --git a/subgraph-radio/src/operator/attestation.rs b/subgraph-radio/src/operator/attestation.rs index 4dbae71..becf8ac 100644 --- a/subgraph-radio/src/operator/attestation.rs +++ b/subgraph-radio/src/operator/attestation.rs @@ -17,6 +17,7 @@ use graphcast_sdk::{ graphcast_agent::message_typing::{get_indexer_stake, BuildMessageError, GraphcastMessage}, }; +use crate::operator::notifier::NotificationMode; use crate::{ messages::poi::PublicPoiMessage, metrics::ACTIVE_INDEXERS, state::PersistedState, OperationError, @@ -626,7 +627,7 @@ pub async fn process_comparison_results( match result { Ok(comparison_result) => { let result_type = persisted_state - .handle_comparison_result(comparison_result.clone(), notifier.clone()) + .handle_comparison_result(comparison_result.clone()) .await; match result_type { @@ -648,6 +649,12 @@ pub async fn process_comparison_results( } } + let notifications = persisted_state.notifications(); + if notifier.notification_mode == NotificationMode::Live && !notifications.is_empty() { + notifier.notify(notifications.join("\n")).await; + persisted_state.clear_notifications(); + } + info!( chainhead_blocks = blocks_str, num_topics, @@ -1080,26 +1087,26 @@ mod tests { ); assert!(!local_attestations.lock().unwrap().is_empty()); - assert!(local_attestations.lock().unwrap().len() == 2); - assert!( + assert_eq!(local_attestations.lock().unwrap().len(), 2); + assert_eq!( local_attestations .lock() .unwrap() .get("0xa1") .unwrap() - .len() - == 2 + .len(), + 2 ); - assert!( + assert_eq!( local_attestations .lock() .unwrap() .get("0xa2") .unwrap() - .len() - == 1 + .len(), + 1 ); - assert!( + assert_eq!( local_attestations .lock() .unwrap() @@ -1107,8 +1114,8 @@ mod tests { .unwrap() .get(&0) .unwrap() - .ppoi - == *"ppoi-x" + .ppoi, + *"ppoi-x" ); } } diff --git a/subgraph-radio/src/operator/mod.rs b/subgraph-radio/src/operator/mod.rs index d1f5fe9..14e24cc 100644 --- a/subgraph-radio/src/operator/mod.rs +++ b/subgraph-radio/src/operator/mod.rs @@ -1,21 +1,11 @@ -use derive_getters::Getters; use std::sync::{ atomic::{AtomicBool, Ordering}, mpsc::Receiver, Arc, }; use std::time::Duration; -use tokio::time::{interval, sleep, timeout}; -use tracing::{debug, error, info, trace, warn}; -use crate::{ - chainhead_block_str, - messages::poi::{process_valid_message, PublicPoiMessage}, - metrics::{ - CONNECTED_PEERS, DIVERGING_SUBGRAPHS, GOSSIP_PEERS, RECEIVED_MESSAGES, VALIDATED_MESSAGES, - }, - operator::{attestation::ComparisonResultType, indexer_management::health_query}, -}; +use derive_getters::Getters; use graphcast_sdk::{ graphcast_agent::{ message_typing::check_message_validity, @@ -26,14 +16,26 @@ use graphcast_sdk::{ WakuMessage, }; +use tokio::time::{interval, sleep, timeout}; +use tracing::{debug, error, info, trace, warn}; + use crate::config::Config; use crate::messages::upgrade::UpgradeIntentMessage; use crate::metrics::handle_serve_metrics; use crate::operator::attestation::log_gossip_summary; use crate::operator::attestation::process_comparison_results; +use crate::operator::notifier::NotificationMode; use crate::server::run_server; use crate::state::PersistedState; use crate::GRAPHCAST_AGENT; +use crate::{ + chainhead_block_str, + messages::poi::{process_valid_message, PublicPoiMessage}, + metrics::{ + CONNECTED_PEERS, DIVERGING_SUBGRAPHS, GOSSIP_PEERS, RECEIVED_MESSAGES, VALIDATED_MESSAGES, + }, + operator::{attestation::ComparisonResultType, indexer_management::health_query}, +}; use self::notifier::Notifier; @@ -189,7 +191,11 @@ impl RadioOperator { let mut state_update_interval = interval(Duration::from_secs(10)); let mut gossip_poi_interval = interval(Duration::from_secs(30)); - let mut comparison_interval = interval(Duration::from_secs(30)); + let mut comparison_interval = interval(Duration::from_secs(300)); + + let mut notification_interval = tokio::time::interval(Duration::from_secs( + self.config.radio_infrastructure.notification_interval * 3600, + )); let iteration_timeout = Duration::from_secs(180); let update_timeout = Duration::from_secs(5); @@ -351,7 +357,7 @@ impl RadioOperator { identifiers.len(), comparison_res, self.notifier.clone(), - self.persisted_state.clone() + self.persisted_state.clone(), ) }).await; @@ -361,6 +367,52 @@ impl RadioOperator { debug!("compare_poi completed"); } }, + _ = notification_interval.tick() => { + match self.config.radio_infrastructure.notification_mode { + NotificationMode::PeriodicReport => { + let comparison_results = self.persisted_state.comparison_results(); + if !comparison_results.is_empty() { + let lines = { + let (mut matching, mut divergent) = (0, 0); + let mut lines = Vec::new(); + let total = comparison_results.len(); + + let divergent_lines: Vec = comparison_results.iter().filter_map(|(identifier, res)| { + match res.result_type { + ComparisonResultType::Match => { + matching += 1; + None + }, + ComparisonResultType::Divergent => { + divergent += 1; + Some(format!("{} - {}", identifier, res.block_number)) + }, + _ => None, + } + }).collect(); + + lines.push(format!( + "Total subgraphs being cross-checked: {}\nMatching: {}\nDivergent: {}, identifiers and blocks:", + total, matching, divergent + )); + lines.extend(divergent_lines); + lines + }; + + self.notifier.notify(lines.join("\n")).await; + } + }, + NotificationMode::PeriodicUpdate=> { + let notifications = self.persisted_state.notifications(); + if !notifications.is_empty() { + self.notifier.notify(notifications.join("\n")).await; + self.persisted_state.clear_notifications(); + } + }, + _ => {} + } + }, + else => break, } diff --git a/subgraph-radio/src/operator/notifier.rs b/subgraph-radio/src/operator/notifier.rs index 9f4432a..2f33776 100644 --- a/subgraph-radio/src/operator/notifier.rs +++ b/subgraph-radio/src/operator/notifier.rs @@ -14,9 +14,20 @@ pub struct Notifier { discord_webhook: Option, telegram_token: Option, telegram_chat_id: Option, + pub notification_mode: NotificationMode, + pub notification_interval: u64, +} + +#[derive(clap::ValueEnum, Clone, Debug, Serialize, Deserialize, Default, PartialEq)] +pub enum NotificationMode { + PeriodicReport, + PeriodicUpdate, + #[default] + Live, } impl Notifier { + #[allow(clippy::too_many_arguments)] pub fn new( radio_name: String, slack_token: Option, @@ -24,6 +35,8 @@ impl Notifier { discord_webhook: Option, telegram_token: Option, telegram_chat_id: Option, + notification_mode: NotificationMode, + notification_interval: u64, ) -> Notifier { Notifier { radio_name, @@ -32,6 +45,8 @@ impl Notifier { discord_webhook, telegram_token, telegram_chat_id, + notification_mode, + notification_interval, } } @@ -42,6 +57,8 @@ impl Notifier { let discord_webhook = config.radio_infrastructure().discord_webhook.clone(); let telegram_token = config.radio_infrastructure().telegram_token.clone(); let telegram_chat_id = config.radio_infrastructure().telegram_chat_id; + let notification_mode = config.radio_infrastructure().notification_mode.clone(); + let notification_interval = config.radio_infrastructure().notification_interval; Notifier::new( radio_name, @@ -50,6 +67,8 @@ impl Notifier { discord_webhook, telegram_token, telegram_chat_id, + notification_mode, + notification_interval, ) } diff --git a/subgraph-radio/src/server/model/mod.rs b/subgraph-radio/src/server/model/mod.rs index 2fe2d0e..67c6815 100644 --- a/subgraph-radio/src/server/model/mod.rs +++ b/subgraph-radio/src/server/model/mod.rs @@ -241,8 +241,8 @@ impl SubgraphRadioContext { let msgs = self.remote_ppoi_messages(); let filtered = msgs .iter() + .filter(|&message| filter_remote_ppoi_messages(message, identifier, block)) .cloned() - .filter(|message| filter_remote_ppoi_messages(message, identifier, block)) .collect::>(); filtered } diff --git a/subgraph-radio/src/state.rs b/subgraph-radio/src/state.rs index 5b91cc3..68ae79c 100644 --- a/subgraph-radio/src/state.rs +++ b/subgraph-radio/src/state.rs @@ -22,7 +22,6 @@ use crate::{ operator::attestation::{ clear_local_attestation, Attestation, ComparisonResult, ComparisonResultType, }, - operator::notifier::Notifier, RADIO_OPERATOR, }; @@ -31,12 +30,15 @@ type Remote = Arc>>>; type UpgradeMessages = Arc>>>; type ComparisonResults = Arc>>; +type Notifications = Arc>>; + #[derive(Serialize, Deserialize, Clone, Debug)] pub struct PersistedState { pub local_attestations: Local, pub remote_ppoi_messages: Remote, pub upgrade_intent_messages: UpgradeMessages, pub comparison_results: ComparisonResults, + pub notifications: Notifications, } impl PersistedState { @@ -45,6 +47,7 @@ impl PersistedState { remote: Option, upgrade_intent_messages: Option, comparison_results: Option, + notifications: Option, ) -> PersistedState { let local_attestations = local.unwrap_or(Arc::new(SyncMutex::new(HashMap::new()))); let remote_ppoi_messages = remote.unwrap_or(Arc::new(SyncMutex::new(vec![]))); @@ -52,12 +55,13 @@ impl PersistedState { upgrade_intent_messages.unwrap_or(Arc::new(SyncMutex::new(HashMap::new()))); let comparison_results = comparison_results.unwrap_or(Arc::new(SyncMutex::new(HashMap::new()))); - + let notifications = notifications.unwrap_or(Arc::new(SyncMutex::new(HashMap::new()))); PersistedState { local_attestations, remote_ppoi_messages, upgrade_intent_messages, comparison_results, + notifications, } } @@ -68,6 +72,7 @@ impl PersistedState { remote_ppoi_messages: Option, upgrade_intent_messages: Option, comparison_results: Option, + notifications: Option, ) -> PersistedState { let local_attestations = match local_attestations { None => self.local_attestations.clone(), @@ -85,11 +90,16 @@ impl PersistedState { None => self.comparison_results.clone(), Some(r) => r, }; + let notifications = match notifications { + None => self.notifications.clone(), + Some(n) => n, + }; PersistedState { local_attestations, remote_ppoi_messages, upgrade_intent_messages, comparison_results, + notifications, } } @@ -158,6 +168,20 @@ impl PersistedState { matched_type } + /// Getter for notifications, return only the values + pub fn notifications(&self) -> Vec { + self.notifications + .lock() + .unwrap() + .values() + .cloned() + .collect() + } + + pub fn clear_notifications(&self) { + self.notifications.lock().unwrap().clear(); + } + /// Update local_attestations pub async fn update_local(&mut self, local_attestations: Local) { self.local_attestations = local_attestations; @@ -226,6 +250,13 @@ impl PersistedState { .insert(deployment, comparison_result); } + pub fn add_notification(&self, deployment: String, notification: String) { + self.notifications + .lock() + .unwrap() + .insert(deployment, notification); + } + pub async fn valid_ppoi_messages( &mut self, graph_node_endpoint: &str, @@ -250,7 +281,6 @@ impl PersistedState { pub async fn handle_comparison_result( &self, new_comparison_result: ComparisonResult, - notifier: Notifier, ) -> ComparisonResultType { let (should_notify, updated_comparison_result, result_type) = { let mut results = self.comparison_results.lock().unwrap(); @@ -296,7 +326,10 @@ impl PersistedState { }; if should_notify { - notifier.notify(updated_comparison_result.to_string()).await; + self.add_notification( + updated_comparison_result.deployment.clone(), + updated_comparison_result.to_string(), + ); } result_type @@ -359,7 +392,7 @@ impl PersistedState { "No persisted state file provided, create an empty state" ); // No state persisted, create new - let state = PersistedState::new(None, None, None, None); + let state = PersistedState::new(None, None, None, None, None); state.update_cache(path); return state; } @@ -374,7 +407,7 @@ impl PersistedState { err = e.to_string(), "Could not parse persisted state file, created an empty state", ); - PersistedState::new(None, None, None, None) + PersistedState::new(None, None, None, None, None) } }; state @@ -406,9 +439,8 @@ pub fn panic_cache(panic_info: &PanicInfo<'_>, file_path: &str) { #[cfg(test)] mod tests { use super::*; - use graphcast_sdk::networks::NetworkName; - use crate::operator::attestation::{save_local_attestation, ComparisonResultType}; + use graphcast_sdk::networks::NetworkName; /// Tests for load, update, and store cache #[tokio::test] @@ -490,6 +522,7 @@ mod tests { Some(ppoi_messages.clone()), None, Some(comparison_results.clone()), + None, ) .await; @@ -515,28 +548,28 @@ mod tests { assert_eq!(state.remote_ppoi_messages.lock().unwrap().len(), 1); assert_eq!(state.upgrade_intent_messages.lock().unwrap().len(), 1); assert!(!state.local_attestations.lock().unwrap().is_empty()); - assert!(state.local_attestations.lock().unwrap().len() == 2); - assert!( + assert_eq!(state.local_attestations.lock().unwrap().len(), 2); + assert_eq!( state .local_attestations .lock() .unwrap() .get("0xa1") .unwrap() - .len() - == 2 + .len(), + 2 ); - assert!( + assert_eq!( state .local_attestations .lock() .unwrap() .get("0xa2") .unwrap() - .len() - == 1 + .len(), + 1 ); - assert!( + assert_eq!( state .local_attestations .lock() @@ -545,8 +578,8 @@ mod tests { .unwrap() .get(&0) .unwrap() - .ppoi - == *"ppoi-x" + .ppoi, + *"ppoi-x" ); assert_eq!(state.comparison_results.lock().unwrap().len(), 1); @@ -576,16 +609,18 @@ mod tests { #[tokio::test] async fn handle_comparison_result_new_deployment() { - let notifier = Notifier::new("not-a-real-radio".to_string(), None, None, None, None, None); let local_attestations = Arc::new(SyncMutex::new(HashMap::new())); let remote_ppoi_messages = Arc::new(SyncMutex::new(Vec::new())); let upgrade_intent_messages = Arc::new(SyncMutex::new(HashMap::new())); let comparison_results = Arc::new(SyncMutex::new(HashMap::new())); + let notifications = Arc::new(SyncMutex::new(HashMap::new())); + let state = PersistedState { local_attestations, remote_ppoi_messages, upgrade_intent_messages, comparison_results, + notifications, }; let new_result = ComparisonResult { @@ -596,7 +631,7 @@ mod tests { attestations: Vec::new(), }; - state.handle_comparison_result(new_result, notifier).await; + state.handle_comparison_result(new_result).await; let comparison_results = state.comparison_results.lock().unwrap(); assert!(comparison_results.contains_key(&String::from("new_deployment"))); @@ -604,16 +639,18 @@ mod tests { #[tokio::test] async fn handle_comparison_result_change_result_type() { - let notifier = Notifier::new("not-a-real-radio".to_string(), None, None, None, None, None); let local_attestations = Arc::new(SyncMutex::new(HashMap::new())); let remote_ppoi_messages = Arc::new(SyncMutex::new(Vec::new())); let upgrade_intent_messages = Arc::new(SyncMutex::new(HashMap::new())); let comparison_results = Arc::new(SyncMutex::new(HashMap::new())); + let notifications = Arc::new(SyncMutex::new(HashMap::new())); + let state = PersistedState { local_attestations, remote_ppoi_messages, upgrade_intent_messages, comparison_results, + notifications, }; let old_result = ComparisonResult { @@ -637,7 +674,7 @@ mod tests { .lock() .unwrap() .insert(String::from("existing_deployment"), old_result.clone()); - state.handle_comparison_result(new_result, notifier).await; + state.handle_comparison_result(new_result).await; let comparison_results = state.comparison_results.lock().unwrap(); let result = comparison_results @@ -653,12 +690,15 @@ mod tests { let remote_ppoi_messages = Arc::new(SyncMutex::new(Vec::new())); let upgrade_intent_messages = Arc::new(SyncMutex::new(HashMap::new())); let comparison_results = Arc::new(SyncMutex::new(HashMap::new())); + let notifications = Arc::new(SyncMutex::new(HashMap::new())); + let test_id = "AAAMdCkW3pr619gsJVtUPAWxspALPdCMw6o7obzYBNp3".to_string(); let state = PersistedState { local_attestations, remote_ppoi_messages, upgrade_intent_messages, comparison_results, + notifications, }; // Make 2 msgs @@ -805,6 +845,7 @@ mod tests { local_attestations: Arc::new(SyncMutex::new(HashMap::new())), remote_ppoi_messages: Arc::new(SyncMutex::new(Vec::new())), upgrade_intent_messages: Arc::new(SyncMutex::new(HashMap::new())), + notifications: Arc::new(SyncMutex::new(HashMap::new())), }; let results = state.comparison_result_typed(ComparisonResultType::Match); diff --git a/test-utils/src/config.rs b/test-utils/src/config.rs index e74895f..8659e74 100644 --- a/test-utils/src/config.rs +++ b/test-utils/src/config.rs @@ -4,6 +4,7 @@ use graphcast_sdk::{ }; use serde::{Deserialize, Serialize}; use subgraph_radio::config::{Config, CoverageLevel, GraphStack, RadioInfrastructure, Waku}; +use subgraph_radio::operator::notifier::NotificationMode; #[derive(Clone, Debug, Parser, Serialize, Deserialize)] #[clap(name = "test-sender", about = "Mock message sender")] @@ -78,6 +79,8 @@ pub fn test_config() -> Config { id_validation: IdentityValidation::ValidAddress, topic_update_interval: 600, auto_upgrade: CoverageLevel::OnChain, + notification_mode: NotificationMode::Live, + notification_interval: 24, } }, config_file: None,