From 39e4afaed1efff87e380ebf654910dcbbb9bae11 Mon Sep 17 00:00:00 2001 From: Tony Asleson Date: Fri, 22 Sep 2023 12:05:05 -0500 Subject: [PATCH 01/13] Create function to create 256 & 64 bit hash tuple Removes some duplicated code. Signed-off-by: Tony Asleson --- src/hash.rs | 11 +++++++++++ src/pack.rs | 13 ++----------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/src/hash.rs b/src/hash.rs index a5211af..401f2b7 100644 --- a/src/hash.rs +++ b/src/hash.rs @@ -54,4 +54,15 @@ pub fn hash_32(v: &[u8]) -> Hash32 { hasher.finalize() } +pub fn hash_256_hash_64_iov(iov: &IoVec) -> (Hash256, u64) { + let h = hash_256_iov(iov); + let mini_hash = hash_64(&h[..]); + (h, u64::from_le_bytes(mini_hash.into())) +} +pub fn hash_256_hash_64(v: &[u8]) -> (Hash256, u64) { + let h = hash_256(v); + let mini_hash = hash_64(&h[..]); + (h, u64::from_le_bytes(mini_hash.into())) +} + //----------------------------------------- diff --git a/src/pack.rs b/src/pack.rs index d855fa9..7468d97 100644 --- a/src/pack.rs +++ b/src/pack.rs @@ -1,5 +1,4 @@ use anyhow::{anyhow, Context, Result}; -use byteorder::{LittleEndian, ReadBytesExt}; use chrono::prelude::*; use clap::ArgMatches; use io::Write; @@ -12,7 +11,6 @@ use std::boxed::Box; use std::env; use std::fs::OpenOptions; use std::io; -use std::io::Cursor; use std::num::NonZeroUsize; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; @@ -162,10 +160,7 @@ impl DedupHandler { let buf = hashes_file.read(s as u32)?; let hi = ByHash::new(buf)?; for i in 0..hi.len() { - let h = hi.get(i); - let mini_hash = hash_64(&h[..]); - let mut c = Cursor::new(&mini_hash); - let mini_hash = c.read_u64::()?; + let (_, mini_hash) = hash_256_hash_64(hi.get(i)); seen.test_and_set(mini_hash, s as u32)?; } } @@ -285,11 +280,7 @@ impl IoVecHandler for DedupHandler { )?; self.maybe_complete_stream()?; } else { - let h = hash_256_iov(iov); - let mini_hash = hash_64(&h); - let mut c = Cursor::new(&mini_hash); - let mini_hash = c.read_u64::()?; - + let (h, mini_hash) = hash_256_hash_64_iov(iov); let me: MapEntry; match self.seen.test_and_set(mini_hash, self.current_slab)? { InsertResult::Inserted => { From 7f2e62c7af5426c4791947663e8a562b20c7b0e7 Mon Sep 17 00:00:00 2001 From: Tony Asleson Date: Tue, 26 Sep 2023 12:00:45 -0500 Subject: [PATCH 02/13] Cuckoo_filter: Create ChaCha20Rng in one place Signed-off-by: Tony Asleson --- src/cuckoo_filter.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/cuckoo_filter.rs b/src/cuckoo_filter.rs index 1663afe..ae10a9b 100644 --- a/src/cuckoo_filter.rs +++ b/src/cuckoo_filter.rs @@ -37,7 +37,7 @@ pub enum InsertResult { } pub struct CuckooFilter { - rng: ChaCha20Rng, + rng: Box, len: usize, scatter: Vec, bucket_counts: Vec, @@ -95,7 +95,8 @@ pub fn calculate_signature(values: &[usize]) -> u64 { } impl CuckooFilter { - fn make_scatter(rng: &mut ChaCha20Rng) -> Vec { + fn make_scatter() -> (Box, Vec) { + let mut rng = Box::new(ChaCha20Rng::seed_from_u64(1)); let scatter: Vec = repeat_with(|| rng.gen()) .take(u16::MAX as usize + 1) .collect(); @@ -105,15 +106,14 @@ impl CuckooFilter { // versions/time assert!(4224213928824907068 == calculate_signature(scatter.as_slice())); - scatter + (rng, scatter) } pub fn with_capacity(mut n: usize) -> Self { n = (n * 5) / 4; n /= ENTRIES_PER_BUCKET; - let mut rng = ChaCha20Rng::seed_from_u64(1); let nr_buckets = cmp::max(n, 4096).next_power_of_two(); - let scatter = Self::make_scatter(&mut rng); + let (rng, scatter) = Self::make_scatter(); Self { rng, len: 0, @@ -129,8 +129,6 @@ impl CuckooFilter { let mut file = SlabFileBuilder::open(path).build()?; let input = file.read(0)?; - let mut rng = ChaCha20Rng::seed_from_u64(1); - let (input, nr_buckets) = parse_nr(&input[..]).map_err(|_| anyhow!("couldn't parse nr"))?; let nr_buckets = nr_buckets as usize; let (input, bucket_counts) = @@ -143,7 +141,7 @@ impl CuckooFilter { return Err(anyhow!("extra bytes at end of index file")); } - let scatter = Self::make_scatter(&mut rng); + let (rng, scatter) = Self::make_scatter(); if !is_pow2(nr_buckets) { return Err(anyhow!("nr_buckets({nr_buckets}) is not a power of 2")); } From efebb52da0842757c378c3c9fa24a5b94083cc20 Mon Sep 17 00:00:00 2001 From: Tony Asleson Date: Wed, 4 Oct 2023 12:35:07 -0500 Subject: [PATCH 03/13] utils: Add macro for debug Signed-off-by: Tony Asleson --- src/utils.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/utils.rs b/src/utils.rs index 15d7b2d..d15152d 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,3 +1,13 @@ +#[macro_export] +macro_rules! tpln { + () => { + print!("\n") + }; + ($($arg:tt)*) => {{ + println!("{:?}[{}:{}] {}", ::std::thread::current().id(), file!(), line!(), format!($($arg)*)); + }}; +} + pub fn round_pow2(i: u32) -> u64 { // Round up to the next power of 2 // https://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2 From c82b783651a7a59ee08cbe24a1bf6305ef680272 Mon Sep 17 00:00:00 2001 From: Tony Asleson Date: Wed, 4 Oct 2023 12:51:38 -0500 Subject: [PATCH 04/13] Add a "verify-all" command Currently just checks the data slab & offsets file and the hashes slab & offsets file. Signed-off-by: Tony Asleson --- src/check.rs | 28 ++++++++++++++++ src/lib.rs | 1 + src/main.rs | 9 ++++++ src/slab.rs | 90 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 128 insertions(+) create mode 100644 src/check.rs diff --git a/src/check.rs b/src/check.rs new file mode 100644 index 0000000..1e12ce4 --- /dev/null +++ b/src/check.rs @@ -0,0 +1,28 @@ +use std::env; +use std::path::Path; +use std::sync::Arc; + +use anyhow::{anyhow, Result}; +use clap::ArgMatches; + +use crate::output::Output; +use crate::paths; +use crate::slab::*; + +pub fn run(matches: &ArgMatches, _output: Arc) -> Result<()> { + let archive_dir = Path::new(matches.value_of("ARCHIVE").unwrap()).canonicalize()?; + + env::set_current_dir(archive_dir.clone())?; + + let data_path = paths::data_path(); + let hashes_path = paths::hashes_path(); + + let num_data_slabs = SlabFile::verify(data_path.clone())?; + let num_hash_slabs = SlabFile::verify(hashes_path.clone())?; + + if num_data_slabs != num_hash_slabs { + return Err(anyhow!("Number of slab entries in data slab {num_data_slabs} != {num_hash_slabs} in hashes file!")); + } + + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs index 20842f9..1c273ea 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +pub mod check; pub mod chunkers; pub mod config; pub mod content_sensitive_splitter; diff --git a/src/main.rs b/src/main.rs index c8e823e..32026e7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,6 +5,7 @@ use std::process::exit; use std::sync::Arc; use thinp::report::*; +use blk_archive::check; use blk_archive::create; use blk_archive::dump_stream; use blk_archive::list; @@ -184,6 +185,11 @@ fn main_() -> Result<()> { .about("lists the streams in the archive") .arg(archive_arg.clone()), ) + .subcommand( + Command::new("verify-all") + .about("verifies the integrity of the archive") + .arg(archive_arg.clone()), + ) .get_matches(); let report = mk_report(&matches); @@ -211,6 +217,9 @@ fn main_() -> Result<()> { Some(("dump-stream", sub_matches)) => { dump_stream::run(sub_matches, output)?; } + Some(("verify-all", sub_matches)) => { + check::run(sub_matches, output)?; + } _ => unreachable!("Exhausted list of subcommands and subcommand_required prevents 'None'"), } diff --git a/src/slab.rs b/src/slab.rs index 249467c..a38862d 100644 --- a/src/slab.rs +++ b/src/slab.rs @@ -124,6 +124,7 @@ impl SlabOffsets { const FILE_MAGIC: u64 = 0xb927f96a6b611180; const SLAB_MAGIC: u64 = 0x20565137a3100a7c; +const SLAB_HDR_LEN: u64 = 16; const FORMAT_VERSION: u32 = 0; @@ -403,6 +404,95 @@ impl SlabFile { }) } + fn verify_>(data_path: P) -> Result { + let slab_name = data_path.as_ref().to_path_buf(); + + let mut data = OpenOptions::new() + .read(true) + .write(false) + .create(false) + .open(data_path)?; + + let mut so = SlabOffsets::default(); + let _flags = read_slab_header(&mut data)?; + let file_size = data.metadata()?.len(); + + data.seek(SeekFrom::Start(SLAB_HDR_LEN))?; + + // We don't have any additional data in an empty archive + if data.stream_position()? == file_size { + return Ok(so); + } + + so.offsets.push(SLAB_HDR_LEN); + let mut slab_index = 0; + + loop { + let magic = data.read_u64::()?; + let len = data.read_u64::()?; + + if magic != SLAB_MAGIC { + return Err(anyhow!( + "slab magic incorrect for slab {slab_index} for file {}", + slab_name.to_string_lossy() + )); + } + + let mut expected_csum: Hash64 = Hash64::default(); + data.read_exact(&mut expected_csum)?; + + let mut buf = vec![0; len as usize]; + data.read_exact(&mut buf)?; + + let actual_csum = hash_64(&buf); + if actual_csum != expected_csum { + return Err(anyhow!( + "slab {slab_index} checksum incorrect for file {}!", + slab_name.to_string_lossy() + )); + } + + let curr_offset = data.stream_position()?; + + if curr_offset == file_size { + break; + } + + slab_index += 1; + so.offsets.push(curr_offset); + } + + Ok(so) + } + + pub fn verify>(data_path: P) -> Result { + let offsets_path = offsets_path(&data_path); + + let actual_offsets = Self::verify_(data_path)?; + let stored_offsets = SlabOffsets::read_offset_file(&offsets_path)?; + + let actual_count = actual_offsets.offsets.len(); + let stored_count = stored_offsets.offsets.len(); + + if actual_count != stored_count { + return Err(anyhow!( + "Offset file {} has incorrect number of entries {} != {}", + offsets_path.to_string_lossy(), + actual_count, + stored_count + )); + } + + if actual_offsets.offsets != stored_offsets.offsets { + return Err(anyhow!( + "Stored offset values do not match calculated for file {}", + offsets_path.to_string_lossy() + )); + } + + Ok(actual_count) + } + pub fn close(&mut self) -> Result<()> { self.tx = None; let mut tid = None; From 207a90ba484607ec5dd5857a20eb2adae2ce5d56 Mon Sep 17 00:00:00 2001 From: Tony Asleson Date: Thu, 5 Oct 2023 11:29:48 -0500 Subject: [PATCH 05/13] Add repair option to verify-all At the moment we only fix the offsets file for the slab. Signed-off-by: Tony Asleson --- src/check.rs | 10 +++++++--- src/main.rs | 11 ++++++++++- src/slab.rs | 56 +++++++++++++++++++++++++++++++++++++++------------- 3 files changed, 59 insertions(+), 18 deletions(-) diff --git a/src/check.rs b/src/check.rs index 1e12ce4..bca023a 100644 --- a/src/check.rs +++ b/src/check.rs @@ -11,17 +11,21 @@ use crate::slab::*; pub fn run(matches: &ArgMatches, _output: Arc) -> Result<()> { let archive_dir = Path::new(matches.value_of("ARCHIVE").unwrap()).canonicalize()?; + let repair = matches.is_present("REPAIR"); env::set_current_dir(archive_dir.clone())?; let data_path = paths::data_path(); let hashes_path = paths::hashes_path(); - let num_data_slabs = SlabFile::verify(data_path.clone())?; - let num_hash_slabs = SlabFile::verify(hashes_path.clone())?; + let num_data_slabs = SlabFile::verify(data_path.clone(), repair)?; + let num_hash_slabs = SlabFile::verify(hashes_path.clone(), repair)?; if num_data_slabs != num_hash_slabs { - return Err(anyhow!("Number of slab entries in data slab {num_data_slabs} != {num_hash_slabs} in hashes file!")); + return Err(anyhow!( + "Number of slab entries in data slab {num_data_slabs} \ + != {num_hash_slabs} in hashes file!" + )); } Ok(()) diff --git a/src/main.rs b/src/main.rs index 32026e7..4cca5aa 100644 --- a/src/main.rs +++ b/src/main.rs @@ -65,6 +65,14 @@ fn main_() -> Result<()> { .value_name("JSON") .takes_value(false); + let repair: Arg = Arg::new("REPAIR") + .help("repairs an archive") + .required(false) + .long("repair") + .short('r') + .value_name("REPAIR") + .takes_value(false); + let matches = command!() .arg(json) .propagate_version(true) @@ -188,7 +196,8 @@ fn main_() -> Result<()> { .subcommand( Command::new("verify-all") .about("verifies the integrity of the archive") - .arg(archive_arg.clone()), + .arg(archive_arg.clone()) + .arg(repair.clone()), ) .get_matches(); diff --git a/src/slab.rs b/src/slab.rs index a38862d..528d251 100644 --- a/src/slab.rs +++ b/src/slab.rs @@ -8,6 +8,7 @@ use std::path::{Path, PathBuf}; use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use std::sync::{Arc, Mutex}; use std::thread; +use std::time::SystemTime; use threadpool::ThreadPool; use crate::hash::*; @@ -71,7 +72,7 @@ struct SlabOffsets { } impl SlabOffsets { - fn read_offset_file>(p: P) -> Result { + fn read_offset_file>(p: P, data_mtime: Option) -> Result { let r = OpenOptions::new() .read(true) .write(false) @@ -79,6 +80,17 @@ impl SlabOffsets { .open(p) .context("opening offset file")?; + let mtime = r.metadata()?.modified()?; + + if let Some(file_mtime) = data_mtime { + if file_mtime > mtime { + return Err(anyhow!( + "Offsets file modification time is older than slab data file \ + run blk-archive 'verify-all --repair' to correct!", + )); + } + } + let len = r.metadata().context("offset metadata")?.len(); let mut r = std::io::BufReader::new(r); let nr_entries = len / std::mem::size_of::() as u64; @@ -346,8 +358,10 @@ impl SlabFile { (None, tx) }; - let offsets = SlabOffsets::read_offset_file(&offsets_path)?; let file_size = data.metadata()?.len(); + let file_mtime = data.metadata()?.modified()?; + let offsets = SlabOffsets::read_offset_file(&offsets_path, Some(file_mtime))?; + let shared = Arc::new(Mutex::new(SlabShared { data, offsets, @@ -384,8 +398,10 @@ impl SlabFile { let compressed = flags == 1; let compressor = None; - let offsets = SlabOffsets::read_offset_file(&offsets_path)?; let file_size = data.metadata()?.len(); + let data_mtime = data.metadata()?.modified()?; + let offsets = SlabOffsets::read_offset_file(&offsets_path, Some(data_mtime))?; + let shared = Arc::new(Mutex::new(SlabShared { data, offsets, @@ -465,32 +481,44 @@ impl SlabFile { Ok(so) } - pub fn verify>(data_path: P) -> Result { + pub fn verify>(data_path: P, repair: bool) -> Result { + let data_mtime = std::fs::metadata(data_path.as_ref().clone())?.modified()?; let offsets_path = offsets_path(&data_path); let actual_offsets = Self::verify_(data_path)?; - let stored_offsets = SlabOffsets::read_offset_file(&offsets_path)?; + let stored_offsets = SlabOffsets::read_offset_file(&offsets_path, None)?; let actual_count = actual_offsets.offsets.len(); let stored_count = stored_offsets.offsets.len(); + let offsets_mtime = std::fs::metadata(offsets_path.clone())?.modified()?; - if actual_count != stored_count { - return Err(anyhow!( + let mut result = if actual_count != stored_count { + Err(anyhow!( "Offset file {} has incorrect number of entries {} != {}", offsets_path.to_string_lossy(), actual_count, stored_count - )); - } - - if actual_offsets.offsets != stored_offsets.offsets { - return Err(anyhow!( + )) + } else if actual_offsets.offsets != stored_offsets.offsets { + Err(anyhow!( "Stored offset values do not match calculated for file {}", offsets_path.to_string_lossy() - )); + )) + } else if data_mtime > offsets_mtime { + Err(anyhow!( + "Offsets file modification time is older than slab data file \ + run blk-archive 'verify-all --repair' to correct!", + )) + } else { + Ok(actual_count) + }; + + if result.is_err() && repair { + actual_offsets.write_offset_file(offsets_path)?; + result = Ok(actual_count) } - Ok(actual_count) + result } pub fn close(&mut self) -> Result<()> { From 4b1873bf26d83706e01bc532e4e8031cda2f3619 Mon Sep 17 00:00:00 2001 From: Tony Asleson Date: Fri, 6 Oct 2023 14:34:20 -0500 Subject: [PATCH 06/13] slab: Flush when exiting writer thread Signed-off-by: Tony Asleson --- src/slab.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/slab.rs b/src/slab.rs index 528d251..c10261c 100644 --- a/src/slab.rs +++ b/src/slab.rs @@ -228,6 +228,12 @@ fn writer_(shared: Arc>, rx: Receiver) -> Result<()> } assert!(queued.is_empty()); + + { + let mut shared = shared.lock().unwrap(); + shared.data.flush()?; + } + Ok(()) } From 9a49b0217727bbcee7a48a2ec3763274bc271483 Mon Sep 17 00:00:00 2001 From: Tony Asleson Date: Fri, 6 Oct 2023 14:43:12 -0500 Subject: [PATCH 07/13] slab verify: Handle truncated slab file When you read off of the end of the stream we get a cryptic error "failed to fill whole buffer". Before issuing a read, make sure we have enough data to fill the request. Signed-off-by: Tony Asleson --- src/slab.rs | 41 +++++++++++++++++++++++++++++++++-------- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/src/slab.rs b/src/slab.rs index c10261c..12af52a 100644 --- a/src/slab.rs +++ b/src/slab.rs @@ -136,7 +136,8 @@ impl SlabOffsets { const FILE_MAGIC: u64 = 0xb927f96a6b611180; const SLAB_MAGIC: u64 = 0x20565137a3100a7c; -const SLAB_HDR_LEN: u64 = 16; +const SLAB_FILE_HDR_LEN: u64 = 16; +const SLAB_HDR_LEN: u64 = 24; const FORMAT_VERSION: u32 = 0; @@ -435,21 +436,38 @@ impl SlabFile { .create(false) .open(data_path)?; - let mut so = SlabOffsets::default(); - let _flags = read_slab_header(&mut data)?; let file_size = data.metadata()?.len(); - data.seek(SeekFrom::Start(SLAB_HDR_LEN))?; + if file_size < SLAB_FILE_HDR_LEN { + return Err(anyhow!( + "slab file {} isn't large enough to be a slab file, size = {file_size} bytes.", + slab_name.to_string_lossy() + )); + } + + let mut so = SlabOffsets::default(); + let _flags = read_slab_header(&mut data)?; // We don't have any additional data in an empty archive - if data.stream_position()? == file_size { + let mut curr_offset = data.stream_position()?; + assert!(curr_offset == SLAB_FILE_HDR_LEN); + + if curr_offset == file_size { return Ok(so); } - so.offsets.push(SLAB_HDR_LEN); + so.offsets.push(SLAB_FILE_HDR_LEN); let mut slab_index = 0; loop { + let remaining = file_size - curr_offset; + if remaining < SLAB_HDR_LEN { + return Err(anyhow!( + "Slab {slab_index} is incomplete, not enough remaining for header, \ + {remaining} remaining bytes." + )); + } + let magic = data.read_u64::()?; let len = data.read_u64::()?; @@ -463,6 +481,14 @@ impl SlabFile { let mut expected_csum: Hash64 = Hash64::default(); data.read_exact(&mut expected_csum)?; + if remaining < SLAB_HDR_LEN + len { + return Err(anyhow!( + "Slab {slab_index} is incomplete, payload is truncated, \ + needing {}, remaining {remaining}", + SLAB_HDR_LEN + len + )); + } + let mut buf = vec![0; len as usize]; data.read_exact(&mut buf)?; @@ -474,8 +500,7 @@ impl SlabFile { )); } - let curr_offset = data.stream_position()?; - + curr_offset = data.stream_position()?; if curr_offset == file_size { break; } From 280131eadaef79a3d181993e0bc8f1b2139426c3 Mon Sep 17 00:00:00 2001 From: Tony Asleson Date: Mon, 9 Oct 2023 09:23:22 -0500 Subject: [PATCH 08/13] Handle potential corruption for interrupted pack The most important objective is to prevent the data slab and hashes slab from getting corrupted and losing archived data. Incomplete writes during a pack to the slabs should be the only way for the slabs to get in an inconsistent state. To allow us to detect and correct this we introduce a check point file at the root of the archive which is written and sync'd to stable storage before we start the pack operation. This way if the pack operation is interrupted, we can put the slab files back to where they were before we started with a repair option. Moving forward, the idea is we add the ability to periodically update the checkpoint for long running operations by quiescing IO to the data slab, hashes slab, offsets files, and the stream output and recording the offset into the input data. Then we can resume the operation by checking the files, truncating where needed, and then resuming the de-dupe operation. Note: If the slab file and the hashes file have no corruption and the number of slabs match between the data and hash slab, the slab files are not touched! Thus the archive size could be much larger than would be indicated by the listing of the archive as the data for the interrupted pack operation is retained, but the stream is not. Signed-off-by: Tony Asleson --- src/check.rs | 172 ++++++++++++++++++++++++++++++++++++++++++++++++--- src/pack.rs | 24 ++++++- src/slab.rs | 31 +++++++++- 3 files changed, 213 insertions(+), 14 deletions(-) diff --git a/src/check.rs b/src/check.rs index bca023a..3bf7ae6 100644 --- a/src/check.rs +++ b/src/check.rs @@ -1,15 +1,30 @@ +use anyhow::{anyhow, Context, Result}; +use serde_derive::{Deserialize, Serialize}; use std::env; -use std::path::Path; +use std::fs; +use std::io::Write; +use std::os::unix::prelude::OpenOptionsExt; +use std::path::{Path, PathBuf}; use std::sync::Arc; -use anyhow::{anyhow, Result}; use clap::ArgMatches; use crate::output::Output; use crate::paths; use crate::slab::*; -pub fn run(matches: &ArgMatches, _output: Arc) -> Result<()> { +fn remove_incomplete_stream(cp: &CheckPoint) -> Result<()> { + // Data and hashes has been validated, remove the stream file directory + let stream_dir_to_del = PathBuf::from(&cp.stream_path).canonicalize()?; + fs::remove_dir_all(stream_dir_to_del.clone())?; + + // Everything should be good now, remove the check point + CheckPoint::end()?; + + Ok(()) +} + +pub fn run(matches: &ArgMatches, output: Arc) -> Result<()> { let archive_dir = Path::new(matches.value_of("ARCHIVE").unwrap()).canonicalize()?; let repair = matches.is_present("REPAIR"); @@ -18,15 +33,152 @@ pub fn run(matches: &ArgMatches, _output: Arc) -> Result<()> { let data_path = paths::data_path(); let hashes_path = paths::hashes_path(); - let num_data_slabs = SlabFile::verify(data_path.clone(), repair)?; - let num_hash_slabs = SlabFile::verify(hashes_path.clone(), repair)?; + output.report.progress(0); + output.report.set_title("Verifying archive"); + + // Load up a check point if we have one. + let cp = CheckPoint::read(archive_dir.clone())?; + + let num_data_slabs = SlabFile::verify(data_path.clone(), repair); + output.report.progress(25); + let num_hash_slabs = SlabFile::verify(hashes_path.clone(), repair); + output.report.progress(50); + if (num_data_slabs.is_err() || num_hash_slabs.is_err()) + || (num_data_slabs.as_ref().unwrap() != num_hash_slabs.as_ref().unwrap()) + { + if !repair || cp.is_none() { + return Err(anyhow!( + "The number of slabs in the data file {} != {} number in hashes file!", + num_data_slabs?, + num_hash_slabs? + )); + } + + output.report.set_title("Repairing archive ..."); - if num_data_slabs != num_hash_slabs { - return Err(anyhow!( - "Number of slab entries in data slab {num_data_slabs} \ - != {num_hash_slabs} in hashes file!" - )); + // We tried to do a non-loss data fix which didn't work, we now have to revert the data + // slab to a known good state. It doesn't matter if one of the slabs verifies ok, we need + // to be a matched set. So we put both is known good state, but before we do we will make + // sure our slabs are bigger than our starting sizes. If they aren't there is nothing we + // can do to fix this and we won't do anything. + let cp = cp.unwrap(); + let data_meta = fs::metadata(&data_path)?; + let hashes_meta = fs::metadata(&hashes_path)?; + + if data_meta.len() >= cp.data_start_size && hashes_meta.len() >= cp.hash_start_size { + output + .report + .info("Rolling back archive to previous state..."); + + // Make sure the truncated size verifies by verifying what part we are keeping first + SlabFile::truncate(data_path.clone(), cp.data_start_size, false)?; + SlabFile::truncate(hashes_path.clone(), cp.hash_start_size, false)?; + + // Do the actual truncate which also fixes up the offsets file to match + SlabFile::truncate(data_path, cp.data_start_size, true)?; + output.report.progress(75); + SlabFile::truncate(hashes_path, cp.hash_start_size, true)?; + + remove_incomplete_stream(&cp)?; + output.report.progress(100); + output.report.info("Archive restored to previous state."); + } else { + return Err(anyhow!( + "We're unable to repair this archive, check point sizes > \ + current file sizes, missing data!" + )); + } + } else if repair && cp.is_some() { + remove_incomplete_stream(&cp.unwrap())?; } Ok(()) } + +#[derive(Debug, Deserialize, Serialize)] +pub struct CheckPoint { + pub source_path: String, + pub stream_path: String, + pub data_start_size: u64, + pub hash_start_size: u64, + pub data_curr_size: u64, + pub hash_curr_size: u64, + pub input_offset: u64, + pub checksum: u64, +} + +pub fn checkpoint_path(root: &str) -> PathBuf { + [root, "checkpoint.toml"].iter().collect() +} + +impl CheckPoint { + pub fn start( + source_path: &str, + stream_path: &str, + data_start_size: u64, + hash_start_size: u64, + ) -> Self { + CheckPoint { + source_path: String::from(source_path), + stream_path: String::from(stream_path), + data_start_size, + hash_start_size, + data_curr_size: data_start_size, + hash_curr_size: hash_start_size, + input_offset: 0, + checksum: 0, + } + } + + pub fn write(&mut self, root: &Path) -> Result<()> { + let file_name = checkpoint_path( + root.to_str() + .ok_or_else(|| anyhow!("Invalid root path {}", root.display()))?, + ); + + { + //TODO: make the checksum mean somthing and check it in the read. It's important + // that the values are correct before we destroy data during a repair. Maybe this + // file shouldn't be in a human readable format? + let mut output = fs::OpenOptions::new() + .read(false) + .write(true) + .custom_flags(libc::O_SYNC) + .create_new(true) + .open(file_name) + .context("Previous operation interrupted, please run verify-all")?; + + let toml = toml::to_string(self).unwrap(); + output.write_all(toml.as_bytes())?; + } + + // Sync containing dentry to ensure checkpoint file exists + fs::File::open(root)?.sync_all()?; + + Ok(()) + } + + pub fn end() -> Result<()> { + let root = env::current_dir()?; + let root_str = root.clone().into_os_string().into_string().unwrap(); + let cp_file = checkpoint_path(root_str.as_str()); + + fs::remove_file(cp_file).context("error removing checkpoint file!")?; + + fs::File::open(root)?.sync_all()?; + Ok(()) + } + + fn read>(root: P) -> Result> { + let file_name = checkpoint_path(root.as_ref().to_str().unwrap()); + let content = fs::read_to_string(file_name); + if content.is_err() { + // No checkpoint + Ok(None) + } else { + let cp: CheckPoint = + toml::from_str(&content.unwrap()).context("couldn't parse checkpoint file")?; + Ok(Some(cp)) + } + } +} diff --git a/src/pack.rs b/src/pack.rs index 7468d97..0f6c965 100644 --- a/src/pack.rs +++ b/src/pack.rs @@ -15,6 +15,7 @@ use std::num::NonZeroUsize; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; +use crate::check::*; use crate::chunkers::*; use crate::config; use crate::content_sensitive_splitter::*; @@ -423,8 +424,23 @@ impl Packer { hashes_file.get_file_size() }; + let input_name_string = self + .input_path + .clone() + .into_os_string() + .into_string() + .unwrap(); + let (stream_id, mut stream_path) = new_stream_path()?; + CheckPoint::start( + input_name_string.as_str(), + stream_path.as_os_str().to_str().unwrap(), + data_size, + hashes_size, + ) + .write(env::current_dir()?.as_path())?; + std::fs::create_dir(stream_path.clone())?; stream_path.push("stream"); @@ -750,7 +766,13 @@ pub fn run(matches: &ArgMatches, output: Arc) -> Result<()> { output .report .set_title(&format!("Packing {} ...", input_file.display())); - packer.pack(hashes_file) + packer.pack(hashes_file)?; + // The packer.pack needs to return before the Drop(s) get called ensuring the output files have + // been flushed before we can end our pack check point. It's entirely possible that the data + // is safely written to disk and we exit before we remove the checkpoint file, which at worst + // case would cause us to throw away the last pack operation. + CheckPoint::end()?; + Ok(()) } //----------------------------------------- diff --git a/src/slab.rs b/src/slab.rs index 12af52a..6472e88 100644 --- a/src/slab.rs +++ b/src/slab.rs @@ -427,7 +427,7 @@ impl SlabFile { }) } - fn verify_>(data_path: P) -> Result { + fn verify_>(data_path: P, check_len: Option) -> Result { let slab_name = data_path.as_ref().to_path_buf(); let mut data = OpenOptions::new() @@ -436,7 +436,10 @@ impl SlabFile { .create(false) .open(data_path)?; - let file_size = data.metadata()?.len(); + let file_size = match check_len { + Some(len) => len, + None => data.metadata()?.len(), + }; if file_size < SLAB_FILE_HDR_LEN { return Err(anyhow!( @@ -516,7 +519,7 @@ impl SlabFile { let data_mtime = std::fs::metadata(data_path.as_ref().clone())?.modified()?; let offsets_path = offsets_path(&data_path); - let actual_offsets = Self::verify_(data_path)?; + let actual_offsets = Self::verify_(data_path, None)?; let stored_offsets = SlabOffsets::read_offset_file(&offsets_path, None)?; let actual_count = actual_offsets.offsets.len(); @@ -552,6 +555,28 @@ impl SlabFile { result } + pub fn truncate>(data_path: P, len: u64, do_truncate: bool) -> Result<()> { + if !do_truncate { + Self::verify_(data_path, Some(len))?; + Ok(()) + } else { + let offsets_file = offsets_path(&data_path); + { + let mut data = OpenOptions::new() + .read(true) + .write(true) + .create(false) + .open(data_path.as_ref().clone())?; + data.set_len(len)?; + data.flush()?; + } + + let slab_offsets = Self::verify_(data_path, None)?; + slab_offsets.write_offset_file(offsets_file)?; + Ok(()) + } + } + pub fn close(&mut self) -> Result<()> { self.tx = None; let mut tid = None; From e17ffdf76916846092d3729a9874ce552e9f337a Mon Sep 17 00:00:00 2001 From: Tony Asleson Date: Mon, 9 Oct 2023 11:42:17 -0500 Subject: [PATCH 09/13] Raise error if check point exists If a checkpoint exists we will raise an error and require the user to correct before they proceed. Signed-off-by: Tony Asleson --- src/check.rs | 11 +++++++++++ src/dump_stream.rs | 2 ++ src/list.rs | 2 ++ src/pack.rs | 2 ++ src/unpack.rs | 5 ++++- 5 files changed, 21 insertions(+), 1 deletion(-) diff --git a/src/check.rs b/src/check.rs index 3bf7ae6..644cab9 100644 --- a/src/check.rs +++ b/src/check.rs @@ -181,4 +181,15 @@ impl CheckPoint { Ok(Some(cp)) } } + + pub fn interrupted() -> Result<()> { + let root = env::current_dir()?; + match Self::read(root).context("error while checking for checkpoint file!")? { + Some(cp) => Err(anyhow!( + "pack operation of {} was interrupted, run verify-all -r to correct", + cp.source_path + )), + None => Ok(()), + } + } } diff --git a/src/dump_stream.rs b/src/dump_stream.rs index 2a34195..adc3af6 100644 --- a/src/dump_stream.rs +++ b/src/dump_stream.rs @@ -4,6 +4,7 @@ use std::env; use std::path::Path; use std::sync::Arc; +use crate::check::*; use crate::output::Output; use crate::stream::*; @@ -14,6 +15,7 @@ pub fn run(matches: &ArgMatches, output: Arc) -> Result<()> { let stream = matches.value_of("STREAM").unwrap(); env::set_current_dir(archive_dir)?; + CheckPoint::interrupted()?; let mut d = Dumper::new(stream)?; d.dump(output) diff --git a/src/list.rs b/src/list.rs index 7eea590..0abcb53 100644 --- a/src/list.rs +++ b/src/list.rs @@ -9,6 +9,7 @@ use std::fs; use std::path::Path; use std::sync::Arc; +use crate::check::*; use crate::config; use crate::output::Output; @@ -22,6 +23,7 @@ pub fn run(matches: &ArgMatches, output: Arc) -> Result<()> { let archive_dir = Path::new(matches.value_of("ARCHIVE").unwrap()).canonicalize()?; env::set_current_dir(&archive_dir)?; + CheckPoint::interrupted()?; let paths = fs::read_dir(Path::new("./streams"))?; let stream_ids = paths diff --git a/src/pack.rs b/src/pack.rs index 0f6c965..5bdb39b 100644 --- a/src/pack.rs +++ b/src/pack.rs @@ -735,6 +735,8 @@ pub fn run(matches: &ArgMatches, output: Arc) -> Result<()> { env::set_current_dir(archive_dir)?; let config = config::read_config(".")?; + CheckPoint::interrupted()?; + output .report .set_title(&format!("Building packer {} ...", input_file.display())); diff --git a/src/unpack.rs b/src/unpack.rs index 8cf1cae..fe7e952 100644 --- a/src/unpack.rs +++ b/src/unpack.rs @@ -11,6 +11,7 @@ use std::path::Path; use std::sync::Arc; use thinp::report::*; +use crate::check::*; use crate::chunkers::*; use crate::config; use crate::hash_index::*; @@ -442,6 +443,9 @@ pub fn run_unpack(matches: &ArgMatches, report: Arc) -> Result<()> { let stream = matches.value_of("STREAM").unwrap(); let create = matches.is_present("CREATE"); + env::set_current_dir(archive_dir)?; + CheckPoint::interrupted()?; + let output = if create { fs::OpenOptions::new() .read(false) @@ -456,7 +460,6 @@ pub fn run_unpack(matches: &ArgMatches, report: Arc) -> Result<()> { .open(output_file) .context("Couldn't open output")? }; - env::set_current_dir(archive_dir)?; report.set_title(&format!("Unpacking {} ...", output_file.display())); if create { From 63994f2b3674bf53f60677bfe48a0d2a62c3f7d1 Mon Sep 17 00:00:00 2001 From: Tony Asleson Date: Mon, 9 Oct 2023 16:23:45 -0500 Subject: [PATCH 10/13] Use binary file for checkpoint The toml file format uses signed 64 bit integers. Thus we cannot use it to represent unsigned 64 bit integers which are needed. Comvert file to binary and have the entire file protected by a 8 byte checksum. Note: We should investigate using bincode for this to get automatic ser./des. support. Signed-off-by: Tony Asleson --- Cargo.lock | 10 +++ Cargo.toml | 1 + src/check.rs | 219 ++++++++++++++++++++++++++++++++++++++++++++------- 3 files changed, 203 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6ff820e..919868e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -108,6 +108,7 @@ dependencies = [ "anyhow", "atty", "blake2", + "bytebuffer", "byteorder", "chrono", "clap", @@ -149,6 +150,15 @@ version = "3.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b1ce199063694f33ffb7dd4e0ee620741495c32833cde5aa08f02a0bf96f0c8" +[[package]] +name = "bytebuffer" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa7bfaf7cd08cacd74cdc6b521c37ac39cbc92692e5ab5c21ed5657a749b577c" +dependencies = [ + "byteorder", +] + [[package]] name = "bytemuck" version = "1.13.1" diff --git a/Cargo.toml b/Cargo.toml index 50b2a18..5b059c0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ edition = "2021" anyhow = "1.0" atty = "0.2" blake2 = "0.10" +bytebuffer = "2.2" byteorder = "1.4" chrono = "0.4" clap = { version = "3.1.0", features = ["cargo"] } diff --git a/src/check.rs b/src/check.rs index 644cab9..c877b9d 100644 --- a/src/check.rs +++ b/src/check.rs @@ -1,14 +1,18 @@ use anyhow::{anyhow, Context, Result}; +use byteorder::{LittleEndian, ReadBytesExt}; +use nom::AsBytes; use serde_derive::{Deserialize, Serialize}; use std::env; use std::fs; -use std::io::Write; +use std::io::{Read, Seek, Write}; use std::os::unix::prelude::OpenOptionsExt; use std::path::{Path, PathBuf}; use std::sync::Arc; +use bytebuffer::ByteBuffer; use clap::ArgMatches; +use crate::hash::*; use crate::output::Output; use crate::paths; use crate::slab::*; @@ -95,23 +99,26 @@ pub fn run(matches: &ArgMatches, output: Arc) -> Result<()> { Ok(()) } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)] pub struct CheckPoint { - pub source_path: String, - pub stream_path: String, pub data_start_size: u64, pub hash_start_size: u64, pub data_curr_size: u64, pub hash_curr_size: u64, pub input_offset: u64, - pub checksum: u64, + pub source_path: String, + pub stream_path: String, } pub fn checkpoint_path(root: &str) -> PathBuf { - [root, "checkpoint.toml"].iter().collect() + [root, "checkpoint.bin"].iter().collect() } impl CheckPoint { + pub const VERSION: u64 = 0; + pub const MAGIC: u64 = 0xD00DDEAD10CCD00D; + pub const MIN_SIZE: u64 = 72; + pub fn start( source_path: &str, stream_path: &str, @@ -126,7 +133,6 @@ impl CheckPoint { data_curr_size: data_start_size, hash_curr_size: hash_start_size, input_offset: 0, - checksum: 0, } } @@ -137,19 +143,33 @@ impl CheckPoint { ); { - //TODO: make the checksum mean somthing and check it in the read. It's important - // that the values are correct before we destroy data during a repair. Maybe this - // file shouldn't be in a human readable format? - let mut output = fs::OpenOptions::new() + let mut output = ByteBuffer::new(); + output.set_endian(bytebuffer::Endian::LittleEndian); + + let mut cpf = fs::OpenOptions::new() .read(false) .write(true) .custom_flags(libc::O_SYNC) .create_new(true) .open(file_name) - .context("Previous operation interrupted, please run verify-all")?; + .context("Previous pack operation interrupted, please run verify-all")?; + + output.write_u64(Self::MAGIC); + output.write_u64(Self::VERSION); + output.write_u64(self.data_start_size); + output.write_u64(self.hash_start_size); + output.write_u64(self.data_curr_size); + output.write_u64(self.hash_curr_size); + output.write_u64(self.input_offset); + output.write_u32(self.source_path.len() as u32); + output.write_u32(self.stream_path.len() as u32); + output.write_all(self.source_path.as_bytes())?; + output.write_all(self.stream_path.as_bytes())?; - let toml = toml::to_string(self).unwrap(); - output.write_all(toml.as_bytes())?; + let cs = hash_64(output.as_bytes()); + output.write_all(cs.as_bytes())?; + + cpf.write_all(output.as_bytes())?; } // Sync containing dentry to ensure checkpoint file exists @@ -158,6 +178,77 @@ impl CheckPoint { Ok(()) } + fn read>(root: P) -> Result> { + let file_name = checkpoint_path(root.as_ref().to_str().unwrap()); + + let cpf = fs::OpenOptions::new() + .read(true) + .write(false) + .create_new(false) + .open(file_name); + + if cpf.is_err() { + // No checkpoint + Ok(None) + } else { + let mut cpf = cpf?; + let len = cpf.metadata()?.len(); + + if len < Self::MIN_SIZE { + return Err(anyhow!( + "Checkpoint file is too small to be valid, require {} {len} bytes", + Self::MIN_SIZE + )); + } + + let mut payload = vec![0; (len - 8) as usize]; + cpf.read_exact(&mut payload)?; + let expected_cs = cpf.read_u64::()?; + + assert!(cpf.stream_position()? == len); + + let actual_cs = u64::from_le_bytes(hash_64(&payload).into()); + if actual_cs != expected_cs { + return Err(anyhow!("Checkpoint file checksum is invalid!")); + } + + let mut input = ByteBuffer::from_bytes(&payload); + input.set_endian(bytebuffer::Endian::LittleEndian); + + assert!(input.read_u64()? == Self::MAGIC); + assert!(input.read_u64()? == Self::VERSION); + + let data_start_size = input.read_u64()?; + let hash_start_size = input.read_u64()?; + let data_curr_size = input.read_u64()?; + let hash_curr_size = input.read_u64()?; + let input_offset = input.read_u64()?; + + let source_len = input.read_u32()?; + let stream_len = input.read_u32()?; + + let mut source_binary = vec![0u8; source_len as usize]; + input.read_exact(&mut source_binary)?; + let mut stream_binary = vec![0u8; stream_len as usize]; + input.read_exact(&mut stream_binary)?; + + let source_path = String::from_utf8(source_binary)?; + let stream_path = String::from_utf8(stream_binary)?; + + let cp = CheckPoint { + data_start_size, + hash_start_size, + data_curr_size, + hash_curr_size, + input_offset, + source_path, + stream_path, + }; + + Ok(Some(cp)) + } + } + pub fn end() -> Result<()> { let root = env::current_dir()?; let root_str = root.clone().into_os_string().into_string().unwrap(); @@ -169,19 +260,6 @@ impl CheckPoint { Ok(()) } - fn read>(root: P) -> Result> { - let file_name = checkpoint_path(root.as_ref().to_str().unwrap()); - let content = fs::read_to_string(file_name); - if content.is_err() { - // No checkpoint - Ok(None) - } else { - let cp: CheckPoint = - toml::from_str(&content.unwrap()).context("couldn't parse checkpoint file")?; - Ok(Some(cp)) - } - } - pub fn interrupted() -> Result<()> { let root = env::current_dir()?; match Self::read(root).context("error while checking for checkpoint file!")? { @@ -193,3 +271,90 @@ impl CheckPoint { } } } + +#[test] +fn check_ranges() { + let test_archive = PathBuf::from("/tmp/check_ranges"); + + std::fs::create_dir_all(&test_archive).unwrap(); + + let mut t = CheckPoint::start("/tmp/testing", "/tmp/testing/stream", u64::MAX, u64::MAX); + + let write_result = t.write(&test_archive); + assert!( + write_result.is_ok(), + "CheckPoint.write failed {:?}", + write_result.unwrap() + ); + + let read_back = CheckPoint::read(&test_archive).unwrap(); + + assert!(std::fs::remove_dir_all(&test_archive).is_ok()); + + assert!( + read_back.is_some(), + "CheckPoint::read error {:?}", + read_back.unwrap() + ); + + assert!(read_back.unwrap() == t); +} + +#[test] +fn check_small() { + let test_archive = PathBuf::from("/tmp/check_small"); + + std::fs::create_dir_all(&test_archive).unwrap(); + + let mut t = CheckPoint::start("", "", u64::MAX, u64::MAX); + + let write_result = t.write(&test_archive); + assert!( + write_result.is_ok(), + "CheckPoint.write failed {:?}", + write_result.unwrap() + ); + + let read_back = CheckPoint::read(&test_archive).unwrap(); + + assert!(std::fs::remove_dir_all(&test_archive).is_ok()); + + assert!( + read_back.is_some(), + "CheckPoint::read error {:?}", + read_back.unwrap() + ); + + assert!(read_back.unwrap() == t); +} + +#[test] +fn check_fields() { + let test_archive = PathBuf::from("/tmp/check_fields"); + std::fs::create_dir_all(&test_archive).unwrap(); + + let mut t = CheckPoint::start("source/path", "stream/path/yes", 1024, 384); + + t.data_curr_size = t.data_start_size * 2; + t.hash_curr_size = t.hash_start_size * 2; + t.input_offset = 1024 * 1024 * 1024; + + let write_result = t.write(&test_archive); + assert!( + write_result.is_ok(), + "CheckPoint.write failed {:?}", + write_result.unwrap() + ); + + let read_back = CheckPoint::read(&test_archive).unwrap(); + + assert!(std::fs::remove_dir_all(&test_archive).is_ok()); + + assert!( + read_back.is_some(), + "CheckPoint::read error {:?}", + read_back.unwrap() + ); + + assert!(read_back.unwrap() == t); +} From 130f30a2f6b8c9a1474b2ac3d7b52cc1f69bb63f Mon Sep 17 00:00:00 2001 From: Tony Asleson Date: Tue, 10 Oct 2023 09:50:41 -0500 Subject: [PATCH 11/13] Use bincode for checkpoint data Trading our implementation for library code. Signed-off-by: Tony Asleson --- Cargo.lock | 20 ++++++------ Cargo.toml | 2 +- src/check.rs | 89 ++++++++++++++++++++-------------------------------- 3 files changed, 45 insertions(+), 66 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 919868e..9bf3f11 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -66,6 +66,15 @@ version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5" +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "bindgen" version = "0.63.0" @@ -107,8 +116,8 @@ version = "0.1.0" dependencies = [ "anyhow", "atty", + "bincode", "blake2", - "bytebuffer", "byteorder", "chrono", "clap", @@ -150,15 +159,6 @@ version = "3.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b1ce199063694f33ffb7dd4e0ee620741495c32833cde5aa08f02a0bf96f0c8" -[[package]] -name = "bytebuffer" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa7bfaf7cd08cacd74cdc6b521c37ac39cbc92692e5ab5c21ed5657a749b577c" -dependencies = [ - "byteorder", -] - [[package]] name = "bytemuck" version = "1.13.1" diff --git a/Cargo.toml b/Cargo.toml index 5b059c0..c4f511f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,8 +8,8 @@ edition = "2021" [dependencies] anyhow = "1.0" atty = "0.2" +bincode = "1.3" blake2 = "0.10" -bytebuffer = "2.2" byteorder = "1.4" chrono = "0.4" clap = { version = "3.1.0", features = ["cargo"] } diff --git a/src/check.rs b/src/check.rs index c877b9d..cf11726 100644 --- a/src/check.rs +++ b/src/check.rs @@ -9,7 +9,7 @@ use std::os::unix::prelude::OpenOptionsExt; use std::path::{Path, PathBuf}; use std::sync::Arc; -use bytebuffer::ByteBuffer; +use bincode::*; use clap::ArgMatches; use crate::hash::*; @@ -101,6 +101,8 @@ pub fn run(matches: &ArgMatches, output: Arc) -> Result<()> { #[derive(Debug, Deserialize, Serialize, PartialEq, Eq)] pub struct CheckPoint { + pub magic: u64, + pub version: u32, pub data_start_size: u64, pub hash_start_size: u64, pub data_curr_size: u64, @@ -115,9 +117,9 @@ pub fn checkpoint_path(root: &str) -> PathBuf { } impl CheckPoint { - pub const VERSION: u64 = 0; + pub const VERSION: u32 = 0; pub const MAGIC: u64 = 0xD00DDEAD10CCD00D; - pub const MIN_SIZE: u64 = 72; + pub const MIN_SIZE: u64 = 76; pub fn start( source_path: &str, @@ -126,13 +128,15 @@ impl CheckPoint { hash_start_size: u64, ) -> Self { CheckPoint { - source_path: String::from(source_path), - stream_path: String::from(stream_path), + magic: Self::MAGIC, + version: Self::VERSION, data_start_size, hash_start_size, data_curr_size: data_start_size, hash_curr_size: hash_start_size, input_offset: 0, + source_path: String::from(source_path), + stream_path: String::from(stream_path), } } @@ -143,8 +147,11 @@ impl CheckPoint { ); { - let mut output = ByteBuffer::new(); - output.set_endian(bytebuffer::Endian::LittleEndian); + let ser = bincode::DefaultOptions::new() + .with_fixint_encoding() + .with_little_endian(); + let objbytes = ser.serialize(self)?; + let checksum = hash_64(&objbytes); let mut cpf = fs::OpenOptions::new() .read(false) @@ -154,22 +161,8 @@ impl CheckPoint { .open(file_name) .context("Previous pack operation interrupted, please run verify-all")?; - output.write_u64(Self::MAGIC); - output.write_u64(Self::VERSION); - output.write_u64(self.data_start_size); - output.write_u64(self.hash_start_size); - output.write_u64(self.data_curr_size); - output.write_u64(self.hash_curr_size); - output.write_u64(self.input_offset); - output.write_u32(self.source_path.len() as u32); - output.write_u32(self.stream_path.len() as u32); - output.write_all(self.source_path.as_bytes())?; - output.write_all(self.stream_path.as_bytes())?; - - let cs = hash_64(output.as_bytes()); - output.write_all(cs.as_bytes())?; - - cpf.write_all(output.as_bytes())?; + cpf.write_all(&objbytes)?; + cpf.write_all(checksum.as_bytes())?; } // Sync containing dentry to ensure checkpoint file exists @@ -212,39 +205,25 @@ impl CheckPoint { return Err(anyhow!("Checkpoint file checksum is invalid!")); } - let mut input = ByteBuffer::from_bytes(&payload); - input.set_endian(bytebuffer::Endian::LittleEndian); - - assert!(input.read_u64()? == Self::MAGIC); - assert!(input.read_u64()? == Self::VERSION); - - let data_start_size = input.read_u64()?; - let hash_start_size = input.read_u64()?; - let data_curr_size = input.read_u64()?; - let hash_curr_size = input.read_u64()?; - let input_offset = input.read_u64()?; - - let source_len = input.read_u32()?; - let stream_len = input.read_u32()?; - - let mut source_binary = vec![0u8; source_len as usize]; - input.read_exact(&mut source_binary)?; - let mut stream_binary = vec![0u8; stream_len as usize]; - input.read_exact(&mut stream_binary)?; - - let source_path = String::from_utf8(source_binary)?; - let stream_path = String::from_utf8(stream_binary)?; - - let cp = CheckPoint { - data_start_size, - hash_start_size, - data_curr_size, - hash_curr_size, - input_offset, - source_path, - stream_path, - }; + let des = bincode::DefaultOptions::new() + .with_fixint_encoding() + .with_little_endian(); + let cp: CheckPoint = des.deserialize(&payload)?; + if cp.version != Self::VERSION { + return Err(anyhow!( + "Incorrect version {} expected {}", + cp.version, + Self::VERSION + )); + } + if cp.magic != Self::MAGIC { + return Err(anyhow!( + "Magic incorrect {} expected {}", + cp.magic, + Self::MAGIC + )); + } Ok(Some(cp)) } } From a5bbce631fd505db3248c50943764069db8fb33a Mon Sep 17 00:00:00 2001 From: Tony Asleson Date: Tue, 10 Oct 2023 10:37:40 -0500 Subject: [PATCH 12/13] check.rs: Remove needless clones Signed-off-by: Tony Asleson --- src/check.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/check.rs b/src/check.rs index cf11726..02c3ab3 100644 --- a/src/check.rs +++ b/src/check.rs @@ -41,11 +41,11 @@ pub fn run(matches: &ArgMatches, output: Arc) -> Result<()> { output.report.set_title("Verifying archive"); // Load up a check point if we have one. - let cp = CheckPoint::read(archive_dir.clone())?; + let cp = CheckPoint::read(&archive_dir)?; - let num_data_slabs = SlabFile::verify(data_path.clone(), repair); + let num_data_slabs = SlabFile::verify(&data_path, repair); output.report.progress(25); - let num_hash_slabs = SlabFile::verify(hashes_path.clone(), repair); + let num_hash_slabs = SlabFile::verify(&hashes_path, repair); output.report.progress(50); if (num_data_slabs.is_err() || num_hash_slabs.is_err()) || (num_data_slabs.as_ref().unwrap() != num_hash_slabs.as_ref().unwrap()) @@ -75,8 +75,8 @@ pub fn run(matches: &ArgMatches, output: Arc) -> Result<()> { .info("Rolling back archive to previous state..."); // Make sure the truncated size verifies by verifying what part we are keeping first - SlabFile::truncate(data_path.clone(), cp.data_start_size, false)?; - SlabFile::truncate(hashes_path.clone(), cp.hash_start_size, false)?; + SlabFile::truncate(&data_path, cp.data_start_size, false)?; + SlabFile::truncate(&hashes_path, cp.hash_start_size, false)?; // Do the actual truncate which also fixes up the offsets file to match SlabFile::truncate(data_path, cp.data_start_size, true)?; From cb364631af36321a30dc3ddec0e8b86e8cec51b8 Mon Sep 17 00:00:00 2001 From: Tony Asleson Date: Thu, 12 Oct 2023 12:26:21 -0500 Subject: [PATCH 13/13] Remove 'verify-all' command, replace with 'validate' Instead of having a 'verify' and 'verify-all', we'll remove the 'verify-all' and add a validate which includes sub commands for all and stream. This preserves backwards compatibility in the command line. Signed-off-by: Tony Asleson --- src/check.rs | 9 +++++---- src/main.rs | 48 +++++++++++++++++++++++++++++++++++++----------- src/slab.rs | 4 ++-- 3 files changed, 44 insertions(+), 17 deletions(-) diff --git a/src/check.rs b/src/check.rs index 02c3ab3..f904f4a 100644 --- a/src/check.rs +++ b/src/check.rs @@ -28,9 +28,8 @@ fn remove_incomplete_stream(cp: &CheckPoint) -> Result<()> { Ok(()) } -pub fn run(matches: &ArgMatches, output: Arc) -> Result<()> { +pub fn run(matches: &ArgMatches, output: Arc, repair: bool) -> Result<()> { let archive_dir = Path::new(matches.value_of("ARCHIVE").unwrap()).canonicalize()?; - let repair = matches.is_present("REPAIR"); env::set_current_dir(archive_dir.clone())?; @@ -159,7 +158,9 @@ impl CheckPoint { .custom_flags(libc::O_SYNC) .create_new(true) .open(file_name) - .context("Previous pack operation interrupted, please run verify-all")?; + .context( + "Previous pack operation interrupted, please run 'validate --repair all'", + )?; cpf.write_all(&objbytes)?; cpf.write_all(checksum.as_bytes())?; @@ -243,7 +244,7 @@ impl CheckPoint { let root = env::current_dir()?; match Self::read(root).context("error while checking for checkpoint file!")? { Some(cp) => Err(anyhow!( - "pack operation of {} was interrupted, run verify-all -r to correct", + "pack operation of {} was interrupted, run 'validate --repair all", cp.source_path )), None => Ok(()), diff --git a/src/main.rs b/src/main.rs index 4cca5aa..5baec9b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use clap::{command, Arg, ArgMatches, Command}; +use clap::{command, Arg, ArgMatches, Command, SubCommand}; use std::env; use std::process::exit; use std::sync::Arc; @@ -66,13 +66,36 @@ fn main_() -> Result<()> { .takes_value(false); let repair: Arg = Arg::new("REPAIR") - .help("repairs an archive") + .help("Attempts a repair of an archive") .required(false) .long("repair") .short('r') .value_name("REPAIR") .takes_value(false); + let validate_operations = SubCommand::with_name("validate") + .about("Validate operations") + .arg_required_else_help(true) + .arg(repair.clone()) + .subcommand( + SubCommand::with_name("all") + .help_template( + "Validates the archive and optionally repairs \n\ + corruption that may have occurred during interrupted 'pack' operation.\n\ + Note: Data for previous interrupted 'pack' operation will be lost. \n\ + \nOPTIONS:\n{options}\n USAGE:\n\t{usage}", + ) + .about("Validates an archive") + .arg(archive_arg.clone()), + ) + .subcommand( + SubCommand::with_name("stream") + .help("Validates an individual stream") + .about("Validates an individual stream") + .arg(stream_arg.clone()) + .arg(archive_arg.clone()), + ); + let matches = command!() .arg(json) .propagate_version(true) @@ -193,12 +216,7 @@ fn main_() -> Result<()> { .about("lists the streams in the archive") .arg(archive_arg.clone()), ) - .subcommand( - Command::new("verify-all") - .about("verifies the integrity of the archive") - .arg(archive_arg.clone()) - .arg(repair.clone()), - ) + .subcommand(validate_operations) .get_matches(); let report = mk_report(&matches); @@ -226,9 +244,17 @@ fn main_() -> Result<()> { Some(("dump-stream", sub_matches)) => { dump_stream::run(sub_matches, output)?; } - Some(("verify-all", sub_matches)) => { - check::run(sub_matches, output)?; - } + Some(("validate", sub_matches)) => match sub_matches.subcommand() { + Some(("all", args)) => { + // Repair is an argument on the validate commands, not the all. + // Maybe we can figure out how to make it an option of all instead? + check::run(args, output, sub_matches.is_present("REPAIR"))?; + } + Some(("stream", args)) => { + unpack::run_verify(args, report)?; + } + _ => unreachable!("Exhauted list of validate sub commands"), + }, _ => unreachable!("Exhausted list of subcommands and subcommand_required prevents 'None'"), } diff --git a/src/slab.rs b/src/slab.rs index 6472e88..b17058b 100644 --- a/src/slab.rs +++ b/src/slab.rs @@ -86,7 +86,7 @@ impl SlabOffsets { if file_mtime > mtime { return Err(anyhow!( "Offsets file modification time is older than slab data file \ - run blk-archive 'verify-all --repair' to correct!", + run blk-archive 'validate --repair all' to correct!", )); } } @@ -541,7 +541,7 @@ impl SlabFile { } else if data_mtime > offsets_mtime { Err(anyhow!( "Offsets file modification time is older than slab data file \ - run blk-archive 'verify-all --repair' to correct!", + run blk-archive 'validate --repair all' to correct!", )) } else { Ok(actual_count)