Skip to content

Commit

Permalink
File deduplication
Browse files Browse the repository at this point in the history
  • Loading branch information
Hocuri committed Dec 11, 2024
1 parent 6468806 commit abbaa2e
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 7 deletions.
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ toml = "0.8"
url = "2"
uuid = { version = "1", features = ["serde", "v4"] }
webpki-roots = "0.26.7"
blake3 = "1.5.5"

[dev-dependencies]
anyhow = { workspace = true, features = ["backtrace"] } # Enable `backtrace` feature in tests.
Expand Down
79 changes: 76 additions & 3 deletions src/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ use std::iter::FusedIterator;
use std::mem;
use std::path::{Path, PathBuf};

use anyhow::{format_err, Context as _, Result};
use anyhow::{bail, format_err, Context as _, Result};
use base64::Engine as _;
use futures::StreamExt;
use image::codecs::jpeg::JpegEncoder;
use image::ImageReader;
use image::{DynamicImage, GenericImage, GenericImageView, ImageFormat, Pixel, Rgba};
use num_traits::FromPrimitive;
use tokio::io::AsyncWriteExt;
use tokio::{fs, io};
use tokio::{fs, io, task};
use tokio_stream::wrappers::ReadDirStream;

use crate::config::Config;
Expand Down Expand Up @@ -139,6 +139,47 @@ impl<'a> BlobObject<'a> {
Ok(blob)
}

/// TODO document
/// TODO what about race conditions when the same file is created multiple times concurrently
pub async fn create_and_deduplicate(
context: &'a Context,
src: &Path,
) -> Result<BlobObject<'a>> {
// `update_reader()` uses std::fs, so we need to call task::block_in_place().
// Tokio io also just calls spawn_blocking() internally (e.g. https://docs.rs/tokio/1.42.0/src/tokio/fs/file.rs.html#606 and https://docs.rs/tokio/latest/src/tokio/fs/mod.rs.html#310)
// so we are doing essentially the same here.

task::block_in_place(|| {
let blobdir = context.get_blobdir();
if !(src.starts_with(blobdir) || src.starts_with("$BLOBDIR/")) {
bail!("The file needs to be in the blob dir already and will be renamed. To attach a different file, copy it to the blobdir first.");
}

let mut src_file = std::fs::File::open(src)
.with_context(|| format!("failed to open file {}", src.display()))?;
let mut hasher = blake3::Hasher::new();
hasher.update_reader(&mut src_file)?;
let hash = hasher.finalize().to_hex();
let hash = hash.as_str();
let new_path = blobdir.join(hash);

// This will also replace an already-existing file:
if let Err(_) = std::fs::rename(src, &new_path) {
// Try a second time in case there was some temporary error.
// There is no need to try and create the blobdir since create_and_deduplicate()
// only works for files that already are in the blobdir, anyway.
std::fs::rename(src, &new_path)?;
};

let blob = BlobObject {
blobdir: blobdir,
name: format!("$BLOBDIR/{hash}"),
};
context.emit_event(EventType::NewBlobFile(blob.as_name().to_string()));
Ok(blob)
})
}

/// Creates a blob from a file, possibly copying it to the blobdir.
///
/// If the source file is not a path to into the blob directory
Expand Down Expand Up @@ -1439,7 +1480,7 @@ mod tests {
.await
.context("failed to write file")?;
let mut msg = Message::new(Viewtype::Image);
msg.set_file(file.to_str().unwrap(), None);
msg.set_file(file.to_str().unwrap(), None); // TODO make sure that test also passes with set_file_and_deduplicate
let chat = alice.create_chat(&bob).await;
let sent = alice.send_msg(chat.id, &mut msg).await;
let bob_msg = bob.recv_msg(&sent).await;
Expand Down Expand Up @@ -1490,4 +1531,36 @@ mod tests {

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_deduplication() -> Result<()> {
let t = TestContext::new().await;

let path = t.get_blobdir().join("anyfile.dat");
fs::write(&path, b"bla").await?;
let blob = BlobObject::create_and_deduplicate(&t, &path).await?;
assert_eq!(
blob.name,
"$BLOBDIR/ce940175885d7b78f7b7e9f1396611ff3e6828ebba2ca0d8b6e0f860ef2baf66"
);
assert_eq!(path.exists(), false);

fs::write(&path, b"bla").await?;
let blob2 = BlobObject::create_and_deduplicate(&t, &path).await?;
assert_eq!(blob2.name, blob.name);

let path_outside_blobdir = t.dir.path().join("anyfile.dat");
fs::write(&path_outside_blobdir, b"bla").await?;
let blob_res = BlobObject::create_and_deduplicate(&t, &path_outside_blobdir).await;
assert!(
blob_res.is_err(),
"Files outside the blobdir should not be allowed in create_and_deduplicate()"
);

fs::write(&path, b"blabla").await?;
let blob3 = BlobObject::create_and_deduplicate(&t, &path).await?;
assert_ne!(blob3.name, blob.name);

Ok(())
}
}
20 changes: 20 additions & 0 deletions src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,26 @@ impl Message {
self.param.set_optional(Param::MimeType, filemime);
}

/// Sets the file associated with a message.
/// The actual current name of the file is ignored, instead `name` is used.
/// The file (may be moved immediately by core) (and must not be modified again after this method was called)
///
/// TODO document, also in deltachat.h
pub async fn set_file_and_deduplicate(
&mut self,
context: &Context,
file: &Path,
filemime: Option<&str>,
name: &str,
) -> Result<()> {
let blob = BlobObject::create_and_deduplicate(context, file).await?;
self.param.set(Param::Filename, name);
self.param.set(Param::File, blob.as_name());
self.param.set_optional(Param::MimeType, filemime);

Ok(())
}

/// Creates a new blob and sets it as a file associated with a message.
pub async fn set_file_from_bytes(
&mut self,
Expand Down

0 comments on commit abbaa2e

Please sign in to comment.