Skip to content

Commit

Permalink
feat: add divergence summary notification on a given interval
Browse files Browse the repository at this point in the history
  • Loading branch information
neriumrevolta committed Sep 28, 2023
1 parent e19df4b commit aa293a7
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 50 deletions.
3 changes: 3 additions & 0 deletions subgraph-radio/benches/gossips.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::sync::mpsc;
use graphcast_sdk::networks::NetworkName;
use graphcast_sdk::{BlockPointer, GraphcastNetworkName, LogFormat, NetworkPointer, WakuMessage};
use subgraph_radio::config::{Config, CoverageLevel, GraphStack, RadioInfrastructure, Waku};
use subgraph_radio::operator::notifier::NotificationMode;

fn gossip_poi_bench(c: &mut Criterion) {
let identifiers = black_box(vec!["identifier1".to_string(), "identifier2".to_string()]);
Expand Down Expand Up @@ -71,6 +72,8 @@ fn gossip_poi_bench(c: &mut Criterion) {
log_format: LogFormat::Pretty,
graphcast_network: GraphcastNetworkName::Testnet,
auto_upgrade: CoverageLevel::Comprehensive,
notification_interval: 1,
notification_mode: NotificationMode::PeriodicReport,
},
config_file: None,
});
Expand Down
26 changes: 23 additions & 3 deletions subgraph-radio/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use tracing::{debug, info, trace};

use crate::operator::notifier::NotificationMode;
use crate::state::{panic_hook, PersistedState};
use crate::{active_allocation_hashes, syncing_deployment_hashes};

Expand Down Expand Up @@ -153,7 +154,7 @@ impl Config {
state
} else {
debug!("Created new state");
PersistedState::new(None, None, None, None)
PersistedState::new(None, None, None, None, None)
}
}

Expand Down Expand Up @@ -470,11 +471,28 @@ pub struct RadioInfrastructure {
long,
value_name = "LOG_FORMAT",
env = "LOG_FORMAT",
help = "Support logging formats: pretty, json, full, compact",
long_help = "pretty: verbose and human readable; json: not verbose and parsable; compact: not verbose and not parsable; full: verbose and not parsible",
help = "Supported logging formats: pretty, json, full, compact",
long_help = "pretty: verbose and human readable; json: not verbose and parsable; compact: not verbose and not parsable; full: verbose and not parseable",
default_value = "pretty"
)]
pub log_format: LogFormat,
#[clap(
long,
value_name = "NOTIFICATION_MODE",
env = "NOTIFICATION_MODE",
help = "Supported: live, periodic-report, periodic-update",
long_help = "live: send a notification as soon as it finds a divergence; periodic-report: send a notification on a specified interval (default is 24 hours but can be configured with the NOTIFICATION_INTERVAL variable) with a summary and a list of divergent subgraphs; periodic-update: send a notification on a specified interval (default is 24 hours but can be configured with the NOTIFICATION_INTERVAL variable) containing updates since the previous notification",
default_value = "live"
)]
pub notification_mode: NotificationMode,
#[clap(
long,
value_name = "NOTIFICATION_INTERVAL",
env = "NOTIFICATION_INTERVAL",
help = "Interval (in hours) between sending a divergence summary notification",
default_value = "24"
)]
pub notification_interval: u64,
}

#[derive(Clone, Debug, Args, Serialize, Deserialize, Default)]
Expand Down Expand Up @@ -613,6 +631,8 @@ mod tests {
log_format: LogFormat::Pretty,
graphcast_network: GraphcastNetworkName::Testnet,
auto_upgrade: CoverageLevel::Comprehensive,
notification_mode: NotificationMode::Live,
notification_interval: 24,
},
config_file: None,
}
Expand Down
1 change: 1 addition & 0 deletions subgraph-radio/src/messages/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use ethers_core::types::transaction::eip712::Eip712;
use ethers_derive_eip712::*;
use prost::Message;
use serde::{Deserialize, Serialize};

use tracing::{debug, info};

use graphcast_sdk::{
Expand Down
29 changes: 18 additions & 11 deletions subgraph-radio/src/operator/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use graphcast_sdk::{
graphcast_agent::message_typing::{get_indexer_stake, BuildMessageError, GraphcastMessage},
};

use crate::operator::notifier::NotificationMode;
use crate::{
messages::poi::PublicPoiMessage, metrics::ACTIVE_INDEXERS, state::PersistedState,
OperationError,
Expand Down Expand Up @@ -626,7 +627,7 @@ pub async fn process_comparison_results(
match result {
Ok(comparison_result) => {
let result_type = persisted_state
.handle_comparison_result(comparison_result.clone(), notifier.clone())
.handle_comparison_result(comparison_result.clone())
.await;

match result_type {
Expand All @@ -648,6 +649,12 @@ pub async fn process_comparison_results(
}
}

let notifications = persisted_state.notifications();
if notifier.notification_mode == NotificationMode::Live && !notifications.is_empty() {
notifier.notify(notifications.join("\n")).await;
persisted_state.clear_notifications();
}

info!(
chainhead_blocks = blocks_str,
num_topics,
Expand Down Expand Up @@ -1080,35 +1087,35 @@ mod tests {
);

assert!(!local_attestations.lock().unwrap().is_empty());
assert!(local_attestations.lock().unwrap().len() == 2);
assert!(
assert_eq!(local_attestations.lock().unwrap().len(), 2);
assert_eq!(
local_attestations
.lock()
.unwrap()
.get("0xa1")
.unwrap()
.len()
== 2
.len(),
2
);
assert!(
assert_eq!(
local_attestations
.lock()
.unwrap()
.get("0xa2")
.unwrap()
.len()
== 1
.len(),
1
);
assert!(
assert_eq!(
local_attestations
.lock()
.unwrap()
.get("0xa1")
.unwrap()
.get(&0)
.unwrap()
.ppoi
== *"ppoi-x"
.ppoi,
*"ppoi-x"
);
}
}
78 changes: 65 additions & 13 deletions subgraph-radio/src/operator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,11 @@
use derive_getters::Getters;
use std::sync::{
atomic::{AtomicBool, Ordering},
mpsc::Receiver,
Arc,
};
use std::time::Duration;
use tokio::time::{interval, sleep, timeout};
use tracing::{debug, error, info, trace, warn};

use crate::{
chainhead_block_str,
messages::poi::{process_valid_message, PublicPoiMessage},
metrics::{
CONNECTED_PEERS, DIVERGING_SUBGRAPHS, GOSSIP_PEERS, RECEIVED_MESSAGES, VALIDATED_MESSAGES,
},
operator::{attestation::ComparisonResultType, indexer_management::health_query},
};
use derive_getters::Getters;
use graphcast_sdk::{
graphcast_agent::{
message_typing::check_message_validity,
Expand All @@ -26,14 +16,26 @@ use graphcast_sdk::{
WakuMessage,
};

use tokio::time::{interval, sleep, timeout};
use tracing::{debug, error, info, trace, warn};

use crate::config::Config;
use crate::messages::upgrade::UpgradeIntentMessage;
use crate::metrics::handle_serve_metrics;
use crate::operator::attestation::log_gossip_summary;
use crate::operator::attestation::process_comparison_results;
use crate::operator::notifier::NotificationMode;
use crate::server::run_server;
use crate::state::PersistedState;
use crate::GRAPHCAST_AGENT;
use crate::{
chainhead_block_str,
messages::poi::{process_valid_message, PublicPoiMessage},
metrics::{
CONNECTED_PEERS, DIVERGING_SUBGRAPHS, GOSSIP_PEERS, RECEIVED_MESSAGES, VALIDATED_MESSAGES,
},
operator::{attestation::ComparisonResultType, indexer_management::health_query},
};

use self::notifier::Notifier;

Expand Down Expand Up @@ -189,7 +191,11 @@ impl RadioOperator {

let mut state_update_interval = interval(Duration::from_secs(10));
let mut gossip_poi_interval = interval(Duration::from_secs(30));
let mut comparison_interval = interval(Duration::from_secs(30));
let mut comparison_interval = interval(Duration::from_secs(300));

let mut notification_interval = tokio::time::interval(Duration::from_secs(
self.config.radio_infrastructure.notification_interval * 3600,
));

let iteration_timeout = Duration::from_secs(180);
let update_timeout = Duration::from_secs(5);
Expand Down Expand Up @@ -351,7 +357,7 @@ impl RadioOperator {
identifiers.len(),
comparison_res,
self.notifier.clone(),
self.persisted_state.clone()
self.persisted_state.clone(),
)
}).await;

Expand All @@ -361,6 +367,52 @@ impl RadioOperator {
debug!("compare_poi completed");
}
},
_ = notification_interval.tick() => {
match self.config.radio_infrastructure.notification_mode {
NotificationMode::PeriodicReport => {
let comparison_results = self.persisted_state.comparison_results();
if !comparison_results.is_empty() {
let lines = {
let (mut matching, mut divergent) = (0, 0);
let mut lines = Vec::new();
let total = comparison_results.len();

let divergent_lines: Vec<String> = comparison_results.iter().filter_map(|(identifier, res)| {
match res.result_type {
ComparisonResultType::Match => {
matching += 1;
None
},
ComparisonResultType::Divergent => {
divergent += 1;
Some(format!("{} - {}", identifier, res.block_number))
},
_ => None,
}
}).collect();

lines.push(format!(
"Total subgraphs being cross-checked: {}\nMatching: {}\nDivergent: {}, identifiers and blocks:",
total, matching, divergent
));
lines.extend(divergent_lines);
lines
};

self.notifier.notify(lines.join("\n")).await;
}
},
NotificationMode::PeriodicUpdate=> {
let notifications = self.persisted_state.notifications();
if !notifications.is_empty() {
self.notifier.notify(notifications.join("\n")).await;
self.persisted_state.clear_notifications();
}
},
_ => {}
}
},

else => break,
}

Expand Down
19 changes: 19 additions & 0 deletions subgraph-radio/src/operator/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,29 @@ pub struct Notifier {
discord_webhook: Option<String>,
telegram_token: Option<String>,
telegram_chat_id: Option<i64>,
pub notification_mode: NotificationMode,
pub notification_interval: u64,
}

#[derive(clap::ValueEnum, Clone, Debug, Serialize, Deserialize, Default, PartialEq)]
pub enum NotificationMode {
PeriodicReport,
PeriodicUpdate,
#[default]
Live,
}

impl Notifier {
#[allow(clippy::too_many_arguments)]
pub fn new(
radio_name: String,
slack_token: Option<String>,
slack_channel: Option<String>,
discord_webhook: Option<String>,
telegram_token: Option<String>,
telegram_chat_id: Option<i64>,
notification_mode: NotificationMode,
notification_interval: u64,
) -> Notifier {
Notifier {
radio_name,
Expand All @@ -32,6 +45,8 @@ impl Notifier {
discord_webhook,
telegram_token,
telegram_chat_id,
notification_mode,
notification_interval,
}
}

Expand All @@ -42,6 +57,8 @@ impl Notifier {
let discord_webhook = config.radio_infrastructure().discord_webhook.clone();
let telegram_token = config.radio_infrastructure().telegram_token.clone();
let telegram_chat_id = config.radio_infrastructure().telegram_chat_id;
let notification_mode = config.radio_infrastructure().notification_mode.clone();
let notification_interval = config.radio_infrastructure().notification_interval;

Notifier::new(
radio_name,
Expand All @@ -50,6 +67,8 @@ impl Notifier {
discord_webhook,
telegram_token,
telegram_chat_id,
notification_mode,
notification_interval,
)
}

Expand Down
2 changes: 1 addition & 1 deletion subgraph-radio/src/server/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ impl SubgraphRadioContext {
let msgs = self.remote_ppoi_messages();
let filtered = msgs
.iter()
.filter(|&message| filter_remote_ppoi_messages(message, identifier, block))
.cloned()
.filter(|message| filter_remote_ppoi_messages(message, identifier, block))
.collect::<Vec<_>>();
filtered
}
Expand Down
Loading

0 comments on commit aa293a7

Please sign in to comment.