From b51dcc19efc55844ed79fc5e133ba859808a0b15 Mon Sep 17 00:00:00 2001 From: lambda-0x <0xlambda@protonmail.com> Date: Wed, 16 Oct 2024 15:44:31 +0530 Subject: [PATCH] feat(torii): fetch and process erc721 metadata and image commit-id:274aaa3a --- Cargo.lock | 454 +++++++++++++++++- Cargo.toml | 6 + bin/torii/Cargo.toml | 2 +- bin/torii/src/main.rs | 48 +- crates/dojo-world/Cargo.toml | 6 +- crates/sozo/ops/Cargo.toml | 10 +- crates/torii/core/Cargo.toml | 3 + crates/torii/core/src/engine.rs | 12 +- crates/torii/core/src/executor/erc.rs | 297 ++++++++++++ .../core/src/{executor.rs => executor/mod.rs} | 390 +++++++++------ .../src/processors/erc721_legacy_transfer.rs | 14 +- .../core/src/processors/erc721_transfer.rs | 14 +- .../core/src/processors/metadata_update.rs | 38 +- crates/torii/core/src/sql/erc.rs | 144 ++---- crates/torii/core/src/sql/mod.rs | 6 + crates/torii/core/src/sql/test.rs | 12 +- crates/torii/core/src/utils.rs | 40 ++ .../torii/graphql/src/tests/metadata_test.rs | 16 +- crates/torii/graphql/src/tests/mod.rs | 3 +- .../graphql/src/tests/subscription_test.rs | 31 +- .../grpc/src/server/tests/entities_test.rs | 4 +- crates/torii/libp2p/Cargo.toml | 2 +- crates/torii/libp2p/src/tests.rs | 7 +- .../20241014085532_add_metadata_field.sql | 1 + crates/torii/server/Cargo.toml | 17 +- crates/torii/server/src/artifacts.rs | 336 +++++++++++++ crates/torii/server/src/lib.rs | 1 + crates/torii/server/src/proxy.rs | 36 +- 28 files changed, 1599 insertions(+), 351 deletions(-) create mode 100644 crates/torii/core/src/executor/erc.rs rename crates/torii/core/src/{executor.rs => executor/mod.rs} (61%) create mode 100644 crates/torii/migrations/20241014085532_add_metadata_field.sql create mode 100644 crates/torii/server/src/artifacts.rs diff --git a/Cargo.lock b/Cargo.lock index c0f3194955..bf828ca73f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -68,6 +68,12 @@ dependencies = [ "gimli", ] +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "adler2" version = "2.0.0" @@ -142,6 +148,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "aligned-vec" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4aa90d7ce82d4be67b64039a3d588d38dbcc6736577de4a847025ce5b0c468d1" + [[package]] name = "alloc-no-stdlib" version = "2.0.4" @@ -978,6 +990,17 @@ version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" +[[package]] +name = "arg_enum_proc_macro" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ae92a5119aa49cdbcf6b9f893fe4e1d98b04ccbf82ee0584ad948a44a734dea" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.77", +] + [[package]] name = "ark-ec" version = "0.4.2" @@ -1643,6 +1666,29 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" +[[package]] +name = "av1-grain" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6678909d8c5d46a42abcf571271e15fdbc0a225e3646cf23762cd415046c78bf" +dependencies = [ + "anyhow", + "arrayvec", + "log", + "nom", + "num-rational", + "v_frame", +] + +[[package]] +name = "avif-serialize" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "876c75a42f6364451a033496a14c44bffe41f5f4a8236f697391f11024e596d2" +dependencies = [ + "arrayvec", +] + [[package]] name = "aws-lc-rs" version = "1.9.0" @@ -1779,7 +1825,7 @@ dependencies = [ "addr2line", "cfg-if", "libc", - "miniz_oxide", + "miniz_oxide 0.8.0", "object", "rustc-demangle", "windows-targets 0.52.6", @@ -1947,6 +1993,12 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb" +[[package]] +name = "bit_field" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc827186963e592360843fb5ba4b973e145841266c1357f7180c43526f2e5b61" + [[package]] name = "bitflags" version = "1.3.2" @@ -1962,6 +2014,12 @@ dependencies = [ "serde", ] +[[package]] +name = "bitstream-io" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b81e1519b0d82120d2fd469d5bfb2919a9361c48b02d82d04befc1cdd2002452" + [[package]] name = "bitvec" version = "1.0.1" @@ -2153,6 +2211,12 @@ dependencies = [ "serde", ] +[[package]] +name = "built" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "236e6289eda5a812bc6b53c3b024039382a2895fbbeef2d748b2931546d392c4" + [[package]] name = "bumpalo" version = "3.16.0" @@ -2210,6 +2274,12 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" +[[package]] +name = "byteorder-lite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495" + [[package]] name = "bytes" version = "1.7.2" @@ -3347,7 +3417,7 @@ checksum = "9a95746c5221a74d7b913a415fdbb9e7c90e1b4d818dbbff59bddc034cfce2ec" dependencies = [ "bytes", "flex-error", - "num-derive", + "num-derive 0.3.3", "num-traits 0.2.19", "prost 0.12.6", "prost-types 0.12.6", @@ -3401,6 +3471,16 @@ dependencies = [ "nom", ] +[[package]] +name = "cfg-expr" +version = "0.15.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d067ad48b8650848b989a59a86c6c36a995d02d2bf778d45c3c5d57bc2718f02" +dependencies = [ + "smallvec", + "target-lexicon", +] + [[package]] name = "cfg-if" version = "1.0.0" @@ -3593,6 +3673,12 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67ba02a97a2bd10f4b59b25c7973101c79642302776489e030cd13cdab09ed15" +[[package]] +name = "color_quant" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d7b894f5411737b7867f4827955924d7c254fc9f4d91a6aad6b097804b1018b" + [[package]] name = "colorchoice" version = "1.0.2" @@ -4260,6 +4346,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "data-url" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c297a1c74b71ae29df00c3e22dd9534821d60eb9af5a0192823fa2acea70c2a" + [[package]] name = "debugid" version = "0.8.0" @@ -5094,6 +5186,22 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "exr" +version = "1.72.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "887d93f60543e9a9362ef8a21beedd0a833c5d9610e18c67abe15a5963dcb1a4" +dependencies = [ + "bit_field", + "flume", + "half 2.4.1", + "lebe", + "miniz_oxide 0.7.4", + "rayon-core", + "smallvec", + "zune-inflate", +] + [[package]] name = "eyre" version = "0.6.12" @@ -5145,6 +5253,15 @@ dependencies = [ "bytes", ] +[[package]] +name = "fdeflate" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8090f921a24b04994d9929e204f50b498a33ea6ba559ffaa05e04f7ee7fb5ab" +dependencies = [ + "simd-adler32", +] + [[package]] name = "ff" version = "0.13.0" @@ -5210,7 +5327,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "324a1be68054ef05ad64b861cc9eaf1d623d2d8cb25b4bf2cb9cdd902b4bf253" dependencies = [ "crc32fast", - "miniz_oxide", + "miniz_oxide 0.8.0", ] [[package]] @@ -5536,6 +5653,16 @@ dependencies = [ "polyval", ] +[[package]] +name = "gif" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fb2d69b19215e18bb912fa30f7ce15846e301408695e44e0ef719f1da9e19f2" +dependencies = [ + "color_quant", + "weezl", +] + [[package]] name = "gimli" version = "0.31.0" @@ -7239,6 +7366,39 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "image" +version = "0.25.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99314c8a2152b8ddb211f924cdae532d8c5e4c8bb54728e12fff1b0cd5963a10" +dependencies = [ + "bytemuck", + "byteorder-lite", + "color_quant", + "exr", + "gif", + "image-webp", + "num-traits 0.2.19", + "png", + "qoi", + "ravif", + "rayon", + "rgb", + "tiff", + "zune-core", + "zune-jpeg", +] + +[[package]] +name = "image-webp" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f79afb8cbee2ef20f59ccd477a218c12a93943d075b492015ecb1bb81f8ee904" +dependencies = [ + "byteorder-lite", + "quick-error 2.0.1", +] + [[package]] name = "imara-diff" version = "0.1.7" @@ -7249,6 +7409,12 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "imgref" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44feda355f4159a7c757171a77de25daf6411e217b4cabd03bd6650690468126" + [[package]] name = "impl-codec" version = "0.6.0" @@ -7441,6 +7607,17 @@ dependencies = [ "webrtc-util 0.8.1", ] +[[package]] +name = "interpolate_name" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c34819042dc3d3971c46c2190835914dfbe0c3c13f61449b2997f4e9722dfa60" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.77", +] + [[package]] name = "io-close" version = "0.3.7" @@ -7691,6 +7868,12 @@ dependencies = [ "libc", ] +[[package]] +name = "jpeg-decoder" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5d4a7da358eff58addd2877a45865158f0d78c911d43a5784ceb7bbf52833b0" + [[package]] name = "js-sys" version = "0.3.70" @@ -8546,6 +8729,12 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" +[[package]] +name = "lebe" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03087c2bad5e1034e8cace5926dec053fb3790248370865f5117a7d0213354c8" + [[package]] name = "leopard-codec" version = "0.1.0" @@ -8563,6 +8752,17 @@ version = "0.2.158" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439" +[[package]] +name = "libfuzzer-sys" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a96cfd5557eb82f2b83fed4955246c988d331975a002961b07c81584d107e7f7" +dependencies = [ + "arbitrary", + "cc", + "once_cell", +] + [[package]] name = "libloading" version = "0.8.5" @@ -9171,6 +9371,15 @@ dependencies = [ "value-bag", ] +[[package]] +name = "loop9" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fae87c125b03c1d2c0150c90365d7d6bcc53fb73a9acaef207d2d065860f062" +dependencies = [ + "imgref", +] + [[package]] name = "lru" version = "0.12.4" @@ -9250,6 +9459,15 @@ dependencies = [ "rawpointer", ] +[[package]] +name = "maybe-rayon" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ea1f30cedd69f0a2954655f7188c6a834246d2bcf1e315e2ac40c4b24dc9519" +dependencies = [ + "cfg-if", +] + [[package]] name = "md-5" version = "0.10.6" @@ -9403,6 +9621,15 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" +[[package]] +name = "miniz_oxide" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8a240ddb74feaf34a79a7add65a741f3167852fba007066dcac1ca548d89c08" +dependencies = [ + "adler", +] + [[package]] name = "miniz_oxide" version = "0.8.0" @@ -9410,6 +9637,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1" dependencies = [ "adler2", + "simd-adler32", ] [[package]] @@ -9750,6 +9978,12 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "noop_proc_macro" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0676bb32a98c1a483ce53e500a81ad9c3d5b3f7c920c28c24e9cb0980d0b5bc8" + [[package]] name = "normalize-line-endings" version = "0.3.0" @@ -9850,6 +10084,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "num-derive" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed3955f1a9c7c0c15e092f9c887db08b1fc683305fdf6eb6684f22555355e202" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.77", +] + [[package]] name = "num-format" version = "0.4.4" @@ -10535,6 +10780,19 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "png" +version = "0.17.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52f9d46a34a05a6a57566bc2bfae066ef07585a6e3fa30fbbdff5936380623f0" +dependencies = [ + "bitflags 1.3.2", + "crc32fast", + "fdeflate", + "flate2", + "miniz_oxide 0.8.0", +] + [[package]] name = "polling" version = "2.8.0" @@ -10849,6 +11107,25 @@ dependencies = [ "human_format", ] +[[package]] +name = "profiling" +version = "1.0.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afbdc74edc00b6f6a218ca6a5364d6226a259d4b8ea1af4a0ea063f27e179f4d" +dependencies = [ + "profiling-procmacros", +] + +[[package]] +name = "profiling-procmacros" +version = "1.0.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a65f2e60fbf1063868558d69c6beacf412dc755f9fc020f514b7955fc914fe30" +dependencies = [ + "quote", + "syn 2.0.77", +] + [[package]] name = "prometheus-client" version = "0.22.3" @@ -11067,6 +11344,15 @@ dependencies = [ "psl-types", ] +[[package]] +name = "qoi" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f6d64c71eb498fe9eae14ce4ec935c555749aef511cca85b5568910d6e48001" +dependencies = [ + "bytemuck", +] + [[package]] name = "quanta" version = "0.12.3" @@ -11088,6 +11374,12 @@ version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" +[[package]] +name = "quick-error" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3" + [[package]] name = "quick-protobuf" version = "0.8.1" @@ -11269,6 +11561,55 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "rav1e" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd87ce80a7665b1cce111f8a16c1f3929f6547ce91ade6addf4ec86a8dda5ce9" +dependencies = [ + "arbitrary", + "arg_enum_proc_macro", + "arrayvec", + "av1-grain", + "bitstream-io", + "built", + "cfg-if", + "interpolate_name", + "itertools 0.12.1", + "libc", + "libfuzzer-sys", + "log", + "maybe-rayon", + "new_debug_unreachable", + "noop_proc_macro", + "num-derive 0.4.2", + "num-traits 0.2.19", + "once_cell", + "paste", + "profiling", + "rand 0.8.5", + "rand_chacha", + "simd_helpers", + "system-deps", + "thiserror", + "v_frame", + "wasm-bindgen", +] + +[[package]] +name = "ravif" +version = "0.11.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f0bfd976333248de2078d350bfdf182ff96e168a24d23d2436cef320dd4bdd" +dependencies = [ + "avif-serialize", + "imgref", + "loop9", + "quick-error 2.0.1", + "rav1e", + "rgb", +] + [[package]] name = "raw-cpuid" version = "11.1.0" @@ -11556,7 +11897,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52e44394d2086d010551b14b53b1f24e31647570cd1deb0379e2c21b329aba00" dependencies = [ "hostname", - "quick-error", + "quick-error 1.2.3", ] [[package]] @@ -12156,7 +12497,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb3dcc6e454c328bb824492db107ab7c0ae8fcffe4ad210136ef014458c1bc4f" dependencies = [ "fnv", - "quick-error", + "quick-error 1.2.3", "tempfile", "wait-timeout", ] @@ -13019,6 +13360,21 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "simd-adler32" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe" + +[[package]] +name = "simd_helpers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95890f873bec569a0362c235787f3aca6e1e887302ba4840839bcc6459c42da6" +dependencies = [ + "quote", +] + [[package]] name = "simdutf8" version = "0.1.5" @@ -14289,6 +14645,19 @@ dependencies = [ "libc", ] +[[package]] +name = "system-deps" +version = "6.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3e535eb8dded36d55ec13eddacd30dec501792ff23a0b1682c38601b8cf2349" +dependencies = [ + "cfg-expr", + "heck 0.5.0", + "pkg-config", + "toml 0.8.19", + "version-compare", +] + [[package]] name = "tap" version = "1.0.1" @@ -14306,6 +14675,12 @@ dependencies = [ "xattr", ] +[[package]] +name = "target-lexicon" +version = "0.12.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61c41af27dd6d1e27b1b16b489db798443478cef1f06a660c96db617ba5de3b1" + [[package]] name = "tempdir" version = "0.3.7" @@ -14427,6 +14802,17 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "tiff" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba1310fcea54c6a9a4fd1aad794ecc02c31682f6bfbecdf460bf19533eed1e3e" +dependencies = [ + "flate2", + "jpeg-decoder", + "weezl", +] + [[package]] name = "time" version = "0.3.36" @@ -14912,6 +15298,7 @@ dependencies = [ "camino", "chrono", "crypto-bigint", + "data-url", "dojo-test-utils", "dojo-types", "dojo-utils", @@ -14919,6 +15306,7 @@ dependencies = [ "futures-channel", "futures-util", "hashlink", + "ipfs-api-backend-hyper", "katana-runner", "num-traits 0.2.19", "once_cell", @@ -15062,20 +15450,29 @@ dependencies = [ name = "torii-server" version = "1.0.0-alpha.19" dependencies = [ + "anyhow", "base64 0.21.7", + "camino", + "data-url", "http 0.2.12", "http-body 0.4.6", "hyper 0.14.30", "hyper-reverse-proxy", + "image", "indexmap 2.5.0", "lazy_static", + "mime_guess", + "reqwest 0.12.7", "serde", "serde_json", + "sqlx", "tokio", "tokio-util", + "torii-core", "tower 0.4.13", "tower-http 0.4.4", "tracing", + "warp", ] [[package]] @@ -15649,6 +16046,17 @@ dependencies = [ "getrandom", ] +[[package]] +name = "v_frame" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6f32aaa24bacd11e488aa9ba66369c7cd514885742c9fe08cfe85884db3e92b" +dependencies = [ + "aligned-vec", + "num-traits 0.2.19", + "wasm-bindgen", +] + [[package]] name = "valuable" version = "0.1.0" @@ -15722,6 +16130,12 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "version-compare" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "852e951cb7832cb45cb1169900d19760cfa39b82bc0ea9c0e5a14ae88411c98b" + [[package]] name = "version_check" version = "0.9.5" @@ -16239,6 +16653,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "weezl" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53a85b86a771b1c87058196170769dd264f66c0782acf1ae6cc51bfd64b39082" + [[package]] name = "which" version = "4.4.2" @@ -16931,3 +17351,27 @@ dependencies = [ "cc", "pkg-config", ] + +[[package]] +name = "zune-core" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f423a2c17029964870cfaabb1f13dfab7d092a62a29a89264f4d36990ca414a" + +[[package]] +name = "zune-inflate" +version = "0.2.54" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73ab332fe2f6680068f3582b16a24f90ad7096d5d39b974d1c0aff0125116f02" +dependencies = [ + "simd-adler32", +] + +[[package]] +name = "zune-jpeg" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16099418600b4d8f028622f73ff6e3deaabdff330fb9a2a131dea781ee8b0768" +dependencies = [ + "zune-core", +] diff --git a/Cargo.toml b/Cargo.toml index af0729158d..18a403c818 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -157,13 +157,16 @@ clap_complete = "4.3" console = "0.15.7" convert_case = "0.6.0" crypto-bigint = { version = "0.5.3", features = [ "serde" ] } +data-url = "0.3" derive_more = "0.99.17" flate2 = "1.0.24" +fluent-uri = "0.3" futures = "0.3.30" futures-util = "0.3.30" hashlink = "0.9.1" hex = "0.4.3" http = "0.2.9" +image = "0.25.2" indexmap = "2.2.5" indoc = "1.0.7" itertools = "0.12.1" @@ -209,6 +212,9 @@ tracing-log = "0.1.3" tracing-subscriber = { version = "0.3.16", features = [ "env-filter", "json" ] } url = { version = "2.4.0", features = [ "serde" ] } walkdir = "2.5.0" +# TODO: see if we still need the git version +ipfs-api-backend-hyper = { git = "https://github.com/ferristseng/rust-ipfs-api", rev = "af2c17f7b19ef5b9898f458d97a90055c3605633", features = [ "with-hyper-rustls", "with-send-sync" ] } +mime_guess = "2.0" # server hyper = "0.14.27" diff --git a/bin/torii/Cargo.toml b/bin/torii/Cargo.toml index aadbd390cd..1cd9ceb15c 100644 --- a/bin/torii/Cargo.toml +++ b/bin/torii/Cargo.toml @@ -41,12 +41,12 @@ torii-relay.workspace = true torii-server.workspace = true tower.workspace = true +tempfile.workspace = true tower-http.workspace = true tracing-subscriber.workspace = true tracing.workspace = true url.workspace = true webbrowser = "0.8" -tempfile.workspace = true [dev-dependencies] camino.workspace = true diff --git a/bin/torii/src/main.rs b/bin/torii/src/main.rs index c55da2f464..e09cc639df 100644 --- a/bin/torii/src/main.rs +++ b/bin/torii/src/main.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use std::time::Duration; use anyhow::Context; +use camino::Utf8PathBuf; use clap::{ArgAction, Parser}; use dojo_metrics::exporters::prometheus::PrometheusRecorder; use dojo_utils::parse::{parse_socket_address, parse_url}; @@ -30,7 +31,7 @@ use sqlx::SqlitePool; use starknet::core::types::Felt; use starknet::providers::jsonrpc::HttpTransport; use starknet::providers::JsonRpcClient; -use tempfile::NamedTempFile; +use tempfile::{NamedTempFile, TempDir}; use tokio::sync::broadcast; use tokio::sync::broadcast::Sender; use tokio_stream::StreamExt; @@ -146,6 +147,10 @@ struct Args { /// Configuration file #[arg(long)] config: Option, + + /// Path to a directory to store ERC artifacts + #[arg(long)] + artifacts_path: Option, } #[tokio::main] @@ -213,7 +218,13 @@ async fn main() -> anyhow::Result<()> { let contracts = config.contracts.iter().map(|contract| (contract.address, contract.r#type)).collect(); - let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx.clone()).await?; + let (mut executor, sender) = Executor::new( + pool.clone(), + shutdown_tx.clone(), + provider.clone(), + args.max_concurrent_tasks, + ) + .await?; tokio::spawn(async move { executor.run().await.unwrap(); }); @@ -254,10 +265,24 @@ async fn main() -> anyhow::Result<()> { Arc::new(contracts), ); - let shutdown_rx = shutdown_tx.subscribe(); - let (grpc_addr, grpc_server) = - torii_grpc::server::new(shutdown_rx, &pool, block_rx, world_address, Arc::clone(&provider)) - .await?; + let (grpc_addr, grpc_server) = torii_grpc::server::new( + shutdown_tx.subscribe(), + &pool, + block_rx, + world_address, + Arc::clone(&provider), + ) + .await?; + + let temp_dir = TempDir::new()?; + let artifacts_path = + args.artifacts_path.unwrap_or_else(|| Utf8PathBuf::from(temp_dir.path().to_str().unwrap())); + + tokio::fs::create_dir_all(&artifacts_path).await?; + let absolute_path = artifacts_path.canonicalize_utf8()?; + + let (artifacts_addr, artifacts_server) = + torii_server::artifacts::new(shutdown_tx.subscribe(), &absolute_path, pool.clone()).await?; let mut libp2p_relay_server = torii_relay::server::Relay::new( db, @@ -270,7 +295,13 @@ async fn main() -> anyhow::Result<()> { ) .expect("Failed to start libp2p relay server"); - let proxy_server = Arc::new(Proxy::new(args.addr, args.allowed_origins, Some(grpc_addr), None)); + let proxy_server = Arc::new(Proxy::new( + args.addr, + args.allowed_origins, + Some(grpc_addr), + None, + Some(artifacts_addr), + )); let graphql_server = spawn_rebuilding_graphql_server( shutdown_tx.clone(), @@ -288,6 +319,7 @@ async fn main() -> anyhow::Result<()> { info!(target: LOG_TARGET, endpoint = %endpoint, "Starting torii endpoint."); info!(target: LOG_TARGET, endpoint = %gql_endpoint, "Serving Graphql playground."); info!(target: LOG_TARGET, url = %explorer_url, "Serving World Explorer."); + info!(target: LOG_TARGET, path = %artifacts_path, "Serving ERC artifacts at path"); if args.explorer { if let Err(e) = webbrowser::open(&explorer_url) { @@ -308,6 +340,7 @@ async fn main() -> anyhow::Result<()> { let graphql_server_handle = tokio::spawn(graphql_server); let grpc_server_handle = tokio::spawn(grpc_server); let libp2p_relay_server_handle = tokio::spawn(async move { libp2p_relay_server.run().await }); + let artifacts_server_handle = tokio::spawn(artifacts_server); tokio::select! { res = engine_handle => res??, @@ -315,6 +348,7 @@ async fn main() -> anyhow::Result<()> { res = graphql_server_handle => res?, res = grpc_server_handle => res??, res = libp2p_relay_server_handle => res?, + res = artifacts_server_handle => res?, _ = dojo_utils::signal::wait_signals() => {}, }; diff --git a/crates/dojo-world/Cargo.toml b/crates/dojo-world/Cargo.toml index b380b04615..39301853ca 100644 --- a/crates/dojo-world/Cargo.toml +++ b/crates/dojo-world/Cargo.toml @@ -11,8 +11,8 @@ anyhow.workspace = true async-trait.workspace = true cairo-lang-filesystem.workspace = true cairo-lang-project.workspace = true -cairo-lang-starknet.workspace = true cairo-lang-starknet-classes.workspace = true +cairo-lang-starknet.workspace = true camino.workspace = true convert_case.workspace = true dojo-utils = { workspace = true, optional = true } @@ -22,8 +22,8 @@ serde.workspace = true serde_json.workspace = true serde_with.workspace = true smol_str.workspace = true -starknet.workspace = true starknet-crypto.workspace = true +starknet.workspace = true thiserror.workspace = true topological-sort.workspace = true tracing.workspace = true @@ -31,7 +31,7 @@ tracing.workspace = true cainome.workspace = true dojo-types = { path = "../dojo-types", optional = true } http = { workspace = true, optional = true } -ipfs-api-backend-hyper = { git = "https://github.com/ferristseng/rust-ipfs-api", rev = "af2c17f7b19ef5b9898f458d97a90055c3605633", features = [ "with-hyper-rustls" ], optional = true } +ipfs-api-backend-hyper = { workspace = true, optional = true } scarb = { workspace = true, optional = true } tokio = { version = "1.32.0", features = [ "time" ], default-features = false, optional = true } toml.workspace = true diff --git a/crates/sozo/ops/Cargo.toml b/crates/sozo/ops/Cargo.toml index 0204596429..a72f38cc34 100644 --- a/crates/sozo/ops/Cargo.toml +++ b/crates/sozo/ops/Cargo.toml @@ -15,10 +15,10 @@ cairo-lang-defs.workspace = true cairo-lang-filesystem.workspace = true cairo-lang-plugins.workspace = true cairo-lang-project.workspace = true -cairo-lang-sierra.workspace = true cairo-lang-sierra-to-casm.workspace = true -cairo-lang-starknet.workspace = true +cairo-lang-sierra.workspace = true cairo-lang-starknet-classes.workspace = true +cairo-lang-starknet.workspace = true cairo-lang-test-plugin.workspace = true cairo-lang-utils.workspace = true camino.workspace = true @@ -36,16 +36,16 @@ num-bigint = "0.4.6" num-traits.workspace = true reqwest.workspace = true rpassword.workspace = true -scarb.workspace = true scarb-ui.workspace = true +scarb.workspace = true semver.workspace = true serde.workspace = true serde_json.workspace = true serde_with.workspace = true smol_str.workspace = true sozo-walnut = { workspace = true, optional = true } -starknet.workspace = true starknet-crypto.workspace = true +starknet.workspace = true thiserror.workspace = true tokio.workspace = true toml.workspace = true @@ -58,7 +58,7 @@ katana-runner = { workspace = true, optional = true } [dev-dependencies] assert_fs.workspace = true dojo-test-utils = { workspace = true, features = [ "build-examples" ] } -ipfs-api-backend-hyper = { git = "https://github.com/ferristseng/rust-ipfs-api", rev = "af2c17f7b19ef5b9898f458d97a90055c3605633", features = [ "with-hyper-rustls" ] } +ipfs-api-backend-hyper.workspace = true katana-runner.workspace = true [features] diff --git a/crates/torii/core/Cargo.toml b/crates/torii/core/Cargo.toml index 30040d528b..e5d037972a 100644 --- a/crates/torii/core/Cargo.toml +++ b/crates/torii/core/Cargo.toml @@ -16,8 +16,10 @@ bitflags = "2.6.0" cainome.workspace = true chrono.workspace = true crypto-bigint.workspace = true +data-url.workspace = true dojo-types.workspace = true dojo-world = { workspace = true, features = [ "contracts", "manifest" ] } +# fluent-uri.workspace = true futures-channel = "0.3.0" futures-util.workspace = true hashlink.workspace = true @@ -33,6 +35,7 @@ starknet.workspace = true thiserror.workspace = true tokio = { version = "1.32.0", features = [ "sync" ], default-features = true } # tokio-stream = "0.1.11" +ipfs-api-backend-hyper.workspace = true tokio-util.workspace = true toml.workspace = true tracing.workspace = true diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index ed51840ae6..9343cc3b05 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -259,8 +259,9 @@ impl Engine

{ match self.process(fetch_result).await { Ok(_) => { - self.db.execute().await?; + self.db.flush().await?; self.db.apply_cache_diff().await?; + self.db.execute().await?; }, Err(e) => { error!(target: LOG_TARGET, error = %e, "Processing fetched data."); @@ -627,8 +628,7 @@ impl Engine

{ unique_contracts.insert(event.from_address); - Self::process_event( - self, + self.process_event( block_number, block_timestamp, &event_id, @@ -688,8 +688,7 @@ impl Engine

{ let event_id = format!("{:#064x}:{:#x}:{:#04x}", block_number, *transaction_hash, event_idx); - Self::process_event( - self, + self.process_event( block_number, block_timestamp, &event_id, @@ -701,8 +700,7 @@ impl Engine

{ } if self.config.flags.contains(IndexingFlags::TRANSACTIONS) { - Self::process_transaction( - self, + self.process_transaction( block_number, block_timestamp, *transaction_hash, diff --git a/crates/torii/core/src/executor/erc.rs b/crates/torii/core/src/executor/erc.rs new file mode 100644 index 0000000000..eb4a3c19bc --- /dev/null +++ b/crates/torii/core/src/executor/erc.rs @@ -0,0 +1,297 @@ +use std::str::FromStr; +use std::sync::Arc; + +use anyhow::{Context, Result}; +use cainome::cairo_serde::{ByteArray, CairoSerde}; +use data_url::mime::Mime; +use data_url::DataUrl; +use reqwest::Client; +use starknet::core::types::{BlockId, BlockTag, FunctionCall, U256}; +use starknet::core::utils::{get_selector_from_name, parse_cairo_short_string}; +use starknet::providers::Provider; +use starknet_crypto::Felt; +use tracing::{debug, trace}; + +use super::{ApplyBalanceDiffQuery, Executor}; +use crate::constants::TOKEN_BALANCE_TABLE; +use crate::sql::utils::{felt_to_sql_string, sql_string_to_u256, u256_to_sql_string, I256}; +use crate::sql::FELT_DELIMITER; +use crate::types::ContractType; +use crate::utils::{fetch_content_from_ipfs, MAX_RETRY}; + +#[derive(Debug, Clone)] +pub struct RegisterErc721TokenQuery { + pub token_id: String, + pub contract_address: Felt, + pub actual_token_id: U256, +} + +#[derive(Debug, Clone)] +pub struct RegisterErc721TokenMetadata { + pub query: RegisterErc721TokenQuery, + pub name: String, + pub symbol: String, + pub metadata: String, +} + +#[derive(Debug, Clone)] +pub struct RegisterErc20TokenQuery { + pub token_id: String, + pub contract_address: Felt, + pub name: String, + pub symbol: String, + pub decimals: u8, +} + +impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { + pub async fn apply_balance_diff( + &mut self, + apply_balance_diff: ApplyBalanceDiffQuery, + ) -> Result<()> { + let erc_cache = apply_balance_diff.erc_cache; + for ((contract_type, id_str), balance) in erc_cache.iter() { + let id = id_str.split(FELT_DELIMITER).collect::>(); + match contract_type { + ContractType::WORLD => unreachable!(), + ContractType::ERC721 => { + // account_address/contract_address:id => ERC721 + assert!(id.len() == 2); + let account_address = id[0]; + let token_id = id[1]; + let mid = token_id.split(":").collect::>(); + let contract_address = mid[0]; + + self.apply_balance_diff_helper( + id_str, + account_address, + contract_address, + token_id, + balance, + ) + .await + .with_context(|| "Failed to apply balance diff in apply_cache_diff")?; + } + ContractType::ERC20 => { + // account_address/contract_address/ => ERC20 + assert!(id.len() == 3); + let account_address = id[0]; + let contract_address = id[1]; + let token_id = id[1]; + + self.apply_balance_diff_helper( + id_str, + account_address, + contract_address, + token_id, + balance, + ) + .await + .with_context(|| "Failed to apply balance diff in apply_cache_diff")?; + } + } + } + + Ok(()) + } + + pub async fn apply_balance_diff_helper( + &mut self, + id: &str, + account_address: &str, + contract_address: &str, + token_id: &str, + balance_diff: &I256, + ) -> Result<()> { + let tx = &mut self.transaction; + let balance: Option<(String,)> = + sqlx::query_as(&format!("SELECT balance FROM {TOKEN_BALANCE_TABLE} WHERE id = ?")) + .bind(id) + .fetch_optional(&mut **tx) + .await?; + + let mut balance = if let Some(balance) = balance { + sql_string_to_u256(&balance.0) + } else { + U256::from(0u8) + }; + + if balance_diff.is_negative { + if balance < balance_diff.value { + dbg!(&balance_diff, balance, id); + } + balance -= balance_diff.value; + } else { + balance += balance_diff.value; + } + + // write the new balance to the database + sqlx::query(&format!( + "INSERT OR REPLACE INTO {TOKEN_BALANCE_TABLE} (id, contract_address, account_address, \ + token_id, balance) VALUES (?, ?, ?, ?, ?)", + )) + .bind(id) + .bind(contract_address) + .bind(account_address) + .bind(token_id) + .bind(u256_to_sql_string(&balance)) + .execute(&mut **tx) + .await?; + + Ok(()) + } + + pub async fn process_register_erc721_token_query( + register_erc721_token: RegisterErc721TokenQuery, + provider: Arc

, + name: String, + symbol: String, + ) -> Result { + let token_uri = if let Ok(token_uri) = provider + .call( + FunctionCall { + contract_address: register_erc721_token.contract_address, + entry_point_selector: get_selector_from_name("token_uri").unwrap(), + calldata: vec![ + register_erc721_token.actual_token_id.low().into(), + register_erc721_token.actual_token_id.high().into(), + ], + }, + BlockId::Tag(BlockTag::Pending), + ) + .await + { + token_uri + } else if let Ok(token_uri) = provider + .call( + FunctionCall { + contract_address: register_erc721_token.contract_address, + entry_point_selector: get_selector_from_name("tokenURI").unwrap(), + calldata: vec![ + register_erc721_token.actual_token_id.low().into(), + register_erc721_token.actual_token_id.high().into(), + ], + }, + BlockId::Tag(BlockTag::Pending), + ) + .await + { + token_uri + } else { + return Err(anyhow::anyhow!("Failed to fetch token_uri")); + }; + + let token_uri = if let Ok(byte_array) = ByteArray::cairo_deserialize(&token_uri, 0) { + byte_array.to_string().expect("Return value not String") + } else if let Ok(felt_array) = Vec::::cairo_deserialize(&token_uri, 0) { + felt_array + .iter() + .map(parse_cairo_short_string) + .collect::, _>>() + .map(|strings| strings.join("")) + .map_err(|_| anyhow::anyhow!("Failed parsing Array to String"))? + } else { + return Err(anyhow::anyhow!("token_uri is neither ByteArray nor Array")); + }; + + let metadata = Self::fetch_metadata(&token_uri).await.with_context(|| { + format!( + "Failed to fetch metadata for token_id: {}", + register_erc721_token.actual_token_id + ) + })?; + let metadata = serde_json::to_string(&metadata).context("Failed to serialize metadata")?; + Ok(RegisterErc721TokenMetadata { query: register_erc721_token, metadata, name, symbol }) + } + + // given a uri which can be either http/https url or data uri, fetch the metadata erc721 + // metadata json schema + pub async fn fetch_metadata(token_uri: &str) -> Result { + // Parse the token_uri + + match token_uri { + uri if uri.starts_with("http") || uri.starts_with("https") => { + // Fetch metadata from HTTP/HTTPS URL + debug!(token_uri = %token_uri, "Fetching metadata from http/https URL"); + let client = Client::new(); + let response = client + .get(token_uri) + .send() + .await + .context("Failed to fetch metadata from URL")?; + + let bytes = response.bytes().await.context("Failed to read response bytes")?; + let json: serde_json::Value = serde_json::from_slice(&bytes) + .context(format!("Failed to parse metadata JSON from response: {:?}", bytes))?; + + Ok(json) + } + uri if uri.starts_with("ipfs") => { + let cid = uri.strip_prefix("ipfs://").unwrap(); + debug!(cid = %cid, "Fetching metadata from IPFS"); + let response = fetch_content_from_ipfs(cid, MAX_RETRY) + .await + .context("Failed to fetch metadata from IPFS")?; + + let json: serde_json::Value = + serde_json::from_slice(&response).context(format!( + "Failed to parse metadata JSON from IPFS: {:?}, data: {:?}", + cid, &response + ))?; + + Ok(json) + } + uri if uri.starts_with("data") => { + // Parse and decode data URI + debug!("Parsing metadata from data URI"); + trace!(data_uri = %token_uri); + + // HACK: https://github.com/servo/rust-url/issues/908 + let uri = token_uri.replace("#", "%23"); + + let data_url = DataUrl::process(&uri).context("Failed to parse data URI")?; + + // Ensure the MIME type is JSON + if data_url.mime_type() != &Mime::from_str("application/json").unwrap() { + return Err(anyhow::anyhow!("Data URI is not of JSON type")); + } + + let decoded = data_url.decode_to_vec().context("Failed to decode data URI")?; + // HACK: Loot Survior NFT metadata contains control characters which makes the json + // DATA invalid so filter them out + let decoded_str = String::from_utf8_lossy(&decoded.0) + .chars() + .filter(|c| !c.is_ascii_control()) + .collect::(); + + let json: serde_json::Value = serde_json::from_str(&decoded_str) + .context(format!("Failed to parse metadata JSON from data URI: {}", &uri))?; + + Ok(json) + } + uri => Err(anyhow::anyhow!("Unsupported URI scheme found in token URI: {}", uri)), + } + } + + pub async fn handle_erc721_token_metadata( + &mut self, + result: RegisterErc721TokenMetadata, + ) -> Result<()> { + let query = sqlx::query( + "INSERT INTO tokens (id, contract_address, name, symbol, decimals, metadata) VALUES \ + (?, ?, ?, ?, ?, ?)", + ) + .bind(&result.query.token_id) + .bind(felt_to_sql_string(&result.query.contract_address)) + .bind(&result.name) + .bind(&result.symbol) + .bind(0) + .bind(&result.metadata); + + query + .execute(&mut *self.transaction) + .await + .with_context(|| format!("Failed to execute721Token query: {:?}", result))?; + + Ok(()) + } +} diff --git a/crates/torii/core/src/executor.rs b/crates/torii/core/src/executor/mod.rs similarity index 61% rename from crates/torii/core/src/executor.rs rename to crates/torii/core/src/executor/mod.rs index 04d64676b9..5033697442 100644 --- a/crates/torii/core/src/executor.rs +++ b/crates/torii/core/src/executor/mod.rs @@ -1,28 +1,33 @@ use std::collections::HashMap; use std::mem; use std::str::FromStr; +use std::sync::Arc; use anyhow::{Context, Result}; +use cainome::cairo_serde::{ByteArray, CairoSerde}; use dojo_types::schema::{Struct, Ty}; -use sqlx::query::Query; -use sqlx::sqlite::SqliteArguments; use sqlx::{FromRow, Pool, Sqlite, Transaction}; -use starknet::core::types::{Felt, U256}; +use starknet::core::types::{BlockId, BlockTag, Felt, FunctionCall}; +use starknet::core::utils::{get_selector_from_name, parse_cairo_short_string}; +use starknet::providers::Provider; use tokio::sync::broadcast::{Receiver, Sender}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; -use tokio::sync::oneshot; +use tokio::sync::{oneshot, Semaphore}; +use tokio::task::JoinSet; use tokio::time::Instant; use tracing::{debug, error}; use crate::simple_broker::SimpleBroker; -use crate::sql::utils::{felt_to_sql_string, sql_string_to_u256, u256_to_sql_string, I256}; -use crate::sql::FELT_DELIMITER; +use crate::sql::utils::{felt_to_sql_string, I256}; use crate::types::{ ContractCursor, ContractType, Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated, Model as ModelRegistered, OptimisticEntity, OptimisticEventMessage, }; +pub mod erc; +pub use erc::{RegisterErc20TokenQuery, RegisterErc721TokenMetadata, RegisterErc721TokenQuery}; + pub(crate) const LOG_TARGET: &str = "torii_core::executor"; #[derive(Debug, Clone)] @@ -90,14 +95,19 @@ pub enum QueryType { DeleteEntity(DeleteEntityQuery), EventMessage(Ty), ApplyBalanceDiff(ApplyBalanceDiffQuery), + RegisterErc721Token(RegisterErc721TokenQuery), + RegisterErc20Token(RegisterErc20TokenQuery), + TokenTransfer, RegisterModel, StoreEvent, + // similar to execute but doesn't create a new transaction + Flush, Execute, Other, } #[derive(Debug)] -pub struct Executor<'c> { +pub struct Executor<'c, P: Provider + Sync + Send + 'static> { // Queries should use `transaction` instead of `pool` // This `pool` is only used to create a new `transaction` pool: Pool, @@ -105,6 +115,16 @@ pub struct Executor<'c> { publish_queue: Vec, rx: UnboundedReceiver, shutdown_rx: Receiver<()>, + // These tasks are spawned to fetch ERC721 token metadata from the chain + // to not block the main loop + register_tasks: JoinSet>, + // Some queries depends on the metadata being registered, so we defer them + // until the metadata is fetched + deferred_query_messages: Vec, + // It is used to make RPC calls to fetch token_uri data for erc721 contracts + provider: Arc

, + // Used to limit number of tasks that run in parallel to fetch metadata + semaphore: Arc, } #[derive(Debug)] @@ -162,19 +182,48 @@ impl QueryMessage { rx, ) } + + pub fn flush_recv() -> (Self, oneshot::Receiver>) { + let (tx, rx) = oneshot::channel(); + ( + Self { + statement: "".to_string(), + arguments: vec![], + query_type: QueryType::Flush, + tx: Some(tx), + }, + rx, + ) + } } -impl<'c> Executor<'c> { +impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { pub async fn new( pool: Pool, shutdown_tx: Sender<()>, + provider: Arc

, + max_concurrent_tasks: usize, ) -> Result<(Self, UnboundedSender)> { let (tx, rx) = unbounded_channel(); let transaction = pool.begin().await?; let publish_queue = Vec::new(); let shutdown_rx = shutdown_tx.subscribe(); - - Ok((Executor { pool, transaction, publish_queue, rx, shutdown_rx }, tx)) + let semaphore = Arc::new(Semaphore::new(max_concurrent_tasks)); + + Ok(( + Executor { + pool, + transaction, + publish_queue, + rx, + shutdown_rx, + register_tasks: JoinSet::new(), + deferred_query_messages: Vec::new(), + provider, + semaphore, + }, + tx, + )) } pub async fn run(&mut self) -> Result<()> { @@ -185,41 +234,38 @@ impl<'c> Executor<'c> { break Ok(()); } Some(msg) = self.rx.recv() => { - let QueryMessage { statement, arguments, query_type, tx } = msg; - let mut query = sqlx::query(&statement); - - for arg in &arguments { - query = match arg { - Argument::Null => query.bind(None::), - Argument::Int(integer) => query.bind(integer), - Argument::Bool(bool) => query.bind(bool), - Argument::String(string) => query.bind(string), - Argument::FieldElement(felt) => query.bind(format!("{:#x}", felt)), - } - } - - match self.handle_query_type(query, query_type.clone(), &statement, &arguments, tx).await { + let query_type = msg.query_type.clone(); + match self.handle_query_message(msg).await { Ok(()) => {}, Err(e) => { error!(target: LOG_TARGET, r#type = ?query_type, error = %e, "Failed to execute query."); } } } + Some(result) = self.register_tasks.join_next() => { + let result = result??; + self.handle_erc721_token_metadata(result).await?; + } } } } - async fn handle_query_type<'a>( - &mut self, - query: Query<'a, Sqlite, SqliteArguments<'a>>, - query_type: QueryType, - statement: &str, - arguments: &[Argument], - sender: Option>>, - ) -> Result<()> { + async fn handle_query_message(&mut self, query_message: QueryMessage) -> Result<()> { let tx = &mut self.transaction; - match query_type { + let mut query = sqlx::query(&query_message.statement); + + for arg in &query_message.arguments { + query = match arg { + Argument::Null => query.bind(None::), + Argument::Int(integer) => query.bind(integer), + Argument::Bool(bool) => query.bind(bool), + Argument::String(string) => query.bind(string), + Argument::FieldElement(felt) => query.bind(format!("{:#x}", felt)), + } + } + + match query_message.query_type { QueryType::SetHead(set_head) => { let previous_block_timestamp: u64 = sqlx::query_scalar::<_, i64>( "SELECT last_block_timestamp FROM contracts WHERE id = ?", @@ -237,7 +283,10 @@ impl<'c> Executor<'c> { }; query.execute(&mut **tx).await.with_context(|| { - format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) + format!( + "Failed to execute query: {:?}, args: {:?}", + query_message.statement, query_message.arguments + ) })?; let row = sqlx::query("UPDATE contracts SET tps = ? WHERE id = ? RETURNING *") @@ -358,7 +407,10 @@ impl<'c> Executor<'c> { } QueryType::SetEntity(entity) => { let row = query.fetch_one(&mut **tx).await.with_context(|| { - format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) + format!( + "Failed to execute query: {:?}, args: {:?}", + query_message.statement, query_message.arguments + ) })?; let mut entity_updated = EntityUpdated::from_row(&row)?; entity_updated.updated_model = Some(entity); @@ -381,7 +433,10 @@ impl<'c> Executor<'c> { } QueryType::DeleteEntity(entity) => { let delete_model = query.execute(&mut **tx).await.with_context(|| { - format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) + format!( + "Failed to execute query: {:?}, args: {:?}", + query_message.statement, query_message.arguments + ) })?; if delete_model.rows_affected() == 0 { return Ok(()); @@ -432,14 +487,20 @@ impl<'c> Executor<'c> { } QueryType::RegisterModel => { let row = query.fetch_one(&mut **tx).await.with_context(|| { - format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) + format!( + "Failed to execute query: {:?}, args: {:?}", + query_message.statement, query_message.arguments + ) })?; let model_registered = ModelRegistered::from_row(&row)?; self.publish_queue.push(BrokerMessage::ModelRegistered(model_registered)); } QueryType::EventMessage(entity) => { let row = query.fetch_one(&mut **tx).await.with_context(|| { - format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) + format!( + "Failed to execute query: {:?}, args: {:?}", + query_message.statement, query_message.arguments + ) })?; let mut event_message = EventMessageUpdated::from_row(&row)?; event_message.updated_model = Some(entity); @@ -460,7 +521,10 @@ impl<'c> Executor<'c> { } QueryType::StoreEvent => { let row = query.fetch_one(&mut **tx).await.with_context(|| { - format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) + format!( + "Failed to execute query: {:?}, args: {:?}", + query_message.statement, query_message.arguments + ) })?; let event = EventEmitted::from_row(&row)?; self.publish_queue.push(BrokerMessage::EventEmitted(event)); @@ -471,13 +535,127 @@ impl<'c> Executor<'c> { self.apply_balance_diff(apply_balance_diff).await?; debug!(target: LOG_TARGET, duration = ?instant.elapsed(), "Applied balance diff."); } + QueryType::RegisterErc721Token(register_erc721_token) => { + let semaphore = self.semaphore.clone(); + let provider = self.provider.clone(); + let res = sqlx::query_as::<_, (String, String)>( + "SELECT name, symbol FROM tokens WHERE contract_address = ?", + ) + .bind(felt_to_sql_string(®ister_erc721_token.contract_address)) + .fetch_one(&mut **tx) + .await; + + // If we find a token already registered for this contract_address we dont need to + // refetch the data since its same for all ERC721 tokens + let (name, symbol) = match res { + Ok((name, symbol)) => { + debug!( + contract_address = %felt_to_sql_string(®ister_erc721_token.contract_address), + "Token already registered for contract_address, so reusing fetched data", + ); + (name, symbol) + } + Err(_) => { + // Fetch token information from the chain + let name = provider + .call( + FunctionCall { + contract_address: register_erc721_token.contract_address, + entry_point_selector: get_selector_from_name("name").unwrap(), + calldata: vec![], + }, + BlockId::Tag(BlockTag::Pending), + ) + .await?; + + // len = 1 => return value felt (i.e. legacy erc721 token) + // len > 1 => return value ByteArray (i.e. new erc721 token) + let name = if name.len() == 1 { + parse_cairo_short_string(&name[0]).unwrap() + } else { + ByteArray::cairo_deserialize(&name, 0) + .expect("Return value not ByteArray") + .to_string() + .expect("Return value not String") + }; + + let symbol = provider + .call( + FunctionCall { + contract_address: register_erc721_token.contract_address, + entry_point_selector: get_selector_from_name("symbol").unwrap(), + calldata: vec![], + }, + BlockId::Tag(BlockTag::Pending), + ) + .await?; + let symbol = if symbol.len() == 1 { + parse_cairo_short_string(&symbol[0]).unwrap() + } else { + ByteArray::cairo_deserialize(&symbol, 0) + .expect("Return value not ByteArray") + .to_string() + .expect("Return value not String") + }; + + (name, symbol) + } + }; + + self.register_tasks.spawn(async move { + let permit = semaphore.acquire().await.unwrap(); + + let result = Self::process_register_erc721_token_query( + register_erc721_token, + provider, + name, + symbol, + ) + .await; + + drop(permit); + result + }); + } + QueryType::RegisterErc20Token(register_erc20_token) => { + let query = sqlx::query( + "INSERT INTO tokens (id, contract_address, name, symbol, decimals) VALUES (?, \ + ?, ?, ?, ?)", + ) + .bind(®ister_erc20_token.token_id) + .bind(felt_to_sql_string(®ister_erc20_token.contract_address)) + .bind(®ister_erc20_token.name) + .bind(®ister_erc20_token.symbol) + .bind(register_erc20_token.decimals); + + query.execute(&mut **tx).await.with_context(|| { + format!( + "Failed to execute RegisterErc20Token query: {:?}", + register_erc20_token + ) + })?; + } + QueryType::Flush => { + debug!(target: LOG_TARGET, "Flushing query."); + let instant = Instant::now(); + let res = self.execute(false).await; + debug!(target: LOG_TARGET, duration = ?instant.elapsed(), "Flushed query."); + + if let Some(sender) = query_message.tx { + sender + .send(res) + .map_err(|_| anyhow::anyhow!("Failed to send execute result"))?; + } else { + res?; + } + } QueryType::Execute => { debug!(target: LOG_TARGET, "Executing query."); let instant = Instant::now(); - let res = self.execute().await; + let res = self.execute(true).await; debug!(target: LOG_TARGET, duration = ?instant.elapsed(), "Executed query."); - if let Some(sender) = sender { + if let Some(sender) = query_message.tx { sender .send(res) .map_err(|_| anyhow::anyhow!("Failed to send execute result"))?; @@ -485,9 +663,16 @@ impl<'c> Executor<'c> { res?; } } + QueryType::TokenTransfer => { + // defer executing these queries since they depend on TokenRegister queries + self.deferred_query_messages.push(query_message); + } QueryType::Other => { query.execute(&mut **tx).await.with_context(|| { - format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) + format!( + "Failed to execute query: {:?}, args: {:?}", + query_message.statement, query_message.arguments + ) })?; } } @@ -495,109 +680,42 @@ impl<'c> Executor<'c> { Ok(()) } - async fn execute(&mut self) -> Result<()> { - let transaction = mem::replace(&mut self.transaction, self.pool.begin().await?); - transaction.commit().await?; + async fn execute(&mut self, new_transaction: bool) -> Result<()> { + if new_transaction { + let transaction = mem::replace(&mut self.transaction, self.pool.begin().await?); + transaction.commit().await?; + } for message in self.publish_queue.drain(..) { send_broker_message(message); } - Ok(()) - } - - async fn apply_balance_diff( - &mut self, - apply_balance_diff: ApplyBalanceDiffQuery, - ) -> Result<()> { - let erc_cache = apply_balance_diff.erc_cache; - for ((contract_type, id_str), balance) in erc_cache.iter() { - let id = id_str.split(FELT_DELIMITER).collect::>(); - match contract_type { - ContractType::WORLD => unreachable!(), - ContractType::ERC721 => { - // account_address/contract_address:id => ERC721 - assert!(id.len() == 2); - let account_address = id[0]; - let token_id = id[1]; - let mid = token_id.split(":").collect::>(); - let contract_address = mid[0]; - - self.apply_balance_diff_helper( - id_str, - account_address, - contract_address, - token_id, - balance, - ) - .await - .with_context(|| "Failed to apply balance diff in apply_cache_diff")?; - } - ContractType::ERC20 => { - // account_address/contract_address/ => ERC20 - assert!(id.len() == 3); - let account_address = id[0]; - let contract_address = id[1]; - let token_id = id[1]; - - self.apply_balance_diff_helper( - id_str, - account_address, - contract_address, - token_id, - balance, - ) - .await - .with_context(|| "Failed to apply balance diff in apply_cache_diff")?; - } - } + while let Some(result) = self.register_tasks.join_next().await { + let result = result??; + self.handle_erc721_token_metadata(result).await?; } - Ok(()) - } - - async fn apply_balance_diff_helper( - &mut self, - id: &str, - account_address: &str, - contract_address: &str, - token_id: &str, - balance_diff: &I256, - ) -> Result<()> { - let tx = &mut self.transaction; - let balance: Option<(String,)> = - sqlx::query_as("SELECT balance FROM balances WHERE id = ?") - .bind(id) - .fetch_optional(&mut **tx) - .await?; - - let mut balance = if let Some(balance) = balance { - sql_string_to_u256(&balance.0) - } else { - U256::from(0u8) - }; - - if balance_diff.is_negative { - if balance < balance_diff.value { - dbg!(&balance_diff, balance, id); + let mut deferred_query_messages = mem::take(&mut self.deferred_query_messages); + + for query_message in deferred_query_messages.drain(..) { + let mut query = sqlx::query(&query_message.statement); + for arg in &query_message.arguments { + query = match arg { + Argument::Null => query.bind(None::), + Argument::Int(integer) => query.bind(integer), + Argument::Bool(bool) => query.bind(bool), + Argument::String(string) => query.bind(string), + Argument::FieldElement(felt) => query.bind(format!("{:#x}", felt)), + }; } - balance -= balance_diff.value; - } else { - balance += balance_diff.value; - } - // write the new balance to the database - sqlx::query( - "INSERT OR REPLACE INTO balances (id, contract_address, account_address, token_id, \ - balance) VALUES (?, ?, ?, ?, ?)", - ) - .bind(id) - .bind(contract_address) - .bind(account_address) - .bind(token_id) - .bind(u256_to_sql_string(&balance)) - .execute(&mut **tx) - .await?; + query.execute(&mut *self.transaction).await.with_context(|| { + format!( + "Failed to execute query: {:?}, args: {:?}", + query_message.statement, query_message.arguments + ) + })?; + } Ok(()) } diff --git a/crates/torii/core/src/processors/erc721_legacy_transfer.rs b/crates/torii/core/src/processors/erc721_legacy_transfer.rs index 198a1ebbd9..bbf7e3711d 100644 --- a/crates/torii/core/src/processors/erc721_legacy_transfer.rs +++ b/crates/torii/core/src/processors/erc721_legacy_transfer.rs @@ -36,7 +36,7 @@ where async fn process( &self, - world: &WorldContractReader

, + _world: &WorldContractReader

, db: &mut Sql, _block_number: u64, block_timestamp: u64, @@ -50,16 +50,8 @@ where let token_id = U256Cainome::cairo_deserialize(&event.data, 2)?; let token_id = U256::from_words(token_id.low, token_id.high); - db.handle_erc721_transfer( - token_address, - from, - to, - token_id, - world.provider(), - block_timestamp, - event_id, - ) - .await?; + db.handle_erc721_transfer(token_address, from, to, token_id, block_timestamp, event_id) + .await?; debug!(target: LOG_TARGET, from = ?from, to = ?to, token_id = ?token_id, "ERC721 Transfer"); Ok(()) diff --git a/crates/torii/core/src/processors/erc721_transfer.rs b/crates/torii/core/src/processors/erc721_transfer.rs index 349bdbea24..a7e0ae9eb0 100644 --- a/crates/torii/core/src/processors/erc721_transfer.rs +++ b/crates/torii/core/src/processors/erc721_transfer.rs @@ -36,7 +36,7 @@ where async fn process( &self, - world: &WorldContractReader

, + _world: &WorldContractReader

, db: &mut Sql, _block_number: u64, block_timestamp: u64, @@ -50,16 +50,8 @@ where let token_id = U256Cainome::cairo_deserialize(&event.keys, 3)?; let token_id = U256::from_words(token_id.low, token_id.high); - db.handle_erc721_transfer( - token_address, - from, - to, - token_id, - world.provider(), - block_timestamp, - event_id, - ) - .await?; + db.handle_erc721_transfer(token_address, from, to, token_id, block_timestamp, event_id) + .await?; debug!(target: LOG_TARGET, from = ?from, to = ?to, token_id = ?token_id, "ERC721 Transfer"); Ok(()) diff --git a/crates/torii/core/src/processors/metadata_update.rs b/crates/torii/core/src/processors/metadata_update.rs index 4b17858d89..f09496fd82 100644 --- a/crates/torii/core/src/processors/metadata_update.rs +++ b/crates/torii/core/src/processors/metadata_update.rs @@ -1,5 +1,3 @@ -use std::time::Duration; - use anyhow::{Error, Result}; use async_trait::async_trait; use base64::engine::general_purpose; @@ -8,17 +6,13 @@ use cainome::cairo_serde::{ByteArray, CairoSerde, Zeroable}; use dojo_world::contracts::world::WorldContractReader; use dojo_world::metadata::WorldMetadata; use dojo_world::uri::Uri; -use reqwest::Client; use starknet::core::types::{Event, Felt}; use starknet::providers::Provider; -use tokio_util::bytes::Bytes; use tracing::{error, info}; use super::EventProcessor; use crate::sql::Sql; - -const IPFS_URL: &str = "https://cartridge.infura-ipfs.io/ipfs/"; -const MAX_RETRY: u8 = 3; +use crate::utils::{fetch_content_from_ipfs, MAX_RETRY}; pub(crate) const LOG_TARGET: &str = "torii_core::processors::metadata_update"; @@ -106,7 +100,7 @@ async fn metadata(uri_str: String) -> Result<(WorldMetadata, Option, Opt let uri = Uri::Ipfs(uri_str); let cid = uri.cid().ok_or("Uri is malformed").map_err(Error::msg)?; - let bytes = fetch_content(cid, MAX_RETRY).await?; + let bytes = fetch_content_from_ipfs(cid, MAX_RETRY).await?; let metadata: WorldMetadata = serde_json::from_str(std::str::from_utf8(&bytes)?)?; let icon_img = fetch_image(&metadata.icon_uri).await; @@ -117,36 +111,10 @@ async fn metadata(uri_str: String) -> Result<(WorldMetadata, Option, Opt async fn fetch_image(image_uri: &Option) -> Option { if let Some(uri) = image_uri { - let data = fetch_content(uri.cid()?, MAX_RETRY).await.ok()?; + let data = fetch_content_from_ipfs(uri.cid()?, MAX_RETRY).await.ok()?; let encoded = general_purpose::STANDARD.encode(data); return Some(encoded); } None } - -async fn fetch_content(cid: &str, mut retries: u8) -> Result { - while retries > 0 { - let response = Client::new().get(format!("{IPFS_URL}{}", cid)).send().await; - - match response { - Ok(response) => return response.bytes().await.map_err(|e| e.into()), - Err(e) => { - retries -= 1; - if retries > 0 { - info!( - target: LOG_TARGET, - error = %e, - "Fetch uri." - ); - tokio::time::sleep(Duration::from_secs(3)).await; - } - } - } - } - - Err(Error::msg(format!( - "Failed to pull data from IPFS after {} attempts, cid: {}", - MAX_RETRY, cid - ))) -} diff --git a/crates/torii/core/src/sql/erc.rs b/crates/torii/core/src/sql/erc.rs index f82eacb1a2..0a7acc643d 100644 --- a/crates/torii/core/src/sql/erc.rs +++ b/crates/torii/core/src/sql/erc.rs @@ -6,11 +6,13 @@ use cainome::cairo_serde::{ByteArray, CairoSerde}; use starknet::core::types::{BlockId, BlockTag, Felt, FunctionCall, U256}; use starknet::core::utils::{get_selector_from_name, parse_cairo_short_string}; use starknet::providers::Provider; -use tracing::debug; use super::utils::{u256_to_sql_string, I256}; use super::{Sql, FELT_DELIMITER}; -use crate::executor::{ApplyBalanceDiffQuery, Argument, QueryMessage, QueryType}; +use crate::executor::{ + ApplyBalanceDiffQuery, Argument, QueryMessage, QueryType, RegisterErc20TokenQuery, + RegisterErc721TokenQuery, +}; use crate::sql::utils::{felt_and_u256_to_sql_string, felt_to_sql_string, felts_to_sql_string}; use crate::types::ContractType; use crate::utils::utc_dt_string_from_timestamp; @@ -34,7 +36,6 @@ impl Sql { if !token_exists { self.register_erc20_token_metadata(contract_address, &token_id, provider).await?; - self.execute().await.with_context(|| "Failed to execute in handle_erc20_transfer")?; } self.store_erc_transfer_event( @@ -66,6 +67,7 @@ impl Sql { } if self.local_cache.erc_cache.len() >= 100000 { + self.flush().await.with_context(|| "Failed to flush in handle_erc20_transfer")?; self.apply_cache_diff().await?; } @@ -73,23 +75,23 @@ impl Sql { } #[allow(clippy::too_many_arguments)] - pub async fn handle_erc721_transfer( + pub async fn handle_erc721_transfer( &mut self, contract_address: Felt, from_address: Felt, to_address: Felt, token_id: U256, - provider: &P, block_timestamp: u64, event_id: &str, ) -> Result<()> { // contract_address:id + let actual_token_id = token_id; let token_id = felt_and_u256_to_sql_string(&contract_address, &token_id); let token_exists: bool = self.local_cache.contains_token_id(&token_id); if !token_exists { - self.register_erc721_token_metadata(contract_address, &token_id, provider).await?; - self.execute().await?; + self.register_erc721_token_metadata(contract_address, &token_id, actual_token_id) + .await?; } self.store_erc_transfer_event( @@ -126,6 +128,7 @@ impl Sql { } if self.local_cache.erc_cache.len() >= 100000 { + self.flush().await.with_context(|| "Failed to flush in handle_erc721_transfer")?; self.apply_cache_diff().await?; } @@ -193,18 +196,16 @@ impl Sql { .await?; let decimals = u8::cairo_deserialize(&decimals, 0).expect("Return value not u8"); - // Insert the token into the tokens table - self.executor.send(QueryMessage::other( - "INSERT INTO tokens (id, contract_address, name, symbol, decimals) VALUES (?, ?, ?, \ - ?, ?)" - .to_string(), - vec![ - Argument::String(token_id.to_string()), - Argument::FieldElement(contract_address), - Argument::String(name), - Argument::String(symbol), - Argument::Int(decimals.into()), - ], + self.executor.send(QueryMessage::new( + "".to_string(), + vec![], + QueryType::RegisterErc20Token(RegisterErc20TokenQuery { + token_id: token_id.to_string(), + contract_address, + name, + symbol, + decimals, + }), ))?; self.local_cache.register_token_id(token_id.to_string()); @@ -212,100 +213,26 @@ impl Sql { Ok(()) } - async fn register_erc721_token_metadata( + async fn register_erc721_token_metadata( &mut self, contract_address: Felt, token_id: &str, - provider: &P, + actual_token_id: U256, ) -> Result<()> { - let res = sqlx::query_as::<_, (String, String, u8)>( - "SELECT name, symbol, decimals FROM tokens WHERE contract_address = ?", - ) - .bind(felt_to_sql_string(&contract_address)) - .fetch_one(&self.pool) - .await; - - // If we find a token already registered for this contract_address we dont need to refetch - // the data since its same for all ERC721 tokens - if let Ok((name, symbol, decimals)) = res { - debug!( - contract_address = %felt_to_sql_string(&contract_address), - "Token already registered for contract_address, so reusing fetched data", - ); - self.executor.send(QueryMessage::other( - "INSERT INTO tokens (id, contract_address, name, symbol, decimals) VALUES (?, ?, \ - ?, ?, ?)" - .to_string(), - vec![ - Argument::String(token_id.to_string()), - Argument::FieldElement(contract_address), - Argument::String(name), - Argument::String(symbol), - Argument::Int(decimals.into()), - ], - ))?; - self.local_cache.register_token_id(token_id.to_string()); - return Ok(()); - } - - // Fetch token information from the chain - let name = provider - .call( - FunctionCall { - contract_address, - entry_point_selector: get_selector_from_name("name").unwrap(), - calldata: vec![], - }, - BlockId::Tag(BlockTag::Pending), - ) - .await?; - - // len = 1 => return value felt (i.e. legacy erc721 token) - // len > 1 => return value ByteArray (i.e. new erc721 token) - let name = if name.len() == 1 { - parse_cairo_short_string(&name[0]).unwrap() - } else { - ByteArray::cairo_deserialize(&name, 0) - .expect("Return value not ByteArray") - .to_string() - .expect("Return value not String") - }; - - let symbol = provider - .call( - FunctionCall { - contract_address, - entry_point_selector: get_selector_from_name("symbol").unwrap(), - calldata: vec![], - }, - BlockId::Tag(BlockTag::Pending), - ) - .await?; - let symbol = if symbol.len() == 1 { - parse_cairo_short_string(&symbol[0]).unwrap() - } else { - ByteArray::cairo_deserialize(&symbol, 0) - .expect("Return value not ByteArray") - .to_string() - .expect("Return value not String") - }; - - let decimals = 0; - - // Insert the token into the tokens table - self.executor.send(QueryMessage::other( - "INSERT INTO tokens (id, contract_address, name, symbol, decimals) VALUES (?, ?, ?, \ - ?, ?)" - .to_string(), - vec![ - Argument::String(token_id.to_string()), - Argument::FieldElement(contract_address), - Argument::String(name), - Argument::String(symbol), - Argument::Int(decimals.into()), - ], + self.executor.send(QueryMessage::new( + "".to_string(), + vec![], + QueryType::RegisterErc721Token(RegisterErc721TokenQuery { + token_id: token_id.to_string(), + contract_address, + actual_token_id, + }), ))?; + // optimistically add the token_id to cache + // this cache is used while applying the cache diff + // so we need to make sure that all RegisterErc*Token queries + // are applied before the cache diff is applied self.local_cache.register_token_id(token_id.to_string()); Ok(()) @@ -326,7 +253,7 @@ impl Sql { to_address, amount, token_id, executed_at) VALUES (?, ?, ?, ?, ?, ?, \ ?)"; - self.executor.send(QueryMessage::other( + self.executor.send(QueryMessage::new( insert_query.to_string(), vec![ Argument::String(event_id.to_string()), @@ -337,6 +264,7 @@ impl Sql { Argument::String(token_id.to_string()), Argument::String(utc_dt_string_from_timestamp(block_timestamp)), ], + QueryType::TokenTransfer, ))?; Ok(()) diff --git a/crates/torii/core/src/sql/mod.rs b/crates/torii/core/src/sql/mod.rs index ad00c34ca6..58b419572f 100644 --- a/crates/torii/core/src/sql/mod.rs +++ b/crates/torii/core/src/sql/mod.rs @@ -1257,4 +1257,10 @@ impl Sql { self.executor.send(execute)?; recv.await? } + + pub async fn flush(&self) -> Result<()> { + let (flush, recv) = QueryMessage::flush_recv(); + self.executor.send(flush)?; + recv.await? + } } diff --git a/crates/torii/core/src/sql/test.rs b/crates/torii/core/src/sql/test.rs index bd6fe9208a..7bc093e35e 100644 --- a/crates/torii/core/src/sql/test.rs +++ b/crates/torii/core/src/sql/test.rs @@ -131,7 +131,8 @@ async fn test_load_from_remote(sequencer: &RunnerCtx) { sqlx::migrate!("../migrations").run(&pool).await.unwrap(); let (shutdown_tx, _) = broadcast::channel(1); - let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + let (mut executor, sender) = + Executor::new(pool.clone(), shutdown_tx.clone(), Arc::clone(&provider), 100).await.unwrap(); tokio::spawn(async move { executor.run().await.unwrap(); }); @@ -300,7 +301,8 @@ async fn test_load_from_remote_del(sequencer: &RunnerCtx) { sqlx::migrate!("../migrations").run(&pool).await.unwrap(); let (shutdown_tx, _) = broadcast::channel(1); - let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + let (mut executor, sender) = + Executor::new(pool.clone(), shutdown_tx.clone(), Arc::clone(&provider), 100).await.unwrap(); tokio::spawn(async move { executor.run().await.unwrap(); }); @@ -313,7 +315,7 @@ async fn test_load_from_remote_del(sequencer: &RunnerCtx) { .await .unwrap(); - let _ = bootstrap_engine(world_reader, db.clone(), provider).await; + let _ = bootstrap_engine(world_reader, db.clone(), Arc::clone(&provider)).await.unwrap(); assert_eq!(count_table("dojo_examples-PlayerConfig", &pool).await, 0); assert_eq!(count_table("dojo_examples-PlayerConfig$favorite_item", &pool).await, 0); @@ -398,7 +400,9 @@ async fn test_update_with_set_record(sequencer: &RunnerCtx) { sqlx::migrate!("../migrations").run(&pool).await.unwrap(); let (shutdown_tx, _) = broadcast::channel(1); - let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + + let (mut executor, sender) = + Executor::new(pool.clone(), shutdown_tx.clone(), Arc::clone(&provider), 100).await.unwrap(); tokio::spawn(async move { executor.run().await.unwrap(); }); diff --git a/crates/torii/core/src/utils.rs b/crates/torii/core/src/utils.rs index 55f7ee563e..d66d294509 100644 --- a/crates/torii/core/src/utils.rs +++ b/crates/torii/core/src/utils.rs @@ -1,4 +1,19 @@ +use std::time::Duration; + +use anyhow::Result; use chrono::{DateTime, Utc}; +use futures_util::TryStreamExt; +use ipfs_api_backend_hyper::{IpfsApi, IpfsClient, TryFromUri}; +use tokio_util::bytes::Bytes; +use tracing::info; + +// pub const IPFS_URL: &str = "https://cartridge.infura-ipfs.io/ipfs/"; +pub const IPFS_URL: &str = "https://ipfs.io/ipfs/"; +pub const MAX_RETRY: u8 = 3; + +pub const IPFS_CLIENT_URL: &str = "https://ipfs.infura.io:5001"; +pub const IPFS_USERNAME: &str = "2EBrzr7ZASQZKH32sl2xWauXPSA"; +pub const IPFS_PASSWORD: &str = "12290b883db9138a8ae3363b6739d220"; pub fn must_utc_datetime_from_timestamp(timestamp: u64) -> DateTime { let naive_dt = DateTime::from_timestamp(timestamp as i64, 0) @@ -10,6 +25,31 @@ pub fn utc_dt_string_from_timestamp(timestamp: u64) -> String { must_utc_datetime_from_timestamp(timestamp).to_rfc3339() } +pub async fn fetch_content_from_ipfs(cid: &str, mut retries: u8) -> Result { + let client = + IpfsClient::from_str(IPFS_CLIENT_URL)?.with_credentials(IPFS_USERNAME, IPFS_PASSWORD); + while retries > 0 { + let response = client.cat(cid).map_ok(|chunk| chunk.to_vec()).try_concat().await; + match response { + Ok(stream) => return Ok(Bytes::from(stream)), + Err(e) => { + retries -= 1; + if retries > 0 { + info!( + error = %e, + "Fetch uri." + ); + tokio::time::sleep(Duration::from_secs(3)).await; + } + } + } + } + + Err(anyhow::anyhow!(format!( + "Failed to pull data from IPFS after {} attempts, cid: {}", + MAX_RETRY, cid + ))) +} // tests #[cfg(test)] mod tests { diff --git a/crates/torii/graphql/src/tests/metadata_test.rs b/crates/torii/graphql/src/tests/metadata_test.rs index 24224eb6b0..14d15d086e 100644 --- a/crates/torii/graphql/src/tests/metadata_test.rs +++ b/crates/torii/graphql/src/tests/metadata_test.rs @@ -1,15 +1,19 @@ #[cfg(test)] mod tests { use std::collections::HashMap; + use std::sync::Arc; use dojo_world::config::ProfileConfig; use dojo_world::metadata::WorldMetadata; use sqlx::SqlitePool; use starknet::core::types::Felt; + use starknet::providers::jsonrpc::HttpTransport; + use starknet::providers::JsonRpcClient; use tokio::sync::broadcast; use torii_core::executor::Executor; use torii_core::sql::Sql; use torii_core::types::ContractType; + use url::Url; use crate::schema::build_schema; use crate::tests::{run_graphql_query, Connection, Content, Metadata as SqlMetadata, Social}; @@ -54,8 +58,12 @@ mod tests { #[sqlx::test(migrations = "../migrations")] async fn test_metadata(pool: SqlitePool) { let (shutdown_tx, _) = broadcast::channel(1); + let url: Url = "https://www.example.com".parse().unwrap(); + let provider = Arc::new(JsonRpcClient::new(HttpTransport::new(url))); let (mut executor, sender) = - Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + Executor::new(pool.clone(), shutdown_tx.clone(), Arc::clone(&provider), 100) + .await + .unwrap(); tokio::spawn(async move { executor.run().await.unwrap(); }); @@ -115,8 +123,12 @@ mod tests { #[sqlx::test(migrations = "../migrations")] async fn test_empty_content(pool: SqlitePool) { let (shutdown_tx, _) = broadcast::channel(1); + let url: Url = "https://www.example.com".parse().unwrap(); + let provider = Arc::new(JsonRpcClient::new(HttpTransport::new(url))); let (mut executor, sender) = - Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + Executor::new(pool.clone(), shutdown_tx.clone(), Arc::clone(&provider), 100) + .await + .unwrap(); tokio::spawn(async move { executor.run().await.unwrap(); }); diff --git a/crates/torii/graphql/src/tests/mod.rs b/crates/torii/graphql/src/tests/mod.rs index d12c4c5e80..0db1aa12e4 100644 --- a/crates/torii/graphql/src/tests/mod.rs +++ b/crates/torii/graphql/src/tests/mod.rs @@ -349,7 +349,8 @@ pub async fn spinup_types_test(path: &str) -> Result { let world = WorldContractReader::new(strat.world_address, Arc::clone(&provider)); let (shutdown_tx, _) = broadcast::channel(1); - let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + let (mut executor, sender) = + Executor::new(pool.clone(), shutdown_tx.clone(), Arc::clone(&provider), 100).await.unwrap(); tokio::spawn(async move { executor.run().await.unwrap(); }); diff --git a/crates/torii/graphql/src/tests/subscription_test.rs b/crates/torii/graphql/src/tests/subscription_test.rs index f35b60fcc6..a48a9334ce 100644 --- a/crates/torii/graphql/src/tests/subscription_test.rs +++ b/crates/torii/graphql/src/tests/subscription_test.rs @@ -2,6 +2,7 @@ mod tests { use std::collections::HashMap; use std::str::FromStr; + use std::sync::Arc; use std::time::Duration; use async_graphql::value; @@ -12,12 +13,15 @@ mod tests { use serial_test::serial; use sqlx::SqlitePool; use starknet::core::types::Event; + use starknet::providers::jsonrpc::HttpTransport; + use starknet::providers::JsonRpcClient; use starknet_crypto::{poseidon_hash_many, Felt}; use tokio::sync::{broadcast, mpsc}; use torii_core::executor::Executor; use torii_core::sql::utils::felts_to_sql_string; use torii_core::sql::Sql; use torii_core::types::ContractType; + use url::Url; use crate::tests::{model_fixtures, run_graphql_subscription}; use crate::utils; @@ -26,8 +30,11 @@ mod tests { #[serial] async fn test_entity_subscription(pool: SqlitePool) { let (shutdown_tx, _) = broadcast::channel(1); + // used to fetch token_uri data for erc721 tokens so pass dummy for the test + let url: Url = "https://www.example.com".parse().unwrap(); + let provider = Arc::new(JsonRpcClient::new(HttpTransport::new(url))); let (mut executor, sender) = - Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + Executor::new(pool.clone(), shutdown_tx.clone(), provider, 100).await.unwrap(); tokio::spawn(async move { executor.run().await.unwrap(); }); @@ -170,8 +177,13 @@ mod tests { #[serial] async fn test_entity_subscription_with_id(pool: SqlitePool) { let (shutdown_tx, _) = broadcast::channel(1); + + // dummy provider since its required to query data for erc721 tokens + let url: Url = "https://www.example.com".parse().unwrap(); + let provider = Arc::new(JsonRpcClient::new(HttpTransport::new(url))); + let (mut executor, sender) = - Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + Executor::new(pool.clone(), shutdown_tx.clone(), provider, 100).await.unwrap(); tokio::spawn(async move { executor.run().await.unwrap(); }); @@ -294,8 +306,11 @@ mod tests { #[serial] async fn test_model_subscription(pool: SqlitePool) { let (shutdown_tx, _) = broadcast::channel(1); + + let url: Url = "https://www.example.com".parse().unwrap(); + let provider = Arc::new(JsonRpcClient::new(HttpTransport::new(url))); let (mut executor, sender) = - Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + Executor::new(pool.clone(), shutdown_tx.clone(), provider, 100).await.unwrap(); tokio::spawn(async move { executor.run().await.unwrap(); }); @@ -368,8 +383,11 @@ mod tests { #[serial] async fn test_model_subscription_with_id(pool: SqlitePool) { let (shutdown_tx, _) = broadcast::channel(1); + + let url: Url = "https://www.example.com".parse().unwrap(); + let provider = Arc::new(JsonRpcClient::new(HttpTransport::new(url))); let (mut executor, sender) = - Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + Executor::new(pool.clone(), shutdown_tx.clone(), provider, 100).await.unwrap(); tokio::spawn(async move { executor.run().await.unwrap(); }); @@ -443,8 +461,11 @@ mod tests { #[serial] async fn test_event_emitted(pool: SqlitePool) { let (shutdown_tx, _) = broadcast::channel(1); + + let url: Url = "https://www.example.com".parse().unwrap(); + let provider = Arc::new(JsonRpcClient::new(HttpTransport::new(url))); let (mut executor, sender) = - Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + Executor::new(pool.clone(), shutdown_tx.clone(), provider, 100).await.unwrap(); tokio::spawn(async move { executor.run().await.unwrap(); }); diff --git a/crates/torii/grpc/src/server/tests/entities_test.rs b/crates/torii/grpc/src/server/tests/entities_test.rs index 0bc8451919..e7af0ae2be 100644 --- a/crates/torii/grpc/src/server/tests/entities_test.rs +++ b/crates/torii/grpc/src/server/tests/entities_test.rs @@ -100,7 +100,9 @@ async fn test_entities_queries(sequencer: &RunnerCtx) { TransactionWaiter::new(tx.transaction_hash, &provider).await.unwrap(); let (shutdown_tx, _) = broadcast::channel(1); - let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + + let (mut executor, sender) = + Executor::new(pool.clone(), shutdown_tx.clone(), Arc::clone(&provider), 100).await.unwrap(); tokio::spawn(async move { executor.run().await.unwrap(); }); diff --git a/crates/torii/libp2p/Cargo.toml b/crates/torii/libp2p/Cargo.toml index ee7abd13a1..765332c07c 100644 --- a/crates/torii/libp2p/Cargo.toml +++ b/crates/torii/libp2p/Cargo.toml @@ -20,8 +20,8 @@ dojo-types.workspace = true dojo-world = { path = "../../dojo-world", features = [ "contracts" ] } indexmap.workspace = true serde_json.workspace = true -starknet.workspace = true starknet-crypto.workspace = true +starknet.workspace = true thiserror.workspace = true tracing.workspace = true diff --git a/crates/torii/libp2p/src/tests.rs b/crates/torii/libp2p/src/tests.rs index dcc3af889f..e9dc510b4a 100644 --- a/crates/torii/libp2p/src/tests.rs +++ b/crates/torii/libp2p/src/tests.rs @@ -525,6 +525,7 @@ mod test { #[tokio::test] async fn test_client_messaging() -> Result<(), Box> { use std::collections::HashMap; + use std::sync::Arc; use std::time::Duration; use dojo_types::schema::{Member, Struct, Ty}; @@ -568,13 +569,15 @@ mod test { let sequencer = KatanaRunner::new().expect("Failed to create Katana sequencer"); - let provider = JsonRpcClient::new(HttpTransport::new(sequencer.url())); + let provider = Arc::new(JsonRpcClient::new(HttpTransport::new(sequencer.url()))); let account = sequencer.account_data(0); let (shutdown_tx, _) = broadcast::channel(1); let (mut executor, sender) = - Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + Executor::new(pool.clone(), shutdown_tx.clone(), Arc::clone(&provider), 100) + .await + .unwrap(); tokio::spawn(async move { executor.run().await.unwrap(); }); diff --git a/crates/torii/migrations/20241014085532_add_metadata_field.sql b/crates/torii/migrations/20241014085532_add_metadata_field.sql new file mode 100644 index 0000000000..f6f81432c2 --- /dev/null +++ b/crates/torii/migrations/20241014085532_add_metadata_field.sql @@ -0,0 +1 @@ +ALTER TABLE tokens ADD COLUMN metadata TEXT; \ No newline at end of file diff --git a/crates/torii/server/Cargo.toml b/crates/torii/server/Cargo.toml index 1ca82c911c..14d503748d 100644 --- a/crates/torii/server/Cargo.toml +++ b/crates/torii/server/Cargo.toml @@ -6,17 +6,26 @@ version.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +anyhow.workspace = true base64.workspace = true -http.workspace = true +camino.workspace = true +data-url.workspace = true http-body = "0.4.5" -hyper.workspace = true +http.workspace = true hyper-reverse-proxy = { git = "https://github.com/tarrencev/hyper-reverse-proxy" } +hyper.workspace = true +image.workspace = true indexmap.workspace = true lazy_static.workspace = true +mime_guess.workspace = true +reqwest.workspace = true serde.workspace = true serde_json.workspace = true -tokio.workspace = true +sqlx.workspace = true tokio-util = "0.7.7" -tower.workspace = true +tokio.workspace = true +torii-core.workspace = true tower-http.workspace = true +tower.workspace = true tracing.workspace = true +warp.workspace = true diff --git a/crates/torii/server/src/artifacts.rs b/crates/torii/server/src/artifacts.rs new file mode 100644 index 0000000000..260d1e48d0 --- /dev/null +++ b/crates/torii/server/src/artifacts.rs @@ -0,0 +1,336 @@ +use std::future::Future; +use std::io::Cursor; +use std::net::SocketAddr; +use std::str::FromStr; + +use anyhow::{Context, Result}; +use camino::Utf8PathBuf; +use data_url::mime::Mime; +use data_url::DataUrl; +use image::{DynamicImage, ImageFormat}; +use reqwest::Client; +use serde::{Deserialize, Serialize}; +use sqlx::{Pool, Sqlite}; +use tokio::fs; +use tokio::fs::File; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::sync::broadcast::Receiver; +use torii_core::utils::{fetch_content_from_ipfs, MAX_RETRY}; +use tracing::{debug, error, trace}; +use warp::http::Response; +use warp::path::Tail; +use warp::{reject, Filter}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct ImageQuery { + #[serde(alias = "h")] + height: Option, + #[serde(alias = "w")] + width: Option, +} + +async fn serve_static_file( + path: Tail, + artifacts_dir: Utf8PathBuf, + pool: Pool, + query: ImageQuery, +) -> Result { + let path = path.as_str(); + + // Split the path and validate format + let parts: Vec<&str> = path.split('/').collect(); + + if parts.len() != 3 || parts[2] != "image" { + return Err(reject::not_found()); + } + + // Validate contract_address format + if !parts[0].starts_with("0x") { + return Err(reject::not_found()); + } + + // Validate token_id format + if !parts[1].starts_with("0x") { + return Err(reject::not_found()); + } + + let token_image_dir = artifacts_dir.join(parts[0]).join(parts[1]); + + let token_id = format!("{}:{}", parts[0], parts[1]); + if !token_image_dir.exists() { + match fetch_and_process_image(&artifacts_dir, &token_id, pool) + .await + .context(format!("Failed to fetch and process image for token_id: {}", token_id)) + { + Ok(path) => path, + Err(e) => { + error!(error = %e, "Failed to fetch and process image for token_id: {}", token_id); + return Err(warp::reject::not_found()); + } + }; + } + let file_name = match file_name_from_dir_and_query(token_image_dir, &query) { + Ok(file_name) => file_name, + Err(e) => { + error!(error = %e, "Failed to get file name from directory and query"); + return Err(reject::not_found()); + } + }; + + match File::open(&file_name).await { + Ok(mut file) => { + let mut contents = vec![]; + if file.read_to_end(&mut contents).await.is_ok() { + let mime = mime_guess::from_path(&file_name).first_or_octet_stream().to_string(); + + Ok(Response::builder().header("content-type", mime).body(contents)) + } else { + Err(reject::not_found()) + } + } + Err(_) => Err(reject::not_found()), + } +} + +fn file_name_from_dir_and_query( + token_image_dir: Utf8PathBuf, + query: &ImageQuery, +) -> Result { + let mut entries = std::fs::read_dir(&token_image_dir).ok().into_iter().flatten().flatten(); + + // Find the base image (without @medium or @small) + let base_image = entries + .find(|entry| { + entry + .file_name() + .to_str() + .map(|name| name.starts_with("image") && !name.contains('@')) + .unwrap_or(false) + }) + .with_context(|| "Failed to find base image")?; + + let base_filename = base_image.file_name(); + let base_filename = base_filename.to_str().unwrap(); + let base_ext = base_filename.split('.').last().unwrap(); + + let suffix = match (query.width, query.height) { + // If either dimension is <= 100px, use small version + (Some(w), _) if w <= 100 => "@small", + (_, Some(h)) if h <= 100 => "@small", + // If either dimension is <= 250px, use medium version + (Some(w), _) if w <= 250 => "@medium", + (_, Some(h)) if h <= 250 => "@medium", + // If no dimensions specified or larger than 250px, use original + _ => "", + }; + + let target_filename = format!("image{}.{}", suffix, base_ext); + Ok(token_image_dir.join(target_filename)) +} + +pub async fn new( + mut shutdown_rx: Receiver<()>, + static_dir: &Utf8PathBuf, + pool: Pool, +) -> Result<(SocketAddr, impl Future + 'static), std::io::Error> { + let static_dir = static_dir.clone(); + + let routes = warp::get() + .and(warp::path("static")) + .and(warp::path::tail()) + .and(warp::any().map(move || static_dir.clone())) + .and(warp::any().map(move || pool.clone())) + .and(warp::any().and(warp::query::())) + .and_then(serve_static_file); + + Ok(warp::serve(routes).bind_with_graceful_shutdown(([127, 0, 0, 1], 0), async move { + shutdown_rx.recv().await.ok(); + })) +} + +async fn fetch_and_process_image( + artifacts_path: &Utf8PathBuf, + token_id: &str, + pool: Pool, +) -> anyhow::Result { + let query = sqlx::query_as::<_, (String,)>("SELECT metadata FROM tokens WHERE id = ?") + .bind(token_id) + .fetch_one(&pool) + .await + .with_context(|| { + format!("Failed to fetch metadata from database for token_id: {}", token_id) + })?; + + let metadata: serde_json::Value = + serde_json::from_str(&query.0).context("Failed to parse metadata")?; + let image_uri = metadata + .get("image") + .with_context(|| format!("Image URL not found in metadata for token_id: {}", token_id))? + .as_str() + .with_context(|| format!("Image field not a string for token_id: {}", token_id))? + .to_string(); + + let image_type = match image_uri { + uri if uri.starts_with("http") || uri.starts_with("https") => { + debug!(image_uri = %uri, "Fetching image from http/https URL"); + // Fetch image from HTTP/HTTPS URL + let client = Client::new(); + let response = client + .get(uri) + .send() + .await + .context("Failed to fetch image from URL")? + .bytes() + .await + .context("Failed to read image bytes from response")?; + + // svg files typically start with { + debug!(image_uri = %uri, "Fetching image from IPFS"); + let cid = uri.strip_prefix("ipfs://").unwrap(); + let response = fetch_content_from_ipfs(cid, MAX_RETRY) + .await + .context("Failed to read image bytes from IPFS response")?; + + if response.starts_with(b" { + debug!("Parsing image from data URI"); + trace!(data_uri = %uri); + // Parse and decode data URI + let data_url = DataUrl::process(&uri).context("Failed to parse data URI")?; + + // Check if it's an SVG + if data_url.mime_type() == &Mime::from_str("image/svg+xml").unwrap() { + let decoded = data_url.decode_to_vec().context("Failed to decode data URI")?; + ErcImageType::Svg(decoded.0) + } else { + let decoded = data_url.decode_to_vec().context("Failed to decode data URI")?; + let format = image::guess_format(&decoded.0) + .with_context(|| format!("Unknown file format for token_id: {}", token_id))?; + ErcImageType::DynamicImage(( + image::load_from_memory(&decoded.0) + .context("Failed to load image from bytes")?, + format, + )) + } + } + uri => { + return Err(anyhow::anyhow!("Unsupported URI scheme: {}", uri)); + } + }; + + // Extract contract_address and token_id from token_id + let parts: Vec<&str> = token_id.split(':').collect(); + if parts.len() != 2 { + return Err(anyhow::anyhow!("token_id must be in format contract_address:token_id")); + } + let contract_address = parts[0]; + let token_id_part = parts[1]; + + let dir_path = artifacts_path.join(contract_address).join(token_id_part); + + // Create directories if they don't exist + fs::create_dir_all(&dir_path) + .await + .context("Failed to create directories for image storage")?; + + // Define base image name + let base_image_name = "image"; + + let relative_path = Utf8PathBuf::new().join(contract_address).join(token_id_part); + + match image_type { + ErcImageType::DynamicImage((img, format)) => { + let format_ext = format.extensions_str()[0]; + + let target_sizes = [("medium", 250, 250), ("small", 100, 100)]; + + // Save original image + let original_file_name = format!("{}.{}", base_image_name, format_ext); + let original_file_path = dir_path.join(&original_file_name); + let mut file = fs::File::create(&original_file_path) + .await + .with_context(|| format!("Failed to create file: {:?}", original_file_path))?; + let encoded_image = encode_image_to_vec(&img, format) + .with_context(|| format!("Failed to encode image: {:?}", original_file_path))?; + file.write_all(&encoded_image).await.with_context(|| { + format!("Failed to write image to file: {:?}", original_file_path) + })?; + + // Save resized images + for (label, max_width, max_height) in &target_sizes { + let resized_image = resize_image_to_fit(&img, *max_width, *max_height); + let file_name = format!("@{}.{}", label, format_ext); + let file_path = dir_path.join(format!("{}{}", base_image_name, file_name)); + let mut file = fs::File::create(&file_path) + .await + .with_context(|| format!("Failed to create file: {:?}", file_path))?; + let encoded_image = encode_image_to_vec(&resized_image, format) + .context("Failed to encode image")?; + file.write_all(&encoded_image) + .await + .with_context(|| format!("Failed to write image to file: {:?}", file_path))?; + } + + Ok(format!("{}/{}", relative_path, base_image_name)) + } + ErcImageType::Svg(svg_data) => { + let file_name = format!("{}.svg", base_image_name); + let file_path = dir_path.join(&file_name); + + // Save the SVG file + let mut file = File::create(&file_path) + .await + .with_context(|| format!("Failed to create file: {:?}", file_path))?; + file.write_all(&svg_data) + .await + .with_context(|| format!("Failed to write SVG to file: {:?}", file_path))?; + + Ok(format!("{}/{}", relative_path, file_name)) + } + } +} + +fn resize_image_to_fit(image: &DynamicImage, max_width: u32, max_height: u32) -> DynamicImage { + image.resize_to_fill(max_width, max_height, image::imageops::FilterType::Lanczos3) +} + +fn encode_image_to_vec(image: &DynamicImage, format: ImageFormat) -> Result> { + let mut buf = Vec::new(); + image.write_to(&mut Cursor::new(&mut buf), format).with_context(|| "Failed to encode image")?; + Ok(buf) +} + +#[derive(Debug)] +pub enum ErcImageType { + DynamicImage((DynamicImage, ImageFormat)), + Svg(Vec), +} diff --git a/crates/torii/server/src/lib.rs b/crates/torii/server/src/lib.rs index 44dcc92d61..621f66d155 100644 --- a/crates/torii/server/src/lib.rs +++ b/crates/torii/server/src/lib.rs @@ -1 +1,2 @@ +pub mod artifacts; pub mod proxy; diff --git a/crates/torii/server/src/proxy.rs b/crates/torii/server/src/proxy.rs index 4c759e8b1a..a43fa71c28 100644 --- a/crates/torii/server/src/proxy.rs +++ b/crates/torii/server/src/proxy.rs @@ -58,6 +58,7 @@ pub struct Proxy { addr: SocketAddr, allowed_origins: Option>, grpc_addr: Option, + artifacts_addr: Option, graphql_addr: Arc>>, } @@ -67,8 +68,15 @@ impl Proxy { allowed_origins: Option>, grpc_addr: Option, graphql_addr: Option, + artifacts_addr: Option, ) -> Self { - Self { addr, allowed_origins, grpc_addr, graphql_addr: Arc::new(RwLock::new(graphql_addr)) } + Self { + addr, + allowed_origins, + grpc_addr, + graphql_addr: Arc::new(RwLock::new(graphql_addr)), + artifacts_addr, + } } pub async fn set_graphql_addr(&self, addr: SocketAddr) { @@ -84,6 +92,7 @@ impl Proxy { let allowed_origins = self.allowed_origins.clone(); let grpc_addr = self.grpc_addr; let graphql_addr = self.graphql_addr.clone(); + let artifacts_addr = self.artifacts_addr; let make_svc = make_service_fn(move |conn: &AddrStream| { let remote_addr = conn.remote_addr().ip(); @@ -125,7 +134,7 @@ impl Proxy { let graphql_addr = graphql_addr_clone.clone(); async move { let graphql_addr = graphql_addr.read().await; - handle(remote_addr, grpc_addr, *graphql_addr, req).await + handle(remote_addr, grpc_addr, artifacts_addr, *graphql_addr, req).await } }); @@ -145,9 +154,32 @@ impl Proxy { async fn handle( client_ip: IpAddr, grpc_addr: Option, + artifacts_addr: Option, graphql_addr: Option, req: Request, ) -> Result, Infallible> { + if req.uri().path().starts_with("/static") { + if let Some(artifacts_addr) = artifacts_addr { + let artifacts_addr = format!("http://{}", artifacts_addr); + + return match GRAPHQL_PROXY_CLIENT.call(client_ip, &artifacts_addr, req).await { + Ok(response) => Ok(response), + Err(_error) => { + error!("{:?}", _error); + Ok(Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::empty()) + .unwrap()) + } + }; + } else { + return Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::empty()) + .unwrap()); + } + } + if req.uri().path().starts_with("/graphql") { if let Some(graphql_addr) = graphql_addr { let graphql_addr = format!("http://{}", graphql_addr);