Skip to content

Commit

Permalink
chore: move copy to dynamic
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Nov 15, 2024
1 parent 454fbf8 commit 90d847e
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 42 deletions.
47 changes: 45 additions & 2 deletions fusio/src/dynamic/fs.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::pin::Pin;
use std::{cmp, pin::Pin, sync::Arc};

use futures_core::Stream;

use super::MaybeSendFuture;
use crate::{
buf::IoBufMut,
fs::{FileMeta, Fs, OpenOptions},
fs::{FileMeta, FileSystemTag, Fs, OpenOptions},
path::Path,
DynRead, DynWrite, Error, IoBuf, MaybeSend, MaybeSync, Read, Write,
};
Expand Down Expand Up @@ -48,6 +48,8 @@ impl<'write> Write for Box<dyn DynFile + 'write> {
}

pub trait DynFs: MaybeSend + MaybeSync {
fn file_system(&self) -> FileSystemTag;

fn open<'s, 'path: 's>(
&'s self,
path: &'path Path,
Expand Down Expand Up @@ -99,6 +101,10 @@ pub trait DynFs: MaybeSend + MaybeSync {
}

impl<F: Fs> DynFs for F {
fn file_system(&self) -> FileSystemTag {
Fs::file_system(self)
}

fn open_options<'s, 'path: 's>(
&'s self,
path: &'path Path,
Expand Down Expand Up @@ -160,6 +166,43 @@ impl<F: Fs> DynFs for F {
}
}

pub async fn copy(
from_fs: &Arc<dyn DynFs>,
from: &Path,
to_fs: &Arc<dyn DynFs>,
to: &Path,
) -> Result<(), Error> {
if from_fs.file_system() == to_fs.file_system() {
from_fs.copy(from, to).await?;
return Ok(());
}
let mut from_file = from_fs
.open_options(from, OpenOptions::default().read(true))
.await?;
let from_file_size = DynRead::size(&from_file).await? as usize;

let mut to_file = to_fs
.open_options(to, OpenOptions::default().create(true).write(true))
.await?;
let buf_size = cmp::min(from_file_size, 4 * 1024);
let mut buf = Some(vec![0u8; buf_size]);
let mut read_pos = 0u64;

while (read_pos as usize) < from_file_size - 1 {
let tmp = buf.take().unwrap();
let (result, tmp) = Read::read_exact_at(&mut from_file, tmp, read_pos).await;
result?;
read_pos += tmp.bytes_init() as u64;

let (result, tmp) = Write::write_all(&mut to_file, tmp).await;
result?;
buf = Some(tmp);
}
DynWrite::close(&mut to_file).await?;

Ok(())
}

#[cfg(test)]
mod tests {

Expand Down
45 changes: 5 additions & 40 deletions fusio/src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

mod options;

use std::{cmp, future::Future};
use std::future::Future;

use futures_core::Stream;
pub use options::*;
Expand Down Expand Up @@ -55,44 +55,9 @@ pub trait Fs: MaybeSend + MaybeSync {
fn link(&self, from: &Path, to: &Path) -> impl Future<Output = Result<(), Error>> + MaybeSend;
}

pub async fn copy<F, T>(from_fs: &F, from: &Path, to_fs: &T, to: &Path) -> Result<(), Error>
where
F: Fs,
T: Fs,
{
if from_fs.file_system() == to_fs.file_system() {
from_fs.copy(from, to).await?;
return Ok(());
}
let mut from_file = from_fs
.open_options(from, OpenOptions::default().read(true))
.await?;
let from_file_size = from_file.size().await? as usize;

let mut to_file = to_fs
.open_options(to, OpenOptions::default().create(true).write(true))
.await?;
let buf_size = cmp::min(from_file_size, 4 * 1024);
let mut buf = Some(vec![0u8; buf_size]);
let mut read_pos = 0u64;

while (read_pos as usize) < from_file_size - 1 {
let tmp = buf.take().unwrap();
let (result, tmp) = from_file.read_exact_at(tmp, read_pos).await;
result?;
read_pos += tmp.len() as u64;

let (result, tmp) = to_file.write_all(tmp).await;
result?;
buf = Some(tmp);
}
to_file.close().await?;

Ok(())
}

#[cfg(test)]
mod tests {
use crate::disk::TokioFs;

#[ignore]
#[cfg(all(
Expand Down Expand Up @@ -141,8 +106,8 @@ mod tests {
checksum: false,
};

let s3_fs = AmazonS3::new(Box::new(client), options);
let local_fs = TokioFs;
let s3_fs = Arc::new(AmazonS3::new(Box::new(client), options));
let local_fs = Arc::new(TokioFs);

{
let mut local_file = local_fs
Expand All @@ -154,7 +119,7 @@ mod tests {
.0?;
local_file.close().await.unwrap();
}
fs::copy(&local_fs, &local_path, &s3_fs, &s3_path).await?;
crate::dynamic::fs::copy(&local_fs, &local_path, &s3_fs, &s3_path).await?;

let mut s3 = S3File::new(s3_fs, s3_path.clone());

Expand Down

0 comments on commit 90d847e

Please sign in to comment.