diff --git a/src/error.rs b/src/error.rs index 115cb21..311e8e1 100644 --- a/src/error.rs +++ b/src/error.rs @@ -4,6 +4,8 @@ use std::{fmt::Display, fmt::Formatter, io}; pub enum Error { /// IO error. Io(io::Error), + /// Database may be inconsistent since an earlier error. Writes are disabled. + Inconsistent, /// Bucket has too many elements or bucket bits > directory bits. BadBucket { /// Bucket file offset. diff --git a/src/lib.rs b/src/lib.rs index 076ff76..cff2690 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,7 @@ extern crate base64; use base64::Engine; +use std::any::Any; use std::io::{self, Read, Seek, SeekFrom, Write}; mod avail; @@ -38,7 +39,6 @@ pub use options::{BlockSize, ConvertOptions, Create, OpenOptions}; use ser::{write32, write64}; pub use ser::{Alignment, Endian, Layout, Offset}; use std::fs::File; -use std::marker::PhantomData; #[cfg(target_os = "linux")] use std::os::linux::fs::MetadataExt; @@ -61,10 +61,21 @@ pub enum ExportBinMode { Exp64, } -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Debug, Default, PartialEq)] +enum WriteState { + #[default] + Clean, + Dirty, + Inconsistent, +} + +#[derive(Copy, Clone, Debug, Default)] pub struct ReadOnly; -#[derive(Copy, Clone, Debug)] -pub struct ReadWrite; +#[derive(Copy, Clone, Debug, Default)] +pub struct ReadWrite { + sync: bool, + state: WriteState, +} pub trait CacheBucket { fn cache_bucket(&mut self, offset: u64, bucket: Bucket) -> Result<()>; @@ -82,14 +93,14 @@ fn read_ofs(f: &mut std::fs::File, ofs: u64, total_size: usize) -> io::Result { +pub struct Gdbm { pathname: String, f: std::fs::File, pub header: Header, pub dir: Directory, bucket_cache: BucketCache, - rw_phantom: PhantomData, + read_write: R, } // cache_bucket for ReadOnly variant ignores (never receives) dirty displaced buckets. @@ -115,6 +126,7 @@ impl CacheBucket for Gdbm { impl Gdbm where Gdbm: CacheBucket, + R: Default, { // API: open database file, read and validate header pub fn open>( @@ -157,7 +169,7 @@ where header, dir, bucket_cache, - rw_phantom: PhantomData, + read_write: R::default(), }) } @@ -448,14 +460,27 @@ impl Gdbm { BucketCache::new(cache_buckets, Some((bucket_offset, bucket))) }; - Ok(Gdbm { + let mut db = Gdbm { pathname: path.as_ref().to_string_lossy().to_string(), f, header, dir, bucket_cache, - rw_phantom: PhantomData, - }) + read_write: ReadWrite { + sync: open_options.write.sync, + state: WriteState::Dirty, + }, + }; + + if db.read_write.sync { + db.sync()?; + } + + Ok(db) + } + + pub fn set_sync(&mut self, sync: bool) { + self.read_write.sync = sync; } pub fn import_ascii(&mut self, reader: &mut impl Read) -> Result<()> { @@ -632,28 +657,44 @@ impl Gdbm { // write out any cached, not-yet-written metadata and data to storage fn write_dirty(&mut self) -> io::Result<()> { + self.read_write.state = WriteState::Inconsistent; + self.write_buckets()?; self.write_dir()?; self.write_header()?; + self.read_write.state = WriteState::Clean; + Ok(()) } // API: ensure database is flushed to stable storage pub fn sync(&mut self) -> Result<()> { - self.header.increment_numsync(); - self.write_dirty() - .and_then(|_| self.f.sync_data()) - .map_err(Error::Io) + match self.read_write.state { + WriteState::Clean => Ok(()), + WriteState::Inconsistent => Err(Error::Inconsistent), + WriteState::Dirty => { + self.header.increment_numsync(); + self.write_dirty() + .and_then(|_| self.f.sync_data()) + .map_err(Error::Io) + } + } } - // API: remove a key/value pair from db, given a key - pub fn remove<'a, K: Into>>(&mut self, key: K) -> Result>> { - let get_opt = self.int_get(key.into().as_ref())?; + fn int_remove(&mut self, key: &[u8]) -> Result>> { + let get_opt = self.int_get(key)?; if get_opt.is_none() { return Ok(None); } + + if self.read_write.state == WriteState::Inconsistent { + return Err(Error::Inconsistent); + } + + self.read_write.state = WriteState::Inconsistent; + let (elem_ofs, data) = get_opt.unwrap(); let elem = self @@ -665,12 +706,22 @@ impl Gdbm { // release record bytes to available-space pool self.free_record(elem.data_ofs, elem.key_size + elem.data_size)?; - // flush any dirty pages to OS - self.write_dirty()?; + self.read_write.state = WriteState::Dirty; Ok(Some(data)) } + // API: remove a key/value pair from db, given a key + pub fn remove<'a, K: Into>>(&mut self, key: K) -> Result>> { + self.int_remove(key.into().as_ref()).and_then(|old_value| { + if old_value.is_some() && self.read_write.sync { + self.sync()?; + } + + Ok(old_value) + }) + } + fn allocate_record(&mut self, size: u32) -> io::Result { let (offset, length) = match self .bucket_cache @@ -697,6 +748,12 @@ impl Gdbm { } fn int_insert(&mut self, key: Vec, data: Vec) -> Result<()> { + if self.read_write.state == WriteState::Inconsistent { + return Err(Error::Inconsistent); + } + + self.read_write.state = WriteState::Inconsistent; + let offset = self.allocate_record((key.len() + data.len()) as u32)?; self.f @@ -717,6 +774,8 @@ impl Gdbm { .unwrap() .insert(bucket_elem); + self.read_write.state = WriteState::Dirty; + Ok(()) } @@ -726,10 +785,18 @@ impl Gdbm { value: V, ) -> Result>> { let key = key.into(); - self.remove(key.as_ref()).and_then(|oldkey| { - self.int_insert(key.into_vec(), value.into().into_vec()) - .map(|_| oldkey) - }) + self.int_remove(key.as_ref()) + .and_then(|oldvalue| { + self.int_insert(key.into_vec(), value.into().into_vec()) + .map(|_| oldvalue) + }) + .and_then(|oldvalue| { + if self.read_write.sync { + self.sync()?; + } + + Ok(oldvalue) + }) } pub fn try_insert, V: Into>( @@ -742,7 +809,14 @@ impl Gdbm { Some(_) => Ok((false, olddata)), _ => self .int_insert(key.into_vec(), value.into().into_vec()) - .map(|_| (true, None)), + .map(|_| (true, None)) + .and_then(|result| { + if self.read_write.sync { + self.sync()?; + } + + Ok(result) + }), }) } @@ -806,15 +880,34 @@ impl Gdbm { // API: convert pub fn convert(&mut self, options: &ConvertOptions) -> Result<()> { + if self.read_write.state == WriteState::Inconsistent { + return Err(Error::Inconsistent); + } + + self.read_write.state = WriteState::Inconsistent; + self.header .convert_numsync(options.numsync) .into_iter() .try_for_each(|(offset, length)| self.free_record(offset, length)) - .map_err(Error::Io) + .map_err(Error::Io)?; + + self.read_write.state = WriteState::Dirty; + + Ok(()) + } +} + +impl Drop for Gdbm { + fn drop(&mut self) { + let db: &mut dyn Any = self as &mut dyn Any; + if let Some(db) = db.downcast_mut::>() { + let _ = db.sync(); + } } } -struct GDBMIterator<'a, R> { +struct GDBMIterator<'a, R: 'static> { key_or_value: KeyOrValue, db: &'a mut Gdbm, slot: Option>, @@ -835,6 +928,7 @@ struct Slot { impl<'a, R> GDBMIterator<'a, R> where Gdbm: CacheBucket, + R: Default + 'static, { fn next_slot(db: &Gdbm, slot: Slot) -> Option { match slot { @@ -901,6 +995,7 @@ where impl<'a, R> Iterator for GDBMIterator<'a, R> where Gdbm: CacheBucket, + R: Default + 'static, { type Item = Result<(Vec, Vec)>; diff --git a/src/options.rs b/src/options.rs index 6a2bb9a..9dbeeba 100644 --- a/src/options.rs +++ b/src/options.rs @@ -33,6 +33,7 @@ pub struct NotCreate; pub struct NotWrite; #[derive(Copy, Clone, Debug, Default)] pub struct Write { + pub sync: bool, pub create: C, } @@ -67,7 +68,10 @@ impl OpenOptions { OpenOptions { alignment: self.alignment, cachesize: self.cachesize, - write: Write { create: NotCreate }, + write: Write { + sync: false, + create: NotCreate, + }, } } } @@ -80,6 +84,17 @@ impl OpenOptions> { write: NotWrite, } } + + pub fn sync(self, sync: bool) -> OpenOptions> { + OpenOptions { + alignment: self.alignment, + cachesize: self.cachesize, + write: Write { + sync, + create: self.write.create, + }, + } + } } impl OpenOptions> { @@ -89,6 +104,7 @@ impl OpenOptions> { cachesize: self.cachesize, write: Write { create: Create::default(), + sync: self.write.sync, }, } } @@ -99,7 +115,10 @@ impl OpenOptions> { OpenOptions { alignment: self.alignment, cachesize: self.cachesize, - write: Write { create: NotCreate }, + write: Write { + create: NotCreate, + sync: self.write.sync, + }, } } @@ -112,6 +131,7 @@ impl OpenOptions> { offset, ..self.write.create }, + ..self.write }, } } @@ -125,6 +145,7 @@ impl OpenOptions> { endian, ..self.write.create }, + ..self.write }, } } @@ -138,6 +159,7 @@ impl OpenOptions> { no_numsync: !numsync, ..self.write.create }, + ..self.write }, } } @@ -151,6 +173,7 @@ impl OpenOptions> { newdb, ..self.write.create }, + ..self.write }, } } @@ -164,6 +187,7 @@ impl OpenOptions> { block_size, ..self.write.create }, + ..self.write }, } } @@ -187,6 +211,10 @@ impl OpenOptions> { .open(path.as_ref()) .map_err(Error::Io) .and_then(|f| Gdbm::::open(f, path, self.alignment, self.cachesize)) + .map(|mut db| { + db.set_sync(self.write.sync); + db + }) } } @@ -217,6 +245,10 @@ impl OpenOptions> { }) }) } + .map(|mut db| { + db.set_sync(self.write.sync); + db + }) } } diff --git a/tests/open.rs b/tests/open.rs index 6c79680..1f95188 100644 --- a/tests/open.rs +++ b/tests/open.rs @@ -122,6 +122,7 @@ fn api_open_bsexact() { match OpenOptions::new() .write() .create() + .newdb(true) .block_size(BlockSize::Exactly(block_size)) .open(old_db.path().to_str().unwrap()) {