Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a WASI file system that grants access to IPFS #11

Draft
wants to merge 14 commits into
base: master
Choose a base branch
from
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[workspace]
members = ["lib/net", "lib/proc"]
members = ["lib/fs", "lib/net", "lib/proc"]

[dependencies]
anyhow = "1"
Expand All @@ -18,5 +18,6 @@ rand = "0.8"
tokio = { version = "1.x", features = ["full"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
fs = { path = "lib/fs" }
net = { path = "lib/net" }
proc = { path = "lib/proc" }
21 changes: 21 additions & 0 deletions lib/fs/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[package]
name = "fs"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1"
bytes = "1.7.1"
futures = "0.3.29"
ipfs-api-backend-hyper = "0.6.0"
ipfs-api-prelude = "0.6.0"
libp2p = { version = "0.54.0", features = ["full"] }
rand = "0.8"
tokio = { version = "1.36", features = ["full"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
wasmer = { version = "5.0.0-rc.1", features = ["sys"] }
wasmer-wasix = { version = "0.29" }
net = { path = "../../lib/net" }
271 changes: 271 additions & 0 deletions lib/fs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
use futures::executor::block_on;
use futures::future::BoxFuture;
use std::fmt;
use std::io::{self, Cursor, SeekFrom};
use std::marker::{Send, Sync};
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
use tracing::instrument;

use wasmer_wasix::{virtual_fs, FsError};

use net::ipfs::Client;

pub struct IpfsFs {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a doc string to this type too?

client: Client,
}

impl IpfsFs {
pub fn new(client: Client) -> IpfsFs {
return IpfsFs { client: client };
}
}

// We need to implement Debug to ble able to implement the other traits.
impl fmt::Debug for IpfsFs {
fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result {
Ok(())
}
}

impl virtual_fs::FileSystem for IpfsFs {
#[instrument(level = "trace", skip_all, fields(?path), ret)]
fn readlink(&self, path: &Path) -> virtual_fs::Result<PathBuf> {
Err(FsError::Unsupported)
}

#[instrument(level = "trace", skip_all, fields(?path), ret)]
fn read_dir(&self, path: &Path) -> virtual_fs::Result<virtual_fs::ReadDir> {
Err(FsError::Unsupported)
}

#[instrument(level = "trace", skip_all, fields(?path), ret)]
fn create_dir(&self, path: &Path) -> virtual_fs::Result<()> {
Err(FsError::Unsupported)
}

#[instrument(level = "trace", skip_all, fields(?path), ret)]
fn remove_dir(&self, path: &Path) -> virtual_fs::Result<()> {
Err(FsError::Unsupported)
}

#[instrument(level = "trace", skip_all, fields(?from, ?to), ret)]
fn rename<'a>(&'a self, from: &'a Path, to: &'a Path) -> BoxFuture<'a, virtual_fs::Result<()>> {
Box::pin(async { Err(FsError::Unsupported) })
}

#[instrument(level = "trace", skip_all, fields(?path), ret)]
fn metadata(&self, path: &Path) -> virtual_fs::Result<virtual_fs::Metadata> {
Ok(virtual_fs::Metadata::default())
}

#[instrument(level = "trace", skip_all, fields(?path), ret)]
fn symlink_metadata(&self, path: &Path) -> virtual_fs::Result<virtual_fs::Metadata> {
Err(FsError::Unsupported)
}

#[instrument(level = "trace", skip_all, fields(?path), ret)]
fn remove_file(&self, path: &Path) -> virtual_fs::Result<()> {
Err(FsError::Unsupported)
}

#[instrument(level = "trace", skip_all, fields(), ret)]
fn new_open_options(&self) -> virtual_fs::OpenOptions {
let mut file_opener = virtual_fs::OpenOptions::new(self);
file_opener.read(true);
file_opener
}

#[instrument(level = "trace", skip_all, fields(?_name, ?_path, ?_fs), ret)]
fn mount(
&self,
_name: String,
_path: &Path,
_fs: Box<dyn virtual_fs::FileSystem + Send + Sync>,
) -> virtual_fs::Result<()> {
Err(FsError::Unsupported)
}
}

impl virtual_fs::FileOpener for IpfsFs {
#[instrument(level = "trace", skip_all, fields(?path, ?conf), ret)]
fn open(
&self,
path: &Path,
conf: &virtual_fs::OpenOptionsConfig,
) -> virtual_fs::Result<Box<dyn virtual_fs::VirtualFile + Send + Sync + 'static>> {
let path_str = path.to_str().ok_or(FsError::EntryNotFound)?;
let bytes_future = self.client.get_file(path_str);

let bytes = block_on(bytes_future);

let ipfs_file = match bytes {
Ok(b) => IpfsFile::new(path_str.to_owned(), b),
Err(_) => return Err(FsError::IOError), // TODO: use a proper error.
};
Ok(Box::new(ipfs_file))
}
}

// unsafe impl Send for IpfsFs {}

// unsafe impl Sync for IpfsFs {}

pub struct IpfsFile {
// bytes: Vec<u8>,
path: String,
size: usize,
cursor: Cursor<Vec<u8>>,
}

impl IpfsFile {
#[instrument(level = "trace", skip_all, fields(?bytes), ret)]
pub fn new(path: String, bytes: Vec<u8>) -> IpfsFile {
IpfsFile {
path: path,
size: bytes.len(),
cursor: Cursor::new(bytes),
}
}
}

impl fmt::Debug for IpfsFile {
fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result {
Ok(())
}
}

impl AsyncRead for IpfsFile {
#[instrument(level = "trace", skip_all, fields(?cx, ?buf), ret)]
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}

// TODO
impl AsyncSeek for IpfsFile {
#[instrument(level = "trace", skip_all, fields(?position), ret)]
fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
Ok(())
}

#[instrument(level = "trace", skip_all, fields(?cx), ret)]
fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
Poll::Ready(Ok(0))
}
}

impl AsyncWrite for IpfsFile {
#[instrument(level = "trace", skip_all, fields(?cx, ?buf), ret)]
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
Poll::Ready(Err(io::Error::new(
io::ErrorKind::Unsupported,
FsError::Unsupported,
)))
}

#[instrument(level = "trace", skip_all, fields(?cx), ret)]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Err(io::Error::new(
io::ErrorKind::Unsupported,
FsError::Unsupported,
)))
}

#[instrument(level = "trace", skip_all, fields(?cx), ret)]
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Err(io::Error::new(
io::ErrorKind::Unsupported,
FsError::Unsupported,
)))
}
}

// unsafe impl Send for IpfsFile {}

// impl Unpin for IpfsFile {}

impl virtual_fs::VirtualFile for IpfsFile {
#[instrument(level = "trace", skip_all, fields(), ret)]
fn last_accessed(&self) -> u64 {
0
}

#[instrument(level = "trace", skip_all, fields(), ret)]
fn last_modified(&self) -> u64 {
0
}

#[instrument(level = "trace", skip_all, fields(), ret)]
fn created_time(&self) -> u64 {
0
}

#[allow(unused_variables)]
#[instrument(level = "trace", skip_all, fields(?atime, ?mtime), ret)]
fn set_times(&mut self, atime: Option<u64>, mtime: Option<u64>) -> virtual_fs::Result<()> {
Err(FsError::Unsupported)
}

#[instrument(level = "trace", skip_all, fields(), ret)]
fn size(&self) -> u64 {
self.size as u64
}

#[instrument(level = "trace", skip_all, fields(?new_size), ret)]
fn set_len(&mut self, new_size: u64) -> virtual_fs::Result<()> {
Err(FsError::Unsupported)
}

#[instrument(level = "trace", skip_all, fields(), ret)]
fn unlink(&mut self) -> virtual_fs::Result<()> {
Err(FsError::Unsupported)
}

#[instrument(level = "trace", skip_all, fields(), ret)]
fn is_open(&self) -> bool {
true
}

#[instrument(level = "trace", skip_all, fields(), ret)]
fn get_special_fd(&self) -> Option<u32> {
None
}

#[instrument(level = "trace", skip_all, fields(?_offset, ?_len), ret)]
fn write_from_mmap(&mut self, _offset: u64, _len: u64) -> std::io::Result<()> {
Err(std::io::ErrorKind::Unsupported.into())
}

#[instrument(level = "trace", skip_all, fields(?src), ret)]
fn copy_reference(
&mut self,
mut src: Box<dyn virtual_fs::VirtualFile + Send + Sync + 'static>,
) -> BoxFuture<'_, std::io::Result<()>> {
Box::pin(async move {
let bytes_written = tokio::io::copy(&mut src, self).await?;
tracing::trace!(bytes_written, "Copying file into host filesystem",);
Ok(())
})
}

#[instrument(level = "trace", skip_all, fields(?cx), ret)]
fn poll_read_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
Poll::Ready(Ok(0))
}

#[instrument(level = "trace", skip_all, fields(?cx), ret)]
fn poll_write_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
Poll::Ready(Ok(0))
}
}
18 changes: 16 additions & 2 deletions lib/net/src/ipfs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use bytes::Bytes;
use futures::TryStreamExt;
use ipfs_api_backend_hyper::{Error, IpfsApi, IpfsClient, TryFromUri};
use ipfs_api_prelude::BoxStream;
use libp2p::Multiaddr;
Expand All @@ -16,7 +17,20 @@ impl Client {
}
}

pub fn get_file(&self, path: String) -> BoxStream<Bytes, Error> {
self.client.cat(path.as_str())
pub fn open_stream(&self, path: &str) -> BoxStream<Bytes, Error> {
self.client.cat(path)
}

pub async fn consume_stream(&self, stream: BoxStream<Bytes, Error>) -> Result<Vec<u8>, Error> {
stream.map_ok(|chunk| chunk.to_vec()).try_concat().await
}

pub async fn get_file(&self, path: &str) -> Result<Vec<u8>, Error> {
let stream = self.open_stream(path);
self.consume_stream(stream).await
}

pub fn delete_me(&self) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's delete_me? Is that some kind of destructor method?

// let some_unix_fs = self.client.object_new(Some(ObjectTemplate::UnixFsDir));
}
}
4 changes: 2 additions & 2 deletions lib/proc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ uuid = { version = "1.10.0", features = [
"fast-rng", # Use a faster (but still sufficiently random) RNG
"macro-diagnostics", # Enable better diagnostics for compile-time UUIDs
] }
wasmer = { version = "4.x" }
wasmer-wasix = { version = "0.26" }
wasmer = { version = "5.0.0-rc.1", features = ["sys"] }
wasmer-wasix = { version = "0.29" }
tracing = "0.1.37"
15 changes: 12 additions & 3 deletions lib/proc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use std::marker::{Send, Sync};

use uuid::Uuid;
use wasmer::{self};
use wasmer_wasix::{WasiEnv, WasiFunctionEnv};
use wasmer_wasix::{virtual_fs, WasiEnv, WasiFunctionEnv};

const START_FUNCTION: &str = "_start";

pub struct WasmProcess {
function: wasmer::Function,
Expand Down Expand Up @@ -40,10 +44,15 @@ impl WasmRuntime {
&mut self.store
}

pub fn build(&mut self, bytecode: Vec<u8>) -> Result<WasmProcess, Box<dyn std::error::Error>> {
pub fn build(
&mut self,
bytecode: Vec<u8>,
fs: Box<dyn virtual_fs::FileSystem + Send + Sync>,
) -> Result<WasmProcess, Box<dyn std::error::Error>> {
let module = wasmer::Module::new(&self.store, bytecode).expect("couldn't load WASM module");
let uuid = Uuid::new_v4();
let mut wasi_env = WasiEnv::builder(uuid)
.fs(fs)
// .args(&["arg1", "arg2"])
// .env("KEY", "VALUE")
.finalize(self.store_mut())?;
Expand All @@ -56,7 +65,7 @@ impl WasmRuntime {

wasi_env.initialize(&mut self.store, instance.clone())?;

let function = instance.exports.get_function("_start")?;
let function = instance.exports.get_function(START_FUNCTION)?;

Ok(WasmProcess::new(wasi_env, function.to_owned()))
}
Expand Down
Loading
Loading