diff --git a/src/airtable.rs b/src/airtable.rs index 8afb2fe..1b3b64f 100644 --- a/src/airtable.rs +++ b/src/airtable.rs @@ -1,8 +1,11 @@ use airtable_api::{Airtable, Record}; use anyhow::Result; use chrono::{DateTime, Utc}; +use dotenv::var; use once_cell::sync::Lazy; +use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION, CONTENT_TYPE}; use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; static AIRTABLE: Lazy = Lazy::new(Airtable::new_from_env); static TABLE: &str = "tblZABr7qbdjjZo1G"; @@ -37,10 +40,10 @@ pub struct AirtableSyncedUser { pub finished_at: Option>, #[serde(rename = "Repl Count")] - pub repl_count: Option, + pub repl_count: usize, #[serde(rename = "File Count")] - pub file_count: Option, + pub file_count: usize, } pub async fn add_user(user: AirtableSyncedUser) -> bool { @@ -69,6 +72,10 @@ pub async fn get_records() -> Result>> { "Status", "R2 Link", "Failed Repl IDs", + "Started At", + "Finished At", + "Repl Count", + "File Count", ], ) .await?; @@ -86,6 +93,52 @@ pub async fn update_records(records: Vec>) -> Result< Ok(()) } +pub async fn aggregates() -> Result<()> { + let url = format!( + "https://api.airtable.com/v0/{}/{}/aggregate", + var("AIRTABLE_BASE_ID")?, + TABLE + ); + + // Set up headers + let mut headers = HeaderMap::new(); + headers.insert( + AUTHORIZATION, + HeaderValue::from_str(&format!("Bearer {}", var("AIRTABLE_API_KEY")?))?, + ); + headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); + + // Payload for the sum aggregation + let payload = json!({ + "aggregations": [ + { + "aggregator": "sum", + "field": "File Count" + } + ] + }); + + // Create a client and send the request + let client = reqwest::Client::new(); + let response = client + .post(&url) + .headers(headers) + .json(&payload) + .send() + .await?; + + // Check if the request was successful + if response.status().is_success() { + let data: Value = response.json().await?; + let column_sum = data["results"][0]["sum"].as_f64().unwrap_or(0.0); + println!("The sum of {} is: {}", "File Count", column_sum); + } else { + println!("Error: {}, {:?}", response.status(), response.text().await?); + } + + Ok(()) +} + use std::fmt; #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] diff --git a/src/crosisdownload/mod.rs b/src/crosisdownload/mod.rs index 6a6f535..06f6e1b 100644 --- a/src/crosisdownload/mod.rs +++ b/src/crosisdownload/mod.rs @@ -686,6 +686,7 @@ async fn handle_file( drop(permit); + log::info!("{} file processed", file_count.load(Ordering::Relaxed)); file_count.fetch_add(1, Ordering::Relaxed); Ok(()) diff --git a/src/main.rs b/src/main.rs index 2d281c7..680b8ca 100644 --- a/src/main.rs +++ b/src/main.rs @@ -45,13 +45,15 @@ impl SignupResponse { async fn rocket() -> _ { env_logger::init(); dotenv::dotenv().ok(); - tokio::spawn(async { - loop { - if let Err(err) = airtable_loop().await { - error!("Airtable internal loop error (restarting): {err}"); - } - } - }); + + airtable::aggregates().await.expect("fialed to get aggs"); + // tokio::spawn(async { + // loop { + // if let Err(err) = airtable_loop().await { + // error!("Airtable internal loop error (restarting): {err}"); + // } + // } + // }); rocket::build() .mount("/", routes![hello, signup, get_progress]) @@ -173,7 +175,7 @@ async fn airtable_loop() -> Result<()> { loop { let mut user; 'mainloop: loop { - trace!("Getting airtable records"); + info!("Getting airtable records"); let records = airtable::get_records().await?; for record in records { if record.fields.status == ProcessState::Registered { @@ -181,7 +183,7 @@ async fn airtable_loop() -> Result<()> { break 'mainloop; } } - tokio::time::sleep(Duration::from_secs(30)).await; + tokio::time::sleep(Duration::from_millis(100)).await; } if let Err(err) = ProfileRepls::download(&user.fields.token, user.clone()).await { diff --git a/src/replit_graphql.rs b/src/replit_graphql.rs index 5707ca2..3a40b16 100644 --- a/src/replit_graphql.rs +++ b/src/replit_graphql.rs @@ -176,7 +176,6 @@ impl ProfileRepls { ) -> Result<()> { synced_user.fields.status = ProcessState::CollectingRepls; synced_user.fields.started_at = Some(chrono::offset::Utc::now()); - synced_user.fields.repl_count = Some(0); airtable::update_records(vec![synced_user.clone()]).await?; let client = create_client(token, None)?; @@ -291,7 +290,11 @@ impl ProfileRepls { "Downloaded {}::{} (without history) to {}", repl.id, repl.slug, download_zip ); - synced_user.fields.file_count = Some(file_count); + synced_user.fields.file_count += file_count; + info!( + "nohist YO! {file_count}, total: {:?}", + synced_user.fields.file_count + ); no_history_download_count += 1; progress.failed.no_history += 1; @@ -325,7 +328,11 @@ impl ProfileRepls { } Ok(Ok((DownloadStatus::Full, file_count))) => { info!("Downloaded {}::{} to {}", repl.id, repl.slug, main_location); - synced_user.fields.file_count = Some(file_count); + synced_user.fields.file_count += file_count; + info!( + "hist YO! {file_count}, total: {:?}", + synced_user.fields.file_count + ); successful_download_count += 1; progress.successful += 1; } @@ -337,12 +344,13 @@ impl ProfileRepls { "Download stats ({}): {successful_download_count} ({no_history_download_count} without history) correctly downloaded out of {total_download_count} total attempted downloads", current_user.username ); - synced_user.fields.repl_count.map(|mut v| v += 1); + synced_user.fields.repl_count += 1; progress.report(¤t_user); } progress.completed = true; progress.report(¤t_user); + airtable::update_records(vec![synced_user.clone()]).await?; let path = format!("repls/{}", current_user.username); make_zip(path.clone(), format!("repls/{}.zip", current_user.username)).await?;