Skip to content

Commit

Permalink
✨ Hook up file & repl counts to airtable
Browse files Browse the repository at this point in the history
  • Loading branch information
malted committed Sep 5, 2024
1 parent 0b5bd44 commit 3c0cd6d
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 15 deletions.
57 changes: 55 additions & 2 deletions src/airtable.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use airtable_api::{Airtable, Record};
use anyhow::Result;
use chrono::{DateTime, Utc};
use dotenv::var;
use once_cell::sync::Lazy;
use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION, CONTENT_TYPE};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};

static AIRTABLE: Lazy<Airtable> = Lazy::new(Airtable::new_from_env);
static TABLE: &str = "tblZABr7qbdjjZo1G";
Expand Down Expand Up @@ -37,10 +40,10 @@ pub struct AirtableSyncedUser {
pub finished_at: Option<DateTime<Utc>>,

#[serde(rename = "Repl Count")]
pub repl_count: Option<usize>,
pub repl_count: usize,

#[serde(rename = "File Count")]
pub file_count: Option<usize>,
pub file_count: usize,
}

pub async fn add_user(user: AirtableSyncedUser) -> bool {
Expand Down Expand Up @@ -69,6 +72,10 @@ pub async fn get_records() -> Result<Vec<Record<AirtableSyncedUser>>> {
"Status",
"R2 Link",
"Failed Repl IDs",
"Started At",
"Finished At",
"Repl Count",
"File Count",
],
)
.await?;
Expand All @@ -86,6 +93,52 @@ pub async fn update_records(records: Vec<Record<AirtableSyncedUser>>) -> Result<
Ok(())
}

pub async fn aggregates() -> Result<()> {
let url = format!(
"https://api.airtable.com/v0/{}/{}/aggregate",
var("AIRTABLE_BASE_ID")?,
TABLE
);

// Set up headers
let mut headers = HeaderMap::new();
headers.insert(
AUTHORIZATION,
HeaderValue::from_str(&format!("Bearer {}", var("AIRTABLE_API_KEY")?))?,
);
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));

// Payload for the sum aggregation
let payload = json!({
"aggregations": [
{
"aggregator": "sum",
"field": "File Count"
}
]
});

// Create a client and send the request
let client = reqwest::Client::new();
let response = client
.post(&url)
.headers(headers)
.json(&payload)
.send()
.await?;

// Check if the request was successful
if response.status().is_success() {
let data: Value = response.json().await?;
let column_sum = data["results"][0]["sum"].as_f64().unwrap_or(0.0);
println!("The sum of {} is: {}", "File Count", column_sum);
} else {
println!("Error: {}, {:?}", response.status(), response.text().await?);
}

Ok(())
}

use std::fmt;

#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
Expand Down
1 change: 1 addition & 0 deletions src/crosisdownload/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,7 @@ async fn handle_file(

drop(permit);

log::info!("{} file processed", file_count.load(Ordering::Relaxed));
file_count.fetch_add(1, Ordering::Relaxed);

Ok(())
Expand Down
20 changes: 11 additions & 9 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ impl SignupResponse {
async fn rocket() -> _ {
env_logger::init();
dotenv::dotenv().ok();
tokio::spawn(async {
loop {
if let Err(err) = airtable_loop().await {
error!("Airtable internal loop error (restarting): {err}");
}
}
});

airtable::aggregates().await.expect("fialed to get aggs");
// tokio::spawn(async {
// loop {
// if let Err(err) = airtable_loop().await {
// error!("Airtable internal loop error (restarting): {err}");
// }
// }
// });

rocket::build()
.mount("/", routes![hello, signup, get_progress])
Expand Down Expand Up @@ -173,15 +175,15 @@ async fn airtable_loop() -> Result<()> {
loop {
let mut user;
'mainloop: loop {
trace!("Getting airtable records");
info!("Getting airtable records");
let records = airtable::get_records().await?;
for record in records {
if record.fields.status == ProcessState::Registered {
user = record;
break 'mainloop;
}
}
tokio::time::sleep(Duration::from_secs(30)).await;
tokio::time::sleep(Duration::from_millis(100)).await;
}

if let Err(err) = ProfileRepls::download(&user.fields.token, user.clone()).await {
Expand Down
16 changes: 12 additions & 4 deletions src/replit_graphql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ impl ProfileRepls {
) -> Result<()> {
synced_user.fields.status = ProcessState::CollectingRepls;
synced_user.fields.started_at = Some(chrono::offset::Utc::now());
synced_user.fields.repl_count = Some(0);
airtable::update_records(vec![synced_user.clone()]).await?;

let client = create_client(token, None)?;
Expand Down Expand Up @@ -291,7 +290,11 @@ impl ProfileRepls {
"Downloaded {}::{} (without history) to {}",
repl.id, repl.slug, download_zip
);
synced_user.fields.file_count = Some(file_count);
synced_user.fields.file_count += file_count;
info!(
"nohist YO! {file_count}, total: {:?}",
synced_user.fields.file_count
);
no_history_download_count += 1;
progress.failed.no_history += 1;

Expand Down Expand Up @@ -325,7 +328,11 @@ impl ProfileRepls {
}
Ok(Ok((DownloadStatus::Full, file_count))) => {
info!("Downloaded {}::{} to {}", repl.id, repl.slug, main_location);
synced_user.fields.file_count = Some(file_count);
synced_user.fields.file_count += file_count;
info!(
"hist YO! {file_count}, total: {:?}",
synced_user.fields.file_count
);
successful_download_count += 1;
progress.successful += 1;
}
Expand All @@ -337,12 +344,13 @@ impl ProfileRepls {
"Download stats ({}): {successful_download_count} ({no_history_download_count} without history) correctly downloaded out of {total_download_count} total attempted downloads", current_user.username
);

synced_user.fields.repl_count.map(|mut v| v += 1);
synced_user.fields.repl_count += 1;
progress.report(&current_user);
}

progress.completed = true;
progress.report(&current_user);
airtable::update_records(vec![synced_user.clone()]).await?;

let path = format!("repls/{}", current_user.username);
make_zip(path.clone(), format!("repls/{}.zip", current_user.username)).await?;
Expand Down

0 comments on commit 3c0cd6d

Please sign in to comment.