diff --git a/examples/full_flow.rs b/examples/full_flow.rs new file mode 100644 index 0000000..a062ad3 --- /dev/null +++ b/examples/full_flow.rs @@ -0,0 +1,56 @@ +use airtable_api::Record; +use anyhow::Result; +use chrono::Utc; +use dotenv::var; +use replit_takeout::{ + airtable::{self, AirtableSyncedUser, ProcessState}, + replit_graphql::ProfileRepls, +}; + +#[tokio::main] +async fn main() -> Result<()> { + env_logger::init(); + dotenv::dotenv().ok(); + + let token = var("REPLIT_TEST_TOKEN")?; + + let fields = AirtableSyncedUser { + id: 29999230, + token, + username: "malted".into(), + status: ProcessState::Registered, + email: "test@malted.dev".into(), + r2_link: "http://example.com".into(), + failed_ids: "none".into(), + started_at: Some(Utc::now()), + finished_at: None, + repl_count: 0, + file_count: 0, + statistics: vec!["recpWEjc0zLoKEtZP".into()], + }; + + let mut user = Record { + id: String::new(), + fields, + created_time: None, + }; + + log::info!("Starting..."); + if let Err(err) = ProfileRepls::download(&user.fields.token, user.clone()).await { + log::error!("Error with `{}`'s download: {err:#?}", user.fields.username); + + user.fields.status = ProcessState::ErroredMain; + //arst airtable::update_records(vec![user.clone()]).await?; + + // user.fields.failed_ids = errored.join(","); + + // send_email( + // &user.fields.email, + // "Your Replit⠕ export is slightly delayed :/".into(), + // format!("Hey {}, We have run into an issue processing your Replit⠕ takeout 🥡.\nWe will manually review and confirm that all your data is included. If you don't hear back again within a few days email malted@hackclub.com. Sorry for the inconvenience.", user.fields.username), + // ) + // .await; + } + + Ok(()) +} diff --git a/examples/repls.rs b/examples/repls.rs new file mode 100644 index 0000000..f0c76c9 --- /dev/null +++ b/examples/repls.rs @@ -0,0 +1,29 @@ +use std::collections::HashMap; + +use anyhow::Result; +use dotenv::var; +use log::error; +use replit_takeout::replit::repls::Repl; + +#[tokio::main] +async fn main() -> Result<()> { + env_logger::init(); + dotenv::dotenv().ok(); + + let token = var("REPLIT_TEST_TOKEN")?; + + let repls = Repl::fetch(&token, None).await.expect("some repls"); + error!("got {} repls", repls.len()); + + let mut map: HashMap = HashMap::new(); + + for repl in repls { + if map.contains_key(&repl.id) { + log::error!("ALREADY CONTAINS {:?}", repl.clone()); + } + + map.insert(repl.id.clone(), repl); + } + + Ok(()) +} diff --git a/src/graphql/repls-query.graphql b/src/graphql/repls-query.graphql new file mode 100644 index 0000000..0cdb23f --- /dev/null +++ b/src/graphql/repls-query.graphql @@ -0,0 +1,33 @@ +query ReplList($path: String!, $starred: Boolean, $after: String) { + currentUser { + id + username + replFolderByPath(path: $path) { + ownerId: userId + pathnames + parent { + pathnames + } + folders { + id + name + pathnames + replsCount + folderType + } + repls(starred: $starred, after: $after) { + items { + id + title + isPrivate + slug + url + timeCreated + } + pageInfo { + nextCursor + } + } + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 3b7c85b..67cb794 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ pub mod airtable; pub mod crosisdownload; pub mod email; pub mod r2; +pub mod replit; pub mod replit_graphql; pub mod utils { diff --git a/src/main.rs b/src/main.rs index c281e15..44a63f5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -223,7 +223,7 @@ async fn airtable_loop() -> Result<()> { error!("Error with `{}`'s download: {err:#?}", user.fields.username); user.fields.status = ProcessState::ErroredMain; - airtable::update_records(vec![user.clone()]).await?; + //arst airtable::update_records(vec![user.clone()]).await?; // user.fields.failed_ids = errored.join(","); diff --git a/src/replit/mod.rs b/src/replit/mod.rs new file mode 100644 index 0000000..d43d89e --- /dev/null +++ b/src/replit/mod.rs @@ -0,0 +1,46 @@ +use reqwest::{ + cookie::Jar, + header::{self, HeaderMap}, + Client, Url, +}; +use std::sync::Arc; + +pub mod repls; + +pub static REPLIT_GQL_URL: &str = "https://replit.com/graphql"; + +pub fn create_client(token: &String, client: Option) -> Result { + if let Some(client) = client { + return Ok(client); + } + + Client::builder() + .user_agent(crate::utils::random_user_agent()) + .default_headers(create_client_headers()) + .cookie_provider(create_client_cookie_jar(token)) + .build() +} + +fn create_client_headers() -> HeaderMap { + let mut headers = header::HeaderMap::new(); + headers.insert( + "X-Requested-With", + header::HeaderValue::from_static("XMLHttpRequest"), + ); + headers.insert( + reqwest::header::REFERER, + header::HeaderValue::from_static("https://replit.com/~"), + ); + + headers +} + +fn create_client_cookie_jar(token: &String) -> Arc { + let cookie = &format!("connect.sid={token}; Domain=replit.com"); + let url = REPLIT_GQL_URL.parse::().unwrap(); + + let jar = Jar::default(); + jar.add_cookie_str(cookie, &url); + + Arc::new(jar) +} diff --git a/src/replit/repls.rs b/src/replit/repls.rs new file mode 100644 index 0000000..6125ed5 --- /dev/null +++ b/src/replit/repls.rs @@ -0,0 +1,144 @@ +use std::collections::HashSet; + +use super::{create_client, REPLIT_GQL_URL}; +use anyhow::Result; +use graphql_client::{GraphQLQuery, Response}; +use log::{debug, info, trace, warn}; +use reqwest::{Client, StatusCode}; +use tokio::time::{sleep, Duration}; + +type DateTime = String; +#[derive(GraphQLQuery)] +#[graphql( + schema_path = "src/graphql/schema 7.graphql", + query_path = "src/graphql/repls-query.graphql", + response_derives = "Debug" +)] +struct ReplList; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Repl { + pub id: String, + pub title: String, + pub slug: String, + pub private: bool, + pub url: String, + pub time_created: String, +} +impl Repl { + pub async fn fetch(token: &str, client_opt: Option) -> Result> { + let client = create_client(&token.into(), client_opt)?; + let mut all_repls = HashSet::new(); + let mut visited_folder_ids = HashSet::new(); + + Self::fetch_recursive("", "", &client, &mut all_repls, &mut visited_folder_ids).await?; + + info!("got {} repls", all_repls.len()); + + Ok(all_repls) + } + + async fn fetch_recursive( + path: &str, + folder_id: &str, + client: &Client, + all_repls: &mut HashSet, + visited_folder_ids: &mut HashSet, + ) -> Result<()> { + if !folder_id.is_empty() && visited_folder_ids.contains(folder_id) { + info!("Skipping already visited folder: {} ({})", path, folder_id); + return Ok(()); + } + + if !folder_id.is_empty() { + visited_folder_ids.insert(folder_id.to_string()); + } + + info!("Traversing {} ({folder_id})", path); + + let mut cursor = None; + let mut retry_count = 0; + let max_retries = 5; + + loop { + let folder_query = ReplList::build_query(repl_list::Variables { + path: path.to_string(), + starred: None, + after: cursor.clone(), + }); + + let folder_data = loop { + if retry_count >= max_retries { + return Err(anyhow::anyhow!("Max retries reached for path {path}")); + } + + let response = client.post(REPLIT_GQL_URL).json(&folder_query).send().await; + + match response { + Ok(res) if res.status() == StatusCode::TOO_MANY_REQUESTS => { + let wait_time = Duration::from_secs(2u64.pow(retry_count)); + warn!("Rate-limited - waiting {:?} before retrying", wait_time); + sleep(wait_time).await; + retry_count = (retry_count + 1).min(max_retries); + continue; + } + Ok(res) => break res, + Err(e) => { + warn!("Error fetching data: {:?}", e); + let wait_time = Duration::from_secs(2u64.pow(retry_count)); + warn!("Waiting {:?} before retrying", wait_time); + sleep(wait_time).await; + retry_count = (retry_count + 1).min(max_retries); + + continue; + } + } + }; + + let folder_data = folder_data.text().await?; + + let folder: Response = serde_json::from_str(&folder_data)?; + log::trace!("{path}-{:#?}", folder); + + let folder = folder + .data + .and_then(|data| data.current_user) + .and_then(|user| user.repl_folder_by_path) + .ok_or_else(|| anyhow::anyhow!("Failed to get folder data"))?; + + // Process subfolders + for subfolder in folder.folders { + Box::pin(Self::fetch_recursive( + &subfolder.pathnames.join("/"), + &subfolder.id, + client, + all_repls, + visited_folder_ids, + )) + .await?; + } + + for repl in folder.repls.items { + all_repls.insert(Repl { + id: repl.id, + title: repl.title, + slug: repl.slug, + private: repl.is_private, + url: repl.url, + time_created: repl.time_created, + }); + + } + + sleep(Duration::from_millis(250)).await; + + // Check for next page + match folder.repls.page_info.next_cursor { + Some(next_cursor) => cursor = Some(next_cursor), + None => break, + } + } + + Ok(()) + } +} diff --git a/src/replit_graphql.rs b/src/replit_graphql.rs index e367b3b..bbcc63d 100644 --- a/src/replit_graphql.rs +++ b/src/replit_graphql.rs @@ -19,6 +19,7 @@ use crate::{ crosisdownload::{make_zip, DownloadLocations, DownloadStatus, ReplInfo}, email::emails::{send_partial_success_email, send_success_email}, r2, + replit::repls::Repl, }; static REPLIT_GQL_URL: &str = "https://replit.com/graphql"; @@ -116,59 +117,59 @@ type DateTime = String; response_derives = "Debug" )] pub struct ProfileRepls; - impl ProfileRepls { - /// Get one page of repls. - pub async fn fetch( - token: &String, - id: i64, - client_opt: Option, - after: Option, - ) -> Result<( - Vec, - Option, - )> { - let client = create_client(token, client_opt)?; - - let repls_query = ProfileRepls::build_query(profile_repls::Variables { id, after }); - - let repls_data: String = client - .post(REPLIT_GQL_URL) - .json(&repls_query) - .send() - .await? - .text() - .await?; - debug!( - "{}:{} Raw text repl data: {repls_data}", - std::line!(), - std::column!() - ); - - let repls_data_result = - match serde_json::from_str::>(&repls_data) { - Ok(data) => data.data, - Err(e) => { - error!("Failed to deserialize JSON: {}", e); - return Err(anyhow::Error::new(e)); - } - }; - - let next_page = repls_data_result - .as_ref() - .and_then(|data| { - data.user - .as_ref() - .map(|user| user.profile_repls.page_info.next_cursor.clone()) - }) - .ok_or(anyhow::Error::msg("Page Info not found during download"))?; - - let repls = repls_data_result - .and_then(|data| data.user.map(|user| user.profile_repls.items)) - .ok_or(anyhow::Error::msg("Repls not found during download"))?; - - Ok((repls, next_page)) - } + // /// Get one page of repls. + // #[deprecated] + // async fn fetch( + // token: &String, + // id: i64, + // client_opt: Option, + // after: Option, + // ) -> Result<( + // Vec, + // Option, + // )> { + // let client = create_client(token, client_opt)?; + + // let repls_query = ProfileRepls::build_query(profile_repls::Variables { id, after }); + + // let repls_data: String = client + // .post(REPLIT_GQL_URL) + // .json(&repls_query) + // .send() + // .await? + // .text() + // .await?; + // debug!( + // "{}:{} Raw text repl data: {repls_data}", + // std::line!(), + // std::column!() + // ); + + // let repls_data_result = + // match serde_json::from_str::>(&repls_data) { + // Ok(data) => data.data, + // Err(e) => { + // error!("Failed to deserialize JSON: {}", e); + // return Err(anyhow::Error::new(e)); + // } + // }; + + // let next_page = repls_data_result + // .as_ref() + // .and_then(|data| { + // data.user + // .as_ref() + // .map(|user| user.profile_repls.page_info.next_cursor.clone()) + // }) + // .ok_or(anyhow::Error::msg("Page Info not found during download"))?; + + // let repls = repls_data_result + // .and_then(|data| data.user.map(|user| user.profile_repls.items)) + // .ok_or(anyhow::Error::msg("Repls not found during download"))?; + + // Ok((repls, next_page)) + // } pub async fn download( token: &String, @@ -176,30 +177,17 @@ impl ProfileRepls { ) -> Result<()> { synced_user.fields.status = ProcessState::CollectingRepls; synced_user.fields.started_at = Some(chrono::offset::Utc::now()); - airtable::update_records(vec![synced_user.clone()]).await?; + //arst airtable::update_records(vec![synced_user.clone()]).await?; let client = create_client(token, None)?; let current_user = QuickUser::fetch(token, Some(client.clone())).await?; + log::info!("current user: {:#?}", current_user); fs::create_dir_all("repls").await?; fs::create_dir(format!("repls/{}", current_user.username)).await?; - let mut repls = Vec::new(); - let mut cursor = None; - - loop { - let (mut page_of_repls, new_cursor) = - Self::fetch(token, current_user.id, None, cursor).await?; - repls.append(&mut page_of_repls); - - if let Some(next_cursor) = new_cursor { - cursor = Some(next_cursor); - } else { - break; - } - } - + let repls = Repl::fetch(&token, Some(client.clone())).await?; let repl_count = repls.len(); let mut progress = ExportProgress::new(repl_count); @@ -219,7 +207,7 @@ impl ProfileRepls { } synced_user.fields.status = ProcessState::NoRepls; - airtable::update_records(vec![synced_user.clone()]).await?; + //arst airtable::update_records(vec![synced_user.clone()]).await?; return Ok(()); } @@ -336,13 +324,13 @@ impl ProfileRepls { ); synced_user.fields.repl_count += 1; - airtable::update_records(vec![synced_user.clone()]).await?; + //arst airtable::update_records(vec![synced_user.clone()]).await?; progress.report(¤t_user); } progress.completed = true; progress.report(¤t_user); - airtable::update_records(vec![synced_user.clone()]).await?; + //arst airtable::update_records(vec![synced_user.clone()]).await?; let path = format!("repls/{}", current_user.username); make_zip(path.clone(), format!("repls/{}.zip", current_user.username)).await?; @@ -359,11 +347,11 @@ impl ProfileRepls { let upload_result = r2::upload(upload_path.clone(), zip_path.clone()).await; fs::remove_file(&zip_path).await?; synced_user.fields.status = ProcessState::WaitingInR2; - airtable::update_records(vec![synced_user.clone()]).await?; + //arst airtable::update_records(vec![synced_user.clone()]).await?; if let Err(upload_err) = upload_result { synced_user.fields.status = ProcessState::ErroredR2; - airtable::update_records(vec![synced_user.clone()]).await?; + //arst airtable::update_records(vec![synced_user.clone()]).await?; error!("Failed to upload {upload_path} to R2"); return Err(upload_err); } @@ -427,8 +415,89 @@ impl ProfileRepls { synced_user.fields.failed_ids = errored.join(","); } synced_user.fields.finished_at = Some(chrono::offset::Utc::now()); - airtable::update_records(vec![synced_user]).await?; + //arst airtable::update_records(vec![synced_user]).await?; + + Ok(()) + } +} + +#[derive(GraphQLQuery)] +#[graphql( + schema_path = "src/graphql/schema 7.graphql", + query_path = "src/graphql/repls-query.graphql", + response_derives = "Debug" +)] +pub struct ReplList; +impl ReplList { + /// Get all a user's repls + pub async fn fetch( + token: &String, + client_opt: Option, + after: Option, + ) -> Result<()> { + /* With this GraphQL query, you get a representation of one level of + * the user's repl directory tree, meaning you get a list of repls, and + * a list of directories. This function starts from the root path + * (where path = ""), collects the (paginated) repls at this level, + * then recursively does the same through the directory list. */ + + let client = create_client(token, client_opt)?; + + let repls_query = ReplList::build_query(repl_list::Variables { + path: "".into(), + starred: None, + after: None, + }); + + let repls_data: String = client + .post(REPLIT_GQL_URL) + .json(&repls_query) + .send() + .await? + .text() + .await?; + trace!( + "{}:{} Raw text repl data: {repls_data}", + std::line!(), + std::column!() + ); + + let repls_data_result = + match serde_json::from_str::>(&repls_data) { + Ok(data) => data.data, + Err(e) => { + error!("Failed to deserialize JSON: {}", e); + return Err(anyhow::Error::new(e)); + } + }; + + debug!("repls data result: {:#?}", repls_data_result); + if let Some(curr) = repls_data_result + .map(|r| r.current_user.map(|r| r.repl_folder_by_path)) + .flatten() + .flatten() + { + info!("curr repls: {:#?}", curr.repls); + } else { + log::error!("No repls data!") + } + + // First, paginate through the repls. + + // let next_page = repls_data_result + // .as_ref() + // .and_then(|data| { + // data.user + // .as_ref() + // .map(|user| user.profile_repls.page_info.next_cursor.clone()) + // }) + // .ok_or(anyhow::Error::msg("Page Info not found during download"))?; + + // let repls = repls_data_result + // .and_then(|data| data.user.map(|user| user.profile_repls.items)) + // .ok_or(anyhow::Error::msg("Repls not found during download"))?; + // Ok((repls, next_page)) Ok(()) } }