diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 0b1d4ea..90bb9c1 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -8,11 +8,13 @@ on: # Source files in each member - "compute/src/**" - "p2p/src/**" + - "monitor/src/**" - "workflows/src/**" # Cargo in each member - "compute/Cargo.toml" - "p2p/Cargo.toml" - "workflows/Cargo.toml" + - "monitor/Cargo.toml" # root-level Cargo - "Cargo.lock" # workflow itself diff --git a/Cargo.lock b/Cargo.lock index 69fb50a..dee226b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -161,6 +161,18 @@ name = "arrayvec" version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +dependencies = [ + "serde", +] + +[[package]] +name = "ash" +version = "0.38.0+1.3.281" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bb44936d800fea8f016d7f2311c6a4f97aebd5dc86f09906139ec848cf3a46f" +dependencies = [ + "libloading", +] [[package]] name = "asn1-rs" @@ -374,7 +386,16 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0700ddab506f33b20a03b13996eccd309a48e5ff77d0d95926aa0210fb4e95f1" dependencies = [ - "bit-vec", + "bit-vec 0.6.3", +] + +[[package]] +name = "bit-set" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08807e080ed7f9d5433fa9b275196cfc35414f66a0c79d864dc51a0d825231a3" +dependencies = [ + "bit-vec 0.8.0", ] [[package]] @@ -383,6 +404,12 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb" +[[package]] +name = "bit-vec" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e764a1d40d510daf35e07be9eb06e75770908c27d411ee6c92109c9840eaaf7" + [[package]] name = "bitflags" version = "1.3.2" @@ -404,6 +431,12 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "block" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d8c1fef690941d3e7788d328517591fecc684c084084702d6ff1641e993699a" + [[package]] name = "block-buffer" version = "0.9.0" @@ -437,6 +470,12 @@ version = "3.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" +[[package]] +name = "bytemuck" +version = "1.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef657dfab802224e671f5818e9a4935f9b1957ed18e58292690cc39e7a4092a3" + [[package]] name = "byteorder" version = "1.5.0" @@ -464,6 +503,15 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b02b629252fe8ef6460461409564e2c21d0c8e77e0944f3d189ff06c4e932ad" +[[package]] +name = "cbor4ii" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "472931dd4dfcc785075b09be910147f9c6258883fc4591d0dac6116392b2daa6" +dependencies = [ + "serde", +] + [[package]] name = "cc" version = "1.2.4" @@ -485,6 +533,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cfg_aliases" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" + [[package]] name = "cfg_aliases" version = "0.2.1" @@ -540,6 +594,16 @@ dependencies = [ "zeroize", ] +[[package]] +name = "codespan-reporting" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3538270d33cc669650c4b093848450d380def10c331d38c768e34cac80576e6e" +dependencies = [ + "termcolor", + "unicode-width", +] + [[package]] name = "colorchoice" version = "1.0.3" @@ -593,6 +657,17 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "core-graphics-types" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45390e6114f68f718cc7a830514a96f903cccd70d02a8f6d9f643ac4ba45afaf" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "libc", +] + [[package]] name = "core2" version = "0.4.0" @@ -940,6 +1015,15 @@ dependencies = [ "subtle", ] +[[package]] +name = "directories" +version = "5.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a49173b84e034382284f27f1af4dcbbd231ffa358c0fe316541a7337f376a35" +dependencies = [ + "dirs-sys", +] + [[package]] name = "dirs-next" version = "2.0.0" @@ -950,6 +1034,18 @@ dependencies = [ "dirs-sys-next", ] +[[package]] +name = "dirs-sys" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c" +dependencies = [ + "libc", + "option-ext", + "redox_users", + "windows-sys 0.48.0", +] + [[package]] name = "dirs-sys-next" version = "0.1.2" @@ -974,7 +1070,7 @@ dependencies = [ [[package]] name = "dkn-compute" -version = "0.2.31" +version = "0.2.32" dependencies = [ "async-trait", "base64 0.22.1", @@ -992,22 +1088,25 @@ dependencies = [ "log", "openssl", "port_check", + "public-ip-address", "rand 0.8.5", "reqwest 0.12.9", "serde", "serde_json", "sha2 0.10.8", "sha3", + "sysinfo 0.33.1", "tokio 1.42.0", "tokio-util 0.7.13", "url", "urlencoding", "uuid", + "wgpu", ] [[package]] name = "dkn-monitor" -version = "0.2.31" +version = "0.2.32" dependencies = [ "async-trait", "dkn-compute", @@ -1027,7 +1126,7 @@ dependencies = [ [[package]] name = "dkn-p2p" -version = "0.2.31" +version = "0.2.32" dependencies = [ "dkn-utils", "env_logger 0.11.5", @@ -1035,17 +1134,19 @@ dependencies = [ "libp2p", "libp2p-identity", "log", + "serde", + "serde_json", "tokio 1.42.0", "tokio-util 0.7.13", ] [[package]] name = "dkn-utils" -version = "0.2.31" +version = "0.2.32" [[package]] name = "dkn-workflows" -version = "0.2.31" +version = "0.2.32" dependencies = [ "dkn-utils", "dotenvy", @@ -1058,11 +1159,20 @@ dependencies = [ "reqwest 0.12.9", "serde", "serde_json", - "sysinfo", + "sysinfo 0.32.1", "tokio 1.42.0", "tokio-util 0.7.13", ] +[[package]] +name = "document-features" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb6969eaabd2421f8a2775cfd2471a2b634372b4a25d41e3bd647b79912850a0" +dependencies = [ + "litrs", +] + [[package]] name = "dotenv" version = "0.15.0" @@ -1316,7 +1426,28 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" dependencies = [ - "foreign-types-shared", + "foreign-types-shared 0.1.1", +] + +[[package]] +name = "foreign-types" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d737d9aa519fb7b749cbc3b962edcf310a8dd1f4b67c91c4f83975dbdd17d965" +dependencies = [ + "foreign-types-macros", + "foreign-types-shared 0.3.1", +] + +[[package]] +name = "foreign-types-macros" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a5c6c585bc94aaf2c7b51dd4c2ba22680844aba4c687be581871a6f518c5742" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", ] [[package]] @@ -1325,6 +1456,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" +[[package]] +name = "foreign-types-shared" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa9a19cbb55df58761df49b23516a86d432839add4af60fc256da840f66ed35b" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -1585,6 +1722,89 @@ version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" +[[package]] +name = "gl_generator" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a95dfc23a2b4a9a2f5ab41d194f8bfda3cabec42af4e39f08c339eb2a0c124d" +dependencies = [ + "khronos_api", + "log", + "xml-rs", +] + +[[package]] +name = "glow" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d51fa363f025f5c111e03f13eda21162faeacb6911fe8caa0c0349f9cf0c4483" +dependencies = [ + "js-sys", + "slotmap", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "glutin_wgl_sys" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a4e1951bbd9434a81aa496fe59ccc2235af3820d27b85f9314e279609211e2c" +dependencies = [ + "gl_generator", +] + +[[package]] +name = "gpu-alloc" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbcd2dba93594b227a1f57ee09b8b9da8892c34d55aa332e034a228d0fe6a171" +dependencies = [ + "bitflags 2.6.0", + "gpu-alloc-types", +] + +[[package]] +name = "gpu-alloc-types" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98ff03b468aa837d70984d55f5d3f846f6ec31fe34bbb97c4f85219caeee1ca4" +dependencies = [ + "bitflags 2.6.0", +] + +[[package]] +name = "gpu-allocator" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c151a2a5ef800297b4e79efa4f4bec035c5f51d5ae587287c9b952bdf734cacd" +dependencies = [ + "log", + "presser", + "thiserror 1.0.69", + "windows 0.53.0", +] + +[[package]] +name = "gpu-descriptor" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcf29e94d6d243368b7a56caa16bc213e4f9f8ed38c4d9557069527b5d5281ca" +dependencies = [ + "bitflags 2.6.0", + "gpu-descriptor-types", + "hashbrown 0.15.2", +] + +[[package]] +name = "gpu-descriptor-types" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdf242682df893b86f33a73828fb09ca4b2d3bb6cc95249707fc684d27484b91" +dependencies = [ + "bitflags 2.6.0", +] + [[package]] name = "h2" version = "0.2.7" @@ -1711,6 +1931,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b07f60793ff0a4d9cef0f18e63b5357e06209987153a64648c972c1e5aff336f" +[[package]] +name = "hexf-parse" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfa686283ad6dd069f105e5ab091b04c62850d3e4cf5d67debad1933f55023df" + [[package]] name = "hickory-proto" version = "0.24.2" @@ -2401,6 +2627,12 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" +[[package]] +name = "jni-sys" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" + [[package]] name = "js-sys" version = "0.3.76" @@ -2430,6 +2662,23 @@ dependencies = [ "winapi-build", ] +[[package]] +name = "khronos-egl" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6aae1df220ece3c0ada96b8153459b67eebe9ae9212258bb0134ae60416fdf76" +dependencies = [ + "libc", + "libloading", + "pkg-config", +] + +[[package]] +name = "khronos_api" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2db585e1d738fc771bf08a151420d3ed193d9d895a36df7f6f8a9456b911ddc" + [[package]] name = "lazy_static" version = "1.5.0" @@ -2442,6 +2691,16 @@ version = "0.2.168" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5aaeb2981e0606ca11d79718f8bb01164f1d6ed75080182d3abf017e6d244b6d" +[[package]] +name = "libloading" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" +dependencies = [ + "cfg-if 1.0.0", + "windows-targets 0.52.6", +] + [[package]] name = "libp2p" version = "0.54.1" @@ -2468,6 +2727,7 @@ dependencies = [ "libp2p-ping", "libp2p-quic", "libp2p-relay", + "libp2p-request-response", "libp2p-swarm", "libp2p-tcp", "libp2p-upnp", @@ -2826,6 +3086,7 @@ version = "0.27.0" source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=7ce9f9e#7ce9f9e65ddbe1fdac3913f0f3c1d94edc1de25e" dependencies = [ "async-trait", + "cbor4ii", "futures", "futures-bounded", "futures-timer", @@ -2833,6 +3094,7 @@ dependencies = [ "libp2p-identity", "libp2p-swarm", "rand 0.8.5", + "serde", "smallvec", "tracing", "void", @@ -3012,6 +3274,12 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ee93343901ab17bd981295f2cf0026d4ad018c7c31ba84549a4ddbb47a45104" +[[package]] +name = "litrs" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4ce301924b7887e9d637144fdade93f9dfff9b60981d4ac161db09720d39aa5" + [[package]] name = "lock_api" version = "0.4.12" @@ -3058,6 +3326,15 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4" +[[package]] +name = "malloc_buf" +version = "0.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62bb907fe88d54d8d9ce32a3cceab4218ed2f6b7d35617cafe9adf84e43919cb" +dependencies = [ + "libc", +] + [[package]] name = "markup5ever" version = "0.10.1" @@ -3110,12 +3387,38 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" +[[package]] +name = "maybe-async" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cf92c10c7e361d6b99666ec1c6f9805b0bea2c3bd8c78dc6fe98ac5bd78db11" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "memchr" version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +[[package]] +name = "metal" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ecfd3296f8c56b7c1f6fbac3c71cefa9d78ce009850c45000015f206dc7fa21" +dependencies = [ + "bitflags 2.6.0", + "block", + "core-graphics-types", + "foreign-types 0.5.0", + "log", + "objc", + "paste", +] + [[package]] name = "mime" version = "0.3.17" @@ -3242,6 +3545,27 @@ dependencies = [ "unsigned-varint", ] +[[package]] +name = "naga" +version = "23.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "364f94bc34f61332abebe8cad6f6cd82a5b65cff22c828d05d0968911462ca4f" +dependencies = [ + "arrayvec", + "bit-set 0.8.0", + "bitflags 2.6.0", + "cfg_aliases 0.1.1", + "codespan-reporting", + "hexf-parse", + "indexmap 2.7.0", + "log", + "rustc-hash 1.1.0", + "spirv", + "termcolor", + "thiserror 1.0.69", + "unicode-xid", +] + [[package]] name = "native-tls" version = "0.2.12" @@ -3259,6 +3583,15 @@ dependencies = [ "tempfile", ] +[[package]] +name = "ndk-sys" +version = "0.5.0+25.2.9519653" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c196769dd60fd4f363e11d948139556a344e79d451aeb2fa2fd040738ef7691" +dependencies = [ + "jni-sys", +] + [[package]] name = "net2" version = "0.2.39" @@ -3427,6 +3760,15 @@ dependencies = [ "libc", ] +[[package]] +name = "objc" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "915b1b472bc21c53464d6c8461c9d3af805ba1ef837e1cac254428f4a77177b1" +dependencies = [ + "malloc_buf", +] + [[package]] name = "object" version = "0.36.5" @@ -3526,7 +3868,7 @@ checksum = "6174bc48f102d208783c2c84bf931bb75927a617866870de8a4ea85597f871f5" dependencies = [ "bitflags 2.6.0", "cfg-if 1.0.0", - "foreign-types", + "foreign-types 0.3.2", "libc", "once_cell", "openssl-macros", @@ -3572,6 +3914,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "option-ext" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" + [[package]] name = "parking" version = "2.2.1" @@ -3886,6 +4234,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c" +[[package]] +name = "presser" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8cf8e6a8aa66ce33f63993ffc4ea4271eb5b0530a9002db8455ea6050c77bfa" + [[package]] name = "pretty_env_logger" version = "0.5.0" @@ -3925,6 +4279,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "profiling" +version = "1.0.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afbdc74edc00b6f6a218ca6a5364d6226a259d4b8ea1af4a0ea063f27e179f4d" + [[package]] name = "prometheus-client" version = "0.22.3" @@ -3948,6 +4308,21 @@ dependencies = [ "syn 2.0.90", ] +[[package]] +name = "public-ip-address" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "761cf3bcffbc326e841fcbaf0849759dc2e30876b89c454e0991f20ceca40f4c" +dependencies = [ + "directories", + "log", + "maybe-async", + "reqwest 0.12.9", + "serde", + "serde_json", + "thiserror 1.0.69", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -3986,7 +4361,7 @@ dependencies = [ "pin-project-lite 0.2.15", "quinn-proto", "quinn-udp", - "rustc-hash", + "rustc-hash 2.1.0", "rustls", "socket2 0.5.8", "thiserror 2.0.8", @@ -4004,7 +4379,7 @@ dependencies = [ "getrandom 0.2.15", "rand 0.8.5", "ring 0.17.8", - "rustc-hash", + "rustc-hash 2.1.0", "rustls", "rustls-pki-types", "slab", @@ -4020,7 +4395,7 @@ version = "0.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c40286217b4ba3a71d644d752e6a0b71f13f1b6a2c5311acfcbe0c2418ed904" dependencies = [ - "cfg_aliases", + "cfg_aliases 0.2.1", "libc", "once_cell", "socket2 0.5.8", @@ -4118,6 +4493,18 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "range-alloc" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8a99fddc9f0ba0a85884b8d14e3592853e787d581ca1816c91349b10e4eeab" + +[[package]] +name = "raw-window-handle" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20675572f6f24e9e76ef639bc5552774ed45f1c30e2951e1e99c59888861c539" + [[package]] name = "rayon" version = "1.10.0" @@ -4199,6 +4586,12 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" +[[package]] +name = "renderdoc-sys" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19b30a45b0cd0bcca8037f3d0dc3421eaf95327a17cad11964fb8179b4fc4832" + [[package]] name = "reqwest" version = "0.10.10" @@ -4366,6 +4759,12 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc-hash" version = "2.1.0" @@ -4567,7 +4966,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ee061f90afcc8678bef7a78d0d121683f0ba753f740ff7005f833ec445876b7" dependencies = [ - "bit-set", + "bit-set 0.5.3", "html5ever 0.25.2", "markup5ever_rcdom", ] @@ -4775,6 +5174,15 @@ dependencies = [ "autocfg", ] +[[package]] +name = "slotmap" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbff4acf519f630b3a3ddcfaea6c06b42174d9a44bc70c620e9ed1649d58b82a" +dependencies = [ + "version_check", +] + [[package]] name = "smallvec" version = "1.13.2" @@ -4831,6 +5239,15 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "spirv" +version = "0.3.0+sdk-1.3.268.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eda41003dc44290527a59b13432d4a0379379fa074b70174882adfbdfd917844" +dependencies = [ + "bitflags 2.6.0", +] + [[package]] name = "spki" version = "0.7.3" @@ -4969,6 +5386,20 @@ dependencies = [ "windows 0.57.0", ] +[[package]] +name = "sysinfo" +version = "0.33.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fc858248ea01b66f19d8e8a6d55f41deaf91e9d495246fd01368d99935c6c01" +dependencies = [ + "core-foundation-sys", + "libc", + "memchr", + "ntapi", + "rayon", + "windows 0.57.0", +] + [[package]] name = "system-configuration" version = "0.6.1" @@ -5365,6 +5796,12 @@ version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0336d538f7abc86d282a4189614dfaa90810dfc2c6f6427eaf88e16311dd225d" +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + [[package]] name = "universal-hash" version = "0.5.1" @@ -5599,6 +6036,115 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "wgpu" +version = "23.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80f70000db37c469ea9d67defdc13024ddf9a5f1b89cb2941b812ad7cde1735a" +dependencies = [ + "arrayvec", + "cfg_aliases 0.1.1", + "document-features", + "js-sys", + "log", + "naga", + "parking_lot", + "profiling", + "raw-window-handle", + "serde", + "smallvec", + "static_assertions", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "wgpu-core", + "wgpu-hal", + "wgpu-types", +] + +[[package]] +name = "wgpu-core" +version = "23.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d63c3c478de8e7e01786479919c8769f62a22eec16788d8c2ac77ce2c132778a" +dependencies = [ + "arrayvec", + "bit-vec 0.8.0", + "bitflags 2.6.0", + "cfg_aliases 0.1.1", + "document-features", + "indexmap 2.7.0", + "log", + "naga", + "once_cell", + "parking_lot", + "profiling", + "raw-window-handle", + "rustc-hash 1.1.0", + "serde", + "smallvec", + "thiserror 1.0.69", + "wgpu-hal", + "wgpu-types", +] + +[[package]] +name = "wgpu-hal" +version = "23.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89364b8a0b211adc7b16aeaf1bd5ad4a919c1154b44c9ce27838213ba05fd821" +dependencies = [ + "android_system_properties", + "arrayvec", + "ash", + "bit-set 0.8.0", + "bitflags 2.6.0", + "block", + "bytemuck", + "cfg_aliases 0.1.1", + "core-graphics-types", + "glow", + "glutin_wgl_sys", + "gpu-alloc", + "gpu-allocator", + "gpu-descriptor", + "js-sys", + "khronos-egl", + "libc", + "libloading", + "log", + "metal", + "naga", + "ndk-sys", + "objc", + "once_cell", + "parking_lot", + "profiling", + "range-alloc", + "raw-window-handle", + "renderdoc-sys", + "rustc-hash 1.1.0", + "smallvec", + "thiserror 1.0.69", + "wasm-bindgen", + "web-sys", + "wgpu-types", + "windows 0.58.0", + "windows-core 0.58.0", +] + +[[package]] +name = "wgpu-types" +version = "23.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "610f6ff27778148c31093f3b03abc4840f9636d58d597ca2f5977433acfe0068" +dependencies = [ + "bitflags 2.6.0", + "js-sys", + "serde", + "web-sys", +] + [[package]] name = "widestring" version = "1.1.0" @@ -5668,6 +6214,16 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd04d41d93c4992d421894c18c8b43496aa748dd4c081bac0dc93eb0489272b6" +dependencies = [ + "windows-core 0.58.0", + "windows-targets 0.52.6", +] + [[package]] name = "windows-core" version = "0.52.0" @@ -5693,12 +6249,25 @@ version = "0.57.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2ed2439a290666cd67ecce2b0ffaad89c2a56b976b736e6ece670297897832d" dependencies = [ - "windows-implement", - "windows-interface", + "windows-implement 0.57.0", + "windows-interface 0.57.0", "windows-result 0.1.2", "windows-targets 0.52.6", ] +[[package]] +name = "windows-core" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba6d44ec8c2591c134257ce647b7ea6b20335bf6379a27dac5f1641fcf59f99" +dependencies = [ + "windows-implement 0.58.0", + "windows-interface 0.58.0", + "windows-result 0.2.0", + "windows-strings", + "windows-targets 0.52.6", +] + [[package]] name = "windows-implement" version = "0.57.0" @@ -5710,6 +6279,17 @@ dependencies = [ "syn 2.0.90", ] +[[package]] +name = "windows-implement" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bbd5b46c938e506ecbce286b6628a02171d56153ba733b6c741fc627ec9579b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "windows-interface" version = "0.57.0" @@ -5721,6 +6301,17 @@ dependencies = [ "syn 2.0.90", ] +[[package]] +name = "windows-interface" +version = "0.58.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "053c4c462dc91d3b1504c6fe5a726dd15e216ba718e84a0e46a88fbe5ded3515" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "windows-registry" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 2d986d4..ba3c621 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ default-members = ["compute"] [workspace.package] edition = "2021" -version = "0.2.31" +version = "0.2.32" license = "Apache-2.0" readme = "README.md" diff --git a/compute/Cargo.toml b/compute/Cargo.toml index 14cbfd2..08c4e73 100644 --- a/compute/Cargo.toml +++ b/compute/Cargo.toml @@ -42,11 +42,20 @@ sha2 = "0.10.8" sha3 = "0.10.8" fastbloom-rs = "0.5.9" +# machine diagnostics +# system info +sysinfo = "0.33.1" +# gpu info +wgpu = { version = "23.0.1", features = ["serde"] } +# public ip +public-ip-address = "0.3.2" + # dria subcrates dkn-p2p = { path = "../p2p" } dkn-utils = { path = "../utils" } dkn-workflows = { path = "../workflows" } + # vendor OpenSSL so that its easier to build cross-platform packages [dependencies.openssl] version = "*" diff --git a/compute/src/config.rs b/compute/src/config.rs index 23fa8c0..a178f77 100644 --- a/compute/src/config.rs +++ b/compute/src/config.rs @@ -1,16 +1,16 @@ -use crate::utils::{ - address_in_use, - crypto::{secret_to_keypair, to_address}, -}; use dkn_p2p::{libp2p::Multiaddr, DriaNetworkType}; use dkn_workflows::DriaWorkflowsConfig; use eyre::{eyre, Result}; use libsecp256k1::{PublicKey, SecretKey}; - use std::{env, str::FromStr}; -// TODO: make this configurable later +use crate::utils::{ + address_in_use, + crypto::{secret_to_keypair, to_address}, +}; + const DEFAULT_WORKFLOW_BATCH_SIZE: usize = 5; +const DEFAULT_P2P_LISTEN_ADDR: &str = "/ip4/0.0.0.0/tcp/4001"; #[derive(Debug, Clone)] pub struct DriaComputeNodeConfig { @@ -35,9 +35,6 @@ pub struct DriaComputeNodeConfig { pub batch_size: usize, } -/// The default P2P network listen address. -pub(crate) const DEFAULT_P2P_LISTEN_ADDR: &str = "/ip4/0.0.0.0/tcp/4001"; - #[allow(clippy::new_without_default)] impl DriaComputeNodeConfig { /// Creates new config from environment variables. diff --git a/compute/src/handlers/pingpong.rs b/compute/src/handlers/pingpong.rs index 1326c37..34cbf09 100644 --- a/compute/src/handlers/pingpong.rs +++ b/compute/src/handlers/pingpong.rs @@ -1,9 +1,11 @@ -use crate::{utils::DriaMessage, DriaComputeNode}; use dkn_p2p::libp2p::gossipsub::MessageAcceptance; use dkn_utils::get_current_time_nanos; use dkn_workflows::{Model, ModelProvider}; use eyre::{Context, Result}; use serde::{Deserialize, Serialize}; +use tokio::time::Instant; + +use crate::{utils::DriaMessage, DriaComputeNode}; pub struct PingpongHandler; @@ -60,6 +62,10 @@ impl PingpongHandler { return Ok(MessageAcceptance::Ignore); } + log::info!("Received a ping for: {}", pingpong.uuid); + // record ping moment + node.last_pinged_at = Instant::now(); + // respond let response_body = PingpongResponse { uuid: pingpong.uuid.clone(), diff --git a/compute/src/handlers/workflow.rs b/compute/src/handlers/workflow.rs index ae651e9..e13810c 100644 --- a/compute/src/handlers/workflow.rs +++ b/compute/src/handlers/workflow.rs @@ -55,12 +55,14 @@ impl WorkflowHandler { // check task inclusion via the bloom filter if !task.filter.contains(&node.config.address)? { - log::info!("Task {} ignored due to filter.", task.task_id); + log::debug!("Task {} ignored due to filter.", task.task_id); // accept the message, someone else may be included in filter return Ok(Either::Left(MessageAcceptance::Accept)); } + log::info!("Received a task with id: {}", task.task_id); + // obtain public key from the payload // do this early to avoid unnecessary processing let task_public_key_bytes = diff --git a/compute/src/lib.rs b/compute/src/lib.rs index c399688..51d5161 100644 --- a/compute/src/lib.rs +++ b/compute/src/lib.rs @@ -1,5 +1,11 @@ pub mod config; + +/// Gossipsub message handlers. pub mod handlers; + +// Request-response handlers. +pub mod responders; + pub mod node; pub mod payloads; pub mod utils; diff --git a/compute/src/node.rs b/compute/src/node.rs index 872e5ad..58dc23c 100644 --- a/compute/src/node.rs +++ b/compute/src/node.rs @@ -1,19 +1,24 @@ use dkn_p2p::{ libp2p::{ gossipsub::{Message, MessageAcceptance, MessageId}, + request_response::ResponseChannel, PeerId, }, DriaNodes, DriaP2PClient, DriaP2PCommander, DriaP2PProtocol, }; use eyre::Result; use std::collections::HashSet; -use tokio::{sync::mpsc, time::Duration}; +use tokio::{ + sync::mpsc, + time::{Duration, Instant}, +}; use tokio_util::{either::Either, sync::CancellationToken}; use crate::{ config::*, handlers::*, - utils::{crypto::secret_to_keypair, refresh_dria_nodes, DriaMessage}, + responders::{IsResponder, SpecResponder, WorkflowResponder}, + utils::{crypto::secret_to_keypair, refresh_dria_nodes, DriaMessage, SpecCollector}, workers::workflow::{WorkflowsWorker, WorkflowsWorkerInput, WorkflowsWorkerOutput}, DRIA_COMPUTE_NODE_VERSION, }; @@ -22,6 +27,8 @@ use crate::{ const DIAGNOSTIC_REFRESH_INTERVAL_SECS: u64 = 30; /// Number of seconds between refreshing the available nodes. const AVAILABLE_NODES_REFRESH_INTERVAL_SECS: u64 = 30 * 60; // 30 minutes +/// Number of seconds such that if the last ping is older than this, the node is considered unreachable. +const PING_LIVENESS_SECS: u64 = 150; /// Buffer size for message publishes. const PUBLISH_CHANNEL_BUFSIZE: usize = 1024; @@ -31,8 +38,13 @@ pub struct DriaComputeNode { pub dria_nodes: DriaNodes, /// Peer-to-peer client commander to interact with the network. pub p2p: DriaP2PCommander, + /// The last time the node was pinged by the network. + /// If this is too much, we can say that the node is not reachable by RPC. + pub last_pinged_at: Instant, /// Gossipsub message receiver, used by peer-to-peer client in a separate thread. message_rx: mpsc::Receiver<(PeerId, MessageId, Message)>, + /// Request-response request receiver. + request_rx: mpsc::Receiver<(PeerId, Vec, ResponseChannel>)>, /// Publish receiver to receive messages to be published, publish_rx: mpsc::Receiver, /// Workflow transmitter to send batchable tasks. @@ -47,6 +59,8 @@ pub struct DriaComputeNode { completed_tasks_single: usize, /// Completed batch tasks count completed_tasks_batch: usize, + /// Spec collector for the node. + spec_collector: SpecCollector, } impl DriaComputeNode { @@ -78,7 +92,7 @@ impl DriaComputeNode { log::info!("Using identity: {}", protocol); // create p2p client - let (p2p_client, p2p_commander, message_rx) = DriaP2PClient::new( + let (p2p_client, p2p_commander, message_rx, request_rx) = DriaP2PClient::new( keypair, config.p2p_listen_addr.clone(), &available_nodes, @@ -111,14 +125,17 @@ impl DriaComputeNode { config, p2p: p2p_commander, dria_nodes: available_nodes, - message_rx, publish_rx, + message_rx, + request_rx, workflow_batch_tx, workflow_single_tx, pending_tasks_single: HashSet::new(), pending_tasks_batch: HashSet::new(), completed_tasks_single: 0, completed_tasks_batch: 0, + spec_collector: SpecCollector::new(), + last_pinged_at: Instant::now(), }, p2p_client, workflows_batch_worker, @@ -127,6 +144,9 @@ impl DriaComputeNode { } /// Subscribe to a certain task with its topic. + /// + /// These are likely to be called once, so can be inlined. + #[inline] pub async fn subscribe(&mut self, topic: &str) -> Result<()> { let ok = self.p2p.subscribe(topic).await?; if ok { @@ -138,6 +158,9 @@ impl DriaComputeNode { } /// Unsubscribe from a certain task with its topic. + /// + /// These are likely to be called once, so can be inlined. + #[inline] pub async fn unsubscribe(&mut self, topic: &str) -> Result<()> { let ok = self.p2p.unsubscribe(topic).await?; if ok { @@ -149,6 +172,7 @@ impl DriaComputeNode { } /// Returns the task count within the channels, `single` and `batch`. + #[inline] pub fn get_pending_task_count(&self) -> [usize; 2] { [ self.pending_tasks_single.len(), @@ -194,14 +218,6 @@ impl DriaComputeNode { return MessageAcceptance::Ignore; }; - // log the received message - log::info!( - "Received {} message ({}) from {}", - gossipsub_message.topic, - message_id, - peer_id, - ); - // ensure that message is from the known RPCs if !self.dria_nodes.rpc_peerids.contains(&source_peer_id) { log::warn!( @@ -227,6 +243,15 @@ impl DriaComputeNode { } }; + // debug-log the received message + log::debug!( + "Received {} message ({}) from {}\n{}", + gossipsub_message.topic, + message_id, + peer_id, + message + ); + // check signature match message.is_signed(&self.config.admin_public_key) { Ok(true) => { /* message is signed correctly, nothing to do here */ } @@ -240,8 +265,6 @@ impl DriaComputeNode { } } - log::debug!("Parsed: {}", message); - // handle the DKN message with respect to the topic let handler_result = match message.topic.as_str() { WorkflowHandler::LISTEN_TOPIC => { @@ -312,13 +335,57 @@ impl DriaComputeNode { } } + /// Handles a request-response request received from the network. + /// + /// Internally, the data is expected to be some JSON serialized data that is expected to be parsed and handled. + async fn handle_request( + &mut self, + (peer_id, data, channel): (PeerId, Vec, ResponseChannel>), + ) -> Result<()> { + // ensure that message is from the known RPCs + if !self.dria_nodes.rpc_peerids.contains(&peer_id) { + log::warn!("Received request from unauthorized source: {}", peer_id); + log::debug!("Allowed sources: {:#?}", self.dria_nodes.rpc_peerids); + return Err(eyre::eyre!( + "Received unauthorized request from {}", + peer_id + )); + } + + // respond w.r.t data + let response_data = if let Ok(req) = SpecResponder::try_parse_request(&data) { + log::info!( + "Got a spec request from peer {} with id {}", + peer_id, + req.request_id + ); + + let response = SpecResponder::respond(req, self.spec_collector.collect().await); + serde_json::to_vec(&response)? + } else if let Ok(req) = WorkflowResponder::try_parse_request(&data) { + log::info!("Received a task request with id: {}", req.task_id); + return Err(eyre::eyre!( + "REQUEST RESPONSE FOR TASKS ARE NOT IMPLEMENTED YET" + )); + } else { + return Err(eyre::eyre!( + "Received unknown request from {}: {:?}", + peer_id, + data, + )); + }; + + log::info!("Responding to peer {}", peer_id); + self.p2p.respond(response_data, channel).await + } + /// Runs the main loop of the compute node. /// This method is not expected to return until cancellation occurs for the given token. pub async fn run(&mut self, cancellation: CancellationToken) -> Result<()> { // prepare durations for sleeps - let mut peer_refresh_interval = + let mut diagnostic_refresh_interval = tokio::time::interval(Duration::from_secs(DIAGNOSTIC_REFRESH_INTERVAL_SECS)); - peer_refresh_interval.tick().await; // move one tick + diagnostic_refresh_interval.tick().await; // move one tick let mut available_node_refresh_interval = tokio::time::interval(Duration::from_secs(AVAILABLE_NODES_REFRESH_INTERVAL_SECS)); available_node_refresh_interval.tick().await; // move one tick @@ -331,8 +398,6 @@ impl DriaComputeNode { loop { tokio::select! { - // prioritize the branches in the order below - biased; // a Workflow message to be published is received from the channel // this is expected to be sent by the workflow worker @@ -359,7 +424,7 @@ impl DriaComputeNode { }, // check peer count every now and then - _ = peer_refresh_interval.tick() => self.handle_diagnostic_refresh().await, + _ = diagnostic_refresh_interval.tick() => self.handle_diagnostic_refresh().await, // available nodes are refreshed every now and then _ = available_node_refresh_interval.tick() => self.handle_available_nodes_refresh().await, // a GossipSub message is received from the channel @@ -375,7 +440,19 @@ impl DriaComputeNode { log::error!("Error validating message {}: {:?}", message_id, e); } } else { - log::error!("Message channel closed unexpectedly."); + log::error!("message_rx channel closed unexpectedly."); + break; + }; + }, + // a Response message is received from the channel + // this is expected to be sent by the p2p client + request_msg_opt = self.request_rx.recv() => { + if let Some((peer_id, data, channel)) = request_msg_opt { + if let Err(e) = self.handle_request((peer_id, data, channel)).await { + log::error!("Error handling request: {:?}", e); + } + } else { + log::error!("request_rx channel closed unexpectedly."); break; }; }, @@ -417,6 +494,7 @@ impl DriaComputeNode { /// Peer refresh simply reports the peer count to the user. async fn handle_diagnostic_refresh(&self) { let mut diagnostics = Vec::new(); + // print peer counts match self.p2p.peer_counts().await { Ok((mesh, all)) => { @@ -444,6 +522,13 @@ impl DriaComputeNode { diagnostics.push(format!("Version: v{}", DRIA_COMPUTE_NODE_VERSION)); log::info!("{}", diagnostics.join(" | ")); + + if self.last_pinged_at < Instant::now() - Duration::from_secs(PING_LIVENESS_SECS) { + log::error!( + "Node has not received any pings for at least {} seconds & it may be unreachable!\nPlease restart your node!", + PING_LIVENESS_SECS + ); + } } /// Updates the local list of available nodes by refreshing it. diff --git a/compute/src/responders/mod.rs b/compute/src/responders/mod.rs new file mode 100644 index 0000000..a398cda --- /dev/null +++ b/compute/src/responders/mod.rs @@ -0,0 +1,20 @@ +use eyre::Context; +use serde::{de::DeserializeOwned, Serialize}; + +mod specs; +pub use specs::SpecResponder; + +mod workflow; +pub use workflow::WorkflowResponder; + +/// A responder should implement a request & response type, both serializable. +/// +/// The `try_parse_request` is automatically implemented using `serde-json` for a byte slice. +pub trait IsResponder { + type Request: DeserializeOwned; + type Response: Serialize + DeserializeOwned; + + fn try_parse_request(data: &[u8]) -> eyre::Result { + serde_json::from_slice(data).wrap_err("could not parse request") + } +} diff --git a/compute/src/responders/specs.rs b/compute/src/responders/specs.rs new file mode 100644 index 0000000..179573b --- /dev/null +++ b/compute/src/responders/specs.rs @@ -0,0 +1,33 @@ +use crate::utils::Specs; + +use super::IsResponder; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize)] +pub struct Request { + /// UUID of the specs request, prevents replay attacks. + pub request_id: String, +} + +#[derive(Serialize, Deserialize)] +pub struct Response { + request_id: String, + #[serde(flatten)] + specs: Specs, +} + +pub struct SpecResponder; + +impl IsResponder for SpecResponder { + type Request = Request; + type Response = Response; +} + +impl SpecResponder { + pub fn respond(request: Request, specs: Specs) -> Response { + Response { + request_id: request.request_id, + specs, + } + } +} diff --git a/compute/src/responders/workflow.rs b/compute/src/responders/workflow.rs new file mode 100644 index 0000000..5ab2472 --- /dev/null +++ b/compute/src/responders/workflow.rs @@ -0,0 +1,155 @@ +#![allow(unused)] + +use dkn_utils::get_current_time_nanos; +use dkn_workflows::{Entry, Executor, ModelProvider, Workflow}; +use eyre::{Context, Result}; +use libsecp256k1::PublicKey; +use serde::Deserialize; + +use crate::payloads::*; +use crate::utils::DriaMessage; +use crate::workers::workflow::*; +use crate::DriaComputeNode; + +use super::IsResponder; + +pub struct WorkflowResponder; + +impl IsResponder for WorkflowResponder { + type Request = TaskRequestPayload; + type Response = TaskResponsePayload; +} + +#[derive(Debug, Deserialize)] +pub struct WorkflowPayload { + /// [Workflow](https://github.com/andthattoo/ollama-workflows/blob/main/src/program/workflow.rs) object to be parsed. + pub(crate) workflow: Workflow, + /// A lıst of model (that can be parsed into `Model`) or model provider names. + /// If model provider is given, the first matching model in the node config is used for that. + /// From the given list, a random choice will be made for the task. + pub(crate) model: Vec, + /// Prompts can be provided within the workflow itself, in which case this is `None`. + /// Otherwise, the prompt is expected to be `Some` here. + pub(crate) prompt: Option, +} + +impl WorkflowResponder { + pub(crate) async fn handle_compute( + node: &mut DriaComputeNode, + compute_message: &DriaMessage, + ) -> Result> { + let stats = TaskStats::new().record_received_at(); + + // parse payload + let task = compute_message + .parse_payload::>(true) + .wrap_err("could not parse workflow task")?; + + // check if deadline is past or not + if get_current_time_nanos() >= task.deadline { + log::debug!("Task {} is past the deadline, ignoring", task.task_id,); + return Ok(None); + } + + // TODO: we dont check the filter at all, because this was a request to the given peer + + log::info!("Received a task with id: {}", task.task_id); + + // obtain public key from the payload + // do this early to avoid unnecessary processing + let task_public_key_bytes = + hex::decode(&task.public_key).wrap_err("could not decode public key")?; + let task_public_key = PublicKey::parse_slice(&task_public_key_bytes, None)?; + + // read model / provider from the task + let (model_provider, model) = node + .config + .workflows + .get_any_matching_model(task.input.model)?; + let model_name = model.to_string(); // get model name, we will pass it in payload + log::info!("Using model {} for task {}", model_name, task.task_id); + + // prepare workflow executor + let (executor, batchable) = if model_provider == ModelProvider::Ollama { + ( + Executor::new_at( + model, + &node.config.workflows.ollama.host, + node.config.workflows.ollama.port, + ), + false, + ) + } else { + (Executor::new(model), true) + }; + + // prepare entry from prompt + let entry: Option = task + .input + .prompt + .map(|prompt| Entry::try_value_or_str(&prompt)); + + // get workflow as well + let workflow = task.input.workflow; + + Ok(Some(WorkflowsWorkerInput { + entry, + executor, + workflow, + model_name, + task_id: task.task_id, + public_key: task_public_key, + stats, + batchable, + })) + } + + /// Handles the result of a workflow task. + pub(crate) async fn handle_respond( + node: &mut DriaComputeNode, + task: WorkflowsWorkerOutput, + ) -> Result<()> { + // TODO: handle response + let _response = match task.result { + Ok(result) => { + // prepare signed and encrypted payload + let payload = TaskResponsePayload::new( + result, + &task.task_id, + &task.public_key, + &node.config.secret_key, + task.model_name, + task.stats.record_published_at(), + )?; + + // convert payload to message + let payload_str = serde_json::json!(payload).to_string(); + log::info!("Publishing result for task {}", task.task_id); + + DriaMessage::new(payload_str, "response") + } + Err(err) => { + // use pretty display string for error logging with causes + let err_string = format!("{:#}", err); + log::error!("Task {} failed: {}", task.task_id, err_string); + + // prepare error payload + let error_payload = TaskErrorPayload { + task_id: task.task_id.clone(), + error: err_string, + model: task.model_name, + stats: task.stats.record_published_at(), + }; + let error_payload_str = serde_json::json!(error_payload).to_string(); + + // prepare signed message + DriaMessage::new_signed(error_payload_str, "response", &node.config.secret_key) + } + }; + + // respond through the channel + // TODO: !!! + + Ok(()) + } +} diff --git a/compute/src/utils/message.rs b/compute/src/utils/message.rs index 9eec768..144cdea 100644 --- a/compute/src/utils/message.rs +++ b/compute/src/utils/message.rs @@ -1,5 +1,3 @@ -use crate::utils::crypto::{sha256hash, sign_bytes_recoverable}; -use crate::DRIA_COMPUTE_NODE_VERSION; use base64::{prelude::BASE64_STANDARD, Engine}; use core::fmt; use dkn_utils::get_current_time_nanos; @@ -8,6 +6,9 @@ use eyre::{Context, Result}; use libsecp256k1::{verify, Message, SecretKey, Signature}; use serde::{Deserialize, Serialize}; +use crate::utils::crypto::{sha256hash, sign_bytes_recoverable}; +use crate::DRIA_COMPUTE_NODE_VERSION; + /// A message within Dria Knowledge Network. #[derive(Serialize, Deserialize, Debug, Clone)] pub struct DriaMessage { diff --git a/compute/src/utils/mod.rs b/compute/src/utils/mod.rs index 9485dad..c7b4bae 100644 --- a/compute/src/utils/mod.rs +++ b/compute/src/utils/mod.rs @@ -9,3 +9,6 @@ pub use misc::*; mod nodes; pub use nodes::*; + +mod specs; +pub use specs::*; diff --git a/compute/src/utils/specs.rs b/compute/src/utils/specs.rs new file mode 100644 index 0000000..64e4e3c --- /dev/null +++ b/compute/src/utils/specs.rs @@ -0,0 +1,88 @@ +use public_ip_address::response::LookupResponse; +use serde::{Deserialize, Serialize}; +use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind}; +use wgpu::AdapterInfo; + +/// Machine info & location. +#[derive(Debug, Serialize, Deserialize)] +pub struct Specs { + /// Total memory in bytes + total_mem: u64, + /// Free memory in bytes + free_mem: u64, + /// Number of physical CPU cores. + num_cpus: Option, + /// Global CPU usage, in percentage. + cpu_usage: f32, + /// Operating system name, e.g. `linux`, `macos`, `windows`. + os: String, + /// CPU architecture, e.g. `x86_64`, `aarch64`. + arch: String, + /// GPU adapter infos, showing information about the available GPUs. + gpus: Vec, + /// Public IP lookup response. + lookup: Option, +} + +pub struct SpecCollector { + /// System information object, this is expected to be created only once + /// as per the [docs](https://github.com/GuillaumeGomez/sysinfo?tab=readme-ov-file#good-practice--performance-tips). + system: sysinfo::System, + /// GPU adapter infos, showing information about the available GPUs. + gpus: Vec, +} + +impl Default for SpecCollector { + fn default() -> Self { + Self::new() + } +} + +impl SpecCollector { + pub fn new() -> Self { + SpecCollector { + system: sysinfo::System::new_with_specifics(Self::get_refresh_specifics()), + gpus: wgpu::Instance::default() + .enumerate_adapters(wgpu::Backends::all()) + .into_iter() + .map(|a| a.get_info()) + .collect(), + } + } + + /// Returns the selected refresh kinds. It is important to ignore + /// process values here because it will consume a lot of file-descriptors. + #[inline(always)] + fn get_refresh_specifics() -> RefreshKind { + RefreshKind::nothing() + .with_cpu(CpuRefreshKind::everything()) + .with_memory(MemoryRefreshKind::everything()) + } + + pub async fn collect(&mut self) -> Specs { + self.system.refresh_specifics(Self::get_refresh_specifics()); + + Specs { + total_mem: self.system.total_memory(), + free_mem: self.system.free_memory(), + num_cpus: self.system.physical_core_count(), + cpu_usage: self.system.global_cpu_usage(), + os: std::env::consts::OS.to_string(), + arch: std::env::consts::ARCH.to_string(), + gpus: self.gpus.clone(), + lookup: public_ip_address::perform_lookup(None).await.ok(), + } + } +} +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + #[ignore = "run manually"] + async fn test_print_specs() { + let mut spec_collector = SpecCollector::new(); + let specs = spec_collector.collect().await; + println!("{}", serde_json::to_string_pretty(&specs).unwrap()); + } +} diff --git a/compute/src/workers/workflow.rs b/compute/src/workers/workflow.rs index ad763da..82e4e2a 100644 --- a/compute/src/workers/workflow.rs +++ b/compute/src/workers/workflow.rs @@ -106,7 +106,11 @@ impl WorkflowsWorker { // (1) there are no tasks, or, // (2) there are tasks less than the batch size and the channel is not empty while tasks.is_empty() || (tasks.len() < batch_size && !self.workflow_rx.is_empty()) { - log::info!("Waiting for more workflows to process ({})", tasks.len()); + log::info!( + "Worker is waiting for tasks ({} < {})", + tasks.len(), + batch_size + ); let limit = batch_size - tasks.len(); match self.workflow_rx.recv_many(&mut tasks, limit).await { // 0 tasks returned means that the channel is closed @@ -246,7 +250,13 @@ mod tests { use libsecp256k1::{PublicKey, SecretKey}; use tokio::sync::mpsc; - // cargo test --package dkn-compute --lib --all-features -- workers::workflow::tests::test_workflows_worker --exact --show-output --nocapture --ignored + /// Tests the workflows worker with a single task sent within a batch. + /// + /// ## Run command + /// + /// ```sh + /// cargo test --package dkn-compute --lib --all-features -- workers::workflow::tests::test_workflows_worker --exact --show-output --nocapture --ignored + /// ``` #[tokio::test] #[ignore = "run manually"] async fn test_workflows_worker() { diff --git a/monitor/src/main.rs b/monitor/src/main.rs index bf10c3d..f68e530 100644 --- a/monitor/src/main.rs +++ b/monitor/src/main.rs @@ -30,7 +30,7 @@ async fn main() -> eyre::Result<()> { log::info!("Listen Address: {}", listen_addr); let keypair = Keypair::generate_secp256k1(); log::info!("PeerID: {}", keypair.public().to_peer_id()); - let (client, commander, msg_rx) = DriaP2PClient::new( + let (client, commander, msg_rx, _) = DriaP2PClient::new( keypair, listen_addr, &nodes, diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index caae8d5..594079e 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -22,6 +22,8 @@ libp2p = { git = "https://github.com/anilaltuner/rust-libp2p.git", rev = "7ce9f9 "mdns", "noise", "macros", + "request-response", + "cbor", "tcp", "yamux", "quic", @@ -30,6 +32,8 @@ libp2p = { git = "https://github.com/anilaltuner/rust-libp2p.git", rev = "7ce9f9 libp2p-identity = { version = "0.2.9", features = ["secp256k1"] } log.workspace = true eyre.workspace = true +serde.workspace = true +serde_json.workspace = true tokio-util.workspace = true tokio.workspace = true diff --git a/p2p/src/behaviour.rs b/p2p/src/behaviour.rs index e9694b7..49f8f0e 100644 --- a/p2p/src/behaviour.rs +++ b/p2p/src/behaviour.rs @@ -6,7 +6,9 @@ use eyre::{eyre, Context, Result}; use libp2p::identity::{Keypair, PeerId, PublicKey}; use libp2p::kad::store::MemoryStore; use libp2p::StreamProtocol; -use libp2p::{autonat, connection_limits, dcutr, gossipsub, identify, kad, relay}; +use libp2p::{ + autonat, connection_limits, dcutr, gossipsub, identify, kad, relay, request_response, +}; #[derive(libp2p::swarm::NetworkBehaviour)] pub struct DriaBehaviour { @@ -17,6 +19,7 @@ pub struct DriaBehaviour { pub autonat: autonat::Behaviour, pub dcutr: dcutr::Behaviour, pub connection_limits: connection_limits::Behaviour, + pub request_response: request_response::cbor::Behaviour, Vec>, } impl DriaBehaviour { @@ -25,23 +28,36 @@ impl DriaBehaviour { relay_behaviour: relay::client::Behaviour, identity_protocol: String, kademlia_protocol: StreamProtocol, + reqres_protocol: StreamProtocol, ) -> Result { let public_key = key.public(); let peer_id = public_key.to_peer_id(); Ok(Self { + connection_limits: create_connection_limits_behaviour(), relay: relay_behaviour, - gossipsub: create_gossipsub_behaviour(peer_id) - .wrap_err("could not create Gossipsub behaviour")?, - kademlia: create_kademlia_behaviour(peer_id, kademlia_protocol), - autonat: create_autonat_behaviour(peer_id), dcutr: create_dcutr_behaviour(peer_id), + autonat: create_autonat_behaviour(peer_id), identify: create_identify_behaviour(public_key, identity_protocol), - connection_limits: create_connection_limits_behaviour(), + kademlia: create_kademlia_behaviour(peer_id, kademlia_protocol), + gossipsub: create_gossipsub_behaviour(peer_id)?, + request_response: create_request_response_behaviour(reqres_protocol), }) } } +/// Configures the request-response behaviour for the node. +/// +/// The protocol supports bytes only, +#[inline] +fn create_request_response_behaviour( + protocol_name: StreamProtocol, +) -> request_response::cbor::Behaviour, Vec> { + use request_response::{Behaviour, Config, ProtocolSupport}; + + Behaviour::new([(protocol_name, ProtocolSupport::Full)], Config::default()) +} + /// Configures the connection limits. #[inline] fn create_connection_limits_behaviour() -> connection_limits::Behaviour { diff --git a/p2p/src/client.rs b/p2p/src/client.rs index 492a842..01e44aa 100644 --- a/p2p/src/client.rs +++ b/p2p/src/client.rs @@ -2,6 +2,7 @@ use eyre::Result; use libp2p::futures::StreamExt; use libp2p::gossipsub::{Message, MessageId}; use libp2p::kad::{GetClosestPeersError, GetClosestPeersOk, QueryResult}; +use libp2p::request_response::{self, ResponseChannel}; use libp2p::swarm::SwarmEvent; use libp2p::{autonat, gossipsub, identify, kad, multiaddr::Protocol, noise, tcp, yamux}; use libp2p::{Multiaddr, PeerId, Swarm, SwarmBuilder}; @@ -17,12 +18,16 @@ use super::DriaP2PCommander; /// Peer-to-peer client for Dria Knowledge Network. pub struct DriaP2PClient { + /// Your peer id. + pub peer_id: PeerId, /// `Swarm` instance, everything p2p-related are accessed through this instace. swarm: Swarm, /// Dria protocol, used for identifying the client. protocol: DriaP2PProtocol, - /// Gossipsub message sender. + /// Gossipsub protoocol, gossip message sender. msg_tx: mpsc::Sender<(PeerId, MessageId, Message)>, + /// Request-response protocol, request sender. + req_tx: mpsc::Sender<(PeerId, Vec, ResponseChannel>)>, /// Command receiver. cmd_rx: mpsc::Receiver, } @@ -52,10 +57,11 @@ impl DriaP2PClient { DriaP2PClient, DriaP2PCommander, mpsc::Receiver<(PeerId, MessageId, Message)>, + mpsc::Receiver<(PeerId, Vec, ResponseChannel>)>, )> { // this is our peerId - let node_peerid = keypair.public().to_peer_id(); - log::info!("Compute node peer address: {}", node_peerid); + let peer_id = keypair.public().to_peer_id(); + log::info!("Compute node peer address: {}", peer_id); let mut swarm = SwarmBuilder::with_existing_identity(keypair) .with_tokio() @@ -72,6 +78,7 @@ impl DriaP2PClient { relay_behaviour, protocol.identity(), protocol.kademlia(), + protocol.request_response(), ) .map_err(Into::into) })? @@ -137,14 +144,17 @@ impl DriaP2PClient { // create p2p client itself let (msg_tx, msg_rx) = mpsc::channel(MSG_CHANNEL_BUFSIZE); + let (req_tx, req_rx) = mpsc::channel(MSG_CHANNEL_BUFSIZE); let client = Self { + peer_id, swarm, protocol, msg_tx, + req_tx, cmd_rx, }; - Ok((client, commander, msg_rx)) + Ok((client, commander, msg_rx, req_rx)) } /// Waits for swarm events and Node commands at the same time. @@ -206,6 +216,31 @@ impl DriaP2PClient { .publish(gossipsub::IdentTopic::new(topic), data), ); } + DriaP2PCommand::Respond { + data, + channel, + sender, + } => { + let _ = sender.send( + self.swarm + .behaviour_mut() + .request_response + .send_response(channel, data) + .map_err(|_| eyre::eyre!("could not send response, channel is closed?")), + ); + } + DriaP2PCommand::Request { + data, + peer_id, + sender, + } => { + let _ = sender.send( + self.swarm + .behaviour_mut() + .request_response + .send_request(&peer_id, data), + ); + } DriaP2PCommand::ValidateMessage { msg_id, propagation_source, @@ -276,8 +311,72 @@ impl DriaP2PClient { message, })) => { if let Err(e) = self.msg_tx.send((peer_id, message_id, message)).await { - log::error!("Error sending message: {:?}", e); + log::error!("Could not send Gossipsub message: {:?}", e); + } + } + + // request-response events + SwarmEvent::Behaviour(DriaBehaviourEvent::RequestResponse( + request_response::Event::Message { message, peer }, + )) => match message { + // a request has been made with us as the target, and we should respond + // using the created `channel`; we simply forward this to the request channel + request_response::Message::Request { + request, channel, .. + } => { + if let Err(e) = self.req_tx.send((peer, request, channel)).await { + log::error!("Could not send request-response request: {:?}", e); + } + } + request_response::Message::Response { + request_id, + response, + } => { + // while we support the protocol, we dont really make any requests + // TODO: should p2p crate support this? + log::warn!( + "Unexpected response message with request_id {}: {:?}", + request_id, + response + ); } + }, + SwarmEvent::Behaviour(DriaBehaviourEvent::RequestResponse( + request_response::Event::ResponseSent { peer, request_id }, + )) => { + log::debug!( + "Request-Response: Response sent to peer {} with request_id {}", + peer, + request_id + ) + } + SwarmEvent::Behaviour(DriaBehaviourEvent::RequestResponse( + request_response::Event::OutboundFailure { + peer, + request_id, + error, + }, + )) => { + log::error!( + "Request-Response: Outbound failure to peer {} with request_id {}: {:?}", + peer, + request_id, + error + ); + } + SwarmEvent::Behaviour(DriaBehaviourEvent::RequestResponse( + request_response::Event::InboundFailure { + peer, + request_id, + error, + }, + )) => { + log::error!( + "Request-Response: Inbound failure to peer {} with request_id {}: {:?}", + peer, + request_id, + error + ); } // kademlia events diff --git a/p2p/src/commands.rs b/p2p/src/commands.rs index d6c8869..79d871b 100644 --- a/p2p/src/commands.rs +++ b/p2p/src/commands.rs @@ -1,5 +1,5 @@ use eyre::{Context, Result}; -use libp2p::{gossipsub, kad, swarm, Multiaddr, PeerId}; +use libp2p::{gossipsub, kad, request_response, swarm, Multiaddr, PeerId}; use tokio::sync::{mpsc, oneshot}; use crate::DriaP2PProtocol; @@ -43,6 +43,20 @@ pub enum DriaP2PCommand { data: Vec, sender: oneshot::Sender>, }, + /// Respond to a request-response message. + Respond { + data: Vec, + channel: request_response::ResponseChannel>, + sender: oneshot::Sender>, + }, + /// Request a request-response message. + /// Note that you are likely to be caught by the RPC peer id check, + /// and your messages will be ignored. + Request { + peer_id: PeerId, + data: Vec, + sender: oneshot::Sender, + }, /// Validates a GossipSub message for propagation, returns whether the message existed in cache. /// /// - `Accept`: Accept the message and propagate it. @@ -153,6 +167,47 @@ impl DriaP2PCommander { .wrap_err("could not publish") } + pub async fn respond( + &mut self, + data: Vec, + channel: request_response::ResponseChannel>, + ) -> Result<()> { + let (sender, receiver) = oneshot::channel(); + + self.sender + .send(DriaP2PCommand::Respond { + data, + channel, + sender, + }) + .await + .wrap_err("could not send")?; + + receiver + .await + .wrap_err("could not receive")? + .wrap_err("could not publish") + } + + pub async fn request( + &mut self, + peer_id: PeerId, + data: Vec, + ) -> Result { + let (sender, receiver) = oneshot::channel(); + + self.sender + .send(DriaP2PCommand::Request { + data, + peer_id, + sender, + }) + .await + .wrap_err("could not send")?; + + receiver.await.wrap_err("could not receive") + } + /// Dials a given peer. pub async fn dial(&mut self, peer_id: Multiaddr) -> Result<()> { let (sender, receiver) = oneshot::channel(); diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 6315dd8..bd824b5 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -18,6 +18,12 @@ pub struct DriaP2PProtocol { /// which is mandatory for a `StreamProtocol`. /// pub kademlia: StreamProtocol, + /// Request-response protocol, must match with other peers in the network. + /// + /// This is usually `/{name}/rr/{version}`, notice the `/` at the start + /// which is mandatory for a `StreamProtocol`. + /// + pub request_response: StreamProtocol, } impl std::fmt::Display for DriaP2PProtocol { @@ -35,15 +41,22 @@ impl Default for DriaP2PProtocol { impl DriaP2PProtocol { /// Creates a new instance of the protocol with the given `name` and `version`. - pub fn new(name: &str, version: &str) -> Self { + pub fn new(name: impl ToString, version: impl ToString) -> Self { + let name = name.to_string(); + let version = version.to_string(); + let identity = format!("{}/{}", name, version); - let kademlia = format!("/{}/kad/{}", name, version); + let kademlia = + StreamProtocol::try_from_owned(format!("/{}/kad/{}", name, version)).unwrap(); + let request_response = + StreamProtocol::try_from_owned(format!("/{}/rr/{}", name, version)).unwrap(); Self { - name: name.to_string(), - version: version.to_string(), + name, + version, identity, - kademlia: StreamProtocol::try_from_owned(kademlia).unwrap(), // guaranteed to unwrap + kademlia, + request_response, } } @@ -69,6 +82,11 @@ impl DriaP2PProtocol { self.kademlia.clone() } + /// Returns the request-response protocol, e.g. `/dria/rr/0.2`. + pub fn request_response(&self) -> StreamProtocol { + self.request_response.clone() + } + /// Returns `true` if the given protocol has a matching prefix with out Kademlia protocol. /// Otherwise, returns `false`. pub fn is_common_kademlia(&self, protocol: &StreamProtocol) -> bool { diff --git a/p2p/tests/listen_test.rs b/p2p/tests/gossipsub_test.rs similarity index 68% rename from p2p/tests/listen_test.rs rename to p2p/tests/gossipsub_test.rs index e4e70ef..0d8f61c 100644 --- a/p2p/tests/listen_test.rs +++ b/p2p/tests/gossipsub_test.rs @@ -2,14 +2,22 @@ use dkn_p2p::{DriaNodes, DriaP2PClient, DriaP2PProtocol}; use eyre::Result; use libp2p_identity::Keypair; +/// A gossipsub test that listens for a single message on a given topic. +/// Terminates when a message is received. +/// +/// ## Run command +/// +/// ```sh +/// cargo test --package dkn-p2p --test gossipsub_test --all-features -- test_gossipsub --exact --show-output --ignored +/// ``` #[tokio::test] #[ignore = "run this manually"] -async fn test_listen_topic_once() -> Result<()> { +async fn test_gossipsub() -> Result<()> { const TOPIC: &str = "pong"; let _ = env_logger::builder() .filter_level(log::LevelFilter::Off) - .filter_module("listen_test", log::LevelFilter::Debug) + .filter_module("gossipsub_test", log::LevelFilter::Debug) .filter_module("dkn_p2p", log::LevelFilter::Debug) .is_test(true) .try_init(); @@ -22,24 +30,16 @@ async fn test_listen_topic_once() -> Result<()> { .with_relay_nodes(["/ip4/34.201.33.141/tcp/4001/p2p/16Uiu2HAkuXiV2CQkC9eJgU6cMnJ9SMARa85FZ6miTkvn5fuHNufa".parse()?]); // spawn P2P client in another task - let (client, mut commander, mut msg_rx) = DriaP2PClient::new( + let (client, mut commander, mut msg_rx, _) = DriaP2PClient::new( Keypair::generate_secp256k1(), listen_addr, &nodes, DriaP2PProtocol::default(), - ) - .expect("could not create p2p client"); - - // spawn task + )?; let task_handle = tokio::spawn(async move { client.run().await }); - // subscribe to the given topic - commander - .subscribe(TOPIC) - .await - .expect("could not subscribe"); - // wait for a single gossipsub message on this topic + commander.subscribe(TOPIC).await?; log::info!("Waiting for messages..."); let message = msg_rx.recv().await; match message { @@ -50,20 +50,13 @@ async fn test_listen_topic_once() -> Result<()> { log::warn!("No message received for topic: {}", TOPIC); } } + commander.unsubscribe(TOPIC).await?; - // unsubscribe to the given topic - commander - .unsubscribe(TOPIC) - .await - .expect("could not unsubscribe"); - - // close command channel - commander.shutdown().await.expect("could not shutdown"); - - // close message channel + // close everything + commander.shutdown().await?; msg_rx.close(); - log::info!("Waiting for p2p task to finish..."); + // wait for handle to return task_handle.await?; log::info!("Done!"); diff --git a/p2p/tests/request_test.rs b/p2p/tests/request_test.rs new file mode 100644 index 0000000..485fc39 --- /dev/null +++ b/p2p/tests/request_test.rs @@ -0,0 +1,70 @@ +use std::str::FromStr; + +use dkn_p2p::DriaNetworkType::Community; +use dkn_p2p::{DriaNodes, DriaP2PClient, DriaP2PProtocol}; +use eyre::Result; +use libp2p::PeerId; +use libp2p_identity::Keypair; + +/// Makes a dummy request to some peer hardcoded within the test. +/// +/// ## Run command +/// +/// ```sh +/// cargo test --package dkn-p2p --test request_test --all-features -- test_request_message --exact --show-output --ignored +/// ``` +#[tokio::test] +#[ignore = "run this manually"] +async fn test_request_message() -> Result<()> { + let _ = env_logger::builder() + .filter_level(log::LevelFilter::Off) + .filter_module("request_test", log::LevelFilter::Debug) + .filter_module("dkn_p2p", log::LevelFilter::Debug) + .is_test(true) + .try_init(); + + let listen_addr = "/ip4/0.0.0.0/tcp/4001".parse()?; + + // prepare nodes + let nodes = DriaNodes::new(Community) + .with_bootstrap_nodes(Community.get_static_bootstrap_nodes()) + .with_relay_nodes(Community.get_static_relay_nodes()); + + // spawn P2P client in another task + let (client, mut commander, mut msg_rx, mut req_rx) = DriaP2PClient::new( + Keypair::generate_secp256k1(), + listen_addr, + &nodes, + DriaP2PProtocol::default(), + ) + .expect("could not create p2p client"); + + // spawn task + let task_handle = tokio::spawn(async move { client.run().await }); + + log::info!("Waiting a bit until we have enough peers"); + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + + let peer_id = + PeerId::from_str("16Uiu2HAmB5HGdwLNHX81u7ey1fvDx5Mr4ofa2PdSSVxFKrrcErAN").unwrap(); + log::info!("Making a request to peer: {}", peer_id); + commander + .request(peer_id, b"here is some data".into()) + .await?; + + log::info!("Waiting for response logs for a few moments..."); + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + + // close command channel + commander.shutdown().await.expect("could not shutdown"); + + // close other channels + msg_rx.close(); + req_rx.close(); + + log::info!("Waiting for p2p task to finish..."); + task_handle.await?; + + log::info!("Done!"); + Ok(()) +} diff --git a/workflows/src/bin/tps.rs b/workflows/src/bin/tps.rs index 8284278..1c0d2d5 100644 --- a/workflows/src/bin/tps.rs +++ b/workflows/src/bin/tps.rs @@ -1,3 +1,11 @@ +#[cfg(feature = "profiling")] +use dkn_workflows::{Model, OllamaConfig}; +#[cfg(feature = "profiling")] +use ollama_workflows::ollama_rs::{ + generation::completion::{request::GenerationRequest, GenerationResponse}, + Ollama, +}; + #[cfg(not(feature = "profiling"))] fn main() { unimplemented!("this binary requires the 'profiling' feature to be enabled"); @@ -6,14 +14,12 @@ fn main() { #[cfg(feature = "profiling")] #[tokio::main] async fn main() { - use dkn_workflows::{DriaWorkflowsConfig, OllamaConfig}; - use ollama_workflows::ollama_rs::{generation::completion::request::GenerationRequest, Ollama}; - use ollama_workflows::Model; - use prettytable::{Cell, Row, Table}; - use sysinfo::{CpuRefreshKind, RefreshKind, System, MINIMUM_CPU_UPDATE_INTERVAL}; - - // initialize logger - env_logger::init(); + env_logger::builder() + .filter_level(log::LevelFilter::Off) + .filter_module("tps", log::LevelFilter::Info) + .filter_module("dkn_workflows", log::LevelFilter::Debug) + .parse_default_env() + .init(); let models = vec![ Model::NousTheta, @@ -21,8 +27,6 @@ async fn main() { Model::Phi3Medium128k, Model::Phi3_5Mini, Model::Phi3_5MiniFp16, - Model::Gemma2_9B, - Model::Gemma2_9BFp16, Model::Llama3_1_8B, Model::Llama3_1_8Bq8, Model::Llama3_1_8Bf16, @@ -43,53 +47,54 @@ async fn main() { Model::Qwen2_5coder7Bf16, Model::DeepSeekCoder6_7B, Model::Mixtral8_7b, - Model::GPT4Turbo, - Model::GPT4o, - Model::GPT4oMini, - Model::O1Preview, - Model::O1Mini, - Model::Gemini15ProExp0827, - Model::Gemini15Pro, - Model::Gemini15Flash, - Model::Gemini10Pro, - Model::Gemma2_2bIt, - Model::Gemma2_27bIt, + Model::Gemma2_9B, + Model::Gemma2_9BFp16, ]; - let cfg = DriaWorkflowsConfig::new(models); let config = OllamaConfig::default(); let ollama = Ollama::new(config.host, config.port); - log::debug!("Starting..."); - // ensure that all lists of CPUs and processes are filled - let mut system = System::new_all(); - // update all information of the system - system.refresh_all(); + + run_benchmark(ollama, models).await; +} + +#[cfg(feature = "profiling")] +async fn run_benchmark(ollama: Ollama, models: Vec) { + use dkn_workflows::ModelProvider; + use prettytable::{Cell, Row, Table}; + use sysinfo::{ + CpuRefreshKind, MemoryRefreshKind, RefreshKind, System, MINIMUM_CPU_UPDATE_INTERVAL, + }; + + // create & update system info + let mut system = System::new_with_specifics( + RefreshKind::new() + .with_cpu(CpuRefreshKind::everything()) + .with_memory(MemoryRefreshKind::everything()), + ); + system.refresh_cpu_usage(); + system.refresh_memory(); log::debug!("Getting system information..."); let brand = system.cpus()[0].brand().to_string(); let os_name = System::name().unwrap_or_else(|| "Unknown".to_string()); let os_version = System::long_os_version().unwrap_or_else(|| "Unknown".to_string()); - let cpu_usage = system.global_cpu_usage(); - let total_memory = system.total_memory(); - let used_memory = system.used_memory(); - let mut tps: f64; - let mut table = Table::new(); + log::info!("{} {} ({})", brand, os_name, os_version); - // Add a row with the headers + let mut table = Table::new(); table.add_row(Row::new(vec![ Cell::new("Model"), Cell::new("TPS"), - Cell::new("OS"), - Cell::new("Version"), Cell::new("CPU Usage (%)"), Cell::new("Total Memory (KB)"), Cell::new("Used Memory (KB)"), ])); - for (_, model) in cfg.models { + // iterate over Ollama models + for model in models + .into_iter() + .filter(|m| ModelProvider::from(m.clone()) == ModelProvider::Ollama) + { log::debug!("Pulling model: {}", model); - - // pull model match ollama.pull_model(model.to_string(), false).await { Ok(status) => log::debug!("Status: {}", status.message), Err(err) => { @@ -97,47 +102,68 @@ async fn main() { } } - log::debug!("Creating request..."); - // create dummy request as a warm-up - let generation_request = - GenerationRequest::new(model.to_string(), "compute 6780 * 1200".to_string()); - - // generate response - match ollama.generate(generation_request).await { + match ollama + .generate(GenerationRequest::new( + model.to_string(), + "Write a poem about Julius Caesar.".to_string(), + )) + .await + { Ok(response) => { log::debug!("Got response for model {}", model); - // compute TPS - tps = (response.eval_count.unwrap_or_default() as f64) - / (response.eval_duration.unwrap_or(1) as f64) - * 1_000_000_000f64; - - // add row to table + system.refresh_cpu_usage(); + system.refresh_memory(); table.add_row(Row::new(vec![ Cell::new(&model.to_string()), - Cell::new(&tps.to_string()), - Cell::new(&format!("{} {}", brand, os_name)), - Cell::new(&os_version), - Cell::new(&cpu_usage.to_string()), - Cell::new(&total_memory.to_string()), - Cell::new(&used_memory.to_string()), + Cell::new(&get_response_tps(&response).to_string()), + Cell::new(&system.global_cpu_usage().to_string()), + Cell::new(&(system.total_memory() / 1000).to_string()), + Cell::new(&(system.used_memory() / 1000).to_string()), ])); + // TODO: should add GPU usage here as well } Err(e) => { log::warn!("Ignoring model {}: Workflow failed with error {}", model, e); } } - table.printstd(); - // print system info - // refresh CPU usage (https://docs.rs/sysinfo/latest/sysinfo/struct.Cpu.html#method.cpu_usage) - system = - System::new_with_specifics(RefreshKind::new().with_cpu(CpuRefreshKind::everything())); + // wait a bit because CPU usage is based on diff std::thread::sleep(MINIMUM_CPU_UPDATE_INTERVAL); - // refresh CPUs again to get actual value - system.refresh_cpu_usage(); } - // print system info + + // print the final result table.printstd(); - log::debug!("Finished"); +} + +/// Computes the TPS. +#[cfg(feature = "profiling")] +#[inline(always)] +fn get_response_tps(res: &GenerationResponse) -> f64 { + (res.eval_count.unwrap_or_default() as f64) / (res.eval_duration.unwrap_or(1) as f64) + * 1_000_000_000f64 +} + +#[cfg(feature = "profiling")] +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_single() { + env_logger::builder() + .filter_level(log::LevelFilter::Off) + .filter_module("tps", log::LevelFilter::Debug) + .filter_module("dkn_workflows", log::LevelFilter::Debug) + .parse_default_env() + .is_test(true) + .init(); + + let models = vec![Model::Llama3_2_3B, Model::Llama3_2_1B]; + + let config = OllamaConfig::default(); + let ollama = Ollama::new(config.host, config.port); + + run_benchmark(ollama, models).await; + } }