Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement file blockstore #660

Merged
merged 20 commits into from
Jan 9, 2025
Merged
46 changes: 19 additions & 27 deletions mater/lib/src/stores/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,18 @@ struct FileBlockstoreInner {
// The byte length of the CARv1 payload. This is used by the indexing, so we
// know the locations of each blocks in the file.
data_size: u64,
// Is true if the blockstore was finalized.
is_finalized: bool,
}

impl FileBlockstore {
/// Create a new blockstore. If file at the path already exists it is truncated.
/// Create a new blockstore. If file at the path already exists the error is thrown.
pub async fn new<P>(path: P, roots: Vec<Cid>) -> Result<Self, Error>
where
P: AsRef<Path>,
{
if roots.is_empty() {
return Err(crate::Error::EmptyRootsError);
}

let mut file = File::options()
.create_new(true)
.write(true)
Expand All @@ -60,12 +62,11 @@ impl FileBlockstore {

// Write headers
v2::write_header(&mut file, &CarV2Header::default()).await?;
let written = v1::write_header(&mut file, &CarV1Header::new(roots.clone())).await?;
let written = v1::write_header(&mut file, &CarV1Header::new(roots)).await?;

let inner = FileBlockstoreInner {
store: file,
data_size: written as u64,
is_finalized: false,
};

Ok(Self {
Expand All @@ -74,6 +75,7 @@ impl FileBlockstore {
})
}

/// Check if the store contains a block with the cid.
async fn has(&self, cid: Cid) -> Result<bool, Error> {
Ok(self.index.read().await.get(&cid).is_some())
}
Expand Down Expand Up @@ -129,17 +131,12 @@ impl FileBlockstore {
Ok(())
}

/// Finalize this blockstore by writing the CARv2 header, along with
/// flattened index for more efficient subsequent read.
/// Finalize this blockstore by writing the CARv2 header, along with index
cernicc marked this conversation as resolved.
Show resolved Hide resolved
/// for more efficient subsequent read.
async fn finalize(self) -> Result<(), Error> {
// Locked underlying file handler
let mut inner = self.inner.lock().await;

// The blockstore was already finalized
if inner.is_finalized {
return Ok(());
}

// Correct CARv2 header
let header = CarV2Header {
characteristics: Characteristics::EMPTY,
cernicc marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -152,7 +149,7 @@ impl FileBlockstore {
inner.store.rewind().await?;
v2::write_header(&mut inner.store, &header).await?;

// Flatten and write the index
// Write the index
inner
.store
.seek(SeekFrom::Start(header.index_offset))
Expand All @@ -171,7 +168,6 @@ impl FileBlockstore {

// Flush underlying writer
inner.store.flush().await?;
inner.is_finalized = true;

Ok(())
}
Expand Down Expand Up @@ -211,7 +207,9 @@ impl blockstore::Blockstore for FileBlockstore {
}

async fn remove<const S: usize>(&self, _cid: &CidGeneric<S>) -> Result<(), blockstore::Error> {
unimplemented!("Operation not supported")
Err(blockstore::Error::FatalDatabaseError(
"remove operation not supported".to_string(),
))
}

async fn close(self) -> Result<(), blockstore::Error> {
Expand All @@ -231,7 +229,7 @@ mod tests {
io::{AsyncReadExt, AsyncSeekExt},
};

use crate::{CarV2Reader, Error, FileBlockstore};
use crate::{CarV2Reader, FileBlockstore};

#[tokio::test]
async fn file_exists() {
Expand All @@ -244,12 +242,9 @@ mod tests {
#[tokio::test]
async fn test_blockstore() {
// Car file
let mut file =
File::open(PathBuf::from_str("tests/fixtures/car_v2/spaceglenda.car").unwrap())
.await
.unwrap();
let mut original_archive = Vec::new();
file.read_to_end(&mut original_archive).await.unwrap();
let original_archive = tokio::fs::read("tests/fixtures/car_v2/spaceglenda.car")
.await
.unwrap();

let mut reader = CarV2Reader::new(Cursor::new(original_archive.clone()));
reader.read_pragma().await.unwrap();
Expand All @@ -264,7 +259,6 @@ mod tests {
.unwrap();

loop {
// NOTE(@jmg-duarte,22/05/2024): review this
match reader.read_block().await {
Ok((cid, data)) => {
// Add block to the store
Expand All @@ -284,10 +278,8 @@ mod tests {
break;
}
}
else_ => {
// With the length check above this branch should actually be unreachable
assert!(matches!(else_, Err(Error::IoError(_))));
break;
_ => {
unreachable!();
cernicc marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down