From 3b2af916a09521c013896223a42d4f7fe88b5c30 Mon Sep 17 00:00:00 2001 From: yellowhatpro Date: Sun, 22 Sep 2024 23:53:49 +0530 Subject: [PATCH] refactor: debug-based print statements --- src/app/mod.rs | 29 ++++++++++++----------------- src/archival/client.rs | 5 ++--- src/archival/listener.rs | 18 ++++++++++-------- src/archival/notifier.rs | 9 +++++---- src/archival/retry.rs | 36 +++++++++++++++++++++++++++++------- src/archival/tests/utils.rs | 9 ++++----- src/archival/utils.rs | 28 +++++++++++++++++++++------- src/configuration/macros.rs | 8 ++++++++ src/configuration/mod.rs | 7 +++++++ src/main.rs | 26 ++++++++++++-------------- src/poller/looper.rs | 23 ++++++++++++++--------- 11 files changed, 124 insertions(+), 74 deletions(-) create mode 100644 src/configuration/macros.rs diff --git a/src/app/mod.rs b/src/app/mod.rs index c4fe241..5a07798 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -1,6 +1,6 @@ use crate::archival; use crate::archival::notifier::Notifier; -use crate::configuration::Settings; +use crate::configuration::SETTINGS; use crate::poller::Poller; use sqlx::PgPool; use std::sync::Arc; @@ -40,16 +40,15 @@ pub async fn start(pool: &PgPool) -> Result<(), sqlx::Error> { /// /// ⚠️ This must be awaited twice. Once to get the `JoinHandle`, and a second to start the task pub async fn spawn_poller_task(db_pool: PgPool) -> JoinHandle<()> { - let settings = Settings::new().expect("Config settings are not configured properly"); - let mut poller = Poller::new(settings.poller_task.poll_interval, db_pool.clone()) + let mut poller = Poller::new(SETTINGS.poller_task.poll_interval, db_pool.clone()) .await - .expect("Could not find rows in edit rows to start poller"); + .expect("[POLLER] Could not find rows in edit rows to start poller"); tokio::spawn(async move { if let Err(e) = poller.poll().await { - eprintln!("Cannot poll edit data and edit notes, due to: {}", e); + eprintln!("[POLLER] Task Failed, Error: {}", e); sentry::capture_message( - format!("Cannot poll edit data and edit notes, due to: {}", e).as_str(), + format!("[POLLER] Task Failed, Error: {}", e).as_str(), sentry::Level::Warning, ); } @@ -60,12 +59,11 @@ pub async fn spawn_poller_task(db_pool: PgPool) -> JoinHandle<()> { /// /// ⚠️ This must be awaited twice. Once to get the `JoinHandle`, and a second to start the task async fn spawn_notification_task(db_pool: PgPool) -> JoinHandle<()> { - let settings = Settings::new().expect("Config settings are not configured properly"); let notifier = Arc::new(Mutex::new(Notifier::new(db_pool.clone()).await)); tokio::spawn(async move { let mut interval = - tokio::time::interval(Duration::from_secs(settings.notify_task.notify_interval)); + tokio::time::interval(Duration::from_secs(SETTINGS.notify_task.notify_interval)); while !db_pool.is_closed() { interval.tick().await; @@ -73,10 +71,8 @@ async fn spawn_notification_task(db_pool: PgPool) -> JoinHandle<()> { let mut notifier = notifier.lock().await; if notifier.should_notify().await { - println!("Notifying"); - if let Err(e) = notifier.notify().await { - eprintln!("Notify failed, error: {}", e); + eprintln!("[NOTIFIER] Task Failed, Error: {}", e); sentry::capture_error(&e); }; } @@ -93,7 +89,7 @@ async fn spawn_archiver_task(db_pool: PgPool) -> JoinHandle<()> { .await .unwrap_or_else(|e| { sentry::capture_error(&e); - eprintln!("Listener Task Error {}", e) + eprintln!("[LISTENER] Task Failed, Error {}", e) }) }) } @@ -102,24 +98,23 @@ async fn spawn_archiver_task(db_pool: PgPool) -> JoinHandle<()> { /// /// ⚠️ This must be awaited twice. Once to get the `JoinHandle`, and a second to start the task async fn spawn_retry_and_cleanup_task(db_pool: PgPool) -> JoinHandle<()> { - let settings = Settings::new().expect("Config settings are not configured properly"); tokio::spawn(async move { let mut interval = - tokio::time::interval(Duration::from_secs(settings.retry_task.retry_interval)); + tokio::time::interval(Duration::from_secs(SETTINGS.retry_task.retry_interval)); while !db_pool.is_closed() { interval.tick().await; - sentry::capture_message("Retry and Cleanup Task started", sentry::Level::Info); + sentry::capture_message("[RETRY_AND_CLEANUP] Task started", sentry::Level::Info); let archival_retry_task = archival::retry::start(db_pool.clone()).await; match archival_retry_task { Ok(_) => { sentry::capture_message( - "Retry and Cleanup Task Completed", + "[RETRY_AND_CLEANUP] Task Completed", sentry::Level::Info, ); } Err(e) => { sentry::capture_error(&e); - eprintln!("Retry and Cleanup Task failed, error: {}", e) + eprintln!("[RETRY_AND_CLEANUP] Task Failed, Error: {}", e) } } } diff --git a/src/archival/client.rs b/src/archival/client.rs index c8f1941..7cf8b3f 100644 --- a/src/archival/client.rs +++ b/src/archival/client.rs @@ -1,16 +1,15 @@ -use crate::configuration::Settings; +use crate::configuration::SETTINGS; use once_cell::sync::Lazy; use reqwest::{header, Client}; pub static REQWEST_CLIENT: Lazy = Lazy::new(|| { - let settings = Settings::new().expect("Config needed"); let mut headers = header::HeaderMap::new(); headers.insert("Accept", "application/json".parse().unwrap()); headers.insert( "Authorization", format!( "LOW {}:{}", - settings.wayback_machine_api.myaccesskey, settings.wayback_machine_api.mysecret + SETTINGS.wayback_machine_api.myaccesskey, SETTINGS.wayback_machine_api.mysecret ) .parse() .unwrap(), diff --git a/src/archival/listener.rs b/src/archival/listener.rs index 656639b..51ce247 100644 --- a/src/archival/listener.rs +++ b/src/archival/listener.rs @@ -4,7 +4,8 @@ use crate::archival::utils::{ }; use crate::archival::error::ArchivalError; -use crate::configuration::Settings; +use crate::configuration::SETTINGS; +use crate::debug_println; use crate::metrics::Metrics; use crate::structs::internet_archive_urls::{ArchivalStatus, InternetArchiveUrls}; use sentry::Level::Error; @@ -15,14 +16,15 @@ use tokio::time; /// Listens to the `archive_urls` postgres channel pub async fn listen(pool: PgPool) -> Result<(), ArchivalError> { - let settings = Settings::new().expect("Config settings not configured properly"); - let mut listener = PgListener::connect_with(&pool).await?; listener.listen("archive_urls").await?; loop { while let Some(notification) = listener.try_recv().await? { - time::sleep(Duration::from_secs(settings.listen_task.listen_interval)).await; - println!("Notification Payload: {}", notification.payload()); + time::sleep(Duration::from_secs(SETTINGS.listen_task.listen_interval)).await; + debug_println!( + "[LISTENER] Notification Payload: {}", + notification.payload() + ); let payload: InternetArchiveUrls = serde_json::from_str(notification.payload())?; handle_payload(payload, &pool).await? } @@ -52,7 +54,7 @@ pub async fn handle_payload( } else { let archival_result = archive(url, url_row.id, pool).await; if let Err(e) = archival_result { - eprintln!("Archival Error : {}", e); + eprintln!("[LISTENER] Archival Error for id {}: {}", url_row.id, e); set_status_with_message( pool, id, @@ -79,7 +81,7 @@ pub async fn archive(url: String, id: i32, pool: &PgPool) -> Result<(), Archival inc_archive_request_retry_count(&status_pool, id) .await .unwrap_or_else(|e| { - eprintln!("Could not increment archive request retry count"); + eprintln!("[LISTENER] Could not increment archive request retry count for internet_archive_urls id: {}", id); sentry::capture_error(&e); }); set_status_with_message( @@ -90,7 +92,7 @@ pub async fn archive(url: String, id: i32, pool: &PgPool) -> Result<(), Archival ) .await .unwrap_or_else(|e| { - eprintln!("Could not increment archive request retry count"); + eprintln!("[LISTENER] Could not increment archive request retry count for internet_archive_urls id: {}", id); sentry::capture_error(&e); }); sentry::capture_error(&e); diff --git a/src/archival/notifier.rs b/src/archival/notifier.rs index e0ed63d..afee693 100644 --- a/src/archival/notifier.rs +++ b/src/archival/notifier.rs @@ -1,4 +1,5 @@ use crate::archival::utils::{get_first_id_to_start_notifier_from, is_row_exists}; +use crate::debug_println; use sqlx::{Error, PgPool}; pub struct Notifier { @@ -10,7 +11,7 @@ impl Notifier { pub async fn new(pool: PgPool) -> Notifier { let start_notifier_from = get_first_id_to_start_notifier_from(pool.clone()).await; if start_notifier_from.is_some() { - println!("Notifies starts from : {}", start_notifier_from.unwrap()); + debug_println!("[NOTIFIER] starts from : {}", start_notifier_from.unwrap()); } Notifier { start_notifier_from, @@ -27,8 +28,8 @@ impl Notifier { .bind(self.start_notifier_from) .execute(&pool) .await?; - println!( - "[start_id from notify], {}", + debug_println!( + "[NOTIFIER] start notifier from internet_archive_urls id, {}", self.start_notifier_from.unwrap() ); @@ -39,7 +40,7 @@ impl Notifier { Ok(()) } else { //Case: It could be that there is no URL in InternetArchiveURL table when we call `notify`, so we check for the id here, to start notifier from it in the next notify call - println!("[NOTIFIER] No row detected, checking again"); + debug_println!("[NOTIFIER] No row detected to archive, checking again"); self.start_notifier_from = get_first_id_to_start_notifier_from(self.pool.clone()).await; Ok(()) } diff --git a/src/archival/retry.rs b/src/archival/retry.rs index 85ae42e..3b94412 100644 --- a/src/archival/retry.rs +++ b/src/archival/retry.rs @@ -1,5 +1,6 @@ use crate::archival::utils::check_if_permanent_error; -use crate::configuration::Settings; +use crate::configuration::SETTINGS; +use crate::debug_println; use crate::structs::internet_archive_urls::{ArchivalStatus, InternetArchiveUrls}; use chrono::{Duration, Utc}; use sqlx::{Error, PgPool}; @@ -7,8 +8,7 @@ use std::ops::Sub; /// Method started by `retry_and_cleanup` task, which reiterates `internet_archive_urls`, and according to the conditions, either re archives or cleans the row pub async fn start(pool: PgPool) -> Result<(), Error> { - let settings = Settings::new().expect("Config settings not configured properly"); - let select_limit = settings.retry_task.select_limit; + let select_limit = SETTINGS.retry_task.select_limit; let mut last_id = 0; loop { let query = format!( @@ -31,7 +31,7 @@ pub async fn start(pool: PgPool) -> Result<(), Error> { } last_id += select_limit; } - println!("Retry/Cleanup Task Complete"); + debug_println!("[RETRY_AND_CLEANUP] Task Complete"); Ok(()) } @@ -40,7 +40,6 @@ pub async fn retry_and_cleanup_ia_row( row: InternetArchiveUrls, pool: &PgPool, ) -> Result<(), Error> { - let settings = Settings::new().expect("Config settings are not configured properly"); let current_time = Utc::now(); let created_at = row.created_at.unwrap(); let duration_since_creation = current_time.sub(created_at); @@ -49,7 +48,7 @@ pub async fn retry_and_cleanup_ia_row( // If the URL status is failed, then we can remove it from the table (Case when still can't archive after 3 retries) Ok(ArchivalStatus::Failed) => { if duration_since_creation - >= Duration::seconds(settings.retry_task.allow_remove_row_after) + >= Duration::seconds(SETTINGS.retry_task.allow_remove_row_after) { sqlx::query( "DELETE FROM external_url_archiver.internet_archive_urls WHERE id = $1", @@ -57,6 +56,11 @@ pub async fn retry_and_cleanup_ia_row( .bind(row.id) .execute(pool) .await?; + debug_println!( + "[RETRY_AND_CLEANUP] Removing failed URL: {} after {} seconds", + row.url.unwrap_or("".to_string()), + SETTINGS.retry_task.allow_remove_row_after + ) } } Ok(ArchivalStatus::StatusError) => { @@ -69,19 +73,27 @@ pub async fn retry_and_cleanup_ia_row( .bind(row.id) .execute(pool) .await?; + debug_println!( + "[RETRY_AND_CLEANUP] Removing URL: {} due to Permanent Errors", + row.url.unwrap_or("".to_string()) + ) } else { // If the archival status is null, which means the URL could not get archived earlier, therefore, enqueue the row to be sent to get archived sqlx::query("SELECT external_url_archiver.notify_archive_urls($1)") .bind(row.id) .execute(pool) .await?; + debug_println!( + "[RETRY_AND_CLEANUP] Retrying notifying URL: {}", + row.url.unwrap_or("".to_string()) + ) } } } _ => { // In any other case, if the URL has been there for more than the time limit, i.e. 24 hours, we will remove it, else re-archive it if duration_since_creation - >= Duration::seconds(settings.retry_task.allow_remove_row_after) + >= Duration::seconds(SETTINGS.retry_task.allow_remove_row_after) { sqlx::query( "DELETE FROM external_url_archiver.internet_archive_urls WHERE id = $1", @@ -89,11 +101,21 @@ pub async fn retry_and_cleanup_ia_row( .bind(row.id) .execute(pool) .await?; + debug_println!( + "[RETRY_AND_CLEANUP] Removing URL: {} after {} seconds", + row.url.unwrap_or("".to_string()), + SETTINGS.retry_task.allow_remove_row_after + ) } else if row.status.try_into() != Ok(ArchivalStatus::Success) { sqlx::query("SELECT external_url_archiver.notify_archive_urls($1)") .bind(row.id) .execute(pool) .await?; + debug_println!( + "[RETRY_AND_CLEANUP] Retrying URL: {} after {} seconds", + row.url.unwrap_or("".to_string()), + SETTINGS.retry_task.allow_remove_row_after + ) } } } diff --git a/src/archival/tests/utils.rs b/src/archival/tests/utils.rs index 9f58487..69fd8e1 100644 --- a/src/archival/tests/utils.rs +++ b/src/archival/tests/utils.rs @@ -1,5 +1,5 @@ use super::*; -use crate::configuration::Settings; +use crate::configuration::SETTINGS; use sqlx::Error; #[sqlx::test(fixtures( @@ -37,11 +37,10 @@ async fn test_make_archival_network_request() -> Result<(), ArchivalError> { ..Default::default() }; let mut server = mockito::Server::new_with_opts_async(opts).await; - let settings = Settings::new().expect("Config settings are not configured properly"); let mock = server .mock("POST", "/save") .match_header("Accept", "application/json") - .match_header("Authorization", format!("LOW {}:{}", settings.wayback_machine_api.myaccesskey, settings.wayback_machine_api.mysecret).as_str()) + .match_header("Authorization", format!("LOW {}:{}", SETTINGS.wayback_machine_api.myaccesskey, settings.wayback_machine_api.mysecret).as_str()) .match_header("Content-Type", "application/x-www-form-urlencoded") .match_body(format!("url={}", testing_url_invalid).as_str()) .with_body(r#"{"message":"www.example.om URL syntax is not valid.","status":"error","status_ext":"error:invalid-url-syntax"}"#) @@ -55,7 +54,7 @@ async fn test_make_archival_network_request() -> Result<(), ArchivalError> { "Authorization", format!( "LOW {}:{}", - settings.wayback_machine_api.myaccesskey, settings.wayback_machine_api.mysecret + SETTINGS.wayback_machine_api.myaccesskey, SETTINGS.wayback_machine_api.mysecret ) .as_str(), ) @@ -71,7 +70,7 @@ async fn test_make_archival_network_request() -> Result<(), ArchivalError> { "Authorization", format!( "LOW {}:{}", - settings.wayback_machine_api.myaccesskey, settings.wayback_machine_api.mysecret + SETTINGS.wayback_machine_api.myaccesskey, SETTINGS.wayback_machine_api.mysecret ) .as_str(), ) diff --git a/src/archival/utils.rs b/src/archival/utils.rs index d952fcd..5820455 100644 --- a/src/archival/utils.rs +++ b/src/archival/utils.rs @@ -5,7 +5,8 @@ use crate::archival::archival_response::{ use crate::archival::client::REQWEST_CLIENT; use crate::archival::error::ArchivalError; use crate::archival::error::ArchivalError::SaveRequestError; -use crate::configuration::Settings; +use crate::configuration::SETTINGS; +use crate::debug_println; use crate::metrics::Metrics; use crate::structs::internet_archive_urls::{ArchivalStatus, InternetArchiveUrls}; use sqlx::{Error, PgPool}; @@ -80,7 +81,11 @@ pub async fn is_row_exists(pool: &PgPool, row_id: i32) -> bool { match is_row_exists_res { Ok(_) => true, Err(error) => { - println!("Cannot notify: {:?}", error); + debug_println!( + "[NOTIFIER] Cannot notify internet_archive_urls id: {}. Reason: {:?}", + row_id, + error + ); false } } @@ -138,12 +143,14 @@ pub async fn schedule_status_check( let metrics = Metrics::new().await; metrics.network_request_counter.inc(); metrics.push_metrics().await; - let settings = Settings::new().expect("Config settings are not configured properly"); - println!("{}", job_id); + debug_println!( + "[LISTENER] STATUS CHECK: Attempting status check for job_id {}", + job_id + ); set_status_with_message(pool, id, ArchivalStatus::Processing as i32, "Processing").await?; for attempt in 1..=3 { time::sleep(Duration::from_secs( - settings.listen_task.sleep_status_interval, + SETTINGS.listen_task.sleep_status_interval, )) .await; let archival_status_response = make_archival_status_request(job_id.as_str()).await?; @@ -156,11 +163,15 @@ pub async fn schedule_status_check( ) .await?; metrics.record_archival_status("success").await; + debug_println!( + "[LISTENER] STATUS CHECK: job_id {} archived successfully", + job_id + ); return Ok(()); } else { if attempt == 3 { let status = archival_status_response.status; - eprintln!("Error making final status check request: {:?}", &status); + eprintln!("[LISTENER] STATUS CHECK: Error making final status check request for job_id {}: {:?}", job_id, &status); inc_archive_request_retry_count(pool, id).await?; set_status_with_message( pool, @@ -171,7 +182,10 @@ pub async fn schedule_status_check( .await?; } metrics.record_archival_status("error").await; - eprintln!("Could not archive: {} attempt", attempt) + eprintln!( + "[LISTENER] STATUS CHECK: Could not archive job_id {}: {} attempt", + job_id, attempt + ) } } Ok(()) diff --git a/src/configuration/macros.rs b/src/configuration/macros.rs new file mode 100644 index 0000000..fe416e9 --- /dev/null +++ b/src/configuration/macros.rs @@ -0,0 +1,8 @@ +#[macro_export] +macro_rules! debug_println { + ($($arg:tt)*) => { + if $crate::configuration::SETTINGS.debug { + println!($($arg)*); + } + } +} diff --git a/src/configuration/mod.rs b/src/configuration/mod.rs index c2ee101..261140b 100644 --- a/src/configuration/mod.rs +++ b/src/configuration/mod.rs @@ -1,8 +1,14 @@ +mod macros; + use config::{Config, ConfigError, File}; use dotenv::dotenv; +use once_cell::sync::Lazy; use serde::Deserialize; use std::env; +pub static SETTINGS: Lazy = + Lazy::new(|| Settings::new().expect("Failed to load settings")); + #[derive(Debug, Deserialize)] pub struct WaybackMachineApi { pub myaccesskey: String, @@ -55,6 +61,7 @@ pub struct Settings { pub listen_task: ListenTask, pub sentry: Sentry, pub database: Database, + pub debug: bool, } impl Settings { diff --git a/src/main.rs b/src/main.rs index cca7b88..26bbb6a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,4 @@ -use crate::configuration::Settings; +use crate::configuration::SETTINGS; use sqlx::postgres::{PgConnectOptions, PgPoolOptions}; mod app; @@ -11,12 +11,10 @@ mod configuration; mod metrics; fn main() { - let settings = Settings::new().expect("Failed to load settings"); - - let _guard = if !settings.sentry.url.trim().is_empty() { + let _guard = if !SETTINGS.sentry.url.trim().is_empty() { println!("Initializing Sentry with DSN..."); Some(sentry::init(( - settings.sentry.url.as_str(), + SETTINGS.sentry.url.as_str(), sentry::ClientOptions { release: sentry::release_name!(), ..Default::default() @@ -33,18 +31,18 @@ fn main() { .build() .unwrap() .block_on(async { - let hostname = settings.database.pg_host; - let user = settings.database.pg_user; - let password = settings.database.pg_password; - let port = settings.database.pg_port; - let db = settings.database.pg_database; + let hostname = &SETTINGS.database.pg_host; + let user = &SETTINGS.database.pg_user; + let password = &SETTINGS.database.pg_password; + let port = SETTINGS.database.pg_port; + let db = &SETTINGS.database.pg_database; let connect_options = PgConnectOptions::new() - .host(&hostname) + .host(hostname) .port(port) - .username(&user) - .password(&password) - .database(&db) + .username(user) + .password(password) + .database(db) .statement_cache_capacity(0); let pool = PgPoolOptions::new() diff --git a/src/poller/looper.rs b/src/poller/looper.rs index 5915483..2f9c65c 100644 --- a/src/poller/looper.rs +++ b/src/poller/looper.rs @@ -1,3 +1,4 @@ +use crate::debug_println; use crate::metrics::Metrics; use crate::poller::utils::{ extract_url_from_edit_data, extract_url_from_edit_note, save_url_to_internet_archive_urls, @@ -16,9 +17,10 @@ pub async fn poll_db( edit_data_start_idx: i32, edit_note_start_idx: i32, ) -> Result<(Option, Option), Error> { - println!( - "EditNote: {}, EditData: {}", - edit_note_start_idx, edit_data_start_idx + debug_println!( + "[POLLER] Starting Polling from EditNote: {}, EditData: {}", + edit_note_start_idx, + edit_data_start_idx ); let metrics = Metrics::new().await; let edits = sqlx::query_as::<_, EditData>( @@ -49,26 +51,29 @@ pub async fn poll_db( .fetch_all(pool) .await?; - println!("Edits ->"); for edit in &edits { let urls = extract_url_from_edit_data(edit, pool).await; for url in urls { save_url_to_internet_archive_urls(url.as_str(), "edit_data", edit.edit, pool) .await - .unwrap_or_else(|e| eprintln!("Error saving URL from edit: {}: {}", edit.edit, e)); - println!("{}", url); + .unwrap_or_else(|e| { + eprintln!("[POLLER] Error saving URL from edit: {}: {}", edit.edit, e) + }); + debug_println!("[POLLER] Edit Data URL: {}", url); } } - println!("Edit Notes ->"); for note in ¬es { let urls = extract_url_from_edit_note(note, pool).await; for url in urls { save_url_to_internet_archive_urls(url.as_str(), "edit_note", note.id, pool) .await .unwrap_or_else(|e| { - eprintln!("Error saving URL from edit note: {}: {}", note.id, e) + eprintln!( + "[POLLER] Error saving URL from edit note: {}: {}", + note.id, e + ) }); - println!("{}", url); + debug_println!("[POLLER] Edit Note URL: {}", url); } } metrics.db_poll_counter.inc();