Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(stats): v2.2.3 fixes #1127

Merged
merged 5 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions stats/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ by enabling word wrapping
| `STATS__FORCE_​UPDATE_ON_START` | | Fully recalculate all charts on start | `false` |
| `STATS__CONCURRENT_​START_UPDATES` | | Amount of concurrent charts update on start | `3` |
| `STATS__​DEFAULT_​SCHEDULE` | | Schedule used for update groups with no config | `"0 0 1 * * * *"` |
| `STATS__LIMITS__REQUESTED_​POINTS_LIMIT` | | Maximum allowed number of requested points | `182500` |
| `STATS__BLOCKSCOUT_API_URL` | Required unless `STATS__​IGNORE_​​BLOCKSCOUT_​API_​ABSENCE` is set to `true`. | URL to Blockscout API. Used for [conditional update start](#conditional-start). | `null` |
| `STATS__CONDITIONAL_​START__CHECK_PERIOD_SECS` | | Time between start condition checking (if they are not satisfied) | `5` |
| `STATS__LIMITS__​REQUESTED_POINTS_​LIMIT` | | Maximum allowed number of requested points | `182500` |
| `STATS__BLOCKSCOUT_​API_URL` | Required unless `STATS__​IGNORE_​​BLOCKSCOUT_​API_​ABSENCE` is set to `true`. | URL to Blockscout API. Used for [conditional update start](#conditional-start). | `null` |
| `STATS__CONDITIONAL_​START__​CHECK_PERIOD_SECS` | | Time between start condition checking (if they are not satisfied) | `5` |
| `STATS__CONDITIONAL_​START__BLOCKS_RATIO__​ENABLED` | | Enable `blocks_​ratio` threshold | `true` |
| `STATS__CONDITIONAL_​START__BLOCKS_RATIO__​THRESHOLD` | | Value for `blocks_​ratio` threshold | `0.98` |
| `STATS__CONDITIONAL_​START__INTERNAL_​TRANSACTIONS_RATIO__​ENABLED` | | Enable `internal_​transactions_​ratio` threshold | `true` |
Expand Down
28 changes: 25 additions & 3 deletions stats/stats-server/src/blockscout_waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ fn is_threshold_passed(
.map(|s| s.parse::<f64>())
.transpose()
.context(format!("Parsing `{value_name}`"))?;
let Some(value) = value else {
anyhow::bail!("Received `null` value of `{value_name}`. Can't determine indexing status.",);
};
let value = value.unwrap_or_else(|| {
info!("Treating `{value_name}=null` as zero.",);
0.0
});
if value < threshold {
info!(
threshold = threshold,
Expand Down Expand Up @@ -245,6 +246,27 @@ mod tests {
.expect_err("must time out");
}

#[rstest]
#[tokio::test]
async fn wait_for_blockscout_indexing_works_with_null_ratios(
wait_config: StartConditionSettings,
) {
test_wait_indexing(
wait_config,
Some(Duration::from_millis(300)),
ResponseTemplate::new(200).set_body_string(
r#"{
"finished_indexing": false,
"finished_indexing_blocks": false,
"indexed_blocks_ratio": null,
"indexed_internal_transactions_ratio": null
}"#,
),
)
.await
.expect_err("must time out and not fall with error");
}

#[rstest]
#[tokio::test]
async fn wait_for_blockscout_indexing_retries_with_error_codes(
Expand Down
41 changes: 30 additions & 11 deletions stats/stats-server/src/update_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use cron::Schedule;
use sea_orm::{DatabaseConnection, DbErr};
use stats::data_source::types::{BlockscoutMigrations, UpdateParameters};
use std::sync::Arc;
use tokio::task::JoinHandle;

const FAILED_UPDATERS_UNTIL_PANIC: u64 = 3;

pub struct UpdateService {
db: Arc<DatabaseConnection>,
Expand Down Expand Up @@ -33,14 +36,17 @@ impl UpdateService {
charts,
})
}

/// Perform initial update and run the service in infinite loop.
/// Terminates dependant threads if one fails.
pub async fn force_async_update_and_run(
self: Arc<Self>,
concurrent_tasks: usize,
default_schedule: Schedule,
force_update_on_start: Option<bool>,
) {
let semaphore = Arc::new(tokio::sync::Semaphore::new(concurrent_tasks));
let tasks = self
let (tasks, mut updaters) = self
.charts
.update_groups
.values()
Expand All @@ -49,32 +55,45 @@ impl UpdateService {
let group_entry = group.clone();
let default_schedule = default_schedule.clone();
let sema = semaphore.clone();
async move {
let _permit = sema.acquire().await.expect("failed to acquire permit");
if let Some(force_full) = force_update_on_start {
this.clone().update(group_entry.clone(), force_full).await
};
this.spawn_group_updater(group_entry, &default_schedule);
}
(
async move {
let _permit = sema.acquire().await.expect("failed to acquire permit");
if let Some(force_full) = force_update_on_start {
this.clone().update(group_entry.clone(), force_full).await
};
},
self.spawn_group_updater(group.clone(), &default_schedule),
)
})
.collect::<Vec<_>>();
.collect::<(Vec<_>, Vec<_>)>();
futures::future::join_all(tasks).await;
tracing::info!("initial update is done");

let mut failed = 0;
while !updaters.is_empty() {
let (res, _, others) = futures::future::select_all(updaters).await;
updaters = others;
tracing::error!("updater stopped: {:?}", res);
failed += 1;
if failed >= FAILED_UPDATERS_UNTIL_PANIC {
panic!("too many failed updaters");
}
}
}

fn spawn_group_updater(
self: &Arc<Self>,
group_entry: UpdateGroupEntry,
default_schedule: &Schedule,
) {
) -> JoinHandle<()> {
let this = self.clone();
let chart = group_entry.clone();
let schedule = group_entry
.update_schedule
.as_ref()
.unwrap_or(default_schedule)
.clone();
tokio::spawn(async move { this.run_cron(chart, schedule).await });
tokio::spawn(async move { this.run_cron(chart, schedule).await })
bragov4ik marked this conversation as resolved.
Show resolved Hide resolved
}

async fn update(self: Arc<Self>, group_entry: UpdateGroupEntry, force_full: bool) {
Expand Down
Loading