From 072c06212a43e1ec3f56d39ec45548ad01db7872 Mon Sep 17 00:00:00 2001 From: Joey Kraut Date: Tue, 3 Sep 2024 18:06:52 -0700 Subject: [PATCH] price-reporter: Make misc reliability improvements --- Cargo.lock | 221 ++++++++++++----------- price-reporter/src/http_server/routes.rs | 17 +- price-reporter/src/main.rs | 81 +++++++++ price-reporter/src/utils.rs | 14 +- price-reporter/src/ws_server.rs | 91 ++++------ 5 files changed, 245 insertions(+), 179 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7847b11..11e7288 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -135,7 +135,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -147,11 +147,11 @@ dependencies = [ "alloy-sol-macro-input", "const-hex", "heck 0.5.0", - "indexmap 2.4.0", + "indexmap 2.5.0", "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", "syn-solidity", "tiny-keccak", ] @@ -167,7 +167,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", "syn-solidity", ] @@ -274,7 +274,7 @@ dependencies = [ [[package]] name = "arbitrum-client" version = "0.1.0" -source = "git+https://github.com/renegade-fi/renegade.git#41712fbb77818687271cf71352e11b6f6691f717" +source = "git+https://github.com/renegade-fi/renegade.git#1032a31ca8d983cf12b52bdd35203680083620be" dependencies = [ "alloy-primitives", "alloy-sol-types", @@ -456,7 +456,7 @@ dependencies = [ "num-traits", "paste", "rayon", - "rustc_version 0.4.0", + "rustc_version 0.4.1", "zeroize", ] @@ -662,13 +662,13 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.81" +version = "0.1.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" +checksum = "a27b8a3a6e1a44fa4c8baf1f653e4172e81486d4941f2237e20dc2d0cf4ddff1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -679,7 +679,7 @@ checksum = "b6d7b9decdf35d8908a7e3ef02f64c5e9b1695e230154c0e8de3969142d9b94c" dependencies = [ "futures", "pharos", - "rustc_version 0.4.0", + "rustc_version 0.4.1", ] [[package]] @@ -705,9 +705,9 @@ checksum = "62af46d040ba9df09edc6528dae9d8e49f5f3e82f55b7d2ec31a733c38dbc49d" [[package]] name = "atomic_float" -version = "1.0.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c4b08ed8a30ff7320117c190eb4d73d47f0ac0c930ab853b8224cef7cd9a5e7" +checksum = "628d228f918ac3b82fe590352cc719d30664a0c13ca3a60266fe02c7132d480a" [[package]] name = "auto_impl" @@ -717,7 +717,7 @@ checksum = "3c87f3f15e7794432337fc718554eaa4dc8f04c9677a950ffe366f20a162ae42" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -758,9 +758,9 @@ dependencies = [ [[package]] name = "aws-credential-types" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e16838e6c9e12125face1c1eff1343c75e3ff540de98ff7ebd61874a89bcfeb9" +checksum = "60e8f6b615cb5fc60a98132268508ad104310f0cfb25a1c22eee76efdf9154da" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -770,14 +770,15 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.4.0" +version = "1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f42c2d4218de4dcd890a109461e2f799a1a2ba3bcd2cde9af88360f5df9266c6" +checksum = "2424565416eef55906f9f8cece2072b6b6a76075e3ff81483ebe938a89a4c05f" dependencies = [ "aws-credential-types", "aws-sigv4", "aws-smithy-async", "aws-smithy-http", + "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", "aws-types", @@ -794,9 +795,9 @@ dependencies = [ [[package]] name = "aws-sdk-secretsmanager" -version = "1.43.0" +version = "1.44.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f468d566c05086b1b6a08e9de12dca141071a717580dc075f180d0fe11b6190f" +checksum = "2039325a02aa048e510442bb006129ec64c0c775666db905c315d2e714aeb2c0" dependencies = [ "aws-credential-types", "aws-runtime", @@ -817,9 +818,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.39.0" +version = "1.40.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11822090cf501c316c6f75711d77b96fba30658e3867a7762e5e2f5d32d31e81" +checksum = "e5879bec6e74b648ce12f6085e7245417bc5f6d672781028384d2e494be3eb6d" dependencies = [ "aws-credential-types", "aws-runtime", @@ -839,9 +840,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.40.0" +version = "1.41.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78a2a06ff89176123945d1bbe865603c4d7101bea216a550bb4d2e4e9ba74d74" +checksum = "4ef4cd9362f638c22a3b959fd8df292e7e47fdf170270f86246b97109b5f2f7d" dependencies = [ "aws-credential-types", "aws-runtime", @@ -861,9 +862,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.39.0" +version = "1.40.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a20a91795850826a6f456f4a48eff1dfa59a0e69bdbf5b8c50518fd372106574" +checksum = "0b1e2735d2ab28b35ecbb5496c9d41857f52a0d6a0075bbf6a8af306045ea6f6" dependencies = [ "aws-credential-types", "aws-runtime", @@ -918,9 +919,9 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.60.9" +version = "0.60.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9cd0ae3d97daa0a2bf377a4d8e8e1362cae590c4a1aad0d40058ebca18eb91e" +checksum = "01dbcb6e2588fd64cfb6d7529661b06466419e4c54ed1c62d6510d2d0350a728" dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", @@ -957,9 +958,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.6.3" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0abbf454960d0db2ad12684a1640120e7557294b0ff8e2f11236290a1b293225" +checksum = "d1ce695746394772e7000b39fe073095db6d45a862d0767dd5ad0ac0d7f8eb87" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -1001,9 +1002,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.2.2" +version = "1.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6cee7cadb433c781d3299b916fbf620fea813bf38f49db282fb6858141a05cc8" +checksum = "273dcdfd762fae3e1650b8024624e7cd50e484e37abdab73a7a706188ad34543" dependencies = [ "base64-simd", "bytes", @@ -1044,7 +1045,7 @@ dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", "aws-smithy-types", - "rustc_version 0.4.0", + "rustc_version 0.4.1", "tracing", ] @@ -1348,7 +1349,7 @@ dependencies = [ "proc-macro-crate 3.2.0", "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", "syn_derive", ] @@ -1599,7 +1600,7 @@ dependencies = [ [[package]] name = "circuit-macros" version = "0.1.0" -source = "git+https://github.com/renegade-fi/renegade.git#41712fbb77818687271cf71352e11b6f6691f717" +source = "git+https://github.com/renegade-fi/renegade.git#1032a31ca8d983cf12b52bdd35203680083620be" dependencies = [ "itertools 0.10.5", "proc-macro2", @@ -1610,7 +1611,7 @@ dependencies = [ [[package]] name = "circuit-types" version = "0.1.0" -source = "git+https://github.com/renegade-fi/renegade.git#41712fbb77818687271cf71352e11b6f6691f717" +source = "git+https://github.com/renegade-fi/renegade.git#1032a31ca8d983cf12b52bdd35203680083620be" dependencies = [ "ark-bn254", "ark-ec", @@ -1641,7 +1642,7 @@ dependencies = [ [[package]] name = "circuits" version = "0.1.0" -source = "git+https://github.com/renegade-fi/renegade.git#41712fbb77818687271cf71352e11b6f6691f717" +source = "git+https://github.com/renegade-fi/renegade.git#1032a31ca8d983cf12b52bdd35203680083620be" dependencies = [ "ark-crypto-primitives", "ark-ec", @@ -1699,7 +1700,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -1785,7 +1786,7 @@ dependencies = [ [[package]] name = "common" version = "0.1.0" -source = "git+https://github.com/renegade-fi/renegade.git#41712fbb77818687271cf71352e11b6f6691f717" +source = "git+https://github.com/renegade-fi/renegade.git#1032a31ca8d983cf12b52bdd35203680083620be" dependencies = [ "ark-mpc", "async-trait", @@ -1800,7 +1801,7 @@ dependencies = [ "ed25519-dalek 1.0.1", "ethers", "hmac", - "indexmap 2.4.0", + "indexmap 2.5.0", "itertools 0.10.5", "k256", "lazy_static", @@ -1850,7 +1851,7 @@ dependencies = [ [[package]] name = "config" version = "0.1.0" -source = "git+https://github.com/renegade-fi/renegade.git#41712fbb77818687271cf71352e11b6f6691f717" +source = "git+https://github.com/renegade-fi/renegade.git#1032a31ca8d983cf12b52bdd35203680083620be" dependencies = [ "arbitrum-client", "base64 0.13.1", @@ -1859,6 +1860,7 @@ dependencies = [ "clap", "colored", "common", + "constants", "ed25519-dalek 1.0.1", "ethers", "json", @@ -1901,7 +1903,7 @@ checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" [[package]] name = "constants" version = "0.1.0" -source = "git+https://github.com/renegade-fi/renegade.git#41712fbb77818687271cf71352e11b6f6691f717" +source = "git+https://github.com/renegade-fi/renegade.git#1032a31ca8d983cf12b52bdd35203680083620be" dependencies = [ "ark-bn254", "ark-ec", @@ -1912,7 +1914,7 @@ dependencies = [ [[package]] name = "contracts-common" version = "0.1.0" -source = "git+https://github.com/renegade-fi/renegade-contracts.git#c133f9c903d88b653e961de1bea63402b8cb9ac6" +source = "git+https://github.com/renegade-fi/renegade-contracts.git#d0f21eef0ab13b943def05438e5a3cc489c4c7e5" dependencies = [ "alloy-primitives", "alloy-sol-types", @@ -2125,7 +2127,7 @@ dependencies = [ "curve25519-dalek-derive", "digest 0.10.7", "fiat-crypto", - "rustc_version 0.4.0", + "rustc_version 0.4.1", "subtle", "zeroize", ] @@ -2138,7 +2140,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -2162,7 +2164,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -2173,7 +2175,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -2242,8 +2244,8 @@ dependencies = [ "convert_case", "proc-macro2", "quote", - "rustc_version 0.4.0", - "syn 2.0.76", + "rustc_version 0.4.1", + "syn 2.0.77", ] [[package]] @@ -2263,7 +2265,7 @@ checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", "unicode-xid", ] @@ -2310,7 +2312,7 @@ dependencies = [ "diesel_table_macro_syntax", "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -2319,7 +2321,7 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc5557efc453706fed5e4fa85006fe9817c224c3f480a34c7e5959fd700921c5" dependencies = [ - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -2402,7 +2404,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -2771,7 +2773,7 @@ dependencies = [ "reqwest 0.11.27", "serde", "serde_json", - "syn 2.0.76", + "syn 2.0.77", "toml 0.8.19", "walkdir", ] @@ -2789,7 +2791,7 @@ dependencies = [ "proc-macro2", "quote", "serde_json", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -2815,7 +2817,7 @@ dependencies = [ "serde", "serde_json", "strum", - "syn 2.0.76", + "syn 2.0.77", "tempfile", "thiserror", "tiny-keccak", @@ -2956,7 +2958,7 @@ dependencies = [ [[package]] name = "external-api" version = "0.1.0" -source = "git+https://github.com/renegade-fi/renegade.git#41712fbb77818687271cf71352e11b6f6691f717" +source = "git+https://github.com/renegade-fi/renegade.git#1032a31ca8d983cf12b52bdd35203680083620be" dependencies = [ "circuit-types", "common", @@ -3262,7 +3264,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -3385,7 +3387,7 @@ dependencies = [ [[package]] name = "gossip-api" version = "0.1.0" -source = "git+https://github.com/renegade-fi/renegade.git#41712fbb77818687271cf71352e11b6f6691f717" +source = "git+https://github.com/renegade-fi/renegade.git#1032a31ca8d983cf12b52bdd35203680083620be" dependencies = [ "bincode", "circuit-types", @@ -3424,7 +3426,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap 2.4.0", + "indexmap 2.5.0", "slab", "tokio", "tokio-util 0.7.11", @@ -3443,7 +3445,7 @@ dependencies = [ "futures-core", "futures-sink", "http 1.1.0", - "indexmap 2.4.0", + "indexmap 2.5.0", "slab", "tokio", "tokio-util 0.7.11", @@ -3528,7 +3530,7 @@ checksum = "cdc6457c0eb62c71aac4bc17216026d8410337c4126773b9c5daba343f17964f" dependencies = [ "atomic-polyfill", "hash32", - "rustc_version 0.4.0", + "rustc_version 0.4.1", "serde", "spin 0.9.8", "stable_deref_trait", @@ -3925,9 +3927,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93ead53efc7ea8ed3cfb0c79fc8023fbb782a5432b52830b6518941cebe6505c" +checksum = "68b900aa2f7301e21c36462b170ee99994de34dff39a4a6a528e80e7376d07e5" dependencies = [ "equivalent", "hashbrown 0.14.5", @@ -4069,7 +4071,7 @@ dependencies = [ [[package]] name = "job-types" version = "0.1.0" -source = "git+https://github.com/renegade-fi/renegade.git#41712fbb77818687271cf71352e11b6f6691f717" +source = "git+https://github.com/renegade-fi/renegade.git#1032a31ca8d983cf12b52bdd35203680083620be" dependencies = [ "ark-mpc", "circuit-types", @@ -4500,7 +4502,7 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fb791d015f8947acf5a7f62bd28d00f289bb7ea98cfbe3ffec1d061eee12df12" dependencies = [ - "indexmap 2.4.0", + "indexmap 2.5.0", "itoa", "lockfree-object-pool", "metrics", @@ -4521,7 +4523,7 @@ dependencies = [ "crossbeam-epoch", "crossbeam-utils", "hashbrown 0.14.5", - "indexmap 2.4.0", + "indexmap 2.5.0", "metrics", "num_cpus", "ordered-float", @@ -4877,14 +4879,14 @@ dependencies = [ "proc-macro-crate 3.2.0", "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] name = "object" -version = "0.36.3" +version = "0.36.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27b64972346851a39438c60b341ebc01bba47464ae329e55cf343eb93964efd9" +checksum = "084f1a5821ac4c651660a94a7153d27ac9d8a53736203f58b31945ded098070a" dependencies = [ "memchr", ] @@ -4934,9 +4936,9 @@ dependencies = [ [[package]] name = "openraft" -version = "0.9.14" +version = "0.9.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c3d679c9cd2dfdfea765c6922d4173cf89ca06bdc568928d86ea65c32ca524f" +checksum = "0f633802689f2dcce7c4be2419b657aeb3d579f92f93a99cb92985aa8f21d64c" dependencies = [ "anyerror", "byte-unit", @@ -4956,15 +4958,15 @@ dependencies = [ [[package]] name = "openraft-macros" -version = "0.9.14" +version = "0.9.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31d91f73f5ba304006ead327a028ad759452f05879efc04b55ab7405089576cd" +checksum = "798585f0b7bbd3790680f9ab4097d9f10513947b619a7d8dcb6e88fd11e4dbba" dependencies = [ "chrono", "proc-macro2", "quote", "semver 1.0.23", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -4990,7 +4992,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -5019,7 +5021,7 @@ checksum = "1e32339a5dc40459130b3bd269e9892439f55b33e772d2a9d402a789baaf4e8a" dependencies = [ "futures-core", "futures-sink", - "indexmap 2.4.0", + "indexmap 2.5.0", "js-sys", "once_cell", "pin-project-lite", @@ -5035,7 +5037,7 @@ checksum = "3e09667367cb509f10d7cf5960a83f9c4d96e93715f750b164b4b98d46c3cbf4" dependencies = [ "futures-core", "http 0.2.12", - "indexmap 2.4.0", + "indexmap 2.5.0", "itertools 0.11.0", "once_cell", "opentelemetry", @@ -5311,7 +5313,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" dependencies = [ "fixedbitset", - "indexmap 2.4.0", + "indexmap 2.5.0", ] [[package]] @@ -5321,7 +5323,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9567389417feee6ce15dd6527a8a1ecac205ef62c2932bcf3d9f6fc5b78b414" dependencies = [ "futures", - "rustc_version 0.4.0", + "rustc_version 0.4.1", ] [[package]] @@ -5354,7 +5356,7 @@ dependencies = [ "phf_shared 0.11.2", "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -5392,7 +5394,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -5532,17 +5534,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "479cf940fbbb3426c32c5d5176f62ad57549a0bb84773423ba8be9d089f5faba" dependencies = [ "proc-macro2", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] name = "price-reporter" version = "0.1.0" -source = "git+https://github.com/renegade-fi/renegade.git#41712fbb77818687271cf71352e11b6f6691f717" +source = "git+https://github.com/renegade-fi/renegade.git#1032a31ca8d983cf12b52bdd35203680083620be" dependencies = [ "async-trait", "atomic_float 0.1.0", "common", + "constants", "create2", "external-api", "futures", @@ -6059,7 +6062,7 @@ dependencies = [ [[package]] name = "renegade-crypto" version = "0.1.0" -source = "git+https://github.com/renegade-fi/renegade.git#41712fbb77818687271cf71352e11b6f6691f717" +source = "git+https://github.com/renegade-fi/renegade.git#1032a31ca8d983cf12b52bdd35203680083620be" dependencies = [ "ark-ec", "ark-ff 0.4.2", @@ -6123,9 +6126,9 @@ dependencies = [ [[package]] name = "renegade-metrics" version = "0.1.0" -source = "git+https://github.com/renegade-fi/renegade.git#41712fbb77818687271cf71352e11b6f6691f717" +source = "git+https://github.com/renegade-fi/renegade.git#1032a31ca8d983cf12b52bdd35203680083620be" dependencies = [ - "atomic_float 1.0.0", + "atomic_float 1.1.0", "circuit-types", "common", "lazy_static", @@ -6432,9 +6435,9 @@ dependencies = [ [[package]] name = "rustc_version" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" dependencies = [ "semver 1.0.23", ] @@ -6821,7 +6824,7 @@ checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -6867,7 +6870,7 @@ dependencies = [ "chrono", "hex 0.4.3", "indexmap 1.9.3", - "indexmap 2.4.0", + "indexmap 2.5.0", "serde", "serde_derive", "serde_json", @@ -6884,7 +6887,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -7269,7 +7272,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -7311,9 +7314,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.76" +version = "2.0.77" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "578e081a14e0cefc3279b0472138c513f37b41a08d5a3cca9b6e4e8ceb6cd525" +checksum = "9f35bcdf61fd8e7be6caf75f429fdca8beb3ed76584befb503b1569faee373ed" dependencies = [ "proc-macro2", "quote", @@ -7329,7 +7332,7 @@ dependencies = [ "paste", "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -7341,7 +7344,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -7374,7 +7377,7 @@ dependencies = [ [[package]] name = "system-bus" version = "0.1.0" -source = "git+https://github.com/renegade-fi/renegade.git#41712fbb77818687271cf71352e11b6f6691f717" +source = "git+https://github.com/renegade-fi/renegade.git#1032a31ca8d983cf12b52bdd35203680083620be" dependencies = [ "bus", "common", @@ -7498,7 +7501,7 @@ checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -7568,9 +7571,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.39.3" +version = "1.40.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9babc99b9923bfa4804bd74722ff02c0381021eafa4db9949217e3be8e84fff5" +checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998" dependencies = [ "backtrace", "bytes", @@ -7602,7 +7605,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -7781,7 +7784,7 @@ version = "0.22.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "583c44c02ad26b0c3f3066fe629275e50627026c51ac2e595cca4c230ce1ce1d" dependencies = [ - "indexmap 2.4.0", + "indexmap 2.5.0", "serde", "serde_spanned", "toml_datetime", @@ -7868,7 +7871,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -8163,7 +8166,7 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "util" version = "0.1.0" -source = "git+https://github.com/renegade-fi/renegade.git#41712fbb77818687271cf71352e11b6f6691f717" +source = "git+https://github.com/renegade-fi/renegade.git#1032a31ca8d983cf12b52bdd35203680083620be" dependencies = [ "ark-ec", "ark-serialize 0.4.2", @@ -8356,7 +8359,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", "wasm-bindgen-shared", ] @@ -8390,7 +8393,7 @@ checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -8811,7 +8814,7 @@ dependencies = [ "js-sys", "log", "pharos", - "rustc_version 0.4.0", + "rustc_version 0.4.1", "send_wrapper 0.6.0", "thiserror", "wasm-bindgen", @@ -8873,7 +8876,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] @@ -8893,7 +8896,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.77", ] [[package]] diff --git a/price-reporter/src/http_server/routes.rs b/price-reporter/src/http_server/routes.rs index 496a1e8..3e39e92 100644 --- a/price-reporter/src/http_server/routes.rs +++ b/price-reporter/src/http_server/routes.rs @@ -1,7 +1,6 @@ //! The routes for the HTTP server use async_trait::async_trait; -use futures_util::StreamExt; use hyper::{Body, Request, Response, StatusCode}; use renegade_common::types::Price; use renegade_price_reporter::worker::ExchangeConnectionsConfig; @@ -72,23 +71,13 @@ impl PriceHandler { let mut self_clone = self.clone(); let pair_info = parse_pair_info_from_topic(topic)?; - let mut price_stream = self_clone + let price_rx = self_clone .price_streams .get_or_create_price_stream(pair_info, self_clone.config.clone()) .await?; - // Loop until we get a price from the stream - loop { - match price_stream.next().await { - None => return Err(ServerError::HttpServer("Price stream closed".to_string())), - Some(Ok(price)) => { - return Ok(price); - }, - // This error case is only thrown when the stream is lagging, meaning we should just - // continue looping - Some(Err(_)) => {}, - } - } + let price = *price_rx.borrow(); + Ok(price) } } diff --git a/price-reporter/src/main.rs b/price-reporter/src/main.rs index db9f1f8..0129239 100644 --- a/price-reporter/src/main.rs +++ b/price-reporter/src/main.rs @@ -11,7 +11,12 @@ use std::net::SocketAddr; use errors::ServerError; use http_server::HttpServer; +use renegade_common::types::{ + exchange::Exchange, + token::{Token, TOKEN_REMAPS, USDC_TICKER, USDT_TICKER, USD_TICKER}, +}; use renegade_config::setup_token_remaps; +use renegade_price_reporter::{manager::get_listing_exchanges, worker::ExchangeConnectionsConfig}; use renegade_util::err_str; use tokio::{net::TcpListener, sync::mpsc::unbounded_channel}; use tracing::{error, info}; @@ -23,6 +28,9 @@ mod http_server; mod utils; mod ws_server; +/// The default stable to initiate price streams on +const DEFAULT_STABLE: &str = USDT_TICKER; + #[tokio::main] async fn main() -> Result<(), ServerError> { // Set up logging @@ -46,6 +54,7 @@ async fn main() -> Result<(), ServerError> { let (closure_tx, mut closure_rx) = unbounded_channel(); let global_price_streams = GlobalPriceStreams::new(closure_tx); + init_default_price_streams(&global_price_streams, exchange_conn_config.clone()).await?; // Bind the server to the given port let addr: SocketAddr = format!("0.0.0.0:{:?}", ws_port).parse().unwrap(); @@ -80,3 +89,75 @@ async fn main() -> Result<(), ServerError> { } } } + +/// Initialize price streams for all default token mapped pairs +async fn init_default_price_streams( + global_price_streams: &GlobalPriceStreams, + config: ExchangeConnectionsConfig, +) -> Result<(), ServerError> { + info!("Initializing default price streams"); + + // Get the default token remap + let quote_token = Token::from_ticker(DEFAULT_STABLE); + let remap = TOKEN_REMAPS.get().unwrap(); + for (addr, ticker) in remap.clone().into_iter() { + // Skip stables + if [USD_TICKER, USDC_TICKER, USDT_TICKER].contains(&ticker.as_str()) { + continue; + } + + let base_token = Token::from_addr(&addr); + let supported_exchanges = get_supported_exchanges(&base_token, "e_token, &config); + for exchange in supported_exchanges.into_iter() { + init_price_stream( + base_token.clone(), + quote_token.clone(), + exchange, + global_price_streams, + config.clone(), + )?; + } + } + + Ok(()) +} + +/// Spawn a task to initialize a price stream for a given token pair +#[allow(clippy::needless_pass_by_value)] +fn init_price_stream( + base_token: Token, + quote_token: Token, + exchange: Exchange, + global_price_streams: &GlobalPriceStreams, + config: ExchangeConnectionsConfig, +) -> Result<(), ServerError> { + let pair_info = (exchange, base_token.clone(), quote_token.clone()); + let mut streams = global_price_streams.clone(); + tokio::spawn(async move { + if let Err(e) = streams.get_or_create_price_stream(pair_info.clone(), config.clone()).await + { + let ticker = base_token.get_ticker().expect("Failed to get ticker"); + error!("Error initializing price stream for {ticker}: {e}"); + } + }); + + Ok(()) +} + +/// Get the listing exchanges for a given pair +fn get_supported_exchanges( + base_token: &Token, + quote_token: &Token, + config: &ExchangeConnectionsConfig, +) -> Vec { + let mut supported_exchanges = get_listing_exchanges(base_token, quote_token); + if config.coinbase_api_key.is_none() || config.coinbase_api_secret.is_none() { + supported_exchanges.retain(|e| e != &Exchange::Coinbase); + } + + if config.eth_websocket_addr.is_none() { + supported_exchanges.retain(|e| e != &Exchange::UniswapV3); + } + + supported_exchanges +} diff --git a/price-reporter/src/utils.rs b/price-reporter/src/utils.rs index 876d010..f0ecdc2 100644 --- a/price-reporter/src/utils.rs +++ b/price-reporter/src/utils.rs @@ -11,9 +11,10 @@ use renegade_util::err_str; use serde::{Deserialize, Serialize}; use tokio::{ net::TcpStream, - sync::{broadcast::Sender, mpsc::UnboundedSender, RwLock}, + sync::watch::{Receiver as WatchReceiver, Sender as WatchSender}, + sync::{mpsc::UnboundedSender, RwLock}, }; -use tokio_stream::{wrappers::BroadcastStream, StreamMap}; +use tokio_stream::{wrappers::WatchStream, StreamMap}; use tokio_tungstenite::WebSocketStream; use tracing_subscriber::{ filter::{EnvFilter, LevelFilter}, @@ -77,14 +78,17 @@ const ETH_WS_ADDR_ENV_VAR: &str = "ETH_WS_ADDR"; pub type PairInfo = (Exchange, Token, Token); /// A type alias for the sender end of a price channel -pub type PriceSender = Sender; +pub type PriceSender = WatchSender; + +/// A type alias for a price receiver +pub type PriceReceiver = WatchReceiver; /// A type alias for a shareable map of price streams, indexed by the (source, /// base, quote) tuple -pub type SharedPriceStreams = Arc>>; +pub type SharedPriceStreams = Arc>>; /// A type alias for a price stream -pub type PriceStream = BroadcastStream; +pub type PriceStream = WatchStream; /// A type alias for a mapped stream prices, indexed by the (source, base, /// quote) tuple diff --git a/price-reporter/src/ws_server.rs b/price-reporter/src/ws_server.rs index 29a85a7..c0f5bcd 100644 --- a/price-reporter/src/ws_server.rs +++ b/price-reporter/src/ws_server.rs @@ -5,17 +5,14 @@ use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; use futures_util::{SinkExt, StreamExt}; use renegade_api::websocket::{SubscriptionResponse, WebsocketMessage}; +use renegade_common::types::Price; use renegade_price_reporter::{ errors::ExchangeConnectionError, exchange::{connect_exchange, ExchangeConnection}, worker::ExchangeConnectionsConfig, }; use renegade_util::err_str; -use tokio::{ - net::TcpStream, - sync::{broadcast::channel, RwLock}, - time::Instant, -}; +use tokio::{net::TcpStream, sync::watch::channel, sync::RwLock, time::Instant}; use tokio_stream::StreamMap; use tokio_tungstenite::accept_async; use tracing::{debug, error, info, warn}; @@ -25,8 +22,8 @@ use crate::{ errors::ServerError, utils::{ get_pair_info_topic, get_subscribed_topics, parse_pair_info_from_topic, - validate_subscription, ClosureSender, PairInfo, PriceMessage, PriceSender, PriceStream, - PriceStreamMap, SharedPriceStreams, WsWriteStream, CONN_RETRY_DELAY_MS, + validate_subscription, ClosureSender, PairInfo, PriceMessage, PriceReceiver, PriceSender, + PriceStream, PriceStreamMap, SharedPriceStreams, WsWriteStream, CONN_RETRY_DELAY_MS, KEEPALIVE_INTERVAL_MS, MAX_CONN_RETRIES, MAX_CONN_RETRY_WINDOW_MS, }, }; @@ -52,44 +49,47 @@ impl GlobalPriceStreams { Self { price_streams: Arc::new(RwLock::new(HashMap::new())), closure_channel } } + /// Add a price stream to the global map + pub async fn add_price_stream(&mut self, pair_info: PairInfo, price_rx: PriceReceiver) { + self.price_streams.write().await.insert(pair_info, price_rx); + } + + /// Remove a price stream from the global map + pub async fn remove_price_stream(&mut self, pair_info: PairInfo) { + self.price_streams.write().await.remove(&pair_info); + } + /// Initialize a price stream for the given pair info pub async fn init_price_stream( &mut self, pair_info: PairInfo, config: ExchangeConnectionsConfig, - ) -> Result { + ) -> Result { validate_subscription(&pair_info).await?; info!("Initializing price stream for {}", get_pair_info_topic(&pair_info)); // Create a shared channel into which we forward streamed prices - let (price_tx, price_rx) = channel(32 /* capacity */); - - // Clone the global map of price streams for the task to have access to it - let global_price_streams = self.clone(); + let (price_tx, price_rx) = channel(Price::default()); + self.add_price_stream(pair_info.clone(), price_rx.clone()).await; // Spawn a task responsible for forwarding prices into the broadcast channel & // sending keepalive messages to the exchange + let mut global_price_streams = self.clone(); tokio::spawn(async move { - let res = Self::price_stream_task( - config, - pair_info, - &global_price_streams.price_streams, - price_tx, - ) - .await; + let res = Self::price_stream_task(config, pair_info.clone(), price_tx).await; + global_price_streams.remove_price_stream(pair_info).await; global_price_streams.closure_channel.send(res).unwrap() }); // Return a handle to the broadcast channel stream - Ok(PriceStream::new(price_rx)) + Ok(price_rx) } /// The task responsible for streaming prices from the exchange async fn price_stream_task( config: ExchangeConnectionsConfig, pair_info: PairInfo, - price_streams: &SharedPriceStreams, price_tx: PriceSender, ) -> Result<(), ServerError> { let mut retry_timestamps = Vec::new(); @@ -98,11 +98,6 @@ impl GlobalPriceStreams { let mut conn = Self::connect_with_retries(&pair_info, &config, &mut retry_timestamps).await?; - // Add the channel to the map of price streams - { - price_streams.write().await.insert(pair_info.clone(), price_tx.clone()); - } - let delay = tokio::time::sleep(Duration::from_millis(KEEPALIVE_INTERVAL_MS)); tokio::pin!(delay); @@ -119,13 +114,9 @@ impl GlobalPriceStreams { match price_res.map_err(ServerError::ExchangeConnection) { Ok(price) => { // `send` only errors if there are no more receivers, meaning no more - // clientz are subscribed to this price stream. In this case, we remove + // clients are subscribed to this price stream. In this case, we remove // the stream from the global map, and complete the task. - if price_tx.send(price).is_err() { - info!("No more subscribers for {}, closing price stream", get_pair_info_topic(&pair_info)); - price_streams.write().await.remove(&pair_info); - return Ok(()); - } + let _ = price_tx.send(price); } Err(e) => { // We failed to stream a price, attempt to @@ -219,18 +210,18 @@ impl GlobalPriceStreams { &mut self, pair_info: PairInfo, config: ExchangeConnectionsConfig, - ) -> Result { - let maybe_stream_tx = { + ) -> Result { + let maybe_stream_rx = { let price_streams = self.price_streams.read().await; price_streams.get(&pair_info).cloned() }; - let price_stream = if let Some(stream_tx) = maybe_stream_tx { - PriceStream::new(stream_tx.subscribe()) - } else { - self.init_price_stream(pair_info, config).await? + + let recv = match maybe_stream_rx { + Some(stream_rx) => stream_rx, + None => self.init_price_stream(pair_info, config).await?, }; - Ok(price_stream) + Ok(recv) } } @@ -258,18 +249,16 @@ pub async fn handle_connection( loop { tokio::select! { // Send the next price to the client - Some((pair_info, price_res)) = subscriptions.next() => { + Some((pair_info, price)) = subscriptions.next() => { // The potential error in `price_res` here is a `BroadcastStreamRecvError::Lagged`, // meaning the stream lagged receiving price updates. We can safely ignore this. - if let Ok(price) = price_res { - let topic = get_pair_info_topic(&pair_info); - let message = PriceMessage { topic, price }; - let message_ser = serde_json::to_string(&message).map_err(ServerError::Serde)?; - write_stream - .send(Message::Text(message_ser)) - .await - .map_err(err_str!(ServerError::WebsocketSend))?; - } + let topic = get_pair_info_topic(&pair_info); + let message = PriceMessage { topic, price }; + let message_ser = serde_json::to_string(&message).map_err(ServerError::Serde)?; + write_stream + .send(Message::Text(message_ser)) + .await + .map_err(err_str!(ServerError::WebsocketSend))?; } // Handle incoming websocket messages @@ -361,9 +350,9 @@ async fn handle_subscription_message( info!("Subscribing {} to {}", peer_addr, &topic); - let price_stream = + let price_rx = global_price_streams.get_or_create_price_stream(pair_info.clone(), config).await?; - subscriptions.insert(pair_info, price_stream); + subscriptions.insert(pair_info, PriceStream::new(price_rx)); }, WebsocketMessage::Unsubscribe { topic } => { info!("Unsubscribing {} from {}", peer_addr, &topic);