Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(files): Move download/upload to new tasks #1727

Merged
merged 15 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
299 changes: 161 additions & 138 deletions common/src/warp_runner/manager/commands/constellation_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
io::Read,
path::{Path, PathBuf},
process::{Command, Stdio},
sync::mpsc,
};

use derive_more::Display;
Expand All @@ -25,7 +26,7 @@ use warp::{
directory::Directory,
file::File,
item::{FormatType, Item, ItemType},
Progression,
ConstellationProgressStream, Progression,
},
error::Error,
logging::tracing::log,
Expand Down Expand Up @@ -370,7 +371,9 @@ async fn upload_files(warp_storage: &mut warp_storage, files_path: Vec<PathBuf>)
};

let max_size_ipfs = warp_storage.max_size();
'files_parth_loop: for file_path in files_path.clone() {
let (tx, rx) = mpsc::channel();

for file_path in files_path.clone() {
let mut filename = match file_path
.file_name()
.map(|file| file.to_string_lossy().to_string())
Expand Down Expand Up @@ -419,155 +422,175 @@ async fn upload_files(warp_storage: &mut warp_storage, files_path: Vec<PathBuf>)
filename = rename_if_duplicate(current_directory.clone(), filename.clone(), file);

match warp_storage.put(&filename, &local_path).await {
Ok(mut upload_progress) => {
let mut previous_percentage: usize = 0;
let mut upload_process_started = false;

while let Some(upload_progress) = upload_progress.next().await {
match upload_progress {
Progression::CurrentProgress {
name,
current,
total,
} => {
log::trace!("starting upload file action listener");
if let Ok(received_tx) = CANCEL_FILE_UPLOADLISTENER
.rx
.clone()
.lock()
.await
.try_recv()
{
if received_tx {
let _ = tx_upload_file.send(UploadFileAction::Cancelling);
continue 'files_parth_loop;
}
}
if !upload_process_started {
upload_process_started = true;
log::info!("Starting upload for {name}");
log::info!("0% completed -> written 0 bytes")
};

if let Some(total) = total {
let current_percentage =
(((current as f64) / (total as f64)) * 100.) as usize;
if previous_percentage != current_percentage {
previous_percentage = current_percentage;
let readable_current = format_size(current, DECIMAL);
let percentage_number =
((current as f64) / (total as f64)) * 100.;
let _ = tx_upload_file.send(UploadFileAction::Uploading((
format!("{}%", percentage_number as usize),
get_local_text("files.uploading-file"),
filename.clone(),
)));
log::info!(
"{}% completed -> written {readable_current}",
percentage_number as usize
)
}
}
}
Progression::ProgressComplete { name, total } => {
let total = total.unwrap_or_default();
let readable_total = format_size(total, DECIMAL);
let _ = tx_upload_file.send(UploadFileAction::Uploading((
"100%".into(),
get_local_text("files.uploading-file"),
filename.clone(),
)));
log::info!("{name} has been uploaded with {}", readable_total);
}
Progression::ProgressFailed {
name,
last_size,
error,
} => {
log::info!(
"{name} failed to upload at {} MB due to: {}",
last_size.unwrap_or_default(),
error.unwrap_or_default()
);
let _ = tx_upload_file.send(UploadFileAction::Error);
continue 'files_parth_loop;
}
Ok(upload_progress) => {
// Handle each upload on another thread
let mut warp_storage = warp_storage.clone();
let res = tx.clone();
tokio::spawn(async move {
handle_upload_progress(
&mut warp_storage,
upload_progress,
filename,
file_path.clone(),
)
.await;
let _ = res.send(file_path);
});
}
Err(error) => log::error!("Error when upload file: {:?}", error),
}
}
let mut warp_storage = warp_storage.clone();
// Spawn a listener for when all files finished uploading
// Listener should automatically finish once all senders are dropped (aka done)
tokio::spawn(async move {
loop {
if rx.recv().is_err() {
// Sender all dropped
break;
}
}
let ret = match get_items_from_current_directory(&mut warp_storage) {
Ok(r) => UploadFileAction::Finished(r),
Err(_) => UploadFileAction::Error,
};

let _ = tx_upload_file.send(ret);
});
}

async fn handle_upload_progress(
warp_storage: &mut warp_storage,
mut upload_progress: ConstellationProgressStream,
filename: String,
file_path: PathBuf,
) {
let tx_upload_file = UPLOAD_FILE_LISTENER.tx.clone();
let mut previous_percentage: usize = 0;
let mut upload_process_started = false;

while let Some(upload_progress) = upload_progress.next().await {
match upload_progress {
Progression::CurrentProgress {
name,
current,
total,
} => {
log::trace!("starting upload file action listener");
if let Ok(received_tx) = CANCEL_FILE_UPLOADLISTENER
.rx
.clone()
.lock()
.await
.try_recv()
{
if received_tx {
let _ = tx_upload_file.send(UploadFileAction::Cancelling);
stavares843 marked this conversation as resolved.
Show resolved Hide resolved
break;
}
}

if !upload_process_started {
upload_process_started = true;
log::info!("Starting upload for {name}");
log::info!("0% completed -> written 0 bytes")
};

if let Some(total) = total {
let current_percentage = (((current as f64) / (total as f64)) * 100.) as usize;
if previous_percentage != current_percentage {
previous_percentage = current_percentage;
let readable_current = format_size(current, DECIMAL);
let percentage_number = ((current as f64) / (total as f64)) * 100.;
let _ = tx_upload_file.send(UploadFileAction::Uploading((
format!("{}%", percentage_number as usize),
get_local_text("files.uploading-file"),
filename.clone(),
)));
log::info!(
"{}% completed -> written {readable_current}",
percentage_number as usize
)
}
}
}
Progression::ProgressComplete { name, total } => {
let total = total.unwrap_or_default();
let readable_total = format_size(total, DECIMAL);
let _ = tx_upload_file.send(UploadFileAction::Uploading((
"100%".into(),
get_local_text("files.checking-thumbnail"),
get_local_text("files.uploading-file"),
filename.clone(),
)));
log::info!("{name} has been uploaded with {}", readable_total);
}
Progression::ProgressFailed {
name,
last_size,
error,
} => {
log::info!(
"{name} failed to upload at {} MB due to: {}",
last_size.unwrap_or_default(),
error.unwrap_or_default()
);
let _ = tx_upload_file.send(UploadFileAction::Error);
break;
}
}
}

let video_formats = VIDEO_FILE_EXTENSIONS.to_vec();
let doc_formats = DOC_EXTENSIONS.to_vec();
let _ = tx_upload_file.send(UploadFileAction::Uploading((
"100%".into(),
get_local_text("files.checking-thumbnail"),
filename.clone(),
)));

let file_extension = std::path::Path::new(&filename)
.extension()
.and_then(OsStr::to_str)
.map(|s| format!(".{s}"))
.unwrap_or_default();
let video_formats = VIDEO_FILE_EXTENSIONS.to_vec();
let doc_formats = DOC_EXTENSIONS.to_vec();

if video_formats.iter().any(|f| f == &file_extension) {
match set_thumbnail_if_file_is_video(
warp_storage,
filename.clone(),
file_path.clone(),
)
.await
{
Ok(_) => {
log::info!("Video Thumbnail uploaded");
let _ = tx_upload_file.send(UploadFileAction::Uploading((
"100%".into(),
get_local_text("files.thumbnail-uploaded"),
filename.clone(),
)));
}
Err(error) => {
log::error!("Not possible to update thumbnail for video: {:?}", error);
}
};
}
let file_extension = std::path::Path::new(&filename)
.extension()
.and_then(OsStr::to_str)
.map(|s| format!(".{s}"))
.unwrap_or_default();

if doc_formats.iter().any(|f| f == &file_extension) {
match set_thumbnail_if_file_is_document(
warp_storage,
filename.clone(),
file_path.clone(),
)
.await
{
Ok(_) => {
log::info!("Document Thumbnail uploaded");
let _ = tx_upload_file.send(UploadFileAction::Uploading((
"100%".into(),
get_local_text("files.thumbnail-uploaded"),
filename.clone(),
)));
}
Err(error) => {
log::error!(
"Not possible to update thumbnail for document: {:?}",
error
);
}
};
}
let _ = tx_upload_file.send(UploadFileAction::Finishing);
log::info!("{:?} file uploaded!", filename);
if video_formats.iter().any(|f| f == &file_extension) {
match set_thumbnail_if_file_is_video(warp_storage, filename.clone(), file_path.clone())
.await
{
Ok(_) => {
log::info!("Video Thumbnail uploaded");
let _ = tx_upload_file.send(UploadFileAction::Uploading((
"100%".into(),
get_local_text("files.thumbnail-uploaded"),
filename.clone(),
)));
}
Err(error) => log::error!("Error when upload file: {:?}", error),
}
Err(error) => {
log::error!("Not possible to update thumbnail for video: {:?}", error);
}
};
}
let ret = match get_items_from_current_directory(warp_storage) {
Ok(r) => UploadFileAction::Finished(r),
Err(_) => UploadFileAction::Error,
};

let _ = tx_upload_file.send(ret);
if doc_formats.iter().any(|f| f == &file_extension) {
match set_thumbnail_if_file_is_document(warp_storage, filename.clone(), file_path.clone())
.await
{
Ok(_) => {
log::info!("Document Thumbnail uploaded");
let _ = tx_upload_file.send(UploadFileAction::Uploading((
"100%".into(),
get_local_text("files.thumbnail-uploaded"),
filename.clone(),
)));
}
Err(error) => {
log::error!("Not possible to update thumbnail for document: {:?}", error);
}
};
}
let _ = tx_upload_file.send(UploadFileAction::Finishing);
log::info!("{:?} file uploaded!", filename);
}

fn rename_if_duplicate(
Expand Down
Loading
Loading