diff --git a/Cargo.toml b/Cargo.toml index ed59530..caaaa4a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [workspace] -members = ["lib/net", "lib/proc"] +members = ["lib/fs", "lib/net", "lib/proc"] [dependencies] anyhow = "1" @@ -18,5 +18,6 @@ rand = "0.8" tokio = { version = "1.x", features = ["full"] } tracing = "0.1.37" tracing-subscriber = { version = "0.3", features = ["env-filter"] } +fs = { path = "lib/fs" } net = { path = "lib/net" } proc = { path = "lib/proc" } diff --git a/lib/fs/Cargo.toml b/lib/fs/Cargo.toml new file mode 100644 index 0000000..25e4bc6 --- /dev/null +++ b/lib/fs/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "fs" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1" +bytes = "1.7.1" +futures = "0.3.29" +ipfs-api-backend-hyper = "0.6.0" +ipfs-api-prelude = "0.6.0" +libp2p = { version = "0.54.0", features = ["full"] } +rand = "0.8" +tokio = { version = "1.36", features = ["full"] } +tracing = "0.1.37" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +wasmer = { version = "5.0.0-rc.1", features = ["sys"] } +wasmer-wasix = { version = "0.29" } +net = { path = "../../lib/net" } diff --git a/lib/fs/src/lib.rs b/lib/fs/src/lib.rs new file mode 100644 index 0000000..044a252 --- /dev/null +++ b/lib/fs/src/lib.rs @@ -0,0 +1,271 @@ +use futures::executor::block_on; +use futures::future::BoxFuture; +use std::fmt; +use std::io::{self, Cursor, SeekFrom}; +use std::marker::{Send, Sync}; +use std::path::{Path, PathBuf}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; +use tracing::instrument; + +use wasmer_wasix::{virtual_fs, FsError}; + +use net::ipfs::Client; + +pub struct IpfsFs { + client: Client, +} + +impl IpfsFs { + pub fn new(client: Client) -> IpfsFs { + return IpfsFs { client: client }; + } +} + +// We need to implement Debug to ble able to implement the other traits. +impl fmt::Debug for IpfsFs { + fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result { + Ok(()) + } +} + +impl virtual_fs::FileSystem for IpfsFs { + #[instrument(level = "trace", skip_all, fields(?path), ret)] + fn readlink(&self, path: &Path) -> virtual_fs::Result { + Err(FsError::Unsupported) + } + + #[instrument(level = "trace", skip_all, fields(?path), ret)] + fn read_dir(&self, path: &Path) -> virtual_fs::Result { + Err(FsError::Unsupported) + } + + #[instrument(level = "trace", skip_all, fields(?path), ret)] + fn create_dir(&self, path: &Path) -> virtual_fs::Result<()> { + Err(FsError::Unsupported) + } + + #[instrument(level = "trace", skip_all, fields(?path), ret)] + fn remove_dir(&self, path: &Path) -> virtual_fs::Result<()> { + Err(FsError::Unsupported) + } + + #[instrument(level = "trace", skip_all, fields(?from, ?to), ret)] + fn rename<'a>(&'a self, from: &'a Path, to: &'a Path) -> BoxFuture<'a, virtual_fs::Result<()>> { + Box::pin(async { Err(FsError::Unsupported) }) + } + + #[instrument(level = "trace", skip_all, fields(?path), ret)] + fn metadata(&self, path: &Path) -> virtual_fs::Result { + Ok(virtual_fs::Metadata::default()) + } + + #[instrument(level = "trace", skip_all, fields(?path), ret)] + fn symlink_metadata(&self, path: &Path) -> virtual_fs::Result { + Err(FsError::Unsupported) + } + + #[instrument(level = "trace", skip_all, fields(?path), ret)] + fn remove_file(&self, path: &Path) -> virtual_fs::Result<()> { + Err(FsError::Unsupported) + } + + #[instrument(level = "trace", skip_all, fields(), ret)] + fn new_open_options(&self) -> virtual_fs::OpenOptions { + let mut file_opener = virtual_fs::OpenOptions::new(self); + file_opener.read(true); + file_opener + } + + #[instrument(level = "trace", skip_all, fields(?_name, ?_path, ?_fs), ret)] + fn mount( + &self, + _name: String, + _path: &Path, + _fs: Box, + ) -> virtual_fs::Result<()> { + Err(FsError::Unsupported) + } +} + +impl virtual_fs::FileOpener for IpfsFs { + #[instrument(level = "trace", skip_all, fields(?path, ?conf), ret)] + fn open( + &self, + path: &Path, + conf: &virtual_fs::OpenOptionsConfig, + ) -> virtual_fs::Result> { + let path_str = path.to_str().ok_or(FsError::EntryNotFound)?; + let bytes_future = self.client.get_file(path_str); + + let bytes = block_on(bytes_future); + + let ipfs_file = match bytes { + Ok(b) => IpfsFile::new(path_str.to_owned(), b), + Err(_) => return Err(FsError::IOError), // TODO: use a proper error. + }; + Ok(Box::new(ipfs_file)) + } +} + +// unsafe impl Send for IpfsFs {} + +// unsafe impl Sync for IpfsFs {} + +pub struct IpfsFile { + // bytes: Vec, + path: String, + size: usize, + cursor: Cursor>, +} + +impl IpfsFile { + #[instrument(level = "trace", skip_all, fields(?bytes), ret)] + pub fn new(path: String, bytes: Vec) -> IpfsFile { + IpfsFile { + path: path, + size: bytes.len(), + cursor: Cursor::new(bytes), + } + } +} + +impl fmt::Debug for IpfsFile { + fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result { + Ok(()) + } +} + +impl AsyncRead for IpfsFile { + #[instrument(level = "trace", skip_all, fields(?cx, ?buf), ret)] + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } +} + +// TODO +impl AsyncSeek for IpfsFile { + #[instrument(level = "trace", skip_all, fields(?position), ret)] + fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> { + Ok(()) + } + + #[instrument(level = "trace", skip_all, fields(?cx), ret)] + fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(0)) + } +} + +impl AsyncWrite for IpfsFile { + #[instrument(level = "trace", skip_all, fields(?cx, ?buf), ret)] + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Poll::Ready(Err(io::Error::new( + io::ErrorKind::Unsupported, + FsError::Unsupported, + ))) + } + + #[instrument(level = "trace", skip_all, fields(?cx), ret)] + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Err(io::Error::new( + io::ErrorKind::Unsupported, + FsError::Unsupported, + ))) + } + + #[instrument(level = "trace", skip_all, fields(?cx), ret)] + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Err(io::Error::new( + io::ErrorKind::Unsupported, + FsError::Unsupported, + ))) + } +} + +// unsafe impl Send for IpfsFile {} + +// impl Unpin for IpfsFile {} + +impl virtual_fs::VirtualFile for IpfsFile { + #[instrument(level = "trace", skip_all, fields(), ret)] + fn last_accessed(&self) -> u64 { + 0 + } + + #[instrument(level = "trace", skip_all, fields(), ret)] + fn last_modified(&self) -> u64 { + 0 + } + + #[instrument(level = "trace", skip_all, fields(), ret)] + fn created_time(&self) -> u64 { + 0 + } + + #[allow(unused_variables)] + #[instrument(level = "trace", skip_all, fields(?atime, ?mtime), ret)] + fn set_times(&mut self, atime: Option, mtime: Option) -> virtual_fs::Result<()> { + Err(FsError::Unsupported) + } + + #[instrument(level = "trace", skip_all, fields(), ret)] + fn size(&self) -> u64 { + self.size as u64 + } + + #[instrument(level = "trace", skip_all, fields(?new_size), ret)] + fn set_len(&mut self, new_size: u64) -> virtual_fs::Result<()> { + Err(FsError::Unsupported) + } + + #[instrument(level = "trace", skip_all, fields(), ret)] + fn unlink(&mut self) -> virtual_fs::Result<()> { + Err(FsError::Unsupported) + } + + #[instrument(level = "trace", skip_all, fields(), ret)] + fn is_open(&self) -> bool { + true + } + + #[instrument(level = "trace", skip_all, fields(), ret)] + fn get_special_fd(&self) -> Option { + None + } + + #[instrument(level = "trace", skip_all, fields(?_offset, ?_len), ret)] + fn write_from_mmap(&mut self, _offset: u64, _len: u64) -> std::io::Result<()> { + Err(std::io::ErrorKind::Unsupported.into()) + } + + #[instrument(level = "trace", skip_all, fields(?src), ret)] + fn copy_reference( + &mut self, + mut src: Box, + ) -> BoxFuture<'_, std::io::Result<()>> { + Box::pin(async move { + let bytes_written = tokio::io::copy(&mut src, self).await?; + tracing::trace!(bytes_written, "Copying file into host filesystem",); + Ok(()) + }) + } + + #[instrument(level = "trace", skip_all, fields(?cx), ret)] + fn poll_read_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(0)) + } + + #[instrument(level = "trace", skip_all, fields(?cx), ret)] + fn poll_write_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(0)) + } +} diff --git a/lib/net/src/ipfs.rs b/lib/net/src/ipfs.rs index 033535e..1e78554 100644 --- a/lib/net/src/ipfs.rs +++ b/lib/net/src/ipfs.rs @@ -1,4 +1,5 @@ use bytes::Bytes; +use futures::TryStreamExt; use ipfs_api_backend_hyper::{Error, IpfsApi, IpfsClient, TryFromUri}; use ipfs_api_prelude::BoxStream; use libp2p::Multiaddr; @@ -16,7 +17,20 @@ impl Client { } } - pub fn get_file(&self, path: String) -> BoxStream { - self.client.cat(path.as_str()) + pub fn open_stream(&self, path: &str) -> BoxStream { + self.client.cat(path) + } + + pub async fn consume_stream(&self, stream: BoxStream) -> Result, Error> { + stream.map_ok(|chunk| chunk.to_vec()).try_concat().await + } + + pub async fn get_file(&self, path: &str) -> Result, Error> { + let stream = self.open_stream(path); + self.consume_stream(stream).await + } + + pub fn delete_me(&self) { + // let some_unix_fs = self.client.object_new(Some(ObjectTemplate::UnixFsDir)); } } diff --git a/lib/proc/Cargo.toml b/lib/proc/Cargo.toml index b2f87a1..77aefa4 100644 --- a/lib/proc/Cargo.toml +++ b/lib/proc/Cargo.toml @@ -11,6 +11,6 @@ uuid = { version = "1.10.0", features = [ "fast-rng", # Use a faster (but still sufficiently random) RNG "macro-diagnostics", # Enable better diagnostics for compile-time UUIDs ] } -wasmer = { version = "4.x" } -wasmer-wasix = { version = "0.26" } +wasmer = { version = "5.0.0-rc.1", features = ["sys"] } +wasmer-wasix = { version = "0.29" } tracing = "0.1.37" diff --git a/lib/proc/src/lib.rs b/lib/proc/src/lib.rs index d7088c4..2a975d1 100644 --- a/lib/proc/src/lib.rs +++ b/lib/proc/src/lib.rs @@ -1,6 +1,10 @@ +use std::marker::{Send, Sync}; + use uuid::Uuid; use wasmer::{self}; -use wasmer_wasix::{WasiEnv, WasiFunctionEnv}; +use wasmer_wasix::{virtual_fs, WasiEnv, WasiFunctionEnv}; + +const START_FUNCTION: &str = "_start"; pub struct WasmProcess { function: wasmer::Function, @@ -40,10 +44,15 @@ impl WasmRuntime { &mut self.store } - pub fn build(&mut self, bytecode: Vec) -> Result> { + pub fn build( + &mut self, + bytecode: Vec, + fs: Box, + ) -> Result> { let module = wasmer::Module::new(&self.store, bytecode).expect("couldn't load WASM module"); let uuid = Uuid::new_v4(); let mut wasi_env = WasiEnv::builder(uuid) + .fs(fs) // .args(&["arg1", "arg2"]) // .env("KEY", "VALUE") .finalize(self.store_mut())?; @@ -56,7 +65,7 @@ impl WasmRuntime { wasi_env.initialize(&mut self.store, instance.clone())?; - let function = instance.exports.get_function("_start")?; + let function = instance.exports.get_function(START_FUNCTION)?; Ok(WasmProcess::new(wasi_env, function.to_owned())) } diff --git a/src/cfg.rs b/src/cfg.rs index f7a7228..46d7a8f 100644 --- a/src/cfg.rs +++ b/src/cfg.rs @@ -42,6 +42,12 @@ pub struct DefaultCfg { listen_addr: Multiaddr, } +impl Default for DefaultCfg { + fn default() -> Self { + Self::new() + } +} + impl DefaultCfg { // Default node configuration. pub fn new() -> Self { @@ -56,7 +62,7 @@ impl DefaultCfg { // Check if the node is a Kademlia client from the command-line arguments. fn is_kad_client(&self) -> bool { - return self.args.kad_client; + self.args.kad_client } } diff --git a/src/main.rs b/src/main.rs index de4e705..0bc596b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,11 @@ use std::{error::Error, time::Duration}; use anyhow::Result; +pub use fs::IpfsFs; use futures::TryStreamExt; use libp2p::{identify, kad, mdns, noise, ping, swarm, tcp, yamux}; -use net::{DefaultBehaviour, DefaultBehaviourEvent, DefaultSwarm}; -use proc::{self, WasmRuntime}; +pub use net::{DefaultBehaviour, DefaultBehaviourEvent, DefaultSwarm}; +pub use proc::{self, WasmRuntime}; use tracing_subscriber::EnvFilter; pub mod cfg; @@ -14,16 +15,18 @@ async fn main() -> Result<(), Box> { // Use the default configuration. let config: &dyn cfg::Cfg = &cfg::DefaultCfg::new(); - // Start configuring a `fmt` subscriber + std::env::set_var("RUST_LOG", "info,wasmer_wasix=trace,fs=trace"); + + // Start configuring a `fmt` subscriber. let subscriber = tracing_subscriber::fmt() .with_env_filter(EnvFilter::from_default_env()) .compact() // use abbreviated log format // .with_file(true) // .with_thread_ids(true) - .with_max_level(tracing::Level::INFO) + // .with_max_level(tracing::Level::TRACE) .finish(); - // Set the subscriber as global default + // Set the subscriber as global default. tracing::subscriber::set_global_default(subscriber).unwrap(); // Create a MDNS network behaviour. @@ -106,7 +109,7 @@ async fn main() -> Result<(), Box> { tracing::info!("Fetch bytecode from {}...", config.load()); let bytecode = ipfs_client - .get_file(config.load()) + .open_stream(config.load().as_str()) .map_ok(|chunk| chunk.to_vec()) .try_concat() .await?; @@ -116,7 +119,8 @@ async fn main() -> Result<(), Box> { let mut wasm_runtime = WasmRuntime::new(); tracing::info!("Initialize WASM module instance..."); - let mut wasm_process = wasm_runtime.build(bytecode)?; + let ipfs_fs = IpfsFs::new(ipfs_client); + let mut wasm_process = wasm_runtime.build(bytecode, Box::new(ipfs_fs))?; wasm_process.run(wasm_runtime.store_mut())?; Ok(()) } diff --git a/tests/wasm/print_loop/Makefile b/tests/wasm/print_loop/Makefile index 7c242aa..55e1560 100644 --- a/tests/wasm/print_loop/Makefile +++ b/tests/wasm/print_loop/Makefile @@ -9,5 +9,7 @@ build: hash: @ipfs add --only-hash --quieter $(TARGET_WASM) -ipfs: - @ipfs add $(TARGET_WASM) +publish: + @ipfs add $(TARGET_WASM) # TODO replace with ipfs add -r, as in the Go implementation + +# TODO look for something similar to go generate in Rust diff --git a/tests/wasm/read_from_ipfs/Makefile b/tests/wasm/read_from_ipfs/Makefile new file mode 100644 index 0000000..280c79b --- /dev/null +++ b/tests/wasm/read_from_ipfs/Makefile @@ -0,0 +1,15 @@ +TARGET_WASM := target/read_from_ipfs.wasm + +all: build ipfs + +build: + mkdir -p target + rustc src/main.rs -o $(TARGET_WASM) --target wasm32-wasip1 + +hash: + @ipfs add --only-hash --quieter $(TARGET_WASM) + +publish: + @ipfs add $(TARGET_WASM) # TODO replace with ipfs add -r, as in the Go implementation + +# TODO look for something similar to go generate in Rust diff --git a/tests/wasm/read_from_ipfs/README.md b/tests/wasm/read_from_ipfs/README.md new file mode 100644 index 0000000..a10d047 --- /dev/null +++ b/tests/wasm/read_from_ipfs/README.md @@ -0,0 +1,6 @@ +# Print Loop + +Prints a new line with the text `nop` every second, forever. + +Compile with: +```rustc main.rs --target wasm32-wasip1``` diff --git a/tests/wasm/read_from_ipfs/src/main.rs b/tests/wasm/read_from_ipfs/src/main.rs new file mode 100644 index 0000000..4bb50f9 --- /dev/null +++ b/tests/wasm/read_from_ipfs/src/main.rs @@ -0,0 +1,12 @@ +use std::fs; + +fn main() { + loop { + let file_content = + fs::read_to_string("/ipfs/QmeeD4LBwMxMkboCNvsoJ2aDwJhtsjFyoS1B9iMXiDcEqH"); + match file_content { + Ok(s) => println!("{}", s), + Err(e) => println!("Error reading from file: {}", e), + } + } +}