diff --git a/.env.example b/.env.example index 690c13a..89b9b63 100644 --- a/.env.example +++ b/.env.example @@ -3,14 +3,30 @@ IC_URL=https://icp0.io # gateway to canister polling interval in milliseconds POLLING_INTERVAL=100 +# minimum interval between incoming messages in milliseconds +MIN_INCOMING_INTERVAL=100 # the public port where the gateway will listen LISTEN_PORT=443 # the public domain name of the server -DOMAIN_NAME=example.com -# the telemetry endpoint of the Jaeger agent (leave empty to disable telemetry) -# if you're running the gateway from the docker-compose.yml file, you can use `jaeger` as the hostname -TELEMETRY_JAEGER_AGENT_ENDPOINT=127.0.0.1:6831 +DOMAIN_NAME=icws.io # the log configuration (can also be empty). See README.md for more details RUST_LOG_FILE=ic_websocket_gateway=trace RUST_LOG_STDOUT=ic_websocket_gateway=debug + +## TELEMETRY CONFIG ## +# the following configurations are needed only when using the `telemetry-prod` or `telemetry-local` docker compose profiles +# the telemetry collector endpoint (leave empty to disable telemetry) +# if you're running the gateway from the docker-compose.yml file, you can use `otlp_collector` as the hostname +OPENTELEMETRY_COLLECTOR_ENDPOINT=grpc://otlp_collector:4317 +# configure the telemetry trace level RUST_LOG_TELEMETRY=ic_websocket_gateway=trace +# the info necessary to relay the telemetry traces to Grafana Tempo +# find how to set the following variables at: +# https://medium.com/@rasnaut/the-easiest-way-to-send-traces-from-the-rust-app-to-grafana-cloud-7a66baf2e45b +# when using the `telemetry-prod` docker compose profile, set this to the Grafana Tempo endpoint +# when using the `telemetry-local` docker compose profile, set this to `tempo:4318` +GRAFANA_TEMPO_ENDPOINT=tempo:4318 +# when using the `telemetry-prod` docker compose profile, set this to the access token obtained from Grafana Tempo, otherwise leave it empty or comment it out +GRAFANA_TEMPO_ACCESS_TOKEN=your_tempo_basic_auth_token +# when using the `telemetry-local` docker compose profile, set this to true, otherwise false +GRAFANA_TEMPO_LOCAL=false \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 7d62908..6bc8433 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -112,6 +112,51 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core", + "bitflags 1.3.2", + "bytes 1.5.0", + "futures-util", + "http", + "http-body", + "hyper 0.14.27", + "itoa", + "matchit", + "memchr", + "mime 0.3.17", + "percent-encoding 2.3.0", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes 1.5.0", + "futures-util", + "http", + "http-body", + "mime 0.3.17", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "backoff" version = "0.4.0" @@ -349,6 +394,18 @@ dependencies = [ "syn 2.0.38", ] +[[package]] +name = "canister-utils" +version = "1.0.0" +dependencies = [ + "candid", + "ic-agent", + "reqwest", + "serde", + "serde_bytes", + "tracing", +] + [[package]] name = "cc" version = "1.0.83" @@ -848,6 +905,17 @@ dependencies = [ "slab", ] +[[package]] +name = "gateway-state" +version = "1.0.0" +dependencies = [ + "canister-utils", + "dashmap", + "ic-agent", + "tokio", + "tracing", +] + [[package]] name = "generic-array" version = "0.12.4" @@ -885,6 +953,12 @@ version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0" +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "group" version = "0.12.1" @@ -1078,6 +1152,18 @@ dependencies = [ "tokio-rustls", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper 0.14.27", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -1257,24 +1343,22 @@ checksum = "576c539151d4769fb4d1a0c25c4108dd18facd04c5695b02cf2d226ab4e43aa5" [[package]] name = "ic_websocket_gateway" -version = "1.2.3" +version = "1.3.0" dependencies = [ "async-trait", "candid", - "dashmap", + "canister-utils", "futures-util", + "gateway-state", "ic-agent", - "ic-cdk", - "ic-cdk-macros", "ic-identity", "lazy_static", "mockito", "native-tls", "opentelemetry", - "opentelemetry-jaeger", + "opentelemetry-otlp", "opentelemetry_sdk", "rand 0.8.5", - "reqwest", "serde", "serde_bytes", "serde_cbor", @@ -1340,12 +1424,6 @@ dependencies = [ "cfg-if 1.0.0", ] -[[package]] -name = "integer-encoding" -version = "3.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" - [[package]] name = "iovec" version = "0.1.4" @@ -1372,6 +1450,15 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.9" @@ -1490,6 +1577,12 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "maybe-uninit" version = "2.0.0" @@ -1761,71 +1854,77 @@ dependencies = [ [[package]] name = "opentelemetry" -version = "0.20.0" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9591d937bc0e6d2feb6f71a559540ab300ea49955229c347a517a28d27784c54" +checksum = "1e32339a5dc40459130b3bd269e9892439f55b33e772d2a9d402a789baaf4e8a" dependencies = [ - "opentelemetry_api", - "opentelemetry_sdk", + "futures-core", + "futures-sink", + "indexmap 2.0.2", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", + "urlencoding", ] [[package]] -name = "opentelemetry-jaeger" -version = "0.19.0" +name = "opentelemetry-otlp" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "876958ba9084f390f913fcf04ddf7bbbb822898867bb0a51cc28f2b9e5c1b515" +checksum = "f24cda83b20ed2433c68241f918d0f6fdec8b1d43b7a9590ab4420c5095ca930" dependencies = [ "async-trait", "futures-core", - "futures-util", + "http", "opentelemetry", + "opentelemetry-proto", "opentelemetry-semantic-conventions", - "thrift", + "opentelemetry_sdk", + "prost", + "thiserror", "tokio", + "tonic", ] [[package]] -name = "opentelemetry-semantic-conventions" -version = "0.12.0" +name = "opentelemetry-proto" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73c9f9340ad135068800e7f1b24e9e09ed9e7143f5bf8518ded3d3ec69789269" +checksum = "a2e155ce5cc812ea3d1dffbd1539aed653de4bf4882d60e6e04dcf0901d674e1" dependencies = [ "opentelemetry", + "opentelemetry_sdk", + "prost", + "tonic", ] [[package]] -name = "opentelemetry_api" -version = "0.20.0" +name = "opentelemetry-semantic-conventions" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a81f725323db1b1206ca3da8bb19874bbd3f57c3bcd59471bfb04525b265b9b" +checksum = "f5774f1ef1f982ef2a447f6ee04ec383981a3ab99c8e77a1a7b30182e65bbc84" dependencies = [ - "futures-channel", - "futures-util", - "indexmap 1.9.3", - "js-sys", - "once_cell", - "pin-project-lite", - "thiserror", - "urlencoding", + "opentelemetry", ] [[package]] name = "opentelemetry_sdk" -version = "0.20.0" +version = "0.21.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa8e705a0612d48139799fcbaba0d4a90f06277153e43dd2bdc16c6f0edd8026" +checksum = "2f16aec8a98a457a52664d69e0091bac3a0abd18ead9b641cb00202ba4e0efe4" dependencies = [ "async-trait", "crossbeam-channel", "futures-channel", "futures-executor", "futures-util", + "glob", "once_cell", - "opentelemetry_api", - "ordered-float 3.9.2", + "opentelemetry", + "ordered-float", "percent-encoding 2.3.0", "rand 0.8.5", - "regex", "thiserror", "tokio", "tokio-stream", @@ -1833,18 +1932,9 @@ dependencies = [ [[package]] name = "ordered-float" -version = "2.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c" -dependencies = [ - "num-traits", -] - -[[package]] -name = "ordered-float" -version = "3.9.2" +version = "4.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1e1c390732d15f1d48471625cd92d154e66db2c56645e29a9cd26f4699f72dc" +checksum = "a76df7075c7d4d01fdcb46c912dd17fba5b60c78ea480b475f2b6ab6f666584e" dependencies = [ "num-traits", ] @@ -1950,6 +2040,26 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" +[[package]] +name = "pin-project" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.38", +] + [[package]] name = "pin-project-lite" version = "0.2.13" @@ -2044,6 +2154,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" +dependencies = [ + "bytes 1.5.0", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "psm" version = "0.1.21" @@ -2826,6 +2959,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "system-configuration" version = "0.5.1" @@ -2919,28 +3058,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "threadpool" -version = "1.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" -dependencies = [ - "num_cpus", -] - -[[package]] -name = "thrift" -version = "0.17.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" -dependencies = [ - "byteorder", - "integer-encoding", - "log 0.4.20", - "ordered-float 2.10.1", - "threadpool", -] - [[package]] name = "time" version = "0.1.45" @@ -3047,6 +3164,16 @@ dependencies = [ "log 0.4.20", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.1.0" @@ -3186,6 +3313,60 @@ dependencies = [ "winnow", ] +[[package]] +name = "tonic" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" +dependencies = [ + "async-trait", + "axum", + "base64 0.21.4", + "bytes 1.5.0", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper 0.14.27", + "hyper-timeout", + "percent-encoding 2.3.0", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project", + "pin-project-lite", + "rand 0.8.5", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + [[package]] name = "tower-service" version = "0.3.2" @@ -3246,18 +3427,33 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log 0.4.20", + "once_cell", + "tracing-core", +] + [[package]] name = "tracing-opentelemetry" -version = "0.20.0" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc09e402904a5261e42cf27aea09ccb7d5318c6717a9eec3d8e2e65c56b18f19" +checksum = "c67ac25c5407e7b961fafc6f7e9aa5958fd297aada2d20fa2ae1737357e55596" dependencies = [ + "js-sys", "once_cell", "opentelemetry", + "opentelemetry_sdk", + "smallvec 1.11.1", "tracing", "tracing-core", - "tracing-log", + "tracing-log 0.2.0", "tracing-subscriber", + "web-time", ] [[package]] @@ -3287,7 +3483,7 @@ dependencies = [ "thread_local", "tracing", "tracing-core", - "tracing-log", + "tracing-log 0.1.3", "tracing-serde", ] @@ -3562,6 +3758,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa30049b1c872b72c89866d458eae9f20380ab280ffd1b1e18df2d3e2d98cfe0" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki-roots" version = "0.25.2" diff --git a/Cargo.toml b/Cargo.toml index 0c69efb..c553ac7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ members = [ "src/ic-websocket-gateway", "src/ic-identity", "src/scripts", + "src/gateway-state", "tests/src/test_canister_rs", ] default-members = ["src/ic-websocket-gateway"] @@ -23,3 +24,6 @@ ic-cdk = "0.10.0" ic-cdk-macros = "0.7.1" serde = "1.0.176" reqwest = "0.11.7" +tokio = { version = "1.29.1", features = ["full"] } +tracing = "0.1.40" + diff --git a/README.md b/README.md index b334548..6832c3c 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,7 @@ There are some command line arguments that you can set when running the gateway: | `--polling-interval` | The interval (in **milliseconds**) at which the gateway will poll the canisters for new messages. | `100` | | `--tls-certificate-pem-path` | The path to the TLS certificate file. See [Obtain a TLS certificate](#obtain-a-tls-certificate) for more details. | _empty_ | | `--tls-certificate-key-pem-path` | The path to the TLS private key file. See [Obtain a TLS certificate](#obtain-a-tls-certificate) for more details. | _empty_ | -| `--telemetry-jaeger-agent-endpoint` | Jaeger agent endpoint for the telemetry in the format :. See [Tracing telemetry](#tracing-telemetry) for more details. | _empty_ | +| `--opentelemetry-collector-endpoint` | Jaeger agent endpoint for the telemetry in the format :. See [Tracing telemetry](#tracing-telemetry) for more details. | _empty_ | ## Docker @@ -95,23 +95,44 @@ RUST_LOG_FILE=ic_websocket_gateway=debug RUST_LOG_STDOUT=ic_websocket_gateway=de ## Tracing telemetry -The gateway uses the [opentelemetry](https://docs.rs/opentelemetry) crate and [Jaeger](https://www.jaegertracing.io/) for tracing telemetry. To enable tracing telemetry, you have to: +The gateway uses the [opentelemetry](https://docs.rs/opentelemetry) crate and [Grafana](https://www.grafana.com/) for tracing telemetry. To enable tracing telemetry, you have to: -- set the `--telemetry-jaeger-agent-endpoint` argument to point to the Jaeger agent endpoint (leaving it empty or unset will disable tracing telemetry); +- set the `--opentelemetry-collector-endpoint` argument to point to the opentelemetry collector endpoint (leaving it empty or unset will disable tracing telemetry); - optionally set the `RUST_LOG_TELEMETRY` environment variable, which defaults to `trace`, following the same principles described in the [Configure logging](#configure-logging) section. -If you're running the gateway using from the [docker-compose.yml](./docker-compose.yml) file, you can run a Jaeger agent together with the gateway by simply running: +If you're deploying the gateway locally for testing from the [docker-compose.yml](./docker-compose.yml) file, you can run both an opentelemetry collector and grafana together with the gateway by: ``` -docker compose --profile jaeger up -d +docker compose --profile telemetry-local up -d ``` -making sure that you've set the `TELEMETRY_JAEGER_AGENT_ENDPOINT` variable in the `.env` file to: +Before you do so, make sure you set the following varibales in the `.env` file: ``` -TELEMETRY_JAEGER_AGENT_ENDPOINT=jaeger:6831 +OPENTELEMETRY_COLLECTOR_ENDPOINT=grpc://otlp_collector:4317 +GRAFANA_TEMPO_ENDPOINT=tempo:4318 +GRAFANA_TEMPO_LOCAL=true ``` +If you are deploying the gateway in production and want to send the telemetry traces to Grafana Cloud, you only need to deploy the OTLP collector. To do so, run: + +``` +docker compose --profile telemetry-prod up -d +``` + +Before you do so, make sure you set the following varibales in the `.env` file: + +``` +OPENTELEMETRY_COLLECTOR_ENDPOINT=grpc://otlp_collector:4317 +GRAFANA_TEMPO_ENDPOINT=your-grafana-cloud-tempo-endpoint +GRAFANA_TEMPO_ACCESS_TOKEN=your-grafana-cloud-tempo-basic-auth-token +GRAFANA_TEMPO_LOCAL=false +``` + +You can find the Tempo endpoint and create a token, by following [this](https://grafana.com/blog/2021/04/13/how-to-send-traces-to-grafana-clouds-tempo-service-with-opentelemetry-collector/) guide. + +For more information about how to configure the env variables properly, checkout the [.env.example](./.env.example). + # Development ## Testing diff --git a/docker-compose.yml b/docker-compose.yml index a5df060..cd6fec1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -17,8 +17,8 @@ services: "/ic-ws-gateway/data/certs/live/${DOMAIN_NAME}/fullchain.pem", "--tls-certificate-key-pem-path", "/ic-ws-gateway/data/certs/live/${DOMAIN_NAME}/privkey.pem", - "--telemetry-jaeger-agent-endpoint", - "${TELEMETRY_JAEGER_AGENT_ENDPOINT}", + "--opentelemetry-collector-endpoint", + "${OPENTELEMETRY_COLLECTOR_ENDPOINT}", ] environment: - RUST_LOG_FILE=${RUST_LOG_FILE} @@ -29,19 +29,67 @@ services: networks: - ic-ws-gateway-network - jaeger: - image: jaegertracing/all-in-one:latest - container_name: jaeger + otlp_collector: + image: otel/opentelemetry-collector:0.92.0 + container_name: otlp_collector + restart: unless-stopped profiles: - - jaeger + - telemetry-prod + - telemetry-local ports: - - 6831:6831/udp - - 6832:6832/udp - - 16686:16686 - - 14268:14268 + - 4317:4317 # otlp grpc receiver + environment: + - GRAFANA_TEMPO_ENDPOINT=${GRAFANA_TEMPO_ENDPOINT} + - GRAFANA_TEMPO_ACCESS_TOKEN=${GRAFANA_TEMPO_ACCESS_TOKEN:-} + - GRAFANA_TEMPO_LOCAL=${GRAFANA_TEMPO_LOCAL:-false} + volumes: + - ./telemetry/otel-config.yaml:/etc/otelcol/config.yaml + networks: + - ic-ws-gateway-network + + grafana: + image: grafana/grafana-enterprise + container_name: grafana + restart: unless-stopped + profiles: + - telemetry-local + ports: + - "3000:3000" + environment: + - GF_SECURITY_ADMIN_USER=admin + - GF_SECURITY_ADMIN_PASSWORD=admin + volumes: + - grafana-storage:/var/lib/grafana + - ./telemetry/grafana-datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml + - ./telemetry/grafana-dashboards.yaml:/etc/grafana/provisioning/dashboards/dashboards.yaml + - ./telemetry/dashboards:/etc/dashboards + networks: + - ic-ws-gateway-network + depends_on: + - tempo + + tempo: + image: grafana/tempo:latest + container_name: tempo + restart: unless-stopped + profiles: + - telemetry-local + ports: + - "3200:3200" # tempo server + - "4318:4318" # otlp grpc receiver + command: ["-config.file=/etc/tempo.yaml"] + volumes: + - ./telemetry/tempo-config.yaml:/etc/tempo.yaml + - tempo-data:/tmp/tempo networks: - ic-ws-gateway-network networks: ic-ws-gateway-network: name: ic-ws-gateway-network + +volumes: + tempo-data: + name: tempo-data + grafana-storage: + name: grafana-storage diff --git a/src/canister-utils/Cargo.toml b/src/canister-utils/Cargo.toml new file mode 100644 index 0000000..9254ae4 --- /dev/null +++ b/src/canister-utils/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "canister-utils" +version = "1.0.0" +edition.workspace = true +rust-version.workspace = true +repository.workspace = true +license.workspace = true + +[dependencies] +ic-agent = { workspace = true } +candid = { workspace = true } +tracing = { workspace = true } +serde = { workspace = true } +reqwest = { workspace = true } +serde_bytes = "0.11.12" + +[features] +mock-server = [] \ No newline at end of file diff --git a/src/ic-websocket-gateway/src/canister_methods.rs b/src/canister-utils/src/lib.rs similarity index 93% rename from src/ic-websocket-gateway/src/canister_methods.rs rename to src/canister-utils/src/lib.rs index 68b069d..c83dd9e 100644 --- a/src/ic-websocket-gateway/src/canister_methods.rs +++ b/src/canister-utils/src/lib.rs @@ -3,6 +3,7 @@ use ic_agent::AgentError; use ic_agent::{agent::http_transport::ReqwestTransport, identity::BasicIdentity, Agent}; use serde::{Deserialize, Serialize}; use std::fmt; +use tracing::Span; static IC_MAINNET_URLS: [&str; 2] = ["https://icp0.io", "https://icp-api.io"]; @@ -139,6 +140,9 @@ pub struct CanisterOutputCertifiedMessages { pub is_end_of_queue: Option, } +/// Canister message to be relayed to the client, together with its span +pub type IcWsCanisterMessage = (CanisterToClientMessage, Span); + pub fn is_mainnet(ic_network_url: &str) -> bool { IC_MAINNET_URLS.contains(&ic_network_url) } @@ -177,7 +181,7 @@ pub async fn ws_close( Decode!(&res, _CanisterWsCloseResult).map_err(|e| e.to_string())? } -#[cfg(not(test))] +#[cfg(not(feature = "mock-server"))] pub async fn ws_get_messages( agent: &Agent, canister_id: &Principal, @@ -196,7 +200,10 @@ pub async fn ws_get_messages( res.map_err(|e| IcError::Cdk(e)) } -#[cfg(test)] +/// In order to call the mock server during testing, make sure that the 'mock-server' +/// feature is enabled when importing this crate as a dev-dependecy +/// e.g. canister-utils = { path = "../canister-utils", features = ["mock-server"] } +#[cfg(feature = "mock-server")] pub async fn ws_get_messages( _agent: &Agent, _canister_id: &Principal, diff --git a/src/gateway-state/Cargo.toml b/src/gateway-state/Cargo.toml new file mode 100644 index 0000000..72bec7a --- /dev/null +++ b/src/gateway-state/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "gateway-state" +version = "1.0.0" +edition.workspace = true +rust-version.workspace = true +repository.workspace = true +license.workspace = true + +[dependencies] +dashmap = "5.5.3" +ic-agent = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +canister-utils = { version = "1.0.0", path = "../canister-utils" } \ No newline at end of file diff --git a/src/gateway-state/src/lib.rs b/src/gateway-state/src/lib.rs new file mode 100644 index 0000000..4df4557 --- /dev/null +++ b/src/gateway-state/src/lib.rs @@ -0,0 +1,507 @@ +use canister_utils::{ClientKey, IcWsCanisterMessage}; +use dashmap::{mapref::entry::Entry, DashMap}; +use ic_agent::export::Principal; +use std::sync::Arc; +use tokio::sync::mpsc::Sender; +use tracing::Span; + +/// State of the WS Gateway that can be shared between threads +#[derive(Clone)] +pub struct GatewayState { + inner: Arc, +} + +impl GatewayState { + pub fn new() -> Self { + Self { + inner: Arc::new(GatewayStateInner::new()), + } + } + + /// SAFETY: + /// + /// The [Dashmap::entry](https://docs.rs/dashmap/5.5.3/src/dashmap/lib.rs.html#1147-1163) method gets a write lock on the whole shard in which the entry is. + /// + /// The lock is moved into either the [OccupiedEntry](https://docs.rs/dashmap/5.5.3/src/dashmap/mapref/entry.rs.html#175-179) or the + /// [VacantEntry](https://docs.rs/dashmap/5.5.3/src/dashmap/mapref/entry.rs.html#117-120) when they are instantiated and + /// is released when they go out of scope at the end of this function. + /// + /// Therefore, this function is executed atomically. + /// + /// Holding a lock accross an '.await' may cause a deadlock. + /// To prevent deadlocks, this function shall NEVER be async. + /// This is sufficient to prevent the function from yielding while holding the lock. + /// + /// In order to not starve other tasks, make sure to keep the critical section as short as possible. + pub fn insert_client_channel_and_get_new_poller_state( + &self, + canister_id: CanisterPrincipal, + client_key: ClientKey, + client_channel_tx: Sender, + client_session_span: Span, + ) -> Option { + // START OF THE CRITICAL SECTION + match self.inner.data.entry(canister_id) { + Entry::Occupied(mut entry) => { + // the poller has already been started + // if the poller is active, add client key and sender end of the channel to the poller state + let poller_state = entry.get_mut(); + poller_state.insert( + client_key, + ClientSender { + sender: client_channel_tx.clone(), + span: client_session_span, + }, + ); + // the poller shall not be started again + None + }, + Entry::Vacant(entry) => { + // the poller has not been started yet + // initialize the poller state and add client key and sender end of the channel + let poller_state = Arc::new(DashMap::with_capacity_and_shard_amount(1024, 1024)); + poller_state.insert( + client_key, + ClientSender { + sender: client_channel_tx.clone(), + span: client_session_span, + }, + ); + entry.insert(Arc::clone(&poller_state)); + // the poller shall be started + Some(poller_state) + }, + } + // END OF THE CRITICAL SECTION + } + + /// SAFETY: + /// + /// The [Dashmap::entry](https://docs.rs/dashmap/5.5.3/src/dashmap/lib.rs.html#1147-1163) method gets a write lock on the whole shard in which the entry is. + /// + /// The lock is moved into the [OccupiedEntry](https://docs.rs/dashmap/5.5.3/src/dashmap/mapref/entry.rs.html#175-179) when it is instantiated and + /// is released when it goes out of scope at the end of this function. + /// + /// Therefore, this function is executed atomically. + /// + /// Holding a lock accross an '.await' may cause a deadlock. + /// To prevent deadlocks, this function shall NEVER be async. + /// This is sufficient to prevent the function from yielding while holding the lock. + /// + /// In order to not starve other tasks, make sure to keep the critical section as short as possible. + pub fn remove_client(&self, canister_id: CanisterPrincipal, client_key: ClientKey) { + // START OF THE CRITICAL SECTION + if let Entry::Occupied(mut entry) = self.inner.data.entry(canister_id) { + let poller_state = entry.get_mut(); + if poller_state.remove(&client_key).is_none() { + // as the client was connected, the poller state must contain an entry for 'client_key' + // if this is encountered it might indicate a race condition + unreachable!("Client key not found in poller state"); + } + // even if this is the last client session for the canister, do not remove the canister from the gateway state + // this will be done by the poller task + } + // END OF THE CRITICAL SECTION + + // this can happen when the poller has failed and the poller state has already been removed + // indeed, a client session might enter the Close state before the poller side of the channel has been dropped - but after the poller state has been removed - + // in such a case, the client state as already been removed by the poller, together with the whole poller state + // therefore there is no need to do anything else here + } + + /// SAFETY: + /// + /// The [Dashmap::entry](https://docs.rs/dashmap/5.5.3/src/dashmap/lib.rs.html#1147-1163) method gets a write lock on the whole shard in which the entry is. + /// + /// The lock is moved into the [OccupiedEntry](https://docs.rs/dashmap/5.5.3/src/dashmap/mapref/entry.rs.html#175-179) when it is instantiated and + /// is released when it goes out of scope at the end of this function. + /// + /// Therefore, this function is executed atomically. + /// + /// Holding a lock accross an '.await' may cause a deadlock. + /// To prevent deadlocks, this function shall NEVER be async. + /// This is sufficient to prevent the function from yielding while holding the lock. + /// + /// In order to not starve other tasks, make sure to keep the critical section as short as possible. + pub fn remove_client_if_exists( + &self, + canister_id: CanisterPrincipal, + client_key: ClientKey, + ) -> ClientEntry { + // START OF THE CRITICAL SECTION + if let Entry::Occupied(mut entry) = self.inner.data.entry(canister_id) { + let poller_state = entry.get_mut(); + + // even if this is the last client session for the canister, do not remove the canister from the gateway state + // this will be done by the poller task + // returns 'ClientEntry::Removed' if the client was removed, 'ClientEntry::Vacant' if there was no such client + return { + match poller_state.remove(&client_key) { + Some(_) => ClientEntry::Removed(client_key), + None => ClientEntry::Vacant, + } + }; + } + // END OF THE CRITICAL SECTION + + // this can happen when the poller has failed and the poller state has already been removed + // indeed, a client session might get an error before the poller side of the channel has been dropped - but after the poller state has been removed - + // in such a case, the client state has already been removed by the poller, together with the whole poller state + // therefore there is no need to do anything else here and we pretend that there is no such entry + ClientEntry::Vacant + } + + /// SAFETY: + /// + /// The [Dashmap::remove_if](https://docs.rs/dashmap/5.5.3/src/dashmap/lib.rs.html#944-969) method gets a write lock on the whole shard in which the entry is. + /// + /// The lock is held while checking the condition and removing the entry if the condition is met. + /// + /// Therefore, this function is executed atomically. + /// + /// This function shall be called only if it is guaranteed that the canister entry exists in the gateway state. + pub fn remove_canister_if_empty(&self, canister_id: CanisterPrincipal) -> CanisterEntry { + // remove_if returns None if the condition is not met, otherwise it returns the Some() + // if Some, the poller state is empty and therefore the poller shall terminate - return 'CanisterEntry::RemovedEmpty' + // if None, the poller state is not empty and therefore there are still clients connected and the poller shall not terminate - return 'CanisterEntry::NotEmpty' + match self + .inner + .data + .remove_if(&canister_id, |_, poller_state| poller_state.is_empty()) + { + Some(_) => CanisterEntry::RemovedEmpty, + None => CanisterEntry::NotEmpty, + } + } + + /// SAFETY: + /// + /// The [Dashmap::remove](https://docs.rs/dashmap/5.5.3/src/dashmap/lib.rs.html#930-942) method gets a write lock on the whole shard in which the entry is. + /// + /// The lock is held while removing the entry. + /// + /// Therefore, this function is executed atomically. + /// + /// This function shall be called only if it is guaranteed that the canister entry exists in the gateway state. + pub fn remove_failed_canister(&self, canister_id: CanisterPrincipal) { + if let None = self.inner.data.remove(&canister_id) { + unreachable!("failed canister not found in gateway state"); + } + } +} + +/// State of the WS Gateway consisting of the principal of each canister being polled +/// and the state of each poller +struct GatewayStateInner { + // the guard returned when locking a dashmap is 'Send', therefore it is critical + // that it is not held accross .await points + // more info: https://draft.ryhl.io/blog/shared-mutable-state/ + data: DashMap, +} + +impl GatewayStateInner { + fn new() -> Self { + Self { + data: DashMap::with_capacity_and_shard_amount(32, 32), + } + } +} + +/// State of each poller consisting of the keys of the clients connected to the poller +/// and the state associated to each client +pub type PollerState = Arc>; + +pub enum ClientEntry { + Removed(ClientKey), + Vacant, +} + +pub enum CanisterEntry { + RemovedEmpty, + NotEmpty, +} + +/// State of each client consisting of the sender side of the channel used to send canister updates to the client +/// and the span associated to the client session +#[derive(Debug)] +pub struct ClientSender { + pub sender: Sender, + pub span: ClientSessionSpan, +} + +pub type ClientSessionSpan = Span; + +pub type CanisterPrincipal = Principal; + +#[cfg(test)] +mod tests { + use tokio::sync::mpsc::{self, Receiver}; + + use super::*; + use std::{ + thread, + time::{Duration, Instant}, + }; + + #[tokio::test] + async fn should_insert_new_client_channels_and_get_new_poller_state_once() { + let clients_count = 1000; + let gateway_state = GatewayState::new(); + let canister_id = Principal::from_text("aaaaa-aa").unwrap(); + thread::scope(|s| { + let mut handles = Vec::new(); + for i in 0..clients_count { + let client_key = ClientKey::new(Principal::anonymous(), i); + let handle = s.spawn(|| { + let (client_channel_tx, _): ( + Sender, + Receiver, + ) = mpsc::channel(100); + + gateway_state.insert_client_channel_and_get_new_poller_state( + canister_id, + client_key, + client_channel_tx, + Span::current(), + ) + }); + handles.push(handle); + } + let mut count = 0; + let mut poller_state: Option>> = None; + for h in handles.into_iter() { + if let Some(state) = h.join().unwrap() { + poller_state = Some(state.clone()); + count += 1; + } + } + assert_eq!(count, 1); + assert_eq!( + poller_state.expect("must be some").len(), + clients_count as usize + ); + }); + } + + #[tokio::test] + async fn benchmark_insertions_only() { + let clients_count = 1000; + let gateway_state = GatewayState::new(); + let canister_id = Principal::from_text("aaaaa-aa").unwrap(); + thread::scope(|s| { + let mut handles = Vec::new(); + for i in 0..clients_count { + let client_key = ClientKey::new(Principal::anonymous(), i); + let handle = s.spawn(|| { + let (client_channel_tx, _): ( + Sender, + Receiver, + ) = mpsc::channel(100); + + let start = Instant::now(); + gateway_state.insert_client_channel_and_get_new_poller_state( + canister_id, + client_key, + client_channel_tx, + Span::current(), + ); + Instant::now() - start + }); + handles.push(handle); + } + let mut tot = Duration::from_secs(0); + for h in handles { + tot += h.join().unwrap(); + } + println!( + "Average for 'insert_client_channel_and_get_new_poller_state' from {} different threads: {:?}", + clients_count, + tot / clients_count as u32 + ); + }); + } + + #[tokio::test] + async fn benchmark_insertions_while_check_if_empty() { + let iterations = 10_000; + let gateway_state = GatewayState::new(); + let canister_id = Principal::from_text("aaaaa-aa").unwrap(); + + let start = Instant::now(); + let mut tot = Duration::from_secs(0); + for i in 0..iterations { + let client_key = ClientKey::new(Principal::anonymous(), i); + let (client_channel_tx, _): ( + Sender, + Receiver, + ) = mpsc::channel(100); + + let start = Instant::now(); + gateway_state.insert_client_channel_and_get_new_poller_state( + canister_id, + client_key, + client_channel_tx, + Span::current(), + ); + tot += Instant::now() - start; + } + let average_idle = tot / iterations as u32; + let elapsed_idle = Instant::now() - start; + + { + let gateway_state = gateway_state.clone(); + let canister_id = canister_id.clone(); + thread::spawn(move || loop { + gateway_state.remove_canister_if_empty(canister_id); + }); + } + + let start = Instant::now(); + let mut tot = Duration::from_secs(0); + for i in 0..iterations { + let client_key = ClientKey::new(Principal::anonymous(), i); + let (client_channel_tx, _): ( + Sender, + Receiver, + ) = mpsc::channel(100); + + let start = Instant::now(); + gateway_state.insert_client_channel_and_get_new_poller_state( + canister_id, + client_key, + client_channel_tx, + Span::current(), + ); + tot += Instant::now() - start; + } + let average_busy = tot / iterations as u32; + let elapsed_busy = Instant::now() - start; + println!( + "Run {} iterations of 'insert_client_channel_and_get_new_poller_state' on the same thread\nElapsed while: + idle: {:?} + busy: {:?} + deterioration: {:?}\nAverage while: + idle: {:?} + busy: {:?} + deterioration: {:?}", + iterations, + elapsed_idle, + elapsed_busy, + elapsed_busy.as_secs_f64() / elapsed_idle.as_secs_f64(), + average_idle, + average_busy, + average_busy.as_secs_f64() / average_idle.as_secs_f64(), + ); + } + + #[tokio::test] + async fn benchmark_check_if_empty_while_insertions() { + let iterations = 10_000; + let gateway_state = GatewayState::new(); + let canister_id = Principal::from_text("aaaaa-aa").unwrap(); + + let start = Instant::now(); + let mut tot = Duration::from_secs(0); + for _ in 0..iterations { + let start = Instant::now(); + gateway_state.remove_canister_if_empty(canister_id); + tot += Instant::now() - start; + } + let average_idle = tot / iterations as u32; + let elapsed_idle = Instant::now() - start; + + { + let gateway_state = gateway_state.clone(); + let canister_id = canister_id.clone(); + thread::spawn(move || { + for i in 0.. { + let client_key = ClientKey::new(Principal::anonymous(), i); + let (client_channel_tx, _): ( + Sender, + Receiver, + ) = mpsc::channel(100); + + gateway_state.insert_client_channel_and_get_new_poller_state( + canister_id, + client_key, + client_channel_tx, + Span::current(), + ); + } + }); + } + + let start = Instant::now(); + let mut tot = Duration::from_secs(0); + for _ in 0..iterations { + let start = Instant::now(); + gateway_state.remove_canister_if_empty(canister_id); + tot += Instant::now() - start; + } + let average_busy = tot / iterations as u32; + let elapsed_busy = Instant::now() - start; + + println!( + "Run {} iterations of 'remove_canister_if_empty' on the same thread\nElapsed while: + idle: {:?} + busy: {:?} + deterioration: {:?}\nAverage while: + idle: {:?} + busy: {:?} + deterioration: {:?}", + iterations, + elapsed_idle, + elapsed_busy, + elapsed_busy.as_secs_f64() / elapsed_idle.as_secs_f64(), + average_idle, + average_busy, + average_busy.as_secs_f64() / average_idle.as_secs_f64(), + ); + } + + #[tokio::test] + async fn simulate_ic_ws() { + let iterations = 100; + let gateway_state = GatewayState::new(); + let canister_id = Principal::from_text("aaaaa-aa").unwrap(); + + { + let gateway_state = gateway_state.clone(); + let canister_id = canister_id.clone(); + thread::spawn(move || { + for i in 0.. { + let client_key = ClientKey::new(Principal::anonymous(), i); + let (client_channel_tx, _): ( + Sender, + Receiver, + ) = mpsc::channel(100); + + gateway_state.insert_client_channel_and_get_new_poller_state( + canister_id, + client_key, + client_channel_tx, + Span::current(), + ); + // simulates 100 clients connecting each second + thread::sleep(Duration::from_millis(10)); + } + }); + } + + let mut tot = Duration::from_secs(0); + for _ in 0..iterations { + let start = Instant::now(); + gateway_state.remove_canister_if_empty(canister_id); + tot += Instant::now() - start; + // simulates a polling iteration of 1 ms + thread::sleep(Duration::from_millis(1)); + } + let average = tot / iterations as u32; + + println!( + "Run {} iterations of 'remove_canister_if_empty' on the same thread while simulating 100 clients connecting each second. + Average time: {:?}", + iterations, average, + ); + } +} diff --git a/src/ic-websocket-gateway/Cargo.toml b/src/ic-websocket-gateway/Cargo.toml index ba316ea..ffde276 100644 --- a/src/ic-websocket-gateway/Cargo.toml +++ b/src/ic-websocket-gateway/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ic_websocket_gateway" -version = "1.2.3" +version = "1.3.0" edition.workspace = true rust-version.workspace = true repository.workspace = true @@ -10,11 +10,9 @@ license.workspace = true async-trait = "0.1.72" candid = { workspace = true } ic-agent = { workspace = true } -ic-cdk = { workspace = true } -ic-cdk-macros = { workspace = true } serde = { workspace = true } serde_cbor = "0.11.2" -tokio = { version = "1.29.1", features = ["full"] } +tokio = { workspace = true } tokio-native-tls = "0.3.1" native-tls = "0.2.11" tokio-util = "0.7.8" @@ -22,24 +20,26 @@ serde_bytes = "0.11.12" tokio-tungstenite = "0.20.0" futures-util = "0.3.28" structopt = "0.3.21" -tracing = "0.1.40" +tracing = { workspace = true } tracing-subscriber = { version = "0.3.17", features = [ "fmt", "std", "json", "env-filter", + "registry", ] } tracing-appender = "0.2" -ic-identity = { version = "0.1.0", path = "../ic-identity" } +tracing-opentelemetry = "0.22.0" +opentelemetry = { version = "0.21" } +opentelemetry-otlp = { version = "0.14.0" } +opentelemetry_sdk = { version = "0.21", features = ["rt-tokio"] } +ic-identity = { path = "../ic-identity" } +gateway-state = { path = "../gateway-state" } +canister-utils = { path = "../canister-utils" } rand = "0.8" -reqwest = { workspace = true } -opentelemetry = { version = "0.20" } -tracing-opentelemetry = "0.20" -opentelemetry-jaeger = { version = "0.19", features = ["rt-tokio"] } -opentelemetry_sdk = { version = "0.20", features = ["rt-tokio"] } -dashmap = "5.5.3" [dev-dependencies] websocket = "0.26.5" mockito = "1.2.0" lazy_static = "1.4.0" +canister-utils = { path = "../canister-utils", features = ["mock-server"] } diff --git a/src/ic-websocket-gateway/src/canister_poller.rs b/src/ic-websocket-gateway/src/canister_poller.rs index 5ed007a..04df3ed 100644 --- a/src/ic-websocket-gateway/src/canister_poller.rs +++ b/src/ic-websocket-gateway/src/canister_poller.rs @@ -1,11 +1,9 @@ -use crate::{ - canister_methods::{ - self, CanisterOutputCertifiedMessages, CanisterToClientMessage, - CanisterWsGetMessagesArguments, IcError, - }, - manager::{CanisterPrincipal, ClientSender, GatewaySharedState, PollerState}, -}; use candid::Principal; +use canister_utils::{ + ws_get_messages, CanisterOutputCertifiedMessages, CanisterToClientMessage, + CanisterWsGetMessagesArguments, IcError, IcWsCanisterMessage, +}; +use gateway_state::{CanisterEntry, CanisterPrincipal, ClientSender, GatewayState, PollerState}; use ic_agent::{Agent, AgentError}; use std::{sync::Arc, time::Duration}; use tokio::sync::mpsc::Sender; @@ -16,9 +14,6 @@ enum PollingStatus { MessagesPolled(CanisterOutputCertifiedMessages), } -/// Canister message to be relayed to the client, together with its span -pub type IcWsCanisterMessage = (CanisterToClientMessage, Span); - /// Poller which periodically queries a canister for new messages and relays them to the client pub struct CanisterPoller { /// Agent used to communicate with the IC @@ -28,7 +23,7 @@ pub struct CanisterPoller { /// State of the poller poller_state: PollerState, /// State of the gateway - gateway_shared_state: GatewaySharedState, + gateway_state: GatewayState, /// Nonce specified by the gateway during the query call to ws_get_messages, /// used by the CDK to determine which messages to respond with next_message_nonce: u64, @@ -44,14 +39,14 @@ impl CanisterPoller { agent: Arc, canister_id: Principal, poller_state: PollerState, - gateway_shared_state: GatewaySharedState, + gateway_state: GatewayState, polling_interval_ms: u64, ) -> Self { Self { agent, canister_id, poller_state, - gateway_shared_state, + gateway_state, next_message_nonce: 0, polling_iteration: 0, polling_interval_ms, @@ -81,8 +76,7 @@ impl CanisterPoller { // as the poller state contains all the state of the clients sessions opened to the failed poller, removing the poller state // will also remove all the corresponding clients' states // therefore, there is no need to wait for the clients to remove their state before terminating the poller - self.gateway_shared_state - .remove_failed_canister(self.canister_id); + self.gateway_state.remove_failed_canister(self.canister_id); // TODO: notify the canister that it cannot be polled anymore return Err(e); } @@ -145,7 +139,7 @@ impl CanisterPoller { trace!("Started polling iteration"); // get messages to be relayed to clients from canister (starting from 'message_nonce') - match canister_methods::ws_get_messages( + match ws_get_messages( &self.agent, &self.canister_id, CanisterWsGetMessagesArguments { @@ -211,7 +205,7 @@ impl CanisterPoller { trace!("Start relaying message",); (canister_to_client_message, Span::current()) }); - relay_message(canister_message, client_channel_tx) + relay_message(canister_message, &client_channel_tx) .instrument(canister_message_span) .await; relayed_messages_count += 1; @@ -293,9 +287,13 @@ impl CanisterPoller { // this is not acceptable as it results in messages being lost // therefore, check if the poller should be terminated after each polling iteration // TODO: find a more efficient way to do this, e.g. when the last client disconnects - return self - .gateway_shared_state - .remove_canister_if_empty(self.canister_id); + match self + .gateway_state + .remove_canister_if_empty(self.canister_id) + { + CanisterEntry::RemovedEmpty => true, + CanisterEntry::NotEmpty => false, + } } } diff --git a/src/ic-websocket-gateway/src/client_session.rs b/src/ic-websocket-gateway/src/client_session.rs index ee66b4c..ce3f569 100644 --- a/src/ic-websocket-gateway/src/client_session.rs +++ b/src/ic-websocket-gateway/src/client_session.rs @@ -1,13 +1,12 @@ -use crate::{ - canister_methods::{CanisterToClientMessage, CanisterWsOpenArguments, ClientKey}, - canister_poller::IcWsCanisterMessage, - manager::CanisterPrincipal, -}; use candid::{decode_args, Principal}; +use canister_utils::{ + CanisterToClientMessage, CanisterWsOpenArguments, ClientKey, IcWsCanisterMessage, +}; use futures_util::{ stream::{SplitSink, SplitStream}, SinkExt, StreamExt, }; +use gateway_state::CanisterPrincipal; use ic_agent::{ agent::{Envelope, EnvelopeContent}, Agent, AgentError, @@ -59,7 +58,7 @@ pub enum IcWsError { Poller(String), } -/// IC WebSocket session +/// Actor for an IC WebSocket session pub struct ClientSession { /// Identifier of the client connection _client_id: u64, diff --git a/src/ic-websocket-gateway/src/client_session_handler.rs b/src/ic-websocket-gateway/src/client_session_handler.rs index 11baab8..6d1b5c9 100644 --- a/src/ic-websocket-gateway/src/client_session_handler.rs +++ b/src/ic-websocket-gateway/src/client_session_handler.rs @@ -1,11 +1,11 @@ use crate::{ - canister_methods::{self, CanisterWsCloseArguments, ClientKey}, - canister_poller::{CanisterPoller, IcWsCanisterMessage}, + canister_poller::CanisterPoller, client_session::{ClientSession, IcWsError, IcWsSessionState}, - manager::{CanisterPrincipal, GatewaySharedState, PollerState}, ws_listener::ClientId, }; +use canister_utils::{ws_close, CanisterWsCloseArguments, ClientKey, IcWsCanisterMessage}; use futures_util::StreamExt; +use gateway_state::{CanisterPrincipal, ClientEntry, GatewayState, PollerState}; use ic_agent::Agent; use std::sync::Arc; use tokio::{ @@ -22,7 +22,7 @@ pub struct ClientSessionHandler { /// Agent used to interact with the IC agent: Arc, /// State of the gateway - gateway_shared_state: GatewaySharedState, + gateway_state: GatewayState, /// Polling interval in milliseconds polling_interval_ms: u64, } @@ -31,13 +31,13 @@ impl ClientSessionHandler { pub fn new( id: ClientId, agent: Arc, - gateway_shared_state: GatewaySharedState, + gateway_state: GatewayState, polling_interval_ms: u64, ) -> Self { Self { id, agent, - gateway_shared_state, + gateway_state, polling_interval_ms, } } @@ -133,7 +133,7 @@ impl ClientSessionHandler { let canister_id = self.get_canister_id(&client_session); let client_key = self.get_client_key(&client_session); let new_poller_state = self - .gateway_shared_state + .gateway_state .insert_client_channel_and_get_new_poller_state( canister_id, client_key, @@ -180,7 +180,7 @@ impl ClientSessionHandler { let canister_id = self.get_canister_id(&client_session); let client_key = self.get_client_key(&client_session); // remove client from poller state - self.gateway_shared_state + self.gateway_state .remove_client(canister_id, client_key.clone()); self.call_ws_close(&canister_id, client_key).await; @@ -203,9 +203,9 @@ impl ClientSessionHandler { // remove client from poller state, if it is present // error might have happened before the client session was Setup // if so, there is no need to remove the client as it is not yet in the poller state - if self - .gateway_shared_state - .remove_client_if_exists(canister_id, client_key.clone()) + if let ClientEntry::Removed(client_key) = self + .gateway_state + .remove_client_if_exists(canister_id, client_key) { self.call_ws_close(&canister_id, client_key).await; @@ -239,7 +239,7 @@ impl ClientSessionHandler { async fn call_ws_close(&self, canister_id: &CanisterPrincipal, client_key: ClientKey) { // call ws_close so that the client is removed from the canister - if let Err(e) = canister_methods::ws_close( + if let Err(e) = ws_close( &self.agent, &canister_id, CanisterWsCloseArguments { client_key }, @@ -256,11 +256,11 @@ impl ClientSessionHandler { /// Starts a new canister poller fn start_poller(&self, canister_id: CanisterPrincipal, poller_state: PollerState) { - info!("Starting poller"); + info!("Starting poller for canister: {}", canister_id); // spawn new canister poller task let agent = Arc::clone(&self.agent); - let gateway_shared_state = Arc::clone(&self.gateway_shared_state); + let gateway_state = self.gateway_state.clone(); let polling_interval_ms = self.polling_interval_ms; tokio::spawn(async move { // we pass both the whole gateway state and the poller state for the specific canister @@ -273,13 +273,16 @@ impl ClientSessionHandler { agent, canister_id, poller_state, - gateway_shared_state, + gateway_state, polling_interval_ms, ); if let Err(e) = poller.run_polling().await { - warn!("Poller terminated with error: {:?}", e); + warn!( + "Poller for canister {} terminated with error: {:?}", + canister_id, e + ); } else { - info!("Poller terminated"); + info!("Poller for canister {} terminated", canister_id); } // the poller takes care of notifying the session handlers when an error is detected // and removing its corresponding entry from the gateway state diff --git a/src/ic-websocket-gateway/src/gateway_tracing.rs b/src/ic-websocket-gateway/src/gateway_tracing.rs index 1e55a66..e114853 100644 --- a/src/ic-websocket-gateway/src/gateway_tracing.rs +++ b/src/ic-websocket-gateway/src/gateway_tracing.rs @@ -1,5 +1,7 @@ use candid::Principal; -use opentelemetry_sdk::trace; +use opentelemetry::KeyValue; +use opentelemetry_otlp::{Protocol, WithExportConfig}; +use opentelemetry_sdk::Resource; use std::{ fs::{self, File}, path::Path, @@ -15,7 +17,7 @@ pub struct InitTracingResult { } pub fn init_tracing( - telemetry_jaeger_agent_endpoint: Option, + opentelemetry_collector_endpoint: Option, gateway_principal: Principal, ) -> Result { if !Path::new("./data/traces").is_dir() { @@ -54,32 +56,37 @@ pub fn init_tracing( .with_filter(env_filter_stdout); let is_telemetry_enabled = - match telemetry_jaeger_agent_endpoint + match opentelemetry_collector_endpoint .and_then(|s| if s.is_empty() { None } else { Some(s) }) { - Some(telemetry_jaeger_agent_endpoint) => { - opentelemetry::global::set_text_map_propagator( - opentelemetry_jaeger::Propagator::new(), - ); - - let tracer = opentelemetry_jaeger::new_agent_pipeline() - .with_service_name( - "ic-ws-gw-".to_string() + &gateway_principal.to_string()[..5], - ) - .with_max_packet_size(9216) // on MacOS 9216 is the max amount of bytes that can be sent in a single UDP packet - .with_endpoint(telemetry_jaeger_agent_endpoint) - .with_auto_split_batch(true) - .with_trace_config( - trace::config().with_sampler(trace::Sampler::TraceIdRatioBased(1.0)), - ) + Some(opentelemetry_collector_endpoint) => { + let otlp_exporter = opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(opentelemetry_collector_endpoint) + .with_protocol(Protocol::Grpc); + + let otlp_config = + opentelemetry_sdk::trace::config().with_resource(Resource::new(vec![ + KeyValue::new( + "service.name", + "ic-ws-gw-".to_string() + &gateway_principal.to_string()[..5], + ), + ])); + + let otlp_tracer = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter(otlp_exporter) + .with_trace_config(otlp_config) .install_batch(opentelemetry_sdk::runtime::Tokio) - .expect("should set up machinery to export data"); + .expect("failed to install"); + let env_filter_telemetry = EnvFilter::builder() .with_env_var("RUST_LOG_TELEMETRY") .try_from_env() .unwrap_or_else(|_| EnvFilter::new("ic_websocket_gateway=trace")); + let opentelemetry = tracing_opentelemetry::layer() - .with_tracer(tracer) + .with_tracer(otlp_tracer) .with_filter(env_filter_telemetry); let subscriber = tracing_subscriber::registry() diff --git a/src/ic-websocket-gateway/src/main.rs b/src/ic-websocket-gateway/src/main.rs index ee51651..c54c8e8 100644 --- a/src/ic-websocket-gateway/src/main.rs +++ b/src/ic-websocket-gateway/src/main.rs @@ -1,12 +1,13 @@ -use crate::gateway_tracing::{init_tracing, InitTracingResult}; -use crate::manager::Manager; -use crate::ws_listener::TlsConfig; +use crate::{ + gateway_tracing::{init_tracing, InitTracingResult}, + manager::Manager, + ws_listener::TlsConfig, +}; use ic_identity::{get_identity_from_key_pair, load_key_pair}; use std::{fs, path::Path}; use structopt::StructOpt; use tracing::info; -mod canister_methods; mod canister_poller; mod client_session; mod client_session_handler; @@ -46,7 +47,7 @@ struct DeploymentInfo { /// ```bash /// docker run -d -p6831:6831/udp -p6832:6832/udp -p16686:16686 -p14268:14268 jaegertracing/all-in-one:latest /// ``` - telemetry_jaeger_agent_endpoint: Option, + opentelemetry_collector_endpoint: Option, } fn create_data_dir() -> Result<(), String> { @@ -76,7 +77,7 @@ async fn main() -> Result<(), String> { guards: _guards, is_telemetry_enabled, } = init_tracing( - deployment_info.telemetry_jaeger_agent_endpoint.to_owned(), + deployment_info.opentelemetry_collector_endpoint.to_owned(), gateway_principal, ) .expect("could not init tracing"); diff --git a/src/ic-websocket-gateway/src/manager.rs b/src/ic-websocket-gateway/src/manager.rs index 845b704..1e7f9cc 100644 --- a/src/ic-websocket-gateway/src/manager.rs +++ b/src/ic-websocket-gateway/src/manager.rs @@ -1,141 +1,10 @@ -use dashmap::{mapref::entry::Entry, DashMap}; +use crate::ws_listener::{TlsConfig, WsListener}; +use canister_utils::get_new_agent; +use gateway_state::GatewayState; use ic_agent::{export::Principal, identity::BasicIdentity, Agent}; use std::sync::Arc; -use tokio::{sync::mpsc::Sender, task::JoinHandle}; -use tracing::{info, Span}; - -use crate::{ - canister_methods::{self, ClientKey}, - canister_poller::IcWsCanisterMessage, - ws_listener::{TlsConfig, WsListener}, -}; - -/// State of the WS Gateway that can be shared between threads -pub type GatewaySharedState = Arc; - -/// State of the WS Gateway consisting of the principal of each canister being polled -/// and the state of each poller -pub struct GatewayState(DashMap); - -impl GatewayState { - pub fn new() -> Self { - Self(DashMap::with_capacity_and_shard_amount(32, 32)) - } -} - -impl GatewayState { - pub fn insert_client_channel_and_get_new_poller_state( - &self, - canister_id: CanisterPrincipal, - client_key: ClientKey, - client_channel_tx: Sender, - client_session_span: Span, - ) -> Option { - // TODO: figure out if this is actually atomic - match self.0.entry(canister_id) { - Entry::Occupied(mut entry) => { - // the poller has already been started - // if the poller is active, add client key and sender end of the channel to the poller state - let poller_state = entry.get_mut(); - poller_state.insert( - client_key, - ClientSender { - sender: client_channel_tx.clone(), - span: client_session_span, - }, - ); - // the poller shall not be started again - None - }, - Entry::Vacant(entry) => { - // the poller has not been started yet - // initialize the poller state and add client key and sender end of the channel - let poller_state = Arc::new(DashMap::with_capacity_and_shard_amount(1024, 1024)); - poller_state.insert( - client_key, - ClientSender { - sender: client_channel_tx.clone(), - span: client_session_span, - }, - ); - entry.insert(Arc::clone(&poller_state)); - // the poller shall be started - Some(Arc::clone(&poller_state)) - }, - } - } - - pub fn remove_client(&self, canister_id: CanisterPrincipal, client_key: ClientKey) { - // TODO: figure out if this is actually atomic - if let Entry::Occupied(mut entry) = self.0.entry(canister_id) { - let poller_state = entry.get_mut(); - if poller_state.remove(&client_key).is_none() { - // as the client was connected, the poller state must contain an entry for 'client_key' - // if this is encountered it might indicate a race condition - unreachable!("Client key not found in poller state"); - } - // even if this is the last client session for the canister, do not remove the canister from the gateway state - // this will be done by the poller task - } - // this can happen when the poller has failed and the poller state has already been removed - // indeed, a client session might enter the Close state before the poller side of the channel has been dropped - but after the poller state has been removed - - // in such a case, the client state as already been removed by the poller, together with the whole poller state - // therefore there is no need to do anything else here - } - - pub fn remove_client_if_exists( - &self, - canister_id: CanisterPrincipal, - client_key: ClientKey, - ) -> bool { - // TODO: figure out if this is actually atomic - if let Entry::Occupied(mut entry) = self.0.entry(canister_id) { - let poller_state = entry.get_mut(); - - // even if this is the last client session for the canister, do not remove the canister from the gateway state - // this will be done by the poller task - // returns true if the client was removed, false if there was no such client - return poller_state.remove(&client_key).is_some(); - } - // this can happen when the poller has failed and the poller state has already been removed - // indeed, a client session might get an error before the poller side of the channel has been dropped - but after the poller state has been removed - - // in such a case, the client state as already been removed by the poller, together with the whole poller state - // therefore there is no need to do anything else here - false - } - - pub fn remove_canister_if_empty(&self, canister_id: CanisterPrincipal) -> bool { - // SAFETY: - // remove_if returns None if the condition is not met, otherwise it returns the Some() - // if None is returned, the poller state is not empty and therefore there are still clients connected and the poller shall not terminate - // if Some is returned, the poller state is empty and therefore the poller shall terminate - self.0 - .remove_if(&canister_id, |_, poller_state| poller_state.is_empty()) - .is_some() - } - - pub fn remove_failed_canister(&self, canister_id: CanisterPrincipal) { - if let None = self.0.remove(&canister_id) { - unreachable!("failed canister not found in gateway state"); - } - } -} - -/// State of each poller consisting of the keys of the clients connected to the poller -/// and the state associated to each client -pub type PollerState = Arc>; - -/// State of each client consisting of the sender side of the channel used to send canister updates to the client -/// and the span associated to the client session -#[derive(Debug)] -pub struct ClientSender { - pub sender: Sender, - pub span: ClientSessionSpan, -} - -pub type CanisterPrincipal = Principal; - -pub type ClientSessionSpan = Span; +use tokio::task::JoinHandle; +use tracing::info; /// Manager of the WS Gateway maintaining its state pub struct Manager { @@ -144,7 +13,7 @@ pub struct Manager { /// Gateway address address: String, /// State of the WS Gateway - state: GatewaySharedState, + state: GatewayState, } impl Manager { @@ -153,13 +22,13 @@ impl Manager { ic_network_url: String, identity: BasicIdentity, ) -> Self { - let agent = canister_methods::get_new_agent(&ic_network_url, identity) + let agent = get_new_agent(&ic_network_url, identity) .await .expect("could not get new agent"); let agent = Arc::new(agent); // creates a concurrent hashmap with capacity of 32 divided in shards so that each entry can be accessed concurrently without locking the whole state - let state: GatewaySharedState = Arc::new(GatewayState::new()); + let state: GatewayState = GatewayState::new(); return Self { agent, @@ -181,12 +50,12 @@ impl Manager { // spawn a task which keeps listening for incoming client connections let gateway_address = self.address.clone(); let agent = Arc::clone(&self.agent); - let gateway_shared_state = Arc::clone(&self.state); + let gateway_state = self.state.clone(); tokio::spawn(async move { let mut ws_listener = WsListener::new( &gateway_address, agent, - gateway_shared_state, + gateway_state, polling_interval, tls_config, ) diff --git a/src/ic-websocket-gateway/src/tests/canister_poller.rs b/src/ic-websocket-gateway/src/tests/canister_poller.rs index e0c382f..f9f821d 100644 --- a/src/ic-websocket-gateway/src/tests/canister_poller.rs +++ b/src/ic-websocket-gateway/src/tests/canister_poller.rs @@ -1,15 +1,12 @@ #[cfg(test)] mod test { - use crate::{ - canister_methods::{ - self, CanisterOutputCertifiedMessages, CanisterOutputMessage, - CanisterWsGetMessagesArguments, ClientKey, - }, - canister_poller::{get_nonce_from_message, CanisterPoller, IcWsCanisterMessage}, - manager::{GatewaySharedState, GatewayState}, - }; use candid::Principal; + use canister_utils::{ + ws_get_messages, CanisterOutputCertifiedMessages, CanisterOutputMessage, + CanisterWsGetMessagesArguments, ClientKey, IcWsCanisterMessage, + }; use futures_util::join; + use gateway_state::GatewayState; use ic_agent::{agent::http_transport::ReqwestTransport, Agent}; use lazy_static::lazy_static; use std::{ @@ -19,16 +16,16 @@ mod test { use tokio::sync::mpsc::{self, Receiver, Sender}; use tracing::Span; - impl CanisterOutputCertifiedMessages { - fn serialize(&self) -> Vec { - candid::encode_one(self).unwrap() - } + use crate::canister_poller::{get_nonce_from_message, CanisterPoller}; - fn mock_n(n: usize, base_nonce: usize) -> Self { + struct MockCanisterOutputCertifiedMessages(CanisterOutputCertifiedMessages); + + impl MockCanisterOutputCertifiedMessages { + fn mock_n(n: usize, base_nonce: usize) -> CanisterOutputCertifiedMessages { let messages = (0..n) - .map(|off_nonce| CanisterOutputMessage::mock(base_nonce + off_nonce)) + .map(|off_nonce| MockCanisterOutputMessage::mock(base_nonce + off_nonce)) .collect(); - Self { + CanisterOutputCertifiedMessages { messages, cert: Vec::default(), tree: Vec::default(), @@ -36,42 +33,49 @@ mod test { } } - fn mock_n_with_key_error(n: usize, base_nonce: usize) -> Self { - let mut canister_msgs = CanisterOutputCertifiedMessages::mock_n(n - 1, base_nonce); + fn mock_n_with_key_error(n: usize, base_nonce: usize) -> CanisterOutputCertifiedMessages { + let mut canister_msgs = MockCanisterOutputCertifiedMessages::mock_n(n - 1, base_nonce); canister_msgs .messages - .push(CanisterOutputMessage::mock_with_key_error()); + .push(MockCanisterOutputMessage::mock_with_key_error()); canister_msgs } - fn mock_n_with_not_end_of_queue(n: usize, base_nonce: usize) -> Self { - let mut canister_msgs = CanisterOutputCertifiedMessages::mock_n(n, base_nonce); + fn mock_n_with_not_end_of_queue( + n: usize, + base_nonce: usize, + ) -> CanisterOutputCertifiedMessages { + let mut canister_msgs = MockCanisterOutputCertifiedMessages::mock_n(n, base_nonce); canister_msgs.is_end_of_queue = Some(false); canister_msgs } } - impl CanisterOutputMessage { - fn mock(nonce: usize) -> Self { - Self { - client_key: ClientKey::mock(), + struct MockCanisterOutputMessage(CanisterOutputMessage); + + impl MockCanisterOutputMessage { + fn mock(nonce: usize) -> CanisterOutputMessage { + CanisterOutputMessage { + client_key: MockClientKey::mock(), key: format!("_{}", nonce), content: Vec::default(), } } - fn mock_with_key_error() -> Self { - Self { - client_key: ClientKey::mock(), + fn mock_with_key_error() -> CanisterOutputMessage { + CanisterOutputMessage { + client_key: MockClientKey::mock(), key: "_not-a-u64".to_string(), content: Vec::default(), } } } - impl ClientKey { - fn mock() -> Self { - Self { + struct MockClientKey(ClientKey); + + impl MockClientKey { + fn mock() -> ClientKey { + ClientKey { client_principal: Principal::anonymous(), client_nonce: 0, } @@ -93,12 +97,12 @@ mod test { polling_interval_ms: u64, client_channel_tx: Sender, ) -> CanisterPoller { - let gateway_shared_state: GatewaySharedState = Arc::new(GatewayState::new()); + let gateway_state: GatewayState = GatewayState::new(); - let poller_state = gateway_shared_state + let poller_state = gateway_state .insert_client_channel_and_get_new_poller_state( Principal::anonymous(), - ClientKey::mock(), + MockClientKey::mock(), client_channel_tx, Span::current(), ) @@ -113,16 +117,20 @@ mod test { ), Principal::anonymous(), poller_state, - gateway_shared_state, + gateway_state, polling_interval_ms, ) } + fn serialize(body: CanisterOutputCertifiedMessages) -> Vec { + candid::encode_one(body).unwrap() + } + #[tokio::test] async fn should_poll_and_validate_nonces() { let server = &*MOCK_SERVER; let msg_count = 10; - let body = CanisterOutputCertifiedMessages::mock_n(msg_count, 0).serialize(); + let body = serialize(MockCanisterOutputCertifiedMessages::mock_n(msg_count, 0)); let path = "/ws_get_messages"; let mut guard = server.lock().unwrap(); // do not drop the guard until the end of this test to make sure that no other test interleaves and overwrites the mock response @@ -139,8 +147,7 @@ mod test { .unwrap(); let args = CanisterWsGetMessagesArguments { nonce: 0 }; - match canister_methods::ws_get_messages(&agent, &Principal::anonymous(), args.clone()).await - { + match ws_get_messages(&agent, &Principal::anonymous(), args.clone()).await { Ok(res) => { assert_eq!(res.messages.len(), msg_count); for (i, msg) in res.messages.iter().enumerate() { @@ -162,7 +169,9 @@ mod test { async fn should_poll_and_fail_to_validate_last_nonce() { let server = &*MOCK_SERVER; let msg_count = 10; - let body = CanisterOutputCertifiedMessages::mock_n_with_key_error(msg_count, 0).serialize(); + let body = serialize(MockCanisterOutputCertifiedMessages::mock_n_with_key_error( + msg_count, 0, + )); let path = "/ws_get_messages"; let mut guard = server.lock().unwrap(); // do not drop the guard until the end of this test to make sure that no other test interleaves and overwrites the mock response @@ -179,7 +188,7 @@ mod test { .unwrap(); let args = CanisterWsGetMessagesArguments { nonce: 0 }; - match canister_methods::ws_get_messages(&agent, &Principal::anonymous(), args).await { + match ws_get_messages(&agent, &Principal::anonymous(), args).await { Ok(res) => { for (i, msg) in res.messages.iter().enumerate() { if i == msg_count - 1 { @@ -204,7 +213,7 @@ mod test { async fn should_sleep_after_relaying() { let server = &*MOCK_SERVER; let msg_count = 10; - let body = CanisterOutputCertifiedMessages::mock_n(msg_count, 0).serialize(); + let body = serialize(MockCanisterOutputCertifiedMessages::mock_n(msg_count, 0)); let path = "/ws_get_messages"; let mut guard = server.lock().unwrap(); // do not drop the guard until the end of this test to make sure that no other test interleaves and overwrites the mock response @@ -260,8 +269,9 @@ mod test { async fn should_not_sleep_after_relaying() { let server = &*MOCK_SERVER; let msg_count = 10; - let body = - CanisterOutputCertifiedMessages::mock_n_with_not_end_of_queue(msg_count, 0).serialize(); + let body = serialize( + MockCanisterOutputCertifiedMessages::mock_n_with_not_end_of_queue(msg_count, 0), + ); let path = "/ws_get_messages"; let mut guard = server.lock().unwrap(); // do not drop the guard until the end of this test to make sure that no other test interleaves and overwrites the mock response @@ -313,7 +323,7 @@ mod test { async fn should_terminate_polling_with_error() { let server = &*MOCK_SERVER; let msg_count = 10; - let body = CanisterOutputCertifiedMessages::mock_n(msg_count, 0).serialize(); + let body = serialize(MockCanisterOutputCertifiedMessages::mock_n(msg_count, 0)); let path = "/ws_get_messages"; let mut guard = server.lock().unwrap(); // do not drop the guard until the end of this test to make sure that no other test interleaves and overwrites the mock response @@ -347,6 +357,8 @@ mod test { // as the poller processed only the messages of the first polling iteration assert_eq!(i as usize, msg_count); + // needed to make sure that the test fails in case the task panics + let res = join!(handle).0.expect("task panicked"); // the poller should return an error as in the first iteration it polls the messages from 0 to 'msg_count - 1' // in the second iteration it polls the messages which start again from 0 (as the mock server retruns the same messages) // and therefore it returns an error as the poller was expecting the nonce to be equal to 'msg_count' @@ -355,7 +367,7 @@ mod test { "Non consecutive nonce: expected {}, got 0", msg_count )), - join!(handle).0.unwrap() + res ); mock.assert_async().await; diff --git a/src/ic-websocket-gateway/src/ws_listener.rs b/src/ic-websocket-gateway/src/ws_listener.rs index 76d11c2..6a59287 100644 --- a/src/ic-websocket-gateway/src/ws_listener.rs +++ b/src/ic-websocket-gateway/src/ws_listener.rs @@ -1,4 +1,5 @@ -use crate::{client_session_handler::ClientSessionHandler, manager::GatewaySharedState}; +use crate::client_session_handler::ClientSessionHandler; +use gateway_state::GatewayState; use ic_agent::Agent; use native_tls::Identity; use std::{fs, net::SocketAddr, sync::Arc, time::Duration}; @@ -49,7 +50,7 @@ pub struct WsListener { /// Agent used to interact with the IC agent: Arc, /// State of the gateway - gateway_shared_state: GatewaySharedState, + gateway_state: GatewayState, // Polling interval in milliseconds polling_interval_ms: u64, // Client ID assigned to the next client connection @@ -60,7 +61,7 @@ impl WsListener { pub async fn new( gateway_address: &str, agent: Arc, - gateway_shared_state: GatewaySharedState, + gateway_state: GatewayState, polling_interval_ms: u64, tls_config: Option, ) -> Self { @@ -91,7 +92,7 @@ impl WsListener { listener, tls_acceptor, agent, - gateway_shared_state, + gateway_state, polling_interval_ms, next_client_id: 0, } @@ -196,17 +197,13 @@ impl WsListener { span!(parent: &Span::current(),Level::DEBUG, "Client Session Handler", client_id); let agent = Arc::clone(&self.agent); - let gateway_shared_state = Arc::clone(&self.gateway_shared_state); + let gateway_state = self.gateway_state.clone(); let polling_interval_ms = self.polling_interval_ms; // spawn a session handler task for each incoming client connection tokio::spawn( async move { - let mut client_session_handler = ClientSessionHandler::new( - client_id, - agent, - gateway_shared_state, - polling_interval_ms, - ); + let mut client_session_handler = + ClientSessionHandler::new(client_id, agent, gateway_state, polling_interval_ms); debug!("Started client session handler task"); if let Err(e) = { diff --git a/telemetry/dashboards/duration_by_thread_id.json b/telemetry/dashboards/duration_by_thread_id.json new file mode 100644 index 0000000..460c7c6 --- /dev/null +++ b/telemetry/dashboards/duration_by_thread_id.json @@ -0,0 +1,1047 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": 1, + "links": [], + "liveNow": false, + "panels": [ + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 0 + }, + "id": 2, + "panels": [], + "title": "Duration by Thread ID", + "type": "row" + }, + { + "datasource": { + "type": "tempo", + "uid": "P214B5B846CF3925F" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "fillOpacity": 80, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineWidth": 1, + "scaleDistribution": { + "type": "linear" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 1 + }, + "id": 1, + "options": { + "barRadius": 0, + "barWidth": 0.97, + "fullHighlight": false, + "groupWidth": 0.7, + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "orientation": "auto", + "showValue": "auto", + "stacking": "none", + "tooltip": { + "mode": "single", + "sort": "none" + }, + "xField": "Start time", + "xTickLabelRotation": 0, + "xTickLabelSpacing": 0 + }, + "targets": [ + { + "datasource": { + "type": "tempo", + "uid": "P214B5B846CF3925F" + }, + "filters": [ + { + "id": "d4ede8da", + "operator": "=", + "scope": "span", + "tag": "thread.id", + "value": [ + "2" + ], + "valueType": "int" + }, + { + "id": "service-name", + "operator": "=", + "scope": "resource", + "tag": "service.name", + "value": [ + "ic-ws-gw-ipc77" + ], + "valueType": "string" + }, + { + "id": "span-name", + "operator": "=", + "scope": "span", + "tag": "name", + "value": [ + "Accept Connection" + ], + "valueType": "string" + } + ], + "limit": 20, + "queryType": "traceqlSearch", + "refId": "A", + "tableType": "traces" + } + ], + "title": "Thread ID 2", + "type": "barchart" + }, + { + "datasource": { + "type": "tempo", + "uid": "P214B5B846CF3925F" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "fillOpacity": 80, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineWidth": 1, + "scaleDistribution": { + "type": "linear" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 1 + }, + "id": 6, + "options": { + "barRadius": 0, + "barWidth": 0.97, + "fullHighlight": false, + "groupWidth": 0.7, + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "orientation": "auto", + "showValue": "auto", + "stacking": "none", + "tooltip": { + "mode": "single", + "sort": "none" + }, + "xField": "Start time", + "xTickLabelRotation": 0, + "xTickLabelSpacing": 0 + }, + "targets": [ + { + "datasource": { + "type": "tempo", + "uid": "P214B5B846CF3925F" + }, + "filters": [ + { + "id": "d4ede8da", + "operator": "=", + "scope": "span", + "tag": "thread.id", + "value": [ + "3" + ], + "valueType": "int" + }, + { + "id": "service-name", + "operator": "=", + "scope": "resource", + "tag": "service.name", + "value": [ + "ic-ws-gw-ipc77" + ], + "valueType": "string" + }, + { + "id": "span-name", + "operator": "=", + "scope": "span", + "tag": "name", + "value": [ + "Accept Connection" + ], + "valueType": "string" + } + ], + "limit": 20, + "queryType": "traceqlSearch", + "refId": "A", + "tableType": "traces" + } + ], + "title": "Thread ID 3", + "type": "barchart" + }, + { + "datasource": { + "type": "tempo", + "uid": "P214B5B846CF3925F" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "fillOpacity": 80, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineWidth": 1, + "scaleDistribution": { + "type": "linear" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 9 + }, + "id": 5, + "options": { + "barRadius": 0, + "barWidth": 0.97, + "fullHighlight": false, + "groupWidth": 0.7, + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "orientation": "auto", + "showValue": "auto", + "stacking": "none", + "tooltip": { + "mode": "single", + "sort": "none" + }, + "xField": "Start time", + "xTickLabelRotation": 0, + "xTickLabelSpacing": 0 + }, + "targets": [ + { + "datasource": { + "type": "tempo", + "uid": "P214B5B846CF3925F" + }, + "filters": [ + { + "id": "d4ede8da", + "operator": "=", + "scope": "span", + "tag": "thread.id", + "value": [ + "4" + ], + "valueType": "int" + }, + { + "id": "service-name", + "operator": "=", + "scope": "resource", + "tag": "service.name", + "value": [ + "ic-ws-gw-ipc77" + ], + "valueType": "string" + }, + { + "id": "span-name", + "operator": "=", + "scope": "span", + "tag": "name", + "value": [ + "Accept Connection" + ], + "valueType": "string" + } + ], + "limit": 20, + "queryType": "traceqlSearch", + "refId": "A", + "tableType": "traces" + } + ], + "title": "Thread ID 4", + "type": "barchart" + }, + { + "datasource": { + "type": "tempo", + "uid": "P214B5B846CF3925F" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "fillOpacity": 80, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineWidth": 1, + "scaleDistribution": { + "type": "linear" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 9 + }, + "id": 7, + "options": { + "barRadius": 0, + "barWidth": 0.97, + "fullHighlight": false, + "groupWidth": 0.7, + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "orientation": "auto", + "showValue": "auto", + "stacking": "none", + "tooltip": { + "mode": "single", + "sort": "none" + }, + "xField": "Start time", + "xTickLabelRotation": 0, + "xTickLabelSpacing": 0 + }, + "targets": [ + { + "datasource": { + "type": "tempo", + "uid": "P214B5B846CF3925F" + }, + "filters": [ + { + "id": "d4ede8da", + "operator": "=", + "scope": "span", + "tag": "thread.id", + "value": [ + "5" + ], + "valueType": "int" + }, + { + "id": "service-name", + "operator": "=", + "scope": "resource", + "tag": "service.name", + "value": [ + "ic-ws-gw-ipc77" + ], + "valueType": "string" + }, + { + "id": "span-name", + "operator": "=", + "scope": "span", + "tag": "name", + "value": [ + "Accept Connection" + ], + "valueType": "string" + } + ], + "limit": 20, + "queryType": "traceqlSearch", + "refId": "A", + "tableType": "traces" + } + ], + "title": "Thread ID 5", + "type": "barchart" + }, + { + "datasource": { + "type": "tempo", + "uid": "P214B5B846CF3925F" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "fillOpacity": 80, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineWidth": 1, + "scaleDistribution": { + "type": "linear" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 17 + }, + "id": 3, + "options": { + "barRadius": 0, + "barWidth": 0.97, + "fullHighlight": false, + "groupWidth": 0.7, + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "orientation": "auto", + "showValue": "auto", + "stacking": "none", + "tooltip": { + "mode": "single", + "sort": "none" + }, + "xField": "Start time", + "xTickLabelRotation": 0, + "xTickLabelSpacing": 0 + }, + "targets": [ + { + "datasource": { + "type": "tempo", + "uid": "P214B5B846CF3925F" + }, + "filters": [ + { + "id": "d4ede8da", + "operator": "=", + "scope": "span", + "tag": "thread.id", + "value": [], + "valueType": "int" + }, + { + "id": "service-name", + "operator": "=", + "scope": "resource", + "tag": "service.name", + "value": [ + "ic-ws-gw-ipc77" + ], + "valueType": "string" + }, + { + "id": "span-name", + "operator": "=", + "scope": "span", + "tag": "name", + "value": [ + "Accept Connection" + ], + "valueType": "string" + } + ], + "limit": 20, + "queryType": "traceqlSearch", + "refId": "A", + "tableType": "traces" + } + ], + "title": "Thread ID 6", + "type": "barchart" + }, + { + "datasource": { + "type": "tempo", + "uid": "P214B5B846CF3925F" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "fillOpacity": 80, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineWidth": 1, + "scaleDistribution": { + "type": "linear" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 17 + }, + "id": 8, + "options": { + "barRadius": 0, + "barWidth": 0.97, + "fullHighlight": false, + "groupWidth": 0.7, + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "orientation": "auto", + "showValue": "auto", + "stacking": "none", + "tooltip": { + "mode": "single", + "sort": "none" + }, + "xField": "Start time", + "xTickLabelRotation": 0, + "xTickLabelSpacing": 0 + }, + "targets": [ + { + "datasource": { + "type": "tempo", + "uid": "P214B5B846CF3925F" + }, + "filters": [ + { + "id": "d4ede8da", + "operator": "=", + "scope": "span", + "tag": "thread.id", + "value": [ + "7" + ], + "valueType": "int" + }, + { + "id": "service-name", + "operator": "=", + "scope": "resource", + "tag": "service.name", + "value": [ + "ic-ws-gw-ipc77" + ], + "valueType": "string" + }, + { + "id": "span-name", + "operator": "=", + "scope": "span", + "tag": "name", + "value": [ + "Accept Connection" + ], + "valueType": "string" + } + ], + "limit": 20, + "queryType": "traceqlSearch", + "refId": "A", + "tableType": "traces" + } + ], + "title": "Thread ID 7", + "type": "barchart" + }, + { + "datasource": { + "type": "tempo", + "uid": "P214B5B846CF3925F" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "fillOpacity": 80, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineWidth": 1, + "scaleDistribution": { + "type": "linear" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 25 + }, + "id": 4, + "options": { + "barRadius": 0, + "barWidth": 0.97, + "fullHighlight": false, + "groupWidth": 0.7, + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "orientation": "auto", + "showValue": "auto", + "stacking": "none", + "tooltip": { + "mode": "single", + "sort": "none" + }, + "xField": "Start time", + "xTickLabelRotation": 0, + "xTickLabelSpacing": 0 + }, + "targets": [ + { + "datasource": { + "type": "tempo", + "uid": "P214B5B846CF3925F" + }, + "filters": [ + { + "id": "d4ede8da", + "operator": "=", + "scope": "span", + "tag": "thread.id", + "value": [ + "8" + ], + "valueType": "int" + }, + { + "id": "service-name", + "operator": "=", + "scope": "resource", + "tag": "service.name", + "value": [ + "ic-ws-gw-ipc77" + ], + "valueType": "string" + }, + { + "id": "span-name", + "operator": "=", + "scope": "span", + "tag": "name", + "value": [ + "Accept Connection" + ], + "valueType": "string" + } + ], + "limit": 20, + "queryType": "traceqlSearch", + "refId": "A", + "tableType": "traces" + } + ], + "title": "Thread ID 8", + "type": "barchart" + }, + { + "datasource": { + "type": "tempo", + "uid": "P214B5B846CF3925F" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "fillOpacity": 80, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineWidth": 1, + "scaleDistribution": { + "type": "linear" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 25 + }, + "id": 9, + "options": { + "barRadius": 0, + "barWidth": 0.97, + "fullHighlight": false, + "groupWidth": 0.7, + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "orientation": "auto", + "showValue": "auto", + "stacking": "none", + "tooltip": { + "mode": "single", + "sort": "none" + }, + "xField": "Start time", + "xTickLabelRotation": 0, + "xTickLabelSpacing": 0 + }, + "targets": [ + { + "datasource": { + "type": "tempo", + "uid": "P214B5B846CF3925F" + }, + "filters": [ + { + "id": "d4ede8da", + "operator": "=", + "scope": "span", + "tag": "thread.id", + "value": [ + "9" + ], + "valueType": "int" + }, + { + "id": "service-name", + "operator": "=", + "scope": "resource", + "tag": "service.name", + "value": [ + "ic-ws-gw-ipc77" + ], + "valueType": "string" + }, + { + "id": "span-name", + "operator": "=", + "scope": "span", + "tag": "name", + "value": [ + "Accept Connection" + ], + "valueType": "string" + } + ], + "limit": 20, + "queryType": "traceqlSearch", + "refId": "A", + "tableType": "traces" + } + ], + "title": "Thread ID 9", + "type": "barchart" + } + ], + "refresh": false, + "schemaVersion": 39, + "tags": [], + "templating": { + "list": [] + }, + "time": { + "from": "now-3h", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "Client Connections by Thread ID", + "uid": "a360ec48-ff96-448b-9711-e3e80d339722", + "version": 3, + "weekStart": "" +} + + \ No newline at end of file diff --git a/telemetry/grafana-dashboards.yaml b/telemetry/grafana-dashboards.yaml new file mode 100644 index 0000000..9e2bb46 --- /dev/null +++ b/telemetry/grafana-dashboards.yaml @@ -0,0 +1,9 @@ +apiVersion: 1 + +providers: + - name: dashboards + type: file + updateIntervalSeconds: 30 + options: + path: /etc/dashboards + foldersFromFilesStructure: true \ No newline at end of file diff --git a/telemetry/grafana-datasources.yaml b/telemetry/grafana-datasources.yaml new file mode 100644 index 0000000..2e89a61 --- /dev/null +++ b/telemetry/grafana-datasources.yaml @@ -0,0 +1,7 @@ +apiVersion: 1 + +datasources: + - name: Tempo + type: tempo + url: http://tempo:3200 # tempo server + basicAuth: false \ No newline at end of file diff --git a/telemetry/otel-config.yaml b/telemetry/otel-config.yaml new file mode 100644 index 0000000..b87c34f --- /dev/null +++ b/telemetry/otel-config.yaml @@ -0,0 +1,22 @@ +receivers: + otlp: + protocols: + grpc: + +processors: + batch: + +exporters: + otlp: + endpoint: ${GRAFANA_TEMPO_ENDPOINT} + headers: + authorization: Basic ${GRAFANA_TEMPO_ACCESS_TOKEN} + tls: + insecure: ${GRAFANA_TEMPO_LOCAL} + +service: + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [otlp] diff --git a/telemetry/tempo-config.yaml b/telemetry/tempo-config.yaml new file mode 100644 index 0000000..0244794 --- /dev/null +++ b/telemetry/tempo-config.yaml @@ -0,0 +1,15 @@ +server: + http_listen_port: 3200 + +distributor: + receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4318 + +storage: + trace: + backend: local + local: + path: /tmp/tempo/blocks \ No newline at end of file