Skip to content

Commit

Permalink
fix(stats): v2.2.3 fixes (#1127)
Browse files Browse the repository at this point in the history
* test + treat null as zero

* readme

* don't end the process after initial update is done

* fix
  • Loading branch information
bragov4ik authored Nov 19, 2024
1 parent cd4916e commit 480d1e8
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 17 deletions.
6 changes: 3 additions & 3 deletions stats/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ by enabling word wrapping
| `STATS__FORCE_​UPDATE_ON_START` | | Fully recalculate all charts on start | `false` |
| `STATS__CONCURRENT_​START_UPDATES` | | Amount of concurrent charts update on start | `3` |
| `STATS__​DEFAULT_​SCHEDULE` | | Schedule used for update groups with no config | `"0 0 1 * * * *"` |
| `STATS__LIMITS__REQUESTED_​POINTS_LIMIT` | | Maximum allowed number of requested points | `182500` |
| `STATS__BLOCKSCOUT_API_URL` | Required unless `STATS__​IGNORE_​​BLOCKSCOUT_​API_​ABSENCE` is set to `true`. | URL to Blockscout API. Used for [conditional update start](#conditional-start). | `null` |
| `STATS__CONDITIONAL_​START__CHECK_PERIOD_SECS` | | Time between start condition checking (if they are not satisfied) | `5` |
| `STATS__LIMITS__​REQUESTED_POINTS_​LIMIT` | | Maximum allowed number of requested points | `182500` |
| `STATS__BLOCKSCOUT_​API_URL` | Required unless `STATS__​IGNORE_​​BLOCKSCOUT_​API_​ABSENCE` is set to `true`. | URL to Blockscout API. Used for [conditional update start](#conditional-start). | `null` |
| `STATS__CONDITIONAL_​START__​CHECK_PERIOD_SECS` | | Time between start condition checking (if they are not satisfied) | `5` |
| `STATS__CONDITIONAL_​START__BLOCKS_RATIO__​ENABLED` | | Enable `blocks_​ratio` threshold | `true` |
| `STATS__CONDITIONAL_​START__BLOCKS_RATIO__​THRESHOLD` | | Value for `blocks_​ratio` threshold | `0.98` |
| `STATS__CONDITIONAL_​START__INTERNAL_​TRANSACTIONS_RATIO__​ENABLED` | | Enable `internal_​transactions_​ratio` threshold | `true` |
Expand Down
28 changes: 25 additions & 3 deletions stats/stats-server/src/blockscout_waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ fn is_threshold_passed(
.map(|s| s.parse::<f64>())
.transpose()
.context(format!("Parsing `{value_name}`"))?;
let Some(value) = value else {
anyhow::bail!("Received `null` value of `{value_name}`. Can't determine indexing status.",);
};
let value = value.unwrap_or_else(|| {
info!("Treating `{value_name}=null` as zero.",);
0.0
});
if value < threshold {
info!(
threshold = threshold,
Expand Down Expand Up @@ -245,6 +246,27 @@ mod tests {
.expect_err("must time out");
}

#[rstest]
#[tokio::test]
async fn wait_for_blockscout_indexing_works_with_null_ratios(
wait_config: StartConditionSettings,
) {
test_wait_indexing(
wait_config,
Some(Duration::from_millis(300)),
ResponseTemplate::new(200).set_body_string(
r#"{
"finished_indexing": false,
"finished_indexing_blocks": false,
"indexed_blocks_ratio": null,
"indexed_internal_transactions_ratio": null
}"#,
),
)
.await
.expect_err("must time out and not fall with error");
}

#[rstest]
#[tokio::test]
async fn wait_for_blockscout_indexing_retries_with_error_codes(
Expand Down
42 changes: 31 additions & 11 deletions stats/stats-server/src/update_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use cron::Schedule;
use sea_orm::{DatabaseConnection, DbErr};
use stats::data_source::types::{BlockscoutMigrations, UpdateParameters};
use std::sync::Arc;
use tokio::task::JoinHandle;

const FAILED_UPDATERS_UNTIL_PANIC: u64 = 3;

pub struct UpdateService {
db: Arc<DatabaseConnection>,
Expand Down Expand Up @@ -33,14 +36,17 @@ impl UpdateService {
charts,
})
}

/// Perform initial update and run the service in infinite loop.
/// Terminates dependant threads if one fails.
pub async fn force_async_update_and_run(
self: Arc<Self>,
concurrent_tasks: usize,
default_schedule: Schedule,
force_update_on_start: Option<bool>,
) {
let semaphore = Arc::new(tokio::sync::Semaphore::new(concurrent_tasks));
let tasks = self
let (tasks, mut updaters) = self
.charts
.update_groups
.values()
Expand All @@ -49,32 +55,46 @@ impl UpdateService {
let group_entry = group.clone();
let default_schedule = default_schedule.clone();
let sema = semaphore.clone();
async move {
let _permit = sema.acquire().await.expect("failed to acquire permit");
if let Some(force_full) = force_update_on_start {
this.clone().update(group_entry.clone(), force_full).await
};
this.spawn_group_updater(group_entry, &default_schedule);
}
(
async move {
let _permit = sema.acquire().await.expect("failed to acquire permit");
if let Some(force_full) = force_update_on_start {
this.clone().update(group_entry.clone(), force_full).await
};
},
self.spawn_group_updater(group.clone(), &default_schedule),
)
})
.collect::<Vec<_>>();
.collect::<(Vec<_>, Vec<_>)>();
futures::future::join_all(tasks).await;
tracing::info!("initial update is done");

let mut failed = 0;
while !updaters.is_empty() {
let (res, _, others) = futures::future::select_all(updaters).await;
updaters = others;
tracing::error!("updater stopped: {:?}", res);

failed += 1;
if failed >= FAILED_UPDATERS_UNTIL_PANIC {
panic!("too many failed updaters");
}
}
}

fn spawn_group_updater(
self: &Arc<Self>,
group_entry: UpdateGroupEntry,
default_schedule: &Schedule,
) {
) -> JoinHandle<()> {
let this = self.clone();
let chart = group_entry.clone();
let schedule = group_entry
.update_schedule
.as_ref()
.unwrap_or(default_schedule)
.clone();
tokio::spawn(async move { this.run_cron(chart, schedule).await });
tokio::spawn(this.run_cron(chart, schedule))
}

async fn update(self: Arc<Self>, group_entry: UpdateGroupEntry, force_full: bool) {
Expand Down

0 comments on commit 480d1e8

Please sign in to comment.