From 9a139db2d54bda110905bb0491b02f3980c7748f Mon Sep 17 00:00:00 2001 From: Piotr Sarnacki Date: Mon, 18 Mar 2024 03:19:05 +0100 Subject: [PATCH] wip --- Cargo.lock | 411 +++++++++++++++++++++----------- Cargo.toml | 20 +- bindings/Cargo.toml | 6 +- bindings/src/lib.rs | 41 +++- cli/src/main.rs | 6 +- coordinator/Cargo.toml | 7 +- coordinator/src/main.rs | 45 ++-- rust-example/Cargo.lock | 483 ++++++++++++++++++++++++++++++++------ rust-example/src/lib.rs | 19 +- utils/Cargo.toml | 1 + utils/src/lib.rs | 3 - utils/src/services/mod.rs | 18 +- wasm/Cargo.toml | 8 +- wasm/src/lib.rs | 92 ++++++-- wasm/src/main.rs | 29 ++- worker/Cargo.toml | 8 +- worker/src/main.rs | 188 +++++++-------- 17 files changed, 993 insertions(+), 392 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ef19ed6..fd4d1e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -34,7 +34,7 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "once_cell", "version_check", ] @@ -45,6 +45,12 @@ version = "0.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9d4ee0d472d1cd2e28c97dfa124b3d8d992e10eb0a035f33f5d12e3a177ba3b" +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + [[package]] name = "android_system_properties" version = "0.1.5" @@ -142,7 +148,7 @@ checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837" dependencies = [ "addr2line", "cc", - "cfg-if", + "cfg-if 1.0.0", "libc", "miniz_oxide", "object", @@ -185,30 +191,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "borsh" -version = "1.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f58b559fd6448c6e2fd0adb5720cd98a2506594cafa4737ff98c396f3e82f667" -dependencies = [ - "borsh-derive", - "cfg_aliases", -] - -[[package]] -name = "borsh-derive" -version = "1.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7aadb5b6ccbd078890f6d7003694e33816e6b784358f18e15e7e6d9f065a57cd" -dependencies = [ - "once_cell", - "proc-macro-crate", - "proc-macro2", - "quote", - "syn 2.0.37", - "syn_derive", -] - [[package]] name = "bumpalo" version = "3.14.0" @@ -314,6 +296,12 @@ dependencies = [ "libc", ] +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" + [[package]] name = "cfg-if" version = "1.0.0" @@ -321,10 +309,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] -name = "cfg_aliases" -version = "0.1.1" +name = "chrono" +version = "0.4.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" +checksum = "8eaf5903dcbc0a39312feb77df2ff4c76387d591b9fc7b04a238dcf8bb62639a" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "num-traits", + "serde", + "windows-targets 0.52.4", +] [[package]] name = "clap" @@ -403,7 +398,7 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e8227005286ec39567949b33df9896bcadfa6051bccca2488129f108ca23119" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -520,7 +515,7 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -529,9 +524,9 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ce6fd6f855243022dcecf8702fef0c297d4338e226845fe067f6341ad9fa0cef" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "crossbeam-epoch", - "crossbeam-utils", + "crossbeam-utils 0.8.16", ] [[package]] @@ -541,26 +536,41 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae211234986c545741a7dc064309f67ee1e5ad243d0e48335adc0484d960bcc7" dependencies = [ "autocfg", - "cfg-if", - "crossbeam-utils", + "cfg-if 1.0.0", + "crossbeam-utils 0.8.16", "memoffset", "scopeguard", ] +[[package]] +name = "crossbeam-utils" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" +dependencies = [ + "autocfg", + "cfg-if 0.1.10", + "lazy_static", +] + [[package]] name = "crossbeam-utils" version = "0.8.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] name = "crows-bindings" version = "0.1.0" dependencies = [ - "borsh", + "crows-macros", + "crows-shared", + "serde", + "serde_json", + "serde_with", ] [[package]] @@ -570,7 +580,7 @@ dependencies = [ "clap", "crows-service", "crows-utils", - "futures", + "futures 0.3.28", "serde", "serde_json", "tokio", @@ -584,8 +594,10 @@ name = "crows-coordinator" version = "0.1.0" dependencies = [ "crows-service", + "crows-shared", "crows-utils", - "futures", + "crows-wasm", + "futures 0.3.28", "serde", "serde_json", "tokio", @@ -594,6 +606,15 @@ dependencies = [ "uuid", ] +[[package]] +name = "crows-macros" +version = "0.1.0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.37", +] + [[package]] name = "crows-service" version = "0.1.0" @@ -604,6 +625,14 @@ dependencies = [ "syn 1.0.99", ] +[[package]] +name = "crows-shared" +version = "0.1.0" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "crows-utils" version = "0.1.0" @@ -611,7 +640,8 @@ dependencies = [ "anyhow", "byteorder", "crows-service", - "futures", + "crows-shared", + "futures 0.3.28", "num-rational", "serde", "serde_json", @@ -630,12 +660,14 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", - "borsh", "bytes", "crows-bindings", + "crows-shared", "crows-utils", - "futures", + "futures 0.3.28", "reqwest", + "serde", + "serde_json", "slab", "thiserror", "tokio", @@ -652,9 +684,10 @@ dependencies = [ "anyhow", "byteorder", "crows-service", + "crows-shared", "crows-utils", "crows-wasm", - "futures", + "futures 0.3.28", "num-rational", "num_cpus", "serde", @@ -662,6 +695,7 @@ dependencies = [ "thiserror", "tokio", "tokio-serde", + "tokio-timer", "tokio-util", "uuid", "wasmtime", @@ -677,6 +711,41 @@ dependencies = [ "typenum", ] +[[package]] +name = "darling" +version = "0.20.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54e36fcd13ed84ffdfda6f5be89b31287cbb80c439841fe69e04841435464391" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c2cf1c23a687a1feeb728783b993c4e1ad83d99f351801977dd809b48d0a70f" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn 2.0.37", +] + +[[package]] +name = "darling_macro" +version = "0.20.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a668eda54683121533a393014d8692171709ff57a7d61f187b6e782719f8933f" +dependencies = [ + "darling_core", + "quote", + "syn 2.0.37", +] + [[package]] name = "debugid" version = "0.8.0" @@ -686,6 +755,16 @@ dependencies = [ "uuid", ] +[[package]] +name = "deranged" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" +dependencies = [ + "powerfmt", + "serde", +] + [[package]] name = "digest" version = "0.10.7" @@ -702,7 +781,7 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "339ee130d97a610ea5a5872d2bbb130fdf68884ff09d3028b81bec8a1ac23bbc" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "dirs-sys-next", ] @@ -761,7 +840,7 @@ version = "0.8.33" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7268b386296a025e474d5140678f75d6de9493ae55a5d709eeb9dd08149945e1" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -818,7 +897,7 @@ version = "4.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b0377f1edc77dbd1118507bc7a66e4ab64d2b90c66f90726dc801e73a8c68f9" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "rustix", "windows-sys 0.48.0", ] @@ -864,6 +943,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "futures" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678" + [[package]] name = "futures" version = "0.3.28" @@ -991,7 +1076,7 @@ version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", "wasi", ] @@ -1003,7 +1088,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0" dependencies = [ "fallible-iterator", - "indexmap", + "indexmap 2.0.2", "stable_deref_trait", ] @@ -1019,7 +1104,7 @@ dependencies = [ "futures-sink", "futures-util", "http", - "indexmap", + "indexmap 2.0.2", "slab", "tokio", "tokio-util", @@ -1171,6 +1256,12 @@ version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25a2bc672d1148e28034f176e01fffebb08b35768468cc954630da77a1449005" +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "0.4.0" @@ -1181,6 +1272,17 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", + "serde", +] + [[package]] name = "indexmap" version = "2.0.2" @@ -1401,6 +1503,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-integer" version = "0.1.45" @@ -1451,7 +1559,7 @@ checksum = "9cf5f9dd3933bd50a9e1f149ec995f39ae2c496d31fd772c1fd45ebc27e902b0" dependencies = [ "crc32fast", "hashbrown 0.14.1", - "indexmap", + "indexmap 2.0.2", "memchr", ] @@ -1468,7 +1576,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95a0481286a310808298130d22dd1fef0fa571e05a8f44ec801801e84b216b1f" dependencies = [ "bitflags 2.4.2", - "cfg-if", + "cfg-if 1.0.0", "foreign-types", "libc", "once_cell", @@ -1521,7 +1629,7 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", "redox_syscall 0.3.5", "smallvec", @@ -1579,42 +1687,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" [[package]] -name = "ppv-lite86" -version = "0.2.17" +name = "powerfmt" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" - -[[package]] -name = "proc-macro-crate" -version = "3.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284" -dependencies = [ - "toml_edit 0.21.1", -] +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" [[package]] -name = "proc-macro-error" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" -dependencies = [ - "proc-macro-error-attr", - "proc-macro2", - "quote", - "version_check", -] - -[[package]] -name = "proc-macro-error-attr" -version = "1.0.4" +name = "ppv-lite86" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" -dependencies = [ - "proc-macro2", - "quote", - "version_check", -] +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro2" @@ -1690,7 +1772,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ce3fb6ad83f861aac485e76e1985cd109d9a3713802152be56c3b1f0e0658ed" dependencies = [ "crossbeam-deque", - "crossbeam-utils", + "crossbeam-utils 0.8.16", ] [[package]] @@ -1913,13 +1995,43 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "3.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee80b0e361bbf88fd2f6e242ccd19cfda072cb0faa6ae694ecee08199938569a" +dependencies = [ + "base64", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.0.2", + "serde", + "serde_derive", + "serde_json", + "serde_with_macros", + "time", +] + +[[package]] +name = "serde_with_macros" +version = "3.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6561dc161a9224638a31d876ccdfefbc1df91d3f3a8342eddb35f055d48c7655" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.37", +] + [[package]] name = "sha2" version = "0.10.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "cpufeatures", "digest", ] @@ -2026,18 +2138,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "syn_derive" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1329189c02ff984e9736652b1631330da25eaa6bc639089ed4915d25446cbe7b" -dependencies = [ - "proc-macro-error", - "proc-macro2", - "quote", - "syn 2.0.37", -] - [[package]] name = "sync_wrapper" version = "0.1.2" @@ -2093,7 +2193,7 @@ version = "3.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "fastrand", "rustix", "windows-sys 0.52.0", @@ -2119,6 +2219,37 @@ dependencies = [ "syn 2.0.37", ] +[[package]] +name = "time" +version = "0.3.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8248b6521bb14bc45b4067159b9b6ad792e2d6d754d6c41fb50e29fefe38749" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" + +[[package]] +name = "time-macros" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ba3a3ef41e6672a2f0f001392bb5dcd3ff0a9992d618ca761a11c3121547774" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tinyvec" version = "1.6.0" @@ -2153,6 +2284,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "tokio-executor" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb2d1b8f4548dbf5e1f7818512e9c406860678f29c300cdf0ebac72d1a3a1671" +dependencies = [ + "crossbeam-utils 0.7.2", + "futures 0.1.31", +] + [[package]] name = "tokio-macros" version = "2.1.0" @@ -2189,6 +2330,18 @@ dependencies = [ "serde_json", ] +[[package]] +name = "tokio-timer" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93044f2d313c95ff1cb7809ce9a7a05735b012288a888b62d4434fd58c94f296" +dependencies = [ + "crossbeam-utils 0.7.2", + "futures 0.1.31", + "slab", + "tokio-executor", +] + [[package]] name = "tokio-util" version = "0.7.3" @@ -2216,7 +2369,7 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "toml_edit 0.22.6", + "toml_edit", ] [[package]] @@ -2228,28 +2381,17 @@ dependencies = [ "serde", ] -[[package]] -name = "toml_edit" -version = "0.21.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" -dependencies = [ - "indexmap", - "toml_datetime", - "winnow 0.5.40", -] - [[package]] name = "toml_edit" version = "0.22.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c1b5fd4128cc8d3e0cb74d4ed9a9cc7c7284becd4df68f5f940e1ad123606f6" dependencies = [ - "indexmap", + "indexmap 2.0.2", "serde", "serde_spanned", "toml_datetime", - "winnow 0.6.5", + "winnow", ] [[package]] @@ -2264,7 +2406,7 @@ version = "0.1.36" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2fce9567bd60a67d08a16488756721ba392f24f29006402881e43b19aac64307" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "log", "pin-project-lite", "tracing-attributes", @@ -2427,7 +2569,7 @@ version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1e124130aee3fb58c5bdd6b639a0509486b0338acaaae0c84a5124b0f588b7f" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "wasm-bindgen-macro", ] @@ -2452,7 +2594,7 @@ version = "0.4.41" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "877b9c3f61ceea0e56331985743b13f3d25c406a7098d45180fb5f09bc19ed97" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "js-sys", "wasm-bindgen", "web-sys", @@ -2503,7 +2645,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "84e5df6dba6c0d7fafc63a450f1738451ed7a0b52295d83e868218fa286bf708" dependencies = [ "bitflags 2.4.2", - "indexmap", + "indexmap 2.0.2", "semver", ] @@ -2527,11 +2669,11 @@ dependencies = [ "async-trait", "bincode", "bumpalo", - "cfg-if", + "cfg-if 1.0.0", "encoding_rs", "fxprof-processed-profile", "gimli", - "indexmap", + "indexmap 2.0.2", "ittapi", "libc", "log", @@ -2567,7 +2709,7 @@ name = "wasmtime-asm-macros" version = "19.0.0" source = "git+https://github.com/bytecodealliance/wasmtime.git#0f7b1759d64382b57dc7f886a9862e3a6f6b97ca" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -2614,7 +2756,7 @@ version = "19.0.0" source = "git+https://github.com/bytecodealliance/wasmtime.git#0f7b1759d64382b57dc7f886a9862e3a6f6b97ca" dependencies = [ "anyhow", - "cfg-if", + "cfg-if 1.0.0", "cranelift-codegen", "cranelift-control", "cranelift-entity", @@ -2657,7 +2799,7 @@ dependencies = [ "cpp_demangle", "cranelift-entity", "gimli", - "indexmap", + "indexmap 2.0.2", "log", "object", "rustc-demangle", @@ -2679,7 +2821,7 @@ source = "git+https://github.com/bytecodealliance/wasmtime.git#0f7b1759d64382b57 dependencies = [ "anyhow", "cc", - "cfg-if", + "cfg-if 1.0.0", "rustix", "wasmtime-asm-macros", "wasmtime-versioned-export-macros", @@ -2702,7 +2844,7 @@ name = "wasmtime-jit-icache-coherence" version = "19.0.0" source = "git+https://github.com/bytecodealliance/wasmtime.git#0f7b1759d64382b57dc7f886a9862e3a6f6b97ca" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", "windows-sys 0.52.0", ] @@ -2714,9 +2856,9 @@ source = "git+https://github.com/bytecodealliance/wasmtime.git#0f7b1759d64382b57 dependencies = [ "anyhow", "cc", - "cfg-if", + "cfg-if 1.0.0", "encoding_rs", - "indexmap", + "indexmap 2.0.2", "libc", "log", "mach", @@ -2778,7 +2920,7 @@ dependencies = [ "cap-std", "cap-time-ext", "fs-set-times", - "futures", + "futures 0.3.28", "io-extras", "io-lifetimes", "once_cell", @@ -2816,7 +2958,7 @@ source = "git+https://github.com/bytecodealliance/wasmtime.git#0f7b1759d64382b57 dependencies = [ "anyhow", "heck", - "indexmap", + "indexmap 2.0.2", "wit-parser", ] @@ -3084,15 +3226,6 @@ version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8" -[[package]] -name = "winnow" -version = "0.5.40" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f593a95398737aeed53e489c785df13f3618e41dbcd6718c6addbf1395aa6876" -dependencies = [ - "memchr", -] - [[package]] name = "winnow" version = "0.6.5" @@ -3108,7 +3241,7 @@ version = "0.50.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "windows-sys 0.48.0", ] @@ -3130,7 +3263,7 @@ checksum = "196d3ecfc4b759a8573bf86a9b3f8996b304b3732e4c7de81655f875f6efdca6" dependencies = [ "anyhow", "id-arena", - "indexmap", + "indexmap 2.0.2", "log", "semver", "serde", diff --git a/Cargo.toml b/Cargo.toml index ca6818b..761ec12 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,22 +9,30 @@ members = [ "worker", "wasm", "bindings", + "shared", ] [workspace.dependencies] tokio = { version = "1.20", features = ["full"] } anyhow = { version = "1", features = ["backtrace"] } thiserror = "1" -tokio-serde = { version = "0.9.0", features = ["json"] } -tokio-util = { version = "0.7.3", features = ["full"] } +tokio-serde = { version = "0.9", features = ["json"] } +tokio-util = { version = "0.7", features = ["full"] } serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0.108" -futures = "0.3.21" -uuid = { version = "1.4.1", features = ["serde", "v4"] } +serde_json = "1.0" +serde_with = "3.7" +futures = "0.3" +uuid = { version = "1.4", features = ["serde", "v4"] } wasmtime = { git = "https://github.com/bytecodealliance/wasmtime.git", features = ["async"] } wasmtime-wasi = { git = "https://github.com/bytecodealliance/wasmtime.git" } wasi-common = { git = "https://github.com/bytecodealliance/wasmtime.git" } wiggle = { git = "https://github.com/bytecodealliance/wasmtime.git" } num-rational = { version = "0.4", features = ["serde"]} -borsh = { version = "1.3", features = ["unstable__schema", "derive"] } bytes = "1.5" +rational = "1.5" +crows-macros = { path = "macros" } +crows-shared = { path = "shared" } +crows-bindings = { path = "bindings" } +crows-utils = { path = "utils" } +crows-service = { path = "service" } +crows-wasm = { path = "wasm" } diff --git a/bindings/Cargo.toml b/bindings/Cargo.toml index e839436..fa0e254 100644 --- a/bindings/Cargo.toml +++ b/bindings/Cargo.toml @@ -6,4 +6,8 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -borsh.workspace = true +serde.workspace = true +serde_json.workspace = true +serde_with.workspace = true +crows-macros.workspace = true +crows-shared.workspace = true diff --git a/bindings/src/lib.rs b/bindings/src/lib.rs index 99b53af..d26a0aa 100644 --- a/bindings/src/lib.rs +++ b/bindings/src/lib.rs @@ -1,7 +1,12 @@ -use borsh::{from_slice, to_vec, BorshDeserialize, BorshSerialize}; -use std::{cell::RefCell, collections::HashMap, mem::MaybeUninit}; +use std::{cell::RefCell, collections::HashMap}; -#[derive(BorshSerialize, BorshDeserialize, PartialEq, Debug)] +pub use crows_macros::config; +use serde::{Serialize, Deserialize, de::DeserializeOwned}; +use serde_json::{to_vec, from_slice}; +pub use crows_shared::Config as ExecutorConfig; +pub use crows_shared::ConstantArrivalRateConfig; + +#[derive(Serialize, Deserialize, PartialEq, Debug)] pub enum HTTPMethod { HEAD, GET, @@ -11,7 +16,7 @@ pub enum HTTPMethod { OPTIONS, } -#[derive(BorshSerialize, BorshDeserialize, PartialEq, Debug)] +#[derive(Serialize, Deserialize, PartialEq, Debug)] pub struct HTTPRequest { // TODO: these should not be public I think, I'd prefer to do a public interface for them pub url: String, @@ -20,12 +25,12 @@ pub struct HTTPRequest { pub body: Option, } -#[derive(Debug, BorshDeserialize, BorshSerialize)] +#[derive(Debug, Deserialize, Serialize)] pub struct HTTPError { pub message: String, } -#[derive(BorshSerialize, BorshDeserialize, PartialEq, Debug)] +#[derive(Serialize, Deserialize, PartialEq, Debug)] pub struct HTTPResponse { // TODO: these should not be public I think, I'd prefer to do a public interface for them pub headers: HashMap, @@ -46,14 +51,21 @@ mod bindings { pub fn log(content: *mut u8, content_len: usize); pub fn http(content: *mut u8, content_len: usize) -> u64; pub fn consume_buffer(index: u32, content: *mut u8, content_len: usize); + pub fn set_config(content: *mut u8, content_len: usize) -> u32; } } fn with_buffer(f: impl FnOnce(&mut Vec) -> R) -> R { - let mut buffer: Vec = Vec::with_capacity(256); + // using a buffer saved in thread_local allows us to share it between function calls + thread_local! { + static BUFFER: RefCell> = RefCell::new(Vec::with_capacity(1024)); + } - buffer.clear(); - f(&mut buffer) + BUFFER.with(|r| { + let mut buf = r.borrow_mut(); + buf.clear(); + f(&mut buf) + }) } pub fn http_request( @@ -77,9 +89,9 @@ pub fn http_request( fn call_host_function(arguments: &T, f: impl FnOnce(&mut Vec) -> u64) -> Result where - T: BorshSerialize, - R: BorshDeserialize, - E: BorshDeserialize, + T: Serialize, + R: DeserializeOwned, + E: DeserializeOwned, { let mut encoded = to_vec(arguments).unwrap(); @@ -105,3 +117,8 @@ where Err(from_slice(&buf).expect("Couldn't decode message from the host")) } } + +pub fn __set_config(config: ExecutorConfig) -> u32 { + let mut encoded = to_vec(&config).unwrap(); + unsafe { bindings::set_config(encoded.as_mut_ptr(), encoded.len()) } +} diff --git a/cli/src/main.rs b/cli/src/main.rs index 1fa72c5..2c18171 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -35,8 +35,6 @@ enum Commands { #[arg(short, long)] name: String, #[arg(short, long)] - concurrency: usize, - #[arg(short, long)] workers_number: usize, }, Workers { @@ -68,9 +66,9 @@ pub async fn main() { .unwrap() .unwrap(); } - Some(Commands::Start {name,concurrency, workers_number }) => { + Some(Commands::Start {name, workers_number }) => { coordinator - .start(name.to_string(), concurrency.clone(), workers_number.clone()) + .start(name.to_string(), workers_number.clone()) .await .unwrap(); } diff --git a/coordinator/Cargo.toml b/coordinator/Cargo.toml index 00c7223..eaf048e 100644 --- a/coordinator/Cargo.toml +++ b/coordinator/Cargo.toml @@ -6,9 +6,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -crows-utils = { path = "../utils" } -crows-service = { path = "../service" } - tokio.workspace = true tokio-serde.workspace = true tokio-util.workspace = true @@ -16,3 +13,7 @@ serde.workspace = true serde_json.workspace = true futures.workspace = true uuid.workspace = true +crows-shared.workspace = true +crows-wasm.workspace = true +crows-utils.workspace = true +crows-service.workspace = true diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 35f6135..0c76ce3 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -1,10 +1,8 @@ -#![feature(async_fn_in_trait)] -#![feature(return_position_impl_trait_in_trait)] - use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use crows_wasm::fetch_config; use futures::future::join_all; use tokio::sync::Mutex; use tokio::time::sleep; @@ -22,7 +20,7 @@ use uuid::Uuid; // TODO: Client should probably be thread safe for easier handling #[derive(Default)] struct WorkerToCoordinatorService { - scenarios: Arc>>>, + scenarios: Arc>>>, workers: Arc>>, client: Arc>>, } @@ -41,7 +39,7 @@ impl WorkerToCoordinator for WorkerToCoordinatorService { #[derive(Clone, Default)] struct CoordinatorService { - scenarios: Arc>>>, + scenarios: Arc>>>, workers: Arc>>, } @@ -58,10 +56,9 @@ impl Coordinator for CoordinatorService { name: String, content: Vec, ) -> Result<(), CoordinatorError> { - let id = ModuleId::new(name.clone(), &content); - - // TODO: to send bandwidth maybe it will be worth it to gzip the data? we would be gzipping - // once and sending to N clients + // TODO: to send bandwidth maybe it will be worth it to gzip the data? we would be + // gzipping once and sending to N clients + // // send each uploaded scenario to all of the workers for (_, worker_entry) in self.workers.lock().await.iter() { let locked = worker_entry.client.lock(); @@ -69,24 +66,44 @@ impl Coordinator for CoordinatorService { futures.push(async { if let Some(client) = locked.await.as_mut() { // TODO: handle Result - client.upload_scenario(id.clone(), content.clone()).await; + client.upload_scenario(name.clone(), content.clone()).await; } }); join_all(futures).await; } - self.scenarios.lock().await.insert(id, content); + self.scenarios.lock().await.insert(name, content); Ok(()) } - async fn start(&self, name: String, concurrency: usize, workers_number: usize) { + async fn start(&self, name: String, workers_number: usize) -> Result<(), CoordinatorError> { // TODO: we should check if we have enough workers + // TODO: also this way we will always choose the same workers. in the future we should + // either always split between all workers or do some kind of round robin + // TODO: at the moment we split evenly. in the future we could get some kind of diagnostic + // data from workers in order to determine how much traffic can we push to each worker + // TODO: creating a runtime is probably fast enough, but I'd like to measure and see + // if it's not better to keep one around so we don't create it before each test run + let runtime = crows_wasm::Runtime::new().map_err(|err| CoordinatorError::FailedToCreateRuntime(err.to_string()))?; + let mut instance = { + let scenarios = self.scenarios.lock().await; + let scenario = scenarios.get(&name).ok_or(CoordinatorError::NoSuchModule(name.clone()))?; + let (instance, _) = runtime.compile_instance(&scenario).await.map_err(|_| CoordinatorError::FailedToCompileModule)?; + instance + }; + let config = fetch_config(&mut instance).await.map_err(|err| CoordinatorError::CouldNotFetchConfig(err.to_string()))?.split(workers_number); + for (_, worker_entry) in self.workers.lock().await.iter().take(workers_number) { if let Some(client) = worker_entry.client.lock().await.as_mut() { - client.start(name.clone(), concurrency).await; + // TODO: at the moment we split config to split the load between each of the + // workers, which means that if a worker dies, we will not get a full test + // It would be ideal if we had a way to j + client.start(name.clone(), config.clone()).await; } } + + Ok(()) } async fn list_workers(&self) -> Vec { @@ -110,7 +127,7 @@ pub async fn main() { .parse() .unwrap(); - let original_scenarios: Arc>>> = Default::default(); + let original_scenarios: Arc>>> = Default::default(); let original_workers: Arc>> = Default::default(); let scenarios = original_scenarios.clone(); diff --git a/rust-example/Cargo.lock b/rust-example/Cargo.lock index ea7a499..e1d8d78 100644 --- a/rust-example/Cargo.lock +++ b/rust-example/Cargo.lock @@ -3,40 +3,140 @@ version = 3 [[package]] -name = "borsh" -version = "1.3.1" +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f58b559fd6448c6e2fd0adb5720cd98a2506594cafa4737ff98c396f3e82f667" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" dependencies = [ - "borsh-derive", - "cfg_aliases", + "libc", ] [[package]] -name = "borsh-derive" -version = "1.3.1" +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "base64" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" + +[[package]] +name = "bumpalo" +version = "3.15.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ff69b9dd49fd426c69a0db9fc04dd934cdb6645ff000864d98f7e2af8830eaa" + +[[package]] +name = "cc" +version = "1.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7aadb5b6ccbd078890f6d7003694e33816e6b784358f18e15e7e6d9f065a57cd" +checksum = "8cd6604a82acf3039f1144f54b8eb34e91ffba622051189e71b781822d5ee1f5" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "chrono" +version = "0.4.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8eaf5903dcbc0a39312feb77df2ff4c76387d591b9fc7b04a238dcf8bb62639a" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "num-traits", + "serde", + "windows-targets", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" + +[[package]] +name = "crows-bindings" +version = "0.1.0" +dependencies = [ + "crows-macros", + "crows-shared", + "serde", + "serde_json", + "serde_with", +] + +[[package]] +name = "crows-macros" +version = "0.1.0" dependencies = [ - "once_cell", - "proc-macro-crate", "proc-macro2", "quote", "syn", - "syn_derive", ] [[package]] -name = "cfg_aliases" -version = "0.1.1" +name = "crows-shared" +version = "0.1.0" +dependencies = [ + "serde", + "serde_json", +] + +[[package]] +name = "darling" +version = "0.20.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" +checksum = "54e36fcd13ed84ffdfda6f5be89b31287cbb80c439841fe69e04841435464391" +dependencies = [ + "darling_core", + "darling_macro", +] [[package]] -name = "crows-bindings" -version = "0.1.0" +name = "darling_core" +version = "0.20.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c2cf1c23a687a1feeb728783b993c4e1ad83d99f351801977dd809b48d0a70f" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn", +] + +[[package]] +name = "darling_macro" +version = "0.20.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a668eda54683121533a393014d8692171709ff57a7d61f187b6e782719f8933f" dependencies = [ - "borsh", + "darling_core", + "quote", + "syn", +] + +[[package]] +name = "deranged" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" +dependencies = [ + "powerfmt", + "serde", ] [[package]] @@ -45,12 +145,70 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + [[package]] name = "hashbrown" version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + +[[package]] +name = "iana-time-zone" +version = "0.1.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", + "serde", +] + [[package]] name = "indexmap" version = "2.2.5" @@ -58,14 +216,51 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b0b929d511467233429c45a44ac1dcaa21ba0f5ba11e4879e6ed28ddb4f9df4" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.14.3", + "serde", +] + +[[package]] +name = "itoa" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" + +[[package]] +name = "js-sys" +version = "0.3.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29c15563dc2726973df627357ce0c9ddddbea194836909d655df6a75d2cf296d" +dependencies = [ + "wasm-bindgen", ] [[package]] -name = "memchr" -version = "2.7.1" +name = "libc" +version = "0.2.153" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" + +[[package]] +name = "log" +version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" +checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" + +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + +[[package]] +name = "num-traits" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0df0e5185db44f69b44f26786fe401b6c293d1907744beaa7fa62b2e5a517a" +dependencies = [ + "autocfg", +] [[package]] name = "once_cell" @@ -74,60 +269,107 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] -name = "proc-macro-crate" -version = "3.1.0" +name = "powerfmt" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + +[[package]] +name = "proc-macro2" +version = "1.0.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e835ff2298f5721608eb1a980ecaee1aef2c132bf95ecc026a11b7bf3c01c02e" dependencies = [ - "toml_edit", + "unicode-ident", ] [[package]] -name = "proc-macro-error" -version = "1.0.4" +name = "quote" +version = "1.0.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" dependencies = [ - "proc-macro-error-attr", "proc-macro2", - "quote", - "version_check", ] [[package]] -name = "proc-macro-error-attr" -version = "1.0.4" +name = "ryu" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" + +[[package]] +name = "serde" +version = "1.0.197" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fb1c873e1b9b056a4dc4c0c198b24c3ffa059243875552b2bd0933b1aee4ce2" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.197" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" dependencies = [ "proc-macro2", "quote", - "version_check", + "syn", ] [[package]] -name = "proc-macro2" -version = "1.0.78" +name = "serde_json" +version = "1.0.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" +checksum = "c5f09b1bd632ef549eaa9f60a1f8de742bdbc698e6cee2095fc84dde5f549ae0" dependencies = [ - "unicode-ident", + "itoa", + "ryu", + "serde", ] [[package]] -name = "quote" -version = "1.0.35" +name = "serde_with" +version = "3.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" +checksum = "ee80b0e361bbf88fd2f6e242ccd19cfda072cb0faa6ae694ecee08199938569a" +dependencies = [ + "base64", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.2.5", + "serde", + "serde_derive", + "serde_json", + "serde_with_macros", + "time", +] + +[[package]] +name = "serde_with_macros" +version = "3.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6561dc161a9224638a31d876ccdfefbc1df91d3f3a8342eddb35f055d48c7655" dependencies = [ + "darling", "proc-macro2", + "quote", + "syn", ] +[[package]] +name = "strsim" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" + [[package]] name = "syn" -version = "2.0.52" +version = "2.0.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b699d15b36d1f02c3e7c69f8ffef53de37aefae075d8488d4ba1a7788d574a07" +checksum = "7383cd0e49fff4b6b90ca5670bfd3e9d6a733b3f90c686605aa7eec8c4996032" dependencies = [ "proc-macro2", "quote", @@ -135,32 +377,34 @@ dependencies = [ ] [[package]] -name = "syn_derive" -version = "0.1.8" +name = "time" +version = "0.3.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1329189c02ff984e9736652b1631330da25eaa6bc639089ed4915d25446cbe7b" +checksum = "c8248b6521bb14bc45b4067159b9b6ad792e2d6d754d6c41fb50e29fefe38749" dependencies = [ - "proc-macro-error", - "proc-macro2", - "quote", - "syn", + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", ] [[package]] -name = "toml_datetime" -version = "0.6.5" +name = "time-core" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3550f4e9685620ac18a50ed434eb3aec30db8ba93b0287467bca5826ea25baf1" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] -name = "toml_edit" -version = "0.21.1" +name = "time-macros" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" +checksum = "7ba3a3ef41e6672a2f0f001392bb5dcd3ff0a9992d618ca761a11c3121547774" dependencies = [ - "indexmap", - "toml_datetime", - "winnow", + "num-conv", + "time-core", ] [[package]] @@ -170,10 +414,58 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" [[package]] -name = "version_check" -version = "0.9.4" +name = "wasm-bindgen" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4be2531df63900aeb2bca0daaaddec08491ee64ceecbee5076636a3b026795a8" +dependencies = [ + "cfg-if", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +checksum = "614d787b966d3989fa7bb98a654e369c762374fd3213d212cfc0251257e747da" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1f8823de937b71b9460c0c34e25f3da88250760bec0ebac694b49997550d726" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" [[package]] name = "wasm_example" @@ -183,10 +475,67 @@ dependencies = [ ] [[package]] -name = "winnow" -version = "0.5.40" +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f593a95398737aeed53e489c785df13f3618e41dbcd6718c6addbf1395aa6876" +checksum = "7dd37b7e5ab9018759f893a1952c9420d060016fc19a472b4bb20d1bdd694d1b" dependencies = [ - "memchr", + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", ] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bcf46cf4c365c6f2d1cc93ce535f2c8b244591df96ceee75d8e83deb70a9cac9" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da9f259dd3bcf6990b55bffd094c4f7235817ba4ceebde8e6d11cd0c5633b675" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b474d8268f99e0995f25b9f095bc7434632601028cf86590aea5c8a5cb7801d3" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1515e9a29e5bed743cb4415a9ecf5dfca648ce85ee42e15873c3cd8610ff8e02" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eee091590e89cc02ad514ffe3ead9eb6b660aedca2183455434b93546371a03" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ca79f2451b49fa9e2af39f0747fe999fcda4f5e241b2898624dca97a1f2177" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8" diff --git a/rust-example/src/lib.rs b/rust-example/src/lib.rs index 29e8f33..cdaaa93 100644 --- a/rust-example/src/lib.rs +++ b/rust-example/src/lib.rs @@ -1,8 +1,23 @@ +use crows_bindings::{ + config, http_request, ConstantArrivalRateConfig, ExecutorConfig, HTTPMethod::*, +}; use std::cell::RefCell; use std::collections::HashMap; -use crows_bindings::{http_request, HTTPMethod::*}; +use std::time::Duration; -#[export_name="test"] +#[config] +fn config() -> ExecutorConfig { + let config = ConstantArrivalRateConfig { + duration: Duration::from_secs(10), + rate: 50, + time_unit: Duration::from_secs(1), + allocated_vus: 50, + ..Default::default() + }; + ExecutorConfig::ConstantArrivalRate(config) +} + +#[export_name = "test"] pub fn test() { let response = http_request("https://example.com".into(), GET, HashMap::new(), "".into()); println!("response: {:?}", response.unwrap().status); diff --git a/utils/Cargo.toml b/utils/Cargo.toml index b5db35f..08b9728 100644 --- a/utils/Cargo.toml +++ b/utils/Cargo.toml @@ -21,3 +21,4 @@ tokio-util.workspace = true tokio-serde.workspace = true futures.workspace = true num-rational.workspace = true +crows-shared.workspace = true diff --git a/utils/src/lib.rs b/utils/src/lib.rs index 9afc9f3..b4c82d0 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -1,6 +1,3 @@ -#![feature(async_fn_in_trait)] -#![feature(return_position_impl_trait_in_trait)] - use std::collections::HashMap; use std::pin::Pin; diff --git a/utils/src/services/mod.rs b/utils/src/services/mod.rs index feabc32..89ae3d3 100644 --- a/utils/src/services/mod.rs +++ b/utils/src/services/mod.rs @@ -9,6 +9,14 @@ use num_rational::Rational64; pub enum CoordinatorError { #[error("could not upload a module")] UploadModuleError, + #[error("couldn't find module {0}")] + NoSuchModule(String), + #[error("Failed to create runtime: {0}")] + FailedToCreateRuntime(String), + #[error("Failed to compile module")] + FailedToCompileModule, + #[error("Couldn't fetch config: {0}")] + CouldNotFetchConfig(String) } #[derive(Error, Debug, Serialize, Deserialize, Clone)] @@ -17,6 +25,8 @@ pub enum WorkerError { UploadModuleError, #[error("could not find a requested scenario")] ScenarioNotFound, + #[error("could not create a module from binary")] + CouldNotCreateModule, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] @@ -51,7 +61,7 @@ pub enum WorkerStatus { #[service(variant = "server", other_side = Client)] pub trait Coordinator { async fn upload_scenario(name: String, content: Vec) -> Result<(), CoordinatorError>; - async fn start(name: String, concurrency: usize, workers_number: usize); + async fn start(name: String, workers_number: usize) -> Result<(), CoordinatorError>; async fn list_workers() -> Vec; async fn update_status(&self, status: WorkerStatus, id: Uuid); } @@ -64,10 +74,10 @@ pub struct WorkerData { #[service(variant = "client", other_side = WorkerToCoordinator)] pub trait Worker { - async fn upload_scenario(&mut self, id: ModuleId, content: Vec); + async fn upload_scenario(&mut self, name: String, content: Vec); async fn ping(&self) -> String; - async fn prepare(&mut self, id: ModuleId, concurrency: usize, rate: Rational64) -> Result; - async fn start(&self, id: ModuleId, concurrency: usize) -> Result<(), WorkerError>; + // async fn prepare(&mut self, id: ModuleId, concurrency: usize, rate: Rational64) -> Result; + async fn start(&self, name: String, config: crows_shared::Config) -> Result<(), WorkerError>; async fn get_data(&self) -> WorkerData; } diff --git a/wasm/Cargo.toml b/wasm/Cargo.toml index f79b3d7..b938c68 100644 --- a/wasm/Cargo.toml +++ b/wasm/Cargo.toml @@ -4,9 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] -crows-utils = { path = "../utils" } slab = "0.4" -crows-bindings = { path = "../bindings" } async-trait = "0.1" reqwest = "0.11" @@ -17,6 +15,10 @@ wasmtime.workspace = true wasmtime-wasi.workspace = true wasi-common.workspace = true wiggle.workspace = true -borsh.workspace = true futures.workspace = true bytes.workspace = true +serde.workspace = true +serde_json.workspace = true +crows-shared.workspace = true +crows-utils.workspace = true +crows-bindings.workspace = true diff --git a/wasm/src/lib.rs b/wasm/src/lib.rs index 435473d..6825a50 100644 --- a/wasm/src/lib.rs +++ b/wasm/src/lib.rs @@ -1,17 +1,22 @@ use anyhow::anyhow; -use borsh::{from_slice, to_vec, BorshDeserialize, BorshSerialize}; use crows_bindings::{HTTPError, HTTPMethod, HTTPRequest, HTTPResponse}; use crows_utils::services::RunId; use futures::Future; use reqwest::header::{HeaderName, HeaderValue}; use reqwest::{Body, Request, Url}; +use serde::Serialize; +use serde_json::{from_slice, to_vec}; +use std::collections::VecDeque; use std::pin::Pin; use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; use std::{any::Any, collections::HashMap, io::IoSlice}; use tokio::sync::mpsc::UnboundedReceiver; +use tokio::sync::Mutex; use wasi_common::file::{FdFlags, FileType}; use wasi_common::WasiFile; -use wasmtime::{Caller, Config, Engine, Linker, Memory, MemoryType, Module, Store}; +use wasmtime::{Caller, Engine, Linker, Memory, MemoryType, Module, Store}; use wasmtime_wasi::{StdoutStream, StreamResult}; #[derive(thiserror::Error, Debug)] @@ -20,8 +25,6 @@ pub enum Error { NoSuchRun(RunId), } -// A runtime should be run in a single async runtime. Ideally also in a single -// thread as we want a share-nothing architecture for performance and simplicity pub struct Runtime<'a> { // we index instances with the run id, cause technically we can run // scenarios from multiple modules on a single runtime @@ -29,17 +32,21 @@ pub struct Runtime<'a> { // it. If the overhead is too big I'd probably refactor it to allow only one module // at any point in time. I would like to start with multiple modules, though, to first // see if it's actually problematic, maybe it's not and it seems to give more flexibility - pub instances: HashMap>>, + pub instances: VecDeque>, pub environment: Environment, } impl<'a> Runtime<'a> { pub fn new() -> anyhow::Result { Ok(Self { - instances: HashMap::new(), + instances: VecDeque::new(), environment: Environment::new()?, }) } + + pub async fn compile_instance(&self, content: &Vec) -> anyhow::Result<(Instance<'_>, UnboundedReceiver>)> { + Instance::new(content, &self.environment).await + } // TODO: it looks like the id/module pair should be in a separate data type, might // be worth to extract it in the future // pub fn create_instances( @@ -86,9 +93,12 @@ impl WasiHostCtx { f: F, ) -> anyhow::Result where - F: for<'b> FnOnce(&'b mut Caller<'_, Self>, HTTPRequest) -> Pin> + 'b + Send>>, - U: BorshSerialize, - E: BorshSerialize, + F: for<'b> FnOnce( + &'b mut Caller<'_, Self>, + HTTPRequest, + ) -> Pin> + 'b + Send>>, + U: Serialize, + E: Serialize, { let memory = get_memory(&mut caller)?; @@ -184,6 +194,23 @@ impl WasiHostCtx { }) } + pub fn set_config(mut caller: Caller<'_, Self>, ptr: u32, len: u32) -> anyhow::Result { + let memory = get_memory(&mut caller)?; + + let slice = memory + .data(&caller) + .get(ptr as usize..(ptr + len) as usize) + .ok_or(anyhow!("Could not get memory slice"))? + .to_owned() + .into_boxed_slice(); + + let (_, store) = memory.data_and_store_mut(&mut caller); + + let index = store.buffers.insert(slice); + + Ok(index as u32) + } + pub fn consume_buffer( mut caller: Caller<'_, Self>, index: u32, @@ -227,6 +254,7 @@ impl wasmtime_wasi::preview1::WasiPreview1View for WasiHostCtx { } } +#[derive(Clone)] pub struct Environment { engine: Engine, linker: Linker, @@ -242,7 +270,7 @@ pub struct Instance<'a> { impl Environment { pub fn new() -> anyhow::Result { - let mut config = Config::new(); + let mut config = wasmtime::Config::new(); config.async_support(true); config.consume_fuel(true); @@ -260,6 +288,9 @@ impl Environment { }) }) .unwrap(); + linker + .func_wrap("crows", "set_config", WasiHostCtx::set_config) + .unwrap(); wasmtime_wasi::preview1::add_to_linker_async(&mut linker)?; @@ -305,13 +336,10 @@ impl<'a> Instance<'a> { Ok((store, receiver)) } - pub async fn new( - raw_module: &Vec, - env: &'a Environment, - ) -> anyhow::Result<(Self, UnboundedReceiver>)> { + pub async fn new(raw_module: &Vec, env: &'a Environment) -> anyhow::Result<(Self, UnboundedReceiver>)> { let module = Module::from_binary(&env.engine, raw_module)?; - let (mut store, stdout_receiver) = Instance::new_store(&env.engine)?; + let (mut store, logs_receiver) = Instance::new_store(&env.engine)?; let instance = env.linker.instantiate_async(&mut store, &module).await?; // let func = instance @@ -322,16 +350,14 @@ impl<'a> Instance<'a> { // drop(store); - Ok(( - Self { - engine: &env.engine, - module, - linker: &env.linker, - store, - instance, - }, - stdout_receiver, - )) + let result = Self { + engine: &env.engine, + module, + linker: &env.linker, + store, + instance, + }; + Ok((result, logs_receiver)) } } @@ -345,6 +371,22 @@ pub async fn run_wasm(instance: &mut Instance<'_>) -> anyhow::Result<()> { Ok(()) } +pub async fn fetch_config(instance: &mut Instance<'_>) -> anyhow::Result { + let func = instance + .instance + .get_typed_func::<(), u32>(&mut instance.store, "__config")?; + + let index = func.call_async(&mut instance.store, ()).await?; + let buffer = instance + .store + .data_mut() + .buffers + .try_remove(index as usize) + .ok_or(anyhow!("Couldn't find slab"))?; + + Ok(from_slice(&buffer)?) +} + #[derive(Clone)] struct RemoteStdout { sender: tokio::sync::mpsc::UnboundedSender>, diff --git a/wasm/src/main.rs b/wasm/src/main.rs index 9f36708..44f2459 100644 --- a/wasm/src/main.rs +++ b/wasm/src/main.rs @@ -1,4 +1,4 @@ -// use std::{any::Any, io::IoSlice, sync::Arc}; +// use std:: // use tokio::time::{Duration, Instant}; // use wasi_common::{ // file::{FdFlags, FileType}, @@ -28,15 +28,20 @@ async fn main() -> Result<(), anyhow::Error> { let mut receivers = Vec::new(); let runtime = Runtime::new()?; let instant = Instant::now(); - let mut futures = Vec::new(); - for _ in 0..10 { - let (mut instance, receiver) = Instance::new(&content, &runtime.environment).await.unwrap(); - receivers.push(receiver); - let fut = async move { - run_wasm(&mut instance).await.unwrap(); - }; - futures.push(fut); - } + // let mut futures = Vec::new(); + + let (mut instance, receiver) = Instance::new(&content, &runtime.environment).await.unwrap(); + receivers.push(receiver); + let config = crows_wasm::fetch_config(&mut instance).await.unwrap(); + println!("Config: {config:?}"); + // for _ in 0..10 { + // let (mut instance, receiver) = Instance::new(&content, &runtime.environment).await.unwrap(); + // receivers.push(receiver); + // let fut = async move { + // run_wasm(&mut instance).await.unwrap(); + // }; + // futures.push(fut); + // } tokio::spawn(async move { let mut futures = Vec::new(); @@ -52,9 +57,9 @@ async fn main() -> Result<(), anyhow::Error> { futures::future::join_all(futures).await; }); - futures::future::join_all(futures).await; + // futures::future::join_all(futures).await; - println!("elapsed: {}ms", instant.elapsed().as_millis()); + // println!("elapsed: {}ms", instant.elapsed().as_millis()); Ok(()) } diff --git a/worker/Cargo.toml b/worker/Cargo.toml index 856acac..1036c1c 100644 --- a/worker/Cargo.toml +++ b/worker/Cargo.toml @@ -6,11 +6,9 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -crows-utils = { path = "../utils" } byteorder = "1.4" -crows-service = { path = "../service" } -crows-wasm = { path = "../wasm" } num_cpus = "1.16" +tokio-timer = "0.2" uuid.workspace = true serde.workspace = true @@ -23,3 +21,7 @@ tokio-util.workspace = true futures.workspace = true wasmtime.workspace = true num-rational.workspace = true +crows-utils.workspace = true +crows-wasm.workspace = true +crows-service.workspace = true +crows-shared.workspace = true diff --git a/worker/src/main.rs b/worker/src/main.rs index fd21f95..1fe909e 100644 --- a/worker/src/main.rs +++ b/worker/src/main.rs @@ -1,19 +1,24 @@ #![feature(async_fn_in_trait)] #![feature(return_position_impl_trait_in_trait)] +use crows_shared::ConstantArrivalRateConfig; use crows_wasm::Runtime; use serde::{Deserialize, Serialize}; use std::sync::Arc; +use std::time::Instant; use std::{collections::HashMap, env::args_os, time::Duration}; -use tokio::sync::{RwLock, Mutex}; -use tokio::time::sleep; +use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; +use tokio::sync::{Mutex, RwLock}; +use tokio::time::{sleep, timeout}; +use tokio::prelude::*; use uuid::Uuid; -use crows_utils::services::{connect_to_worker_to_coordinator, Worker, WorkerData, WorkerError, RunId}; -use crows_utils::ModuleId; +use crows_utils::services::{ + connect_to_worker_to_coordinator, RunId, Worker, WorkerData, WorkerError, +}; use num_rational::Rational64; -type ScenariosList = Arc>>>; +type ScenariosList = Arc>>>; // TODO: in the future we should probably share it with the coordinator, ie. // coordinator should prepare the defaults based on the default module settings @@ -23,55 +28,69 @@ struct RunInfo { run_id: RunId, concurrency: usize, rate: Rational64, - module_id: ModuleId, + module_name: String, } impl RunInfo { - fn new(run_id: RunId, concurrency: usize, rate: Rational64, module_id: ModuleId) -> Self { - Self { run_id, concurrency, rate, module_id } + fn new(run_id: RunId, concurrency: usize, rate: Rational64, module_name: String) -> Self { + Self { + run_id, + concurrency, + rate, + module_name, + } } } +enum RuntimeMessage {} + #[derive(Clone)] -struct WorkerService<'a> { +struct WorkerService { scenarios: ScenariosList, hostname: String, - wasm_handles: Arc>>>, - runs: HashMap + wasm_actor: UnboundedSender, + runs: HashMap, + environment: crows_wasm::Environment, } -impl<'a> Worker for WorkerService<'a> { - async fn upload_scenario(&mut self, id: ModuleId, content: Vec) { - self.scenarios.write().await.insert(id, content); +impl Worker for WorkerService { + async fn upload_scenario(&mut self, name: String, content: Vec) { + self.scenarios.write().await.insert(name, content); } async fn ping(&self) -> String { todo!() } - async fn prepare(&mut self, id: ModuleId, concurrency: usize, rate: Rational64) -> Result { - let run_id: RunId = RunId::new(); - - // TODO: we should check if we have a given module available and if not ask coordinator - // to send it. For now let's assume we have the module id - let info = RunInfo::new(run_id.clone(), concurrency, rate, id); - self.runs.insert(run_id.clone(), info); - - Ok(run_id) - } - - async fn start(&self, id: ModuleId, concurrency: usize) -> Result<(), WorkerError> { - let locked = self - .scenarios - .read() - .await; + // async fn prepare( + // &mut self, + // id: ModuleId, + // concurrency: usize, + // rate: Rational64, + // ) -> Result { + // let run_id: RunId = RunId::new(); + // + // // TODO: we should check if we have a given module available and if not ask coordinator + // // to send it. For now let's assume we have the module id + // let info = RunInfo::new(run_id.clone(), concurrency, rate, id); + // self.runs.insert(run_id.clone(), info); + // + // Ok(run_id) + // } + + async fn start(&self, name: String, config: crows_shared::Config) -> Result<(), WorkerError> { + // PLAN + // either pass as an argument or fetch Executor::Config? + let locked = self.scenarios.read().await; let scenario = locked - .get(&id) + .get(&name) .ok_or(WorkerError::ScenarioNotFound)? .clone(); drop(locked); - // spawn modules + let (mut instance, _) = crows_wasm::Instance::new(&scenario, &self.environment).await.map_err(|err| WorkerError::CouldNotCreateModule)?; + + // spawn modules Ok(()) } @@ -84,80 +103,61 @@ impl<'a> Worker for WorkerService<'a> { } } -#[derive(Clone)] -struct RuntimeHandle<'a> { - runtime: Arc>>, -} - -impl<'a> RuntimeHandle<'a> { - pub fn new(runtime: Runtime<'a>) -> Self { - Self { runtime: Arc::new(Mutex::new(runtime)) } - } -} - -fn main() -> Result<(), Box> { +#[tokio::main] +async fn main() -> Result<(), Box> { // TODO: allow to set the number of CPUs let cpus = num_cpus::get(); - let coordinator_address: String = std::env::var("COORDINATOR_ADDRESS").unwrap(); + let coordinator_address: String = std::env::var("COORDINATOR_ADDRESS").unwrap_or("127.0.0.1:8181".into()); let hostname: String = std::env::var("WORKER_NAME").unwrap(); println!("Starting with hostname: {hostname}"); - let handles: Arc>> = Default::default(); + // let handles: Vec = Default::default(); let scenarios: ScenariosList = Default::default(); + let (wasm_sender, wasm_receiver) = unbounded_channel(); + let service = WorkerService { scenarios: scenarios.clone(), hostname, - wasm_handles: handles.clone(), - runs: Default::default() + wasm_actor: wasm_sender, + runs: Default::default(), + environment: crows_wasm::Environment::new().unwrap() }; - std::thread::scope(|s| { - let mut threads = Vec::new(); - - let thread = s.spawn(move || { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - - rt.spawn(async move { - println!("Connecting to {coordinator_address}"); - let mut client = connect_to_worker_to_coordinator(coordinator_address, service) - .await - .unwrap(); - - loop { - // TODO: pinging should also work as an indicator of connection being alive - client.ping(); - sleep(Duration::from_secs(1)); - } - }); - }); - - threads.push(thread); - - for _ in (0..cpus).into_iter() { - let scenarios = scenarios.clone(); - let handles = handles.clone(); - let thread = s.spawn(move || { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - let wasm_runtime = RuntimeHandle::new(Runtime::new().expect("Couldn't create a Runtime")); - - rt.spawn(async move { - handles.lock().await.push(wasm_runtime.clone()); - }); - }); - threads.push(thread); - } + println!("Connecting to {coordinator_address}"); + let mut client = connect_to_worker_to_coordinator(coordinator_address, service) + .await + .unwrap(); - for thread in threads { - thread.join(); - } - }); + loop { + // TODO: pinging should also work as an indicator of connection being alive + client.ping(); + sleep(Duration::from_secs(1)); + } +} + + +trait Executor<'a> { + async fn run(&mut self, runtime: Runtime<'a>, config: ConstantArrivalRateConfig) -> anyhow::Result<()>; +} - Ok(()) +struct ConstantArrivalRateExecutor { + config: ConstantArrivalRateConfig +} + +impl<'a> Executor<'a> for ConstantArrivalRateExecutor { + async fn run(&mut self, runtime: Runtime<'a>, config: ConstantArrivalRateConfig) -> anyhow::Result<()> { + println!("Start"); + let instant = Instant::now(); + let future = async move { + loop { + println!("elapsed: {}ms", instant.elapsed().as_millis()); + tokio::time::sleep(Duration::from_secs(1)); + } + }; + + tokio::timer::Timeout::new(future, config.duration).await; + + Ok(()) + } }