Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
mangas committed Apr 15, 2024
1 parent eda609d commit 64963c5
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 3 deletions.
12 changes: 9 additions & 3 deletions chain/ethereum/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ use anyhow::{anyhow, bail};
use graph::cheap_clone::CheapClone;
use graph::endpoint::EndpointMetrics;
use graph::firehose::{AvailableCapacity, SubgraphLimit};
use graph::parking_lot::RwLock;
use graph::prelude::chrono::{DateTime, Utc};
use graph::prelude::rand::seq::IteratorRandom;
use graph::prelude::rand::{self, Rng};
use jsonrpc_core::futures_util::future::join_all;
use std::cmp::Ordering;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicU8};
use std::sync::Arc;

pub use graph::impl_slog_value;
Expand Down Expand Up @@ -46,9 +50,10 @@ impl EthereumNetworkAdapter {
}
}

#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct EthereumNetworkAdapters {
pub adapters: Vec<EthereumNetworkAdapter>,
// provider_status: ProviderGenesisVerification,
adapters: Vec<EthereumNetworkAdapter>,
call_only_adapters: Vec<EthereumNetworkAdapter>,
// Percentage of request that should be used to retest errored adapters.
retest_percent: f64,
Expand All @@ -66,6 +71,7 @@ impl EthereumNetworkAdapters {
adapters: vec![],
call_only_adapters: vec![],
retest_percent: retest_percent.unwrap_or(DEFAULT_ADAPTER_ERROR_RETEST_PERCENT),
// provider_status: ProviderGenesisVerification::default(),
}
}

Expand All @@ -76,7 +82,7 @@ impl EthereumNetworkAdapters {
self.adapters.push(adapter);
}
}
pub fn all_cheapest_with(
fn all_cheapest_with(
&self,
required_capabilities: &NodeCapabilities,
) -> impl Iterator<Item = &EthereumNetworkAdapter> + '_ {
Expand Down
208 changes: 208 additions & 0 deletions graph/src/components/adapter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
use std::collections::HashMap;

use async_trait::async_trait;
use chrono::{DateTime, Duration, Utc};
use itertools::Itertools;

use slog::Logger;
use thiserror::Error;

use crate::{blockchain::ChainIdentifier, prelude::error, tokio::sync::RwLock};

const VALIDATION_ATTEMPT_TTL_SECONDS: Duration = Duration::seconds(60 * 5);

#[derive(Debug, Error)]
pub enum ProviderManagerError {
#[error("unknown error {0}")]
Unknown(#[from] anyhow::Error),
#[error("all providers for chain_id {0} have failed")]
AllProvidersFailed(ChainId),
}

#[async_trait]
trait NetIdentifiable: Clone {
async fn net_identifiers(&self) -> Result<ChainIdentifier, anyhow::Error>;
fn provider_name(&self) -> ProviderName;
}

pub type ProviderName = String;
pub type ChainId = String;

#[derive(Debug, PartialEq, Eq)]
struct Ident {
provider: ProviderName,
chain_id: ChainId,
}

/// ProviderCorrectness will maintain a list of providers which have had their
/// ChainIdentifiers checked. The first identifier is considered correct, if a later
/// provider for the same chain offers a different ChainIdentifier, this will be considered a
/// failed validation and it will be disabled.
#[derive(Debug)]
struct ProviderManager<T: NetIdentifiable> {
logger: Logger,
// usize is the status index so we can quickly check
adapters: HashMap<ChainId, Vec<(usize, T)>>,
status: Vec<(Ident, RwLock<GenesisCheckStatus>)>,
// When an adapter is not available at the start and is a new chain, an identifier will not
// be available. This will be set to None
expected_idents: HashMap<Ident, Option<ChainIdentifier>>,
}

impl<T: NetIdentifiable> ProviderManager<T> {
pub fn new(logger: Logger) -> Self {
Self {
logger,
adapters: HashMap::default(),
status: Vec::new(),
expected_idents: HashMap::default(),
}
}

pub fn add_adapter(&mut self, chain_id: ChainId, adapter: T) {
let provider = adapter.provider_name();
let ident = Ident {
provider,
chain_id: chain_id.clone(),
};

let index = match self.status.iter().find_position(|s| ident.eq(&s.0)) {
Some((index, _)) => index,
None => {
let index = self.status.len();
self.status
.push((ident, RwLock::new(GenesisCheckStatus::NotChecked)));

index
}
};

match self.adapters.get_mut(&chain_id) {
Some(entry) => entry.push((index, adapter)),
None => {
self.adapters.insert(chain_id, vec![(index, adapter)]);
}
}
}

async fn is_all_verified(&self, adapters: &Vec<(usize, T)>) -> bool {
for (index, _) in adapters.iter() {
let status = self.status.get(*index).unwrap().1.read().await;
if *status == GenesisCheckStatus::Valid {
return false;
}
}

true
}

async fn get_verified_for_chain(&self, chain_id: &ChainId) -> Vec<&T> {
let mut out = vec![];
let adapters = match self.adapters.get(chain_id) {
Some(adapters) => adapters,
None => return vec![],
};

for (index, adapter) in adapters.iter() {
let status = self.status.get(*index).unwrap().1.read().await;
if *status != GenesisCheckStatus::Valid {
continue;
}
out.push(adapter);
}

out
}

async fn get_status(&self, index: usize) -> GenesisCheckStatus {
match self.status.get(index) {
Some(status) => status.1.read().await.clone(),
None => GenesisCheckStatus::Failed,
}
}

fn ttl_has_elapsed(ttl: &DateTime<Utc>) -> bool {
ttl.checked_add_signed(VALIDATION_ATTEMPT_TTL_SECONDS)
.unwrap()
> Utc::now()
}

fn should_verify(status: &GenesisCheckStatus) -> bool {
match status {
GenesisCheckStatus::TemporaryFailure { checked_at }
if Self::ttl_has_elapsed(checked_at) =>
{
true
}
// Let check the provider
GenesisCheckStatus::NotChecked => true,
_ => false,
}
}

async fn verify_provider(&self, index: usize, adapter: T) -> Result<(), ProviderManagerError> {
let status = self.get_status(index).await;
if !Self::should_verify(&status) {
return Ok(());
}

// unwrap: If index didn't exist it would have failed the previous check so it's safe
// to unwrap.
let status = self.status.get(index).unwrap().1.write().await;
// double check nothing has changed.
if !Self::should_verify(&status) {
return Ok(());
}

let chain_ident = adapter.net_identifiers().await?;

Ok(())
}

async fn verify(&self, adapters: &Vec<(usize, T)>) -> Result<(), ProviderManagerError> {
let mut tasks = vec![];

for (index, adapter) in adapters.iter() {}

Ok(())
}

pub async fn get_all(&self, chain_id: ChainId) -> Vec<&T> {
let adapters = match self.adapters.get(&chain_id) {
Some(adapters) => adapters,
None => return vec![],
};

if self.is_all_verified(&adapters).await {
return adapters.iter().map(|v| &v.1).collect();
}

match self.verify(adapters).await {
Ok(_) => {}
Err(error) => error!(
self.logger,
"unable to verify genesis for adapter: {}",
error.to_string()
),
}

self.get_verified_for_chain(&chain_id).await
}
}

#[derive(Debug)]
struct Item<T> {
index: u8,
item: T,
}

#[derive(Debug, Clone, PartialEq, Eq)]
enum GenesisCheckStatus {
NotChecked,
TemporaryFailure { checked_at: DateTime<Utc> },
Valid,
Failed,
}

#[cfg(test)]
mod test {}
2 changes: 2 additions & 0 deletions graph/src/components/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ pub mod metrics;
/// Components dealing with versioning
pub mod versions;

pub mod adapter;

/// A component that receives events of type `T`.
pub trait EventConsumer<E> {
/// Get the event sink.
Expand Down

0 comments on commit 64963c5

Please sign in to comment.