Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement: add db sync option #92

Merged
merged 3 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use std::{fmt::Display, fmt::Formatter, io};
pub enum Error {
/// IO error.
Io(io::Error),
/// Database may be inconsistent since an earlier error. Writes are disabled.
Inconsistent,
/// Bucket has too many elements or bucket bits > directory bits.
BadBucket {
/// Bucket file offset.
Expand Down
147 changes: 121 additions & 26 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
extern crate base64;

use base64::Engine;
use std::any::Any;
use std::io::{self, Read, Seek, SeekFrom, Write};

mod avail;
Expand Down Expand Up @@ -38,7 +39,6 @@ pub use options::{BlockSize, ConvertOptions, Create, OpenOptions};
use ser::{write32, write64};
pub use ser::{Alignment, Endian, Layout, Offset};
use std::fs::File;
use std::marker::PhantomData;

#[cfg(target_os = "linux")]
use std::os::linux::fs::MetadataExt;
Expand All @@ -61,10 +61,21 @@ pub enum ExportBinMode {
Exp64,
}

#[derive(Copy, Clone, Debug)]
#[derive(Copy, Clone, Debug, Default, PartialEq)]
enum WriteState {
#[default]
Clean,
Dirty,
Inconsistent,
}

#[derive(Copy, Clone, Debug, Default)]
pub struct ReadOnly;
#[derive(Copy, Clone, Debug)]
pub struct ReadWrite;
#[derive(Copy, Clone, Debug, Default)]
pub struct ReadWrite {
sync: bool,
state: WriteState,
}

pub trait CacheBucket {
fn cache_bucket(&mut self, offset: u64, bucket: Bucket) -> Result<()>;
Expand All @@ -82,14 +93,14 @@ fn read_ofs(f: &mut std::fs::File, ofs: u64, total_size: usize) -> io::Result<Ve
}

// #[derive(Debug)]
pub struct Gdbm<R> {
pub struct Gdbm<R: 'static> {
pathname: String,
f: std::fs::File,
pub header: Header,
pub dir: Directory,
bucket_cache: BucketCache,

rw_phantom: PhantomData<R>,
read_write: R,
}

// cache_bucket for ReadOnly variant ignores (never receives) dirty displaced buckets.
Expand All @@ -115,6 +126,7 @@ impl CacheBucket for Gdbm<ReadWrite> {
impl<R> Gdbm<R>
where
Gdbm<R>: CacheBucket,
R: Default,
{
// API: open database file, read and validate header
pub fn open<P: AsRef<std::path::Path>>(
Expand Down Expand Up @@ -157,7 +169,7 @@ where
header,
dir,
bucket_cache,
rw_phantom: PhantomData,
read_write: R::default(),
})
}

Expand Down Expand Up @@ -448,14 +460,27 @@ impl Gdbm<ReadWrite> {
BucketCache::new(cache_buckets, Some((bucket_offset, bucket)))
};

Ok(Gdbm {
let mut db = Gdbm {
pathname: path.as_ref().to_string_lossy().to_string(),
f,
header,
dir,
bucket_cache,
rw_phantom: PhantomData,
})
read_write: ReadWrite {
sync: open_options.write.sync,
state: WriteState::Dirty,
},
};

if db.read_write.sync {
db.sync()?;
}

Ok(db)
}

pub fn set_sync(&mut self, sync: bool) {
self.read_write.sync = sync;
}

pub fn import_ascii(&mut self, reader: &mut impl Read) -> Result<()> {
Expand Down Expand Up @@ -632,28 +657,44 @@ impl Gdbm<ReadWrite> {

// write out any cached, not-yet-written metadata and data to storage
fn write_dirty(&mut self) -> io::Result<()> {
self.read_write.state = WriteState::Inconsistent;

self.write_buckets()?;
self.write_dir()?;
self.write_header()?;

self.read_write.state = WriteState::Clean;

Ok(())
}

// API: ensure database is flushed to stable storage
pub fn sync(&mut self) -> Result<()> {
self.header.increment_numsync();
self.write_dirty()
.and_then(|_| self.f.sync_data())
.map_err(Error::Io)
match self.read_write.state {
WriteState::Clean => Ok(()),
WriteState::Inconsistent => Err(Error::Inconsistent),
WriteState::Dirty => {
self.header.increment_numsync();
self.write_dirty()
.and_then(|_| self.f.sync_data())
.map_err(Error::Io)
}
}
}

// API: remove a key/value pair from db, given a key
pub fn remove<'a, K: Into<BytesRef<'a>>>(&mut self, key: K) -> Result<Option<Vec<u8>>> {
let get_opt = self.int_get(key.into().as_ref())?;
fn int_remove(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
let get_opt = self.int_get(key)?;

if get_opt.is_none() {
return Ok(None);
}

if self.read_write.state == WriteState::Inconsistent {
return Err(Error::Inconsistent);
}

self.read_write.state = WriteState::Inconsistent;

let (elem_ofs, data) = get_opt.unwrap();

let elem = self
Expand All @@ -665,12 +706,22 @@ impl Gdbm<ReadWrite> {
// release record bytes to available-space pool
self.free_record(elem.data_ofs, elem.key_size + elem.data_size)?;

// flush any dirty pages to OS
self.write_dirty()?;
self.read_write.state = WriteState::Dirty;

Ok(Some(data))
}

// API: remove a key/value pair from db, given a key
pub fn remove<'a, K: Into<BytesRef<'a>>>(&mut self, key: K) -> Result<Option<Vec<u8>>> {
self.int_remove(key.into().as_ref()).and_then(|old_value| {
if old_value.is_some() && self.read_write.sync {
self.sync()?;
}

Ok(old_value)
})
}

fn allocate_record(&mut self, size: u32) -> io::Result<u64> {
let (offset, length) = match self
.bucket_cache
Expand All @@ -697,6 +748,12 @@ impl Gdbm<ReadWrite> {
}

fn int_insert(&mut self, key: Vec<u8>, data: Vec<u8>) -> Result<()> {
if self.read_write.state == WriteState::Inconsistent {
return Err(Error::Inconsistent);
}

self.read_write.state = WriteState::Inconsistent;

let offset = self.allocate_record((key.len() + data.len()) as u32)?;

self.f
Expand All @@ -717,6 +774,8 @@ impl Gdbm<ReadWrite> {
.unwrap()
.insert(bucket_elem);

self.read_write.state = WriteState::Dirty;

Ok(())
}

Expand All @@ -726,10 +785,18 @@ impl Gdbm<ReadWrite> {
value: V,
) -> Result<Option<Vec<u8>>> {
let key = key.into();
self.remove(key.as_ref()).and_then(|oldkey| {
self.int_insert(key.into_vec(), value.into().into_vec())
.map(|_| oldkey)
})
self.int_remove(key.as_ref())
.and_then(|oldvalue| {
self.int_insert(key.into_vec(), value.into().into_vec())
.map(|_| oldvalue)
})
.and_then(|oldvalue| {
if self.read_write.sync {
self.sync()?;
}

Ok(oldvalue)
})
}

pub fn try_insert<K: Into<Bytes>, V: Into<Bytes>>(
Expand All @@ -742,7 +809,14 @@ impl Gdbm<ReadWrite> {
Some(_) => Ok((false, olddata)),
_ => self
.int_insert(key.into_vec(), value.into().into_vec())
.map(|_| (true, None)),
.map(|_| (true, None))
.and_then(|result| {
if self.read_write.sync {
self.sync()?;
}

Ok(result)
}),
})
}

Expand Down Expand Up @@ -806,15 +880,34 @@ impl Gdbm<ReadWrite> {

// API: convert
pub fn convert(&mut self, options: &ConvertOptions) -> Result<()> {
if self.read_write.state == WriteState::Inconsistent {
return Err(Error::Inconsistent);
}

self.read_write.state = WriteState::Inconsistent;

self.header
.convert_numsync(options.numsync)
.into_iter()
.try_for_each(|(offset, length)| self.free_record(offset, length))
.map_err(Error::Io)
.map_err(Error::Io)?;

self.read_write.state = WriteState::Dirty;

Ok(())
}
}

impl<R> Drop for Gdbm<R> {
fn drop(&mut self) {
let db: &mut dyn Any = self as &mut dyn Any;
if let Some(db) = db.downcast_mut::<Gdbm<ReadWrite>>() {
let _ = db.sync();
}
}
}

struct GDBMIterator<'a, R> {
struct GDBMIterator<'a, R: 'static> {
key_or_value: KeyOrValue,
db: &'a mut Gdbm<R>,
slot: Option<Result<Slot>>,
Expand All @@ -835,6 +928,7 @@ struct Slot {
impl<'a, R> GDBMIterator<'a, R>
where
Gdbm<R>: CacheBucket,
R: Default + 'static,
{
fn next_slot(db: &Gdbm<R>, slot: Slot) -> Option<Slot> {
match slot {
Expand Down Expand Up @@ -901,6 +995,7 @@ where
impl<'a, R> Iterator for GDBMIterator<'a, R>
where
Gdbm<R>: CacheBucket,
R: Default + 'static,
{
type Item = Result<(Vec<u8>, Vec<u8>)>;

Expand Down
Loading