diff --git a/Cargo.lock b/Cargo.lock index 6ff820e..9bf3f11 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -66,6 +66,15 @@ version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5" +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "bindgen" version = "0.63.0" @@ -107,6 +116,7 @@ version = "0.1.0" dependencies = [ "anyhow", "atty", + "bincode", "blake2", "byteorder", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 50b2a18..c4f511f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] anyhow = "1.0" atty = "0.2" +bincode = "1.3" blake2 = "0.10" byteorder = "1.4" chrono = "0.4" diff --git a/src/check.rs b/src/check.rs new file mode 100644 index 0000000..f904f4a --- /dev/null +++ b/src/check.rs @@ -0,0 +1,340 @@ +use anyhow::{anyhow, Context, Result}; +use byteorder::{LittleEndian, ReadBytesExt}; +use nom::AsBytes; +use serde_derive::{Deserialize, Serialize}; +use std::env; +use std::fs; +use std::io::{Read, Seek, Write}; +use std::os::unix::prelude::OpenOptionsExt; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use bincode::*; +use clap::ArgMatches; + +use crate::hash::*; +use crate::output::Output; +use crate::paths; +use crate::slab::*; + +fn remove_incomplete_stream(cp: &CheckPoint) -> Result<()> { + // Data and hashes has been validated, remove the stream file directory + let stream_dir_to_del = PathBuf::from(&cp.stream_path).canonicalize()?; + fs::remove_dir_all(stream_dir_to_del.clone())?; + + // Everything should be good now, remove the check point + CheckPoint::end()?; + + Ok(()) +} + +pub fn run(matches: &ArgMatches, output: Arc, repair: bool) -> Result<()> { + let archive_dir = Path::new(matches.value_of("ARCHIVE").unwrap()).canonicalize()?; + + env::set_current_dir(archive_dir.clone())?; + + let data_path = paths::data_path(); + let hashes_path = paths::hashes_path(); + + output.report.progress(0); + output.report.set_title("Verifying archive"); + + // Load up a check point if we have one. + let cp = CheckPoint::read(&archive_dir)?; + + let num_data_slabs = SlabFile::verify(&data_path, repair); + output.report.progress(25); + let num_hash_slabs = SlabFile::verify(&hashes_path, repair); + output.report.progress(50); + if (num_data_slabs.is_err() || num_hash_slabs.is_err()) + || (num_data_slabs.as_ref().unwrap() != num_hash_slabs.as_ref().unwrap()) + { + if !repair || cp.is_none() { + return Err(anyhow!( + "The number of slabs in the data file {} != {} number in hashes file!", + num_data_slabs?, + num_hash_slabs? + )); + } + + output.report.set_title("Repairing archive ..."); + + // We tried to do a non-loss data fix which didn't work, we now have to revert the data + // slab to a known good state. It doesn't matter if one of the slabs verifies ok, we need + // to be a matched set. So we put both is known good state, but before we do we will make + // sure our slabs are bigger than our starting sizes. If they aren't there is nothing we + // can do to fix this and we won't do anything. + let cp = cp.unwrap(); + let data_meta = fs::metadata(&data_path)?; + let hashes_meta = fs::metadata(&hashes_path)?; + + if data_meta.len() >= cp.data_start_size && hashes_meta.len() >= cp.hash_start_size { + output + .report + .info("Rolling back archive to previous state..."); + + // Make sure the truncated size verifies by verifying what part we are keeping first + SlabFile::truncate(&data_path, cp.data_start_size, false)?; + SlabFile::truncate(&hashes_path, cp.hash_start_size, false)?; + + // Do the actual truncate which also fixes up the offsets file to match + SlabFile::truncate(data_path, cp.data_start_size, true)?; + output.report.progress(75); + SlabFile::truncate(hashes_path, cp.hash_start_size, true)?; + + remove_incomplete_stream(&cp)?; + output.report.progress(100); + output.report.info("Archive restored to previous state."); + } else { + return Err(anyhow!( + "We're unable to repair this archive, check point sizes > \ + current file sizes, missing data!" + )); + } + } else if repair && cp.is_some() { + remove_incomplete_stream(&cp.unwrap())?; + } + + Ok(()) +} + +#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)] +pub struct CheckPoint { + pub magic: u64, + pub version: u32, + pub data_start_size: u64, + pub hash_start_size: u64, + pub data_curr_size: u64, + pub hash_curr_size: u64, + pub input_offset: u64, + pub source_path: String, + pub stream_path: String, +} + +pub fn checkpoint_path(root: &str) -> PathBuf { + [root, "checkpoint.bin"].iter().collect() +} + +impl CheckPoint { + pub const VERSION: u32 = 0; + pub const MAGIC: u64 = 0xD00DDEAD10CCD00D; + pub const MIN_SIZE: u64 = 76; + + pub fn start( + source_path: &str, + stream_path: &str, + data_start_size: u64, + hash_start_size: u64, + ) -> Self { + CheckPoint { + magic: Self::MAGIC, + version: Self::VERSION, + data_start_size, + hash_start_size, + data_curr_size: data_start_size, + hash_curr_size: hash_start_size, + input_offset: 0, + source_path: String::from(source_path), + stream_path: String::from(stream_path), + } + } + + pub fn write(&mut self, root: &Path) -> Result<()> { + let file_name = checkpoint_path( + root.to_str() + .ok_or_else(|| anyhow!("Invalid root path {}", root.display()))?, + ); + + { + let ser = bincode::DefaultOptions::new() + .with_fixint_encoding() + .with_little_endian(); + let objbytes = ser.serialize(self)?; + let checksum = hash_64(&objbytes); + + let mut cpf = fs::OpenOptions::new() + .read(false) + .write(true) + .custom_flags(libc::O_SYNC) + .create_new(true) + .open(file_name) + .context( + "Previous pack operation interrupted, please run 'validate --repair all'", + )?; + + cpf.write_all(&objbytes)?; + cpf.write_all(checksum.as_bytes())?; + } + + // Sync containing dentry to ensure checkpoint file exists + fs::File::open(root)?.sync_all()?; + + Ok(()) + } + + fn read>(root: P) -> Result> { + let file_name = checkpoint_path(root.as_ref().to_str().unwrap()); + + let cpf = fs::OpenOptions::new() + .read(true) + .write(false) + .create_new(false) + .open(file_name); + + if cpf.is_err() { + // No checkpoint + Ok(None) + } else { + let mut cpf = cpf?; + let len = cpf.metadata()?.len(); + + if len < Self::MIN_SIZE { + return Err(anyhow!( + "Checkpoint file is too small to be valid, require {} {len} bytes", + Self::MIN_SIZE + )); + } + + let mut payload = vec![0; (len - 8) as usize]; + cpf.read_exact(&mut payload)?; + let expected_cs = cpf.read_u64::()?; + + assert!(cpf.stream_position()? == len); + + let actual_cs = u64::from_le_bytes(hash_64(&payload).into()); + if actual_cs != expected_cs { + return Err(anyhow!("Checkpoint file checksum is invalid!")); + } + + let des = bincode::DefaultOptions::new() + .with_fixint_encoding() + .with_little_endian(); + let cp: CheckPoint = des.deserialize(&payload)?; + if cp.version != Self::VERSION { + return Err(anyhow!( + "Incorrect version {} expected {}", + cp.version, + Self::VERSION + )); + } + + if cp.magic != Self::MAGIC { + return Err(anyhow!( + "Magic incorrect {} expected {}", + cp.magic, + Self::MAGIC + )); + } + Ok(Some(cp)) + } + } + + pub fn end() -> Result<()> { + let root = env::current_dir()?; + let root_str = root.clone().into_os_string().into_string().unwrap(); + let cp_file = checkpoint_path(root_str.as_str()); + + fs::remove_file(cp_file).context("error removing checkpoint file!")?; + + fs::File::open(root)?.sync_all()?; + Ok(()) + } + + pub fn interrupted() -> Result<()> { + let root = env::current_dir()?; + match Self::read(root).context("error while checking for checkpoint file!")? { + Some(cp) => Err(anyhow!( + "pack operation of {} was interrupted, run 'validate --repair all", + cp.source_path + )), + None => Ok(()), + } + } +} + +#[test] +fn check_ranges() { + let test_archive = PathBuf::from("/tmp/check_ranges"); + + std::fs::create_dir_all(&test_archive).unwrap(); + + let mut t = CheckPoint::start("/tmp/testing", "/tmp/testing/stream", u64::MAX, u64::MAX); + + let write_result = t.write(&test_archive); + assert!( + write_result.is_ok(), + "CheckPoint.write failed {:?}", + write_result.unwrap() + ); + + let read_back = CheckPoint::read(&test_archive).unwrap(); + + assert!(std::fs::remove_dir_all(&test_archive).is_ok()); + + assert!( + read_back.is_some(), + "CheckPoint::read error {:?}", + read_back.unwrap() + ); + + assert!(read_back.unwrap() == t); +} + +#[test] +fn check_small() { + let test_archive = PathBuf::from("/tmp/check_small"); + + std::fs::create_dir_all(&test_archive).unwrap(); + + let mut t = CheckPoint::start("", "", u64::MAX, u64::MAX); + + let write_result = t.write(&test_archive); + assert!( + write_result.is_ok(), + "CheckPoint.write failed {:?}", + write_result.unwrap() + ); + + let read_back = CheckPoint::read(&test_archive).unwrap(); + + assert!(std::fs::remove_dir_all(&test_archive).is_ok()); + + assert!( + read_back.is_some(), + "CheckPoint::read error {:?}", + read_back.unwrap() + ); + + assert!(read_back.unwrap() == t); +} + +#[test] +fn check_fields() { + let test_archive = PathBuf::from("/tmp/check_fields"); + std::fs::create_dir_all(&test_archive).unwrap(); + + let mut t = CheckPoint::start("source/path", "stream/path/yes", 1024, 384); + + t.data_curr_size = t.data_start_size * 2; + t.hash_curr_size = t.hash_start_size * 2; + t.input_offset = 1024 * 1024 * 1024; + + let write_result = t.write(&test_archive); + assert!( + write_result.is_ok(), + "CheckPoint.write failed {:?}", + write_result.unwrap() + ); + + let read_back = CheckPoint::read(&test_archive).unwrap(); + + assert!(std::fs::remove_dir_all(&test_archive).is_ok()); + + assert!( + read_back.is_some(), + "CheckPoint::read error {:?}", + read_back.unwrap() + ); + + assert!(read_back.unwrap() == t); +} diff --git a/src/cuckoo_filter.rs b/src/cuckoo_filter.rs index 1663afe..ae10a9b 100644 --- a/src/cuckoo_filter.rs +++ b/src/cuckoo_filter.rs @@ -37,7 +37,7 @@ pub enum InsertResult { } pub struct CuckooFilter { - rng: ChaCha20Rng, + rng: Box, len: usize, scatter: Vec, bucket_counts: Vec, @@ -95,7 +95,8 @@ pub fn calculate_signature(values: &[usize]) -> u64 { } impl CuckooFilter { - fn make_scatter(rng: &mut ChaCha20Rng) -> Vec { + fn make_scatter() -> (Box, Vec) { + let mut rng = Box::new(ChaCha20Rng::seed_from_u64(1)); let scatter: Vec = repeat_with(|| rng.gen()) .take(u16::MAX as usize + 1) .collect(); @@ -105,15 +106,14 @@ impl CuckooFilter { // versions/time assert!(4224213928824907068 == calculate_signature(scatter.as_slice())); - scatter + (rng, scatter) } pub fn with_capacity(mut n: usize) -> Self { n = (n * 5) / 4; n /= ENTRIES_PER_BUCKET; - let mut rng = ChaCha20Rng::seed_from_u64(1); let nr_buckets = cmp::max(n, 4096).next_power_of_two(); - let scatter = Self::make_scatter(&mut rng); + let (rng, scatter) = Self::make_scatter(); Self { rng, len: 0, @@ -129,8 +129,6 @@ impl CuckooFilter { let mut file = SlabFileBuilder::open(path).build()?; let input = file.read(0)?; - let mut rng = ChaCha20Rng::seed_from_u64(1); - let (input, nr_buckets) = parse_nr(&input[..]).map_err(|_| anyhow!("couldn't parse nr"))?; let nr_buckets = nr_buckets as usize; let (input, bucket_counts) = @@ -143,7 +141,7 @@ impl CuckooFilter { return Err(anyhow!("extra bytes at end of index file")); } - let scatter = Self::make_scatter(&mut rng); + let (rng, scatter) = Self::make_scatter(); if !is_pow2(nr_buckets) { return Err(anyhow!("nr_buckets({nr_buckets}) is not a power of 2")); } diff --git a/src/dump_stream.rs b/src/dump_stream.rs index 2a34195..adc3af6 100644 --- a/src/dump_stream.rs +++ b/src/dump_stream.rs @@ -4,6 +4,7 @@ use std::env; use std::path::Path; use std::sync::Arc; +use crate::check::*; use crate::output::Output; use crate::stream::*; @@ -14,6 +15,7 @@ pub fn run(matches: &ArgMatches, output: Arc) -> Result<()> { let stream = matches.value_of("STREAM").unwrap(); env::set_current_dir(archive_dir)?; + CheckPoint::interrupted()?; let mut d = Dumper::new(stream)?; d.dump(output) diff --git a/src/hash.rs b/src/hash.rs index a5211af..401f2b7 100644 --- a/src/hash.rs +++ b/src/hash.rs @@ -54,4 +54,15 @@ pub fn hash_32(v: &[u8]) -> Hash32 { hasher.finalize() } +pub fn hash_256_hash_64_iov(iov: &IoVec) -> (Hash256, u64) { + let h = hash_256_iov(iov); + let mini_hash = hash_64(&h[..]); + (h, u64::from_le_bytes(mini_hash.into())) +} +pub fn hash_256_hash_64(v: &[u8]) -> (Hash256, u64) { + let h = hash_256(v); + let mini_hash = hash_64(&h[..]); + (h, u64::from_le_bytes(mini_hash.into())) +} + //----------------------------------------- diff --git a/src/lib.rs b/src/lib.rs index 20842f9..1c273ea 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +pub mod check; pub mod chunkers; pub mod config; pub mod content_sensitive_splitter; diff --git a/src/list.rs b/src/list.rs index 7eea590..0abcb53 100644 --- a/src/list.rs +++ b/src/list.rs @@ -9,6 +9,7 @@ use std::fs; use std::path::Path; use std::sync::Arc; +use crate::check::*; use crate::config; use crate::output::Output; @@ -22,6 +23,7 @@ pub fn run(matches: &ArgMatches, output: Arc) -> Result<()> { let archive_dir = Path::new(matches.value_of("ARCHIVE").unwrap()).canonicalize()?; env::set_current_dir(&archive_dir)?; + CheckPoint::interrupted()?; let paths = fs::read_dir(Path::new("./streams"))?; let stream_ids = paths diff --git a/src/main.rs b/src/main.rs index c8e823e..5baec9b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,11 @@ use anyhow::Result; -use clap::{command, Arg, ArgMatches, Command}; +use clap::{command, Arg, ArgMatches, Command, SubCommand}; use std::env; use std::process::exit; use std::sync::Arc; use thinp::report::*; +use blk_archive::check; use blk_archive::create; use blk_archive::dump_stream; use blk_archive::list; @@ -64,6 +65,37 @@ fn main_() -> Result<()> { .value_name("JSON") .takes_value(false); + let repair: Arg = Arg::new("REPAIR") + .help("Attempts a repair of an archive") + .required(false) + .long("repair") + .short('r') + .value_name("REPAIR") + .takes_value(false); + + let validate_operations = SubCommand::with_name("validate") + .about("Validate operations") + .arg_required_else_help(true) + .arg(repair.clone()) + .subcommand( + SubCommand::with_name("all") + .help_template( + "Validates the archive and optionally repairs \n\ + corruption that may have occurred during interrupted 'pack' operation.\n\ + Note: Data for previous interrupted 'pack' operation will be lost. \n\ + \nOPTIONS:\n{options}\n USAGE:\n\t{usage}", + ) + .about("Validates an archive") + .arg(archive_arg.clone()), + ) + .subcommand( + SubCommand::with_name("stream") + .help("Validates an individual stream") + .about("Validates an individual stream") + .arg(stream_arg.clone()) + .arg(archive_arg.clone()), + ); + let matches = command!() .arg(json) .propagate_version(true) @@ -184,6 +216,7 @@ fn main_() -> Result<()> { .about("lists the streams in the archive") .arg(archive_arg.clone()), ) + .subcommand(validate_operations) .get_matches(); let report = mk_report(&matches); @@ -211,6 +244,17 @@ fn main_() -> Result<()> { Some(("dump-stream", sub_matches)) => { dump_stream::run(sub_matches, output)?; } + Some(("validate", sub_matches)) => match sub_matches.subcommand() { + Some(("all", args)) => { + // Repair is an argument on the validate commands, not the all. + // Maybe we can figure out how to make it an option of all instead? + check::run(args, output, sub_matches.is_present("REPAIR"))?; + } + Some(("stream", args)) => { + unpack::run_verify(args, report)?; + } + _ => unreachable!("Exhauted list of validate sub commands"), + }, _ => unreachable!("Exhausted list of subcommands and subcommand_required prevents 'None'"), } diff --git a/src/pack.rs b/src/pack.rs index d855fa9..5bdb39b 100644 --- a/src/pack.rs +++ b/src/pack.rs @@ -1,5 +1,4 @@ use anyhow::{anyhow, Context, Result}; -use byteorder::{LittleEndian, ReadBytesExt}; use chrono::prelude::*; use clap::ArgMatches; use io::Write; @@ -12,11 +11,11 @@ use std::boxed::Box; use std::env; use std::fs::OpenOptions; use std::io; -use std::io::Cursor; use std::num::NonZeroUsize; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; +use crate::check::*; use crate::chunkers::*; use crate::config; use crate::content_sensitive_splitter::*; @@ -162,10 +161,7 @@ impl DedupHandler { let buf = hashes_file.read(s as u32)?; let hi = ByHash::new(buf)?; for i in 0..hi.len() { - let h = hi.get(i); - let mini_hash = hash_64(&h[..]); - let mut c = Cursor::new(&mini_hash); - let mini_hash = c.read_u64::()?; + let (_, mini_hash) = hash_256_hash_64(hi.get(i)); seen.test_and_set(mini_hash, s as u32)?; } } @@ -285,11 +281,7 @@ impl IoVecHandler for DedupHandler { )?; self.maybe_complete_stream()?; } else { - let h = hash_256_iov(iov); - let mini_hash = hash_64(&h); - let mut c = Cursor::new(&mini_hash); - let mini_hash = c.read_u64::()?; - + let (h, mini_hash) = hash_256_hash_64_iov(iov); let me: MapEntry; match self.seen.test_and_set(mini_hash, self.current_slab)? { InsertResult::Inserted => { @@ -432,8 +424,23 @@ impl Packer { hashes_file.get_file_size() }; + let input_name_string = self + .input_path + .clone() + .into_os_string() + .into_string() + .unwrap(); + let (stream_id, mut stream_path) = new_stream_path()?; + CheckPoint::start( + input_name_string.as_str(), + stream_path.as_os_str().to_str().unwrap(), + data_size, + hashes_size, + ) + .write(env::current_dir()?.as_path())?; + std::fs::create_dir(stream_path.clone())?; stream_path.push("stream"); @@ -728,6 +735,8 @@ pub fn run(matches: &ArgMatches, output: Arc) -> Result<()> { env::set_current_dir(archive_dir)?; let config = config::read_config(".")?; + CheckPoint::interrupted()?; + output .report .set_title(&format!("Building packer {} ...", input_file.display())); @@ -759,7 +768,13 @@ pub fn run(matches: &ArgMatches, output: Arc) -> Result<()> { output .report .set_title(&format!("Packing {} ...", input_file.display())); - packer.pack(hashes_file) + packer.pack(hashes_file)?; + // The packer.pack needs to return before the Drop(s) get called ensuring the output files have + // been flushed before we can end our pack check point. It's entirely possible that the data + // is safely written to disk and we exit before we remove the checkpoint file, which at worst + // case would cause us to throw away the last pack operation. + CheckPoint::end()?; + Ok(()) } //----------------------------------------- diff --git a/src/slab.rs b/src/slab.rs index 249467c..b17058b 100644 --- a/src/slab.rs +++ b/src/slab.rs @@ -8,6 +8,7 @@ use std::path::{Path, PathBuf}; use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use std::sync::{Arc, Mutex}; use std::thread; +use std::time::SystemTime; use threadpool::ThreadPool; use crate::hash::*; @@ -71,7 +72,7 @@ struct SlabOffsets { } impl SlabOffsets { - fn read_offset_file>(p: P) -> Result { + fn read_offset_file>(p: P, data_mtime: Option) -> Result { let r = OpenOptions::new() .read(true) .write(false) @@ -79,6 +80,17 @@ impl SlabOffsets { .open(p) .context("opening offset file")?; + let mtime = r.metadata()?.modified()?; + + if let Some(file_mtime) = data_mtime { + if file_mtime > mtime { + return Err(anyhow!( + "Offsets file modification time is older than slab data file \ + run blk-archive 'validate --repair all' to correct!", + )); + } + } + let len = r.metadata().context("offset metadata")?.len(); let mut r = std::io::BufReader::new(r); let nr_entries = len / std::mem::size_of::() as u64; @@ -124,6 +136,8 @@ impl SlabOffsets { const FILE_MAGIC: u64 = 0xb927f96a6b611180; const SLAB_MAGIC: u64 = 0x20565137a3100a7c; +const SLAB_FILE_HDR_LEN: u64 = 16; +const SLAB_HDR_LEN: u64 = 24; const FORMAT_VERSION: u32 = 0; @@ -215,6 +229,12 @@ fn writer_(shared: Arc>, rx: Receiver) -> Result<()> } assert!(queued.is_empty()); + + { + let mut shared = shared.lock().unwrap(); + shared.data.flush()?; + } + Ok(()) } @@ -345,8 +365,10 @@ impl SlabFile { (None, tx) }; - let offsets = SlabOffsets::read_offset_file(&offsets_path)?; let file_size = data.metadata()?.len(); + let file_mtime = data.metadata()?.modified()?; + let offsets = SlabOffsets::read_offset_file(&offsets_path, Some(file_mtime))?; + let shared = Arc::new(Mutex::new(SlabShared { data, offsets, @@ -383,8 +405,10 @@ impl SlabFile { let compressed = flags == 1; let compressor = None; - let offsets = SlabOffsets::read_offset_file(&offsets_path)?; let file_size = data.metadata()?.len(); + let data_mtime = data.metadata()?.modified()?; + let offsets = SlabOffsets::read_offset_file(&offsets_path, Some(data_mtime))?; + let shared = Arc::new(Mutex::new(SlabShared { data, offsets, @@ -403,6 +427,156 @@ impl SlabFile { }) } + fn verify_>(data_path: P, check_len: Option) -> Result { + let slab_name = data_path.as_ref().to_path_buf(); + + let mut data = OpenOptions::new() + .read(true) + .write(false) + .create(false) + .open(data_path)?; + + let file_size = match check_len { + Some(len) => len, + None => data.metadata()?.len(), + }; + + if file_size < SLAB_FILE_HDR_LEN { + return Err(anyhow!( + "slab file {} isn't large enough to be a slab file, size = {file_size} bytes.", + slab_name.to_string_lossy() + )); + } + + let mut so = SlabOffsets::default(); + let _flags = read_slab_header(&mut data)?; + + // We don't have any additional data in an empty archive + let mut curr_offset = data.stream_position()?; + assert!(curr_offset == SLAB_FILE_HDR_LEN); + + if curr_offset == file_size { + return Ok(so); + } + + so.offsets.push(SLAB_FILE_HDR_LEN); + let mut slab_index = 0; + + loop { + let remaining = file_size - curr_offset; + if remaining < SLAB_HDR_LEN { + return Err(anyhow!( + "Slab {slab_index} is incomplete, not enough remaining for header, \ + {remaining} remaining bytes." + )); + } + + let magic = data.read_u64::()?; + let len = data.read_u64::()?; + + if magic != SLAB_MAGIC { + return Err(anyhow!( + "slab magic incorrect for slab {slab_index} for file {}", + slab_name.to_string_lossy() + )); + } + + let mut expected_csum: Hash64 = Hash64::default(); + data.read_exact(&mut expected_csum)?; + + if remaining < SLAB_HDR_LEN + len { + return Err(anyhow!( + "Slab {slab_index} is incomplete, payload is truncated, \ + needing {}, remaining {remaining}", + SLAB_HDR_LEN + len + )); + } + + let mut buf = vec![0; len as usize]; + data.read_exact(&mut buf)?; + + let actual_csum = hash_64(&buf); + if actual_csum != expected_csum { + return Err(anyhow!( + "slab {slab_index} checksum incorrect for file {}!", + slab_name.to_string_lossy() + )); + } + + curr_offset = data.stream_position()?; + if curr_offset == file_size { + break; + } + + slab_index += 1; + so.offsets.push(curr_offset); + } + + Ok(so) + } + + pub fn verify>(data_path: P, repair: bool) -> Result { + let data_mtime = std::fs::metadata(data_path.as_ref().clone())?.modified()?; + let offsets_path = offsets_path(&data_path); + + let actual_offsets = Self::verify_(data_path, None)?; + let stored_offsets = SlabOffsets::read_offset_file(&offsets_path, None)?; + + let actual_count = actual_offsets.offsets.len(); + let stored_count = stored_offsets.offsets.len(); + let offsets_mtime = std::fs::metadata(offsets_path.clone())?.modified()?; + + let mut result = if actual_count != stored_count { + Err(anyhow!( + "Offset file {} has incorrect number of entries {} != {}", + offsets_path.to_string_lossy(), + actual_count, + stored_count + )) + } else if actual_offsets.offsets != stored_offsets.offsets { + Err(anyhow!( + "Stored offset values do not match calculated for file {}", + offsets_path.to_string_lossy() + )) + } else if data_mtime > offsets_mtime { + Err(anyhow!( + "Offsets file modification time is older than slab data file \ + run blk-archive 'validate --repair all' to correct!", + )) + } else { + Ok(actual_count) + }; + + if result.is_err() && repair { + actual_offsets.write_offset_file(offsets_path)?; + result = Ok(actual_count) + } + + result + } + + pub fn truncate>(data_path: P, len: u64, do_truncate: bool) -> Result<()> { + if !do_truncate { + Self::verify_(data_path, Some(len))?; + Ok(()) + } else { + let offsets_file = offsets_path(&data_path); + { + let mut data = OpenOptions::new() + .read(true) + .write(true) + .create(false) + .open(data_path.as_ref().clone())?; + data.set_len(len)?; + data.flush()?; + } + + let slab_offsets = Self::verify_(data_path, None)?; + slab_offsets.write_offset_file(offsets_file)?; + Ok(()) + } + } + pub fn close(&mut self) -> Result<()> { self.tx = None; let mut tid = None; diff --git a/src/unpack.rs b/src/unpack.rs index 8cf1cae..fe7e952 100644 --- a/src/unpack.rs +++ b/src/unpack.rs @@ -11,6 +11,7 @@ use std::path::Path; use std::sync::Arc; use thinp::report::*; +use crate::check::*; use crate::chunkers::*; use crate::config; use crate::hash_index::*; @@ -442,6 +443,9 @@ pub fn run_unpack(matches: &ArgMatches, report: Arc) -> Result<()> { let stream = matches.value_of("STREAM").unwrap(); let create = matches.is_present("CREATE"); + env::set_current_dir(archive_dir)?; + CheckPoint::interrupted()?; + let output = if create { fs::OpenOptions::new() .read(false) @@ -456,7 +460,6 @@ pub fn run_unpack(matches: &ArgMatches, report: Arc) -> Result<()> { .open(output_file) .context("Couldn't open output")? }; - env::set_current_dir(archive_dir)?; report.set_title(&format!("Unpacking {} ...", output_file.display())); if create { diff --git a/src/utils.rs b/src/utils.rs index 15d7b2d..d15152d 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,3 +1,13 @@ +#[macro_export] +macro_rules! tpln { + () => { + print!("\n") + }; + ($($arg:tt)*) => {{ + println!("{:?}[{}:{}] {}", ::std::thread::current().id(), file!(), line!(), format!($($arg)*)); + }}; +} + pub fn round_pow2(i: u32) -> u64 { // Round up to the next power of 2 // https://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2