diff --git a/.github/workflows/ec2-deployment.yml b/.github/workflows/ec2-deployment.yml index 793b5c01..51168c2a 100644 --- a/.github/workflows/ec2-deployment.yml +++ b/.github/workflows/ec2-deployment.yml @@ -81,6 +81,5 @@ jobs: cd /home/ec2-user/enclave git pull - - docker stack deploy -c docker-compose.yml ciphernode-stack + ./deploy/deploy.sh enclave $IMAGE_NAME:$IMAGE_TAG diff --git a/deploy/.env.example b/deploy/.env.example new file mode 100644 index 00000000..14723a66 --- /dev/null +++ b/deploy/.env.example @@ -0,0 +1,4 @@ +RPC_URL=wss://eth-sepolia.g.alchemy.com/v2/API_KEY +SEPOLIA_ENCLAVE_ADDRESS=0xCe087F31e20E2F76b6544A2E4A74D4557C8fDf77 +SEPOLIA_CIPHERNODE_REGISTRY_ADDRESS=0x0952388f6028a9Eda93a5041a3B216Ea331d97Ab +SEPOLIA_FILTER_REGISTRY=0xcBaCE7C360b606bb554345b20884A28e41436934 diff --git a/deploy/.gitignore b/deploy/.gitignore new file mode 100644 index 00000000..0aef23bb --- /dev/null +++ b/deploy/.gitignore @@ -0,0 +1,3 @@ +*.secrets.json +.env +!example.secrets.json diff --git a/deploy/agg.yaml b/deploy/agg.yaml new file mode 100644 index 00000000..8321ce76 --- /dev/null +++ b/deploy/agg.yaml @@ -0,0 +1,15 @@ +address: "${ADDRESS}" +quic_port: ${QUIC_PORT} +enable_mdns: false +peers: + - "/dns4/cn1/udp/9091/quic-v1" + - "/dns4/cn2/udp/9092/quic-v1" + - "/dns4/cn3/udp/9093/quic-v1" + - "/dns4/aggregator/udp/9094/quic-v1" +chains: + - name: "sepolia" + rpc_url: "${RPC_URL}" + contracts: + enclave: "${SEPOLIA_ENCLAVE_ADDRESS}" + ciphernode_registry: "${SEPOLIA_CIPHERNODE_REGISTRY_ADDRESS}" + filter_registry: "${SEPOLIA_FILTER_REGISTRY}" diff --git a/deploy/build.sh b/deploy/build.sh new file mode 100755 index 00000000..a68aa2b2 --- /dev/null +++ b/deploy/build.sh @@ -0,0 +1,12 @@ +#!/usr/bin/env bash + +# Enable BuildKit +export DOCKER_BUILDKIT=1 + +mkdir -p /tmp/docker-cache + +time docker buildx build \ + --cache-from=type=local,src=/tmp/docker-cache \ + --cache-to=type=local,dest=/tmp/docker-cache \ + --load \ + -t ${1:-ghcr.io/gnosisguild/ciphernode} -f ./packages/ciphernode/Dockerfile . diff --git a/deploy/cn1.yaml b/deploy/cn1.yaml new file mode 100644 index 00000000..aefdc431 --- /dev/null +++ b/deploy/cn1.yaml @@ -0,0 +1,15 @@ +address: "${ADDRESS}" +quic_port: ${QUIC_PORT} +enable_mdns: false +peers: + - "/dns4/cn1/udp/9091/quic-v1" + - "/dns4/cn1/udp/9092/quic-v1" + - "/dns4/cn1/udp/9093/quic-v1" + - "/dns4/cn1/udp/9094/quic-v1" +chains: + - name: "sepolia" + rpc_url: "${RPC_URL}" + contracts: + enclave: "${SEPOLIA_ENCLAVE_ADDRESS}" + ciphernode_registry: "${SEPOLIA_CIPHERNODE_REGISTRY_ADDRESS}" + filter_registry: "${SEPOLIA_FILTER_REGISTRY}" diff --git a/deploy/cn2.yaml b/deploy/cn2.yaml new file mode 100644 index 00000000..8321ce76 --- /dev/null +++ b/deploy/cn2.yaml @@ -0,0 +1,15 @@ +address: "${ADDRESS}" +quic_port: ${QUIC_PORT} +enable_mdns: false +peers: + - "/dns4/cn1/udp/9091/quic-v1" + - "/dns4/cn2/udp/9092/quic-v1" + - "/dns4/cn3/udp/9093/quic-v1" + - "/dns4/aggregator/udp/9094/quic-v1" +chains: + - name: "sepolia" + rpc_url: "${RPC_URL}" + contracts: + enclave: "${SEPOLIA_ENCLAVE_ADDRESS}" + ciphernode_registry: "${SEPOLIA_CIPHERNODE_REGISTRY_ADDRESS}" + filter_registry: "${SEPOLIA_FILTER_REGISTRY}" diff --git a/deploy/cn3.yaml b/deploy/cn3.yaml new file mode 100644 index 00000000..8321ce76 --- /dev/null +++ b/deploy/cn3.yaml @@ -0,0 +1,15 @@ +address: "${ADDRESS}" +quic_port: ${QUIC_PORT} +enable_mdns: false +peers: + - "/dns4/cn1/udp/9091/quic-v1" + - "/dns4/cn2/udp/9092/quic-v1" + - "/dns4/cn3/udp/9093/quic-v1" + - "/dns4/aggregator/udp/9094/quic-v1" +chains: + - name: "sepolia" + rpc_url: "${RPC_URL}" + contracts: + enclave: "${SEPOLIA_ENCLAVE_ADDRESS}" + ciphernode_registry: "${SEPOLIA_CIPHERNODE_REGISTRY_ADDRESS}" + filter_registry: "${SEPOLIA_FILTER_REGISTRY}" diff --git a/deploy/copy-secrets.sh b/deploy/copy-secrets.sh new file mode 100755 index 00000000..2bfc6132 --- /dev/null +++ b/deploy/copy-secrets.sh @@ -0,0 +1,69 @@ +#!/usr/bin/env bash + +set_network_private_key() { + echo "Setting network private key for $1" + jq --arg key "$2" '.network_private_key = $key' "$1.secrets.json" > "$1.secrets.json.tmp" && mv "$1.secrets.json.tmp" "$1.secrets.json" +} + +# Set working directory to script location +cd "$(dirname "$0")" || exit 1 + +# Source file path (in current directory) +SOURCE="example.secrets.json" + +# Color codes +RED='\033[0;31m' +YELLOW='\033[1;33m' +NC='\033[0m' # No Color + +# List of target files +TARGETS=("cn1" "cn2" "cn3" "agg") + +# Sample network private keys +NETWORK_KEY_CN1="0x11a1e500a548b70d88184a1e042900c0ed6c57f8710bcc35dc8c85fa33d3f580" +NETWORK_KEY_CN2="0x21a1e500a548b70d88184a1e042900c0ed6c57f8710bcc35dc8c85fa33d3f580" +NETWORK_KEY_CN3="0x31a1e500a548b70d88184a1e042900c0ed6c57f8710bcc35dc8c85fa33d3f580" +NETWORK_KEY_AGG="0x41a1e500a548b70d88184a1e042900c0ed6c57f8710bcc35dc8c85fa33d3f580" +NET_KEYS=($NETWORK_KEY_CN1 $NETWORK_KEY_CN2 $NETWORK_KEY_CN3 $NETWORK_KEY_AGG) + +# Check if source file exists +if [ ! -f "$SOURCE" ]; then + echo "Error: Source file $SOURCE not found!" + exit 1 +fi + +i=0 +# Copy file to each target, skipping if exists +for target in "${TARGETS[@]}"; do + if [ -f "${target}.secrets.json" ]; then + echo "Skipping ${target}.secrets.json - file already exists" + else + cp "$SOURCE" "${target}.secrets.json" + set_network_private_key "${target}" "${NET_KEYS[${i:-0}]}" + ((i++)) + echo "Created ${target}.secrets.json" + fi +done + +echo "Copy operation completed!" + +# Check for unchanged files +echo -e "\nChecking for unchanged secret files..." +UNCHANGED_FILES=() + +for target in "${TARGETS[@]}"; do + if [ -f "${target}.secrets.json" ]; then + if cmp -s "$SOURCE" "${target}.secrets.json"; then + UNCHANGED_FILES+=("${target}.secrets.json") + fi + fi +done + +# Display warning if unchanged files found +if [ ${#UNCHANGED_FILES[@]} -gt 0 ]; then + echo -e "${RED}WARNING: The following files are identical to example.secrets.json:${NC}" + for file in "${UNCHANGED_FILES[@]}"; do + echo -e "${YELLOW}==> ${NC}${file}${YELLOW} <==${NC}" + done + echo -e "${RED}These files should be modified before use in production!${NC}" +fi diff --git a/deploy/deploy.sh b/deploy/deploy.sh new file mode 100755 index 00000000..0825ecda --- /dev/null +++ b/deploy/deploy.sh @@ -0,0 +1,53 @@ +#!/usr/bin/env bash + +TIMESTAMP=$(date +%s) +RUN_FILE="./deploy/tmp.docker-compose.${TIMESTAMP}.yml" +TEMPLATE_FILE="./deploy/docker-compose.yml" + +wait_ready() { + local STACK_NAME="$1" + until [ "$(docker stack services $STACK_NAME --format '{{.Replicas}}' | awk -F'/' '$1 != $2')" = "" ]; do + printf "." + sleep 1 + done + echo -ne "\r\033[K" + echo "Stack $STACK_NAME is ready!" +} + +wait_removed() { + local STACK_NAME="$1" + while docker stack ps $STACK_NAME >/dev/null 2>&1; do + printf "." + sleep 1 + done + echo -ne "\r\033[K" + echo "Stack $STACK_NAME is removed" +} + + +if [ -z "$1" ]; then + echo "Error: Please provide a stack name as an argument" + echo "Usage: $0 " + exit 1 +fi + +if [ -z "$2" ]; then + echo "Error: Please provide an image name as an argument" + echo "Usage: $0 " + exit 1 +fi + +# Check if docker-compose.yml exists +if [ ! -f "$TEMPLATE_FILE" ]; then + echo "Error: $TEMPLATE_FILE not found" + exit 1 +fi + +sed "s|{{IMAGE}}|$2|g" $TEMPLATE_FILE > "${RUN_FILE}" + +STACK_NAME=$1 +docker stack rm $STACK_NAME +wait_removed $STACK_NAME +docker stack deploy -c $RUN_FILE $STACK_NAME +wait_ready $STACK_NAME +rm ./deploy/tmp.*.* diff --git a/deploy/docker-compose.yml b/deploy/docker-compose.yml new file mode 100644 index 00000000..ef076133 --- /dev/null +++ b/deploy/docker-compose.yml @@ -0,0 +1,102 @@ +services: + cn1: + image: {{IMAGE}} + volumes: + - ./cn1.yaml:/home/ciphernode/.config/enclave/config.yaml:ro + - cn1-data:/home/ciphernode/.local/share/enclave + secrets: + - source: secrets_cn1 + target: secrets.json + env_file: .env + environment: + RUST_LOG: "info" + AGGREGATOR: "false" + ADDRESS: "0xbDA5747bFD65F08deb54cb465eB87D40e51B197E" + QUIC_PORT: 9091 + deploy: + replicas: 1 + endpoint_mode: dnsrr + networks: + - global-network + + cn2: + image: {{IMAGE}} + volumes: + - ./cn2.yaml:/home/ciphernode/.config/enclave/config.yaml:ro + - cn2-data:/home/ciphernode/.local/share/enclave + secrets: + - source: secrets_cn2 + target: secrets.json + env_file: .env + environment: + RUST_LOG: "info" + AGGREGATOR: "false" + ADDRESS: "0xdD2FD4581271e230360230F9337D5c0430Bf44C0" + QUIC_PORT: 9092 + deploy: + replicas: 1 + endpoint_mode: dnsrr + networks: + - global-network + + cn3: + image: {{IMAGE}} + volumes: + - ./cn3.yaml:/home/ciphernode/.config/enclave/config.yaml:ro + - cn3-data:/home/ciphernode/.local/share/enclave + secrets: + - source: secrets_cn3 + target: secrets.json + env_file: .env + environment: + RUST_LOG: "info" + AGGREGATOR: "false" + ADDRESS: "0x2546BcD3c84621e976D8185a91A922aE77ECEc30" + QUIC_PORT: 9093 + deploy: + replicas: 1 + endpoint_mode: dnsrr + networks: + - global-network + + aggregator: + image: {{IMAGE}} + depends_on: + - cn1 + volumes: + - ./agg.yaml:/home/ciphernode/.config/enclave/config.yaml:ro + - agg-data:/home/ciphernode/.local/share/enclave + secrets: + - source: secrets_agg + target: secrets.json + env_file: .env + environment: + RUST_LOG: "info" + AGGREGATOR: "true" + ADDRESS: "0x8626a6940E2eb28930eFb4CeF49B2d1F2C9C1199" + QUIC_PORT: 9094 + deploy: + replicas: 1 + endpoint_mode: dnsrr + networks: + - global-network + +secrets: + secrets_cn1: + file: cn1.secrets.json + secrets_cn2: + file: cn2.secrets.json + secrets_cn3: + file: cn3.secrets.json + secrets_agg: + file: agg.secrets.json + +volumes: + cn1-data: + cn2-data: + cn3-data: + agg-data: + +networks: + global-network: + driver: overlay diff --git a/deploy/example.secrets.json b/deploy/example.secrets.json new file mode 100644 index 00000000..72bb3ec1 --- /dev/null +++ b/deploy/example.secrets.json @@ -0,0 +1,5 @@ +{ + "password": "changeme", + "private_key": "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80", + "network_private_key": "0x11a1e500a548b70d88184a1e042900c0ed6c57f8710bcc35dc8c85fa33d3f580" +} \ No newline at end of file diff --git a/deploy/inspect.sh b/deploy/inspect.sh new file mode 100755 index 00000000..20959b5a --- /dev/null +++ b/deploy/inspect.sh @@ -0,0 +1,48 @@ +#!/usr/bin/env bash + +get_logs_by_version() { + local SERVICE_NAME=$1 + + # Get current version number + CURRENT_VERSION=$(docker service inspect --format '{{.Version.Index}}' $SERVICE_NAME) + + # Get all tasks with this version + TASK_IDS=$(docker service ps --filter "desired-state=running" \ + --format '{{.ID}}' $SERVICE_NAME) + + # Get logs from these specific tasks + for TASK_ID in $TASK_IDS; do + docker service logs --raw "$TASK_ID" + done +} + +echo "" +echo "=================================" +echo " CN1 " +echo "=================================" + +get_logs_by_version enclave_cn1 + + +echo "" +echo "=================================" +echo " CN2 " +echo "=================================" + +get_logs_by_version enclave_cn2 + + +echo "" +echo "=================================" +echo " CN3 " +echo "=================================" + +get_logs_by_version enclave_cn3 + + +echo "" +echo "=================================" +echo " AGG " +echo "=================================" + +get_logs_by_version enclave_aggregator diff --git a/deploy/swarm_deployment.md b/deploy/swarm_deployment.md new file mode 100644 index 00000000..6ea63779 --- /dev/null +++ b/deploy/swarm_deployment.md @@ -0,0 +1,156 @@ +# Setup a remote server + +Install docker + +``` +sudo apt-get update +sudo apt-get install -y ca-certificates curl gnupg lsb-release +sudo mkdir -m 0755 -p /etc/apt/keyrings +curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /etc/apt/keyrings/docker.gpg +echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.gpg] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null +sudo apt-get update +sudo apt-get install -y docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin +sudo docker run hello-world +``` + + +Initialize swarm + +``` +docker swarm init +``` + +NOTE: If you get an error about not being able to choose between IP addresses choose the more private IP address. + + +``` +docker swarm init --advertise-addr 10.49.0.5 +``` + +## Setting up Buildkit + +NOTE: You may need to setup buildkit: + +``` +echo '{ + "builder": { + "gc": { + "enabled": true, + "defaultKeepStorage": "20GB" + } + }, + "features": { + "buildkit": true, + "containerd-snapshotter": true + } +}' | sudo tee /etc/docker/daemon.json +``` + +and then restart the docker daemon + +``` +sudo systemctl restart docker +``` + +## Setup the repo + +Clone the repo + +``` +git clone https://github.com/gnosisguild/enclave.git +``` + +Move to the new folder: + +``` +cd enclave/ +``` + +Build the app + +``` +./deploy/build.sh +``` + +# Setup `.env` vars + +Copy the `.env.example` file to `.env` + +``` +cp ./deploy/.env.example ./deploy/.env +``` + +Alter the variables to reflect the correct values required for the stack: + +``` +export RPC_URL=wss://eth-sepolia.g.alchemy.com/v2/ +export SEPOLIA_ENCLAVE_ADDRESS=0xCe087F31e20E2F76b6544A2E4A74D4557C8fDf77 +export SEPOLIA_CIPHERNODE_REGISTRY_ADDRESS=0x0952388f6028a9Eda93a5041a3B216Ea331d97Ab +export SEPOLIA_FILTER_REGISTRY=0xcBaCE7C360b606bb554345b20884A28e41436934 +``` + +Pay special attention to the `RPC_URL` vars as here we use a standin API key value. + +You can peruse the yaml config files for the nodes to see how the vars are used within the config. + +# Secrets Setup Utils Script + +We have created a secrets setup utility to aid setting up the secrets for each node. + +To deploy with swarm we need to set up the secrets file for our cluster. + +## Run +```bash +./deploy/copy-secrets.sh +``` + +## What it does +- Copies `example.secrets.json` to create `cn1/2/3` and `agg.secrets.json` files +- Skips existing files +- Warns with yellow arrows (==>) if any files are identical to the example + +## Example output +```bash +Created cn1.secrets.json +Skipping cn2.secrets.json - file already exists + +==> cn1.secrets.json <== # Yellow arrows indicate files that need customization +``` + +Remember to modify any highlighted files before use with unique secrets. + +# Deploy a version to the stack + +To deploy + +``` +./deploy/deploy.sh enclave ghcr.io/gnosisguild/ciphernode:latest +``` + +This will deploy the following services: + +``` +❯ docker service ls +ID NAME MODE REPLICAS IMAGE PORTS +tr44go8vevh1 enclave_aggregator replicated 1/1 ghcr.io/gnosisguild/ciphernode:latest +kdqktv85xcuv enclave_cn1 replicated 1/1 ghcr.io/gnosisguild/ciphernode:latest +nguul381w6mu enclave_cn2 replicated 1/1 ghcr.io/gnosisguild/ciphernode:latest +zgmwmv7cd63j enclave_cn3 replicated 1/1 ghcr.io/gnosisguild/ciphernode:latest +``` + +# Get the logs + +You can get the logs: + +``` +docker service logs enclave_cn1 +``` + +Notice the line: + +``` +enclave_cn2.1.zom4r645ophf@nixos | 2024-12-19T23:47:08.582536Z INFO enclave: COMPILATION ID: 'painfully_fluent_crane' +``` + +This can help you identify which compilation you are looking at. This works by generating a unique ID based on the complication time. + diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml deleted file mode 100644 index c43f4d29..00000000 --- a/docker-compose.dev.yml +++ /dev/null @@ -1,16 +0,0 @@ -services: - cn1: - networks: - - cn-network - cn2: - networks: - - cn-network - cn3: - networks: - - cn-network - aggregator: - networks: - - cn-network - -networks: - cn-network: diff --git a/docker-compose.yml b/docker-compose.yml deleted file mode 100644 index 5573cce5..00000000 --- a/docker-compose.yml +++ /dev/null @@ -1,102 +0,0 @@ -services: - cn1: - image: ghcr.io/gnosisguild/ciphernode:latest - volumes: - - ./configs/cn1.yaml:/home/ciphernode/.config/enclave/config.yaml:ro - - cn1-data:/home/ciphernode/.local/share/enclave - secrets: - - secrets.json - environment: - RUST_LOG: "info" - AGGREGATOR: "false" - ports: - - target: 9091 - published: 9091 - protocol: udp - mode: host - deploy: - replicas: 1 - networks: - - global-network - - - cn2: - image: ghcr.io/gnosisguild/ciphernode:latest - depends_on: - - cn1 - volumes: - - ./configs/cn2.yaml:/home/ciphernode/.config/enclave/config.yaml:ro - - cn2-data:/home/ciphernode/.local/share/enclave - secrets: - - secrets.json - environment: - RUST_LOG: "info" - AGGREGATOR: "false" - ports: - - target: 9092 - published: 9092 - protocol: udp - mode: host - deploy: - replicas: 1 - networks: - - global-network - - cn3: - image: ghcr.io/gnosisguild/ciphernode:latest - depends_on: - - cn1 - volumes: - - ./configs/cn3.yaml:/home/ciphernode/.config/enclave/config.yaml:ro - - cn3-data:/home/ciphernode/.local/share/enclave - secrets: - - secrets.json - environment: - RUST_LOG: "info" - AGGREGATOR: "false" - ports: - - target: 9093 - published: 9093 - protocol: udp - mode: host - deploy: - replicas: 1 - networks: - - global-network - - - aggregator: - image: ghcr.io/gnosisguild/ciphernode:latest - depends_on: - - cn1 - volumes: - - ./configs/agg.yaml:/home/ciphernode/.config/enclave/config.yaml:ro - - agg-data:/home/ciphernode/.local/share/enclave - secrets: - - secrets.json - environment: - RUST_LOG: "info" - AGGREGATOR: "true" - ports: - - target: 9094 - published: 9094 - protocol: udp - mode: host - deploy: - replicas: 1 - networks: - - global-network - -secrets: - secrets.json: - file: ./configs/secrets.json - -volumes: - cn1-data: - cn2-data: - cn3-data: - agg-data: - -networks: - global-network: - driver: overlay diff --git a/packages/ciphernode/.dockerignore b/packages/ciphernode/.dockerignore index 7d50aa6c..521131cf 100644 --- a/packages/ciphernode/.dockerignore +++ b/packages/ciphernode/.dockerignore @@ -6,18 +6,7 @@ !Cargo.toml !Cargo.lock -# Allow core crate -!core/ -!core/Cargo.toml -!core/src/**/*.rs - -# net -!net/ -!net/Cargo.toml -!net/src/**/*.rs - - -# Allow all other workspace members (adjust paths as needed) +# Allow all other workspace members !*/Cargo.toml !*/src/**/*.rs diff --git a/packages/ciphernode/Cargo.lock b/packages/ciphernode/Cargo.lock index 0684a617..63966fe4 100644 --- a/packages/ciphernode/Cargo.lock +++ b/packages/ciphernode/Cargo.lock @@ -1676,6 +1676,20 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0" +[[package]] +name = "compile-time" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e55ede5279d4d7c528906853743abeb26353ae1e6c440fcd6d18316c2c2dd903" +dependencies = [ + "once_cell", + "proc-macro2", + "quote", + "rustc_version 0.4.0", + "semver 1.0.23", + "time", +] + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -2164,6 +2178,7 @@ dependencies = [ "anyhow", "cipher 0.1.0", "clap", + "compile-time", "config", "data", "dialoguer", @@ -2171,8 +2186,11 @@ dependencies = [ "enclave-core", "enclave_node", "hex", + "libp2p", "once_cell", + "petname", "phf", + "rand", "router", "serde", "serde_json", @@ -4583,6 +4601,20 @@ dependencies = [ "indexmap", ] +[[package]] +name = "petname" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cd31dcfdbbd7431a807ef4df6edd6473228e94d5c805e8cf671227a21bad068" +dependencies = [ + "anyhow", + "clap", + "itertools 0.13.0", + "proc-macro2", + "quote", + "rand", +] + [[package]] name = "pharos" version = "0.5.3" diff --git a/packages/ciphernode/Cargo.toml b/packages/ciphernode/Cargo.toml index 8ce8f55a..ae75b191 100644 --- a/packages/ciphernode/Cargo.toml +++ b/packages/ciphernode/Cargo.toml @@ -37,6 +37,7 @@ bs58 = "0.5.1" base64 = "0.22.1" clap = { version = "4.5.17", features = ["derive"] } cipher = { path = "./cipher" } +compile-time = "0.2.0" dirs = "5.0.1" data = { path = "./data" } shellexpand = "3.1.0" diff --git a/packages/ciphernode/Dockerfile b/packages/ciphernode/Dockerfile index 3f559d95..cb100076 100644 --- a/packages/ciphernode/Dockerfile +++ b/packages/ciphernode/Dockerfile @@ -7,18 +7,42 @@ RUN yarn install && yarn compile # Build stage FROM rust:1.81 AS ciphernode-builder +# Force incremental +ENV CARGO_INCREMENTAL=1 +ENV RUSTC_FORCE_INCREMENTAL=1 +ENV CARGO_BUILD_JOBS=8 + # Create build directory WORKDIR /build/packages/ciphernode -COPY ./packages/ciphernode ./ COPY --from=evm-builder /build/packages/evm/artifacts ../evm/artifacts COPY --from=evm-builder /build/packages/evm/deployments ../evm/deployments + +# Copy workpace Cargo.toml +COPY ./packages/ciphernode/Cargo.toml ./Cargo.toml +COPY ./packages/ciphernode/Cargo.lock ./Cargo.lock +COPY ./packages/ciphernode/*/Cargo.toml ./ + +# Build all dependencies and add them to the build cache +RUN mkdir -p src && \ + echo "fn main() {}" > src/main.rs && \ + for d in ./*/ ; do \ + if [ -f "$d/Cargo.toml" ]; then \ + mkdir -p "$d/src" && \ + echo "fn main() {}" > "$d/src/lib.rs"; \ + fi \ + done + +RUN cargo build --release + +COPY ./packages/ciphernode . + RUN cargo build --release # Runtime stage FROM debian:stable-slim # Install runtime dependencies -RUN apt-get update && apt-get install -y --no-install-recommends iptables ca-certificates jq && \ +RUN apt-get update && apt-get install -y --no-install-recommends iptables dnsutils iputils-ping ca-certificates jq && \ apt-get clean && rm -rf /var/lib/apt/lists/* # Create non-root user @@ -44,5 +68,4 @@ ENV DATA_DIR=/home/ciphernode/.local/share/enclave ENV RUST_LOG=info # Add entrypoint script - -ENTRYPOINT ["ciphernode-entrypoint.sh"] \ No newline at end of file +ENTRYPOINT ["ciphernode-entrypoint.sh"] diff --git a/packages/ciphernode/ciphernode-entrypoint.sh b/packages/ciphernode/ciphernode-entrypoint.sh index 3fa42fdd..917b8411 100644 --- a/packages/ciphernode/ciphernode-entrypoint.sh +++ b/packages/ciphernode/ciphernode-entrypoint.sh @@ -20,9 +20,10 @@ fi # Read secrets from the JSON file PRIVATE_KEY=$(jq -r '.private_key' "$SECRETS_FILE") PASSWORD=$(jq -r '.password' "$SECRETS_FILE") +NETWORK_PRIVATE_KEY=$(jq -r '.network_private_key' "$SECRETS_FILE") -if [ -z "$PRIVATE_KEY" ] || [ -z "$PASSWORD" ]; then - echo "Error: Missing 'private_key' or 'password' in secrets file!" +if [ -z "$PRIVATE_KEY" ] || [ -z "$PASSWORD" ] || [ -z "$NETWORK_PRIVATE_KEY" ]; then + echo "Error: Missing 'private_key', 'password' or 'network_private_key' in secrets file!" exit 1 fi @@ -30,6 +31,10 @@ fi echo "Setting password" enclave password create --config "$CONFIG_FILE" --password "$PASSWORD" +# Set network private key +echo "Setting network private key" +enclave net set-key --config "$CONFIG_FILE" --net-keypair "$NETWORK_PRIVATE_KEY" + if [ "$AGGREGATOR" = "true" ]; then echo "Setting private key" enclave wallet set --config "$CONFIG_FILE" --private-key "$PRIVATE_KEY" diff --git a/packages/ciphernode/enclave/Cargo.toml b/packages/ciphernode/enclave/Cargo.toml index d38cf579..c2595c4b 100644 --- a/packages/ciphernode/enclave/Cargo.toml +++ b/packages/ciphernode/enclave/Cargo.toml @@ -19,6 +19,7 @@ dialoguer = "0.11.0" enclave-core = { path = "../core" } enclave_node = { path = "../enclave_node" } hex = { workspace = true } +libp2p = { workspace = true } once_cell = "1.20.2" router = { path = "../router" } tokio = { workspace = true } @@ -28,6 +29,9 @@ tracing = { workspace = true } tracing-subscriber = { workspace = true } zeroize = { workspace = true } phf = { version = "0.11", features = ["macros"] } +compile-time = { workspace = true } +rand = { workspace = true } +petname = "2.0.2" [build-dependencies] serde_json = { workspace = true } diff --git a/packages/ciphernode/enclave/src/commands/init.rs b/packages/ciphernode/enclave/src/commands/init.rs index 65b16658..dbfe9952 100644 --- a/packages/ciphernode/enclave/src/commands/init.rs +++ b/packages/ciphernode/enclave/src/commands/init.rs @@ -1,7 +1,9 @@ -use crate::commands::password::{self, PasswordCommands}; -use anyhow::anyhow; -use anyhow::bail; -use anyhow::Result; +use crate::commands::{ + net, + password::{self, PasswordCommands}, +}; +use alloy::primitives::Address; +use anyhow::{anyhow, bail, Result}; use config::load_config; use config::RPC; use dialoguer::{theme::ColorfulTheme, Input}; @@ -27,21 +29,10 @@ fn validate_rpc_url(url: &String) -> Result<()> { } fn validate_eth_address(address: &String) -> Result<()> { - if address.is_empty() { - return Ok(()); + match Address::parse_checksummed(address, None) { + Ok(_) => Ok(()), + Err(e) => bail!("Invalid Ethereum address: {}", e), } - if !address.starts_with("0x") { - bail!("Address must start with '0x'") - } - if address.len() != 42 { - bail!("Address must be 42 characters long (including '0x')") - } - for c in address[2..].chars() { - if !c.is_ascii_hexdigit() { - bail!("Address must contain only hexadecimal characters") - } - } - Ok(()) } #[instrument(name = "app", skip_all, fields(id = get_tag()))] @@ -50,6 +41,8 @@ pub async fn execute( eth_address: Option, password: Option, skip_eth: bool, + net_keypair: Option, + generate_net_keypair: bool, ) -> Result<()> { let rpc_url = match rpc_url { Some(url) => { @@ -133,10 +126,22 @@ chains: password, overwrite: true, }, - config, + &config, ) .await?; + if generate_net_keypair { + net::execute(net::NetCommands::GenerateKey, &config).await?; + } else { + net::execute( + net::NetCommands::SetKey { + net_keypair: net_keypair, + }, + &config, + ) + .await?; + } + println!("Enclave configuration successfully created!"); println!("You can start your node using `enclave start`"); diff --git a/packages/ciphernode/enclave/src/commands/mod.rs b/packages/ciphernode/enclave/src/commands/mod.rs index ec26de49..92cc422b 100644 --- a/packages/ciphernode/enclave/src/commands/mod.rs +++ b/packages/ciphernode/enclave/src/commands/mod.rs @@ -56,5 +56,13 @@ pub enum Commands { /// Skip asking for eth #[arg(long = "skip-eth")] skip_eth: bool, + + /// The network private key (ed25519) + #[arg(long = "net-keypair")] + net_keypair: Option, + + /// Generate a new network keypair + #[arg(long = "generate-net-keypair")] + generate_net_keypair: bool, }, } diff --git a/packages/ciphernode/enclave/src/commands/net/generate.rs b/packages/ciphernode/enclave/src/commands/net/generate.rs new file mode 100644 index 00000000..1e747379 --- /dev/null +++ b/packages/ciphernode/enclave/src/commands/net/generate.rs @@ -0,0 +1,37 @@ +use actix::Actor; +use anyhow::{bail, Result}; +use cipher::Cipher; +use config::AppConfig; +use enclave_core::{EventBus, GetErrors}; +use enclave_node::get_repositories; +use libp2p::identity::Keypair; +use zeroize::Zeroize; + +pub async fn execute(config: &AppConfig) -> Result<()> { + let kp = Keypair::generate_ed25519(); + println!( + "Generated new keypair with peer ID: {}", + kp.public().to_peer_id() + ); + let mut bytes = kp.try_into_ed25519()?.to_bytes().to_vec(); + let cipher = Cipher::from_config(config).await?; + let encrypted = cipher.encrypt_data(&mut bytes.clone())?; + let bus = EventBus::new(true).start(); + let repositories = get_repositories(&config, &bus)?; + bytes.zeroize(); + + // NOTE: We are writing an encrypted string here + repositories.libp2p_keypair().write(&encrypted); + + let errors = bus.send(GetErrors).await?; + if errors.len() > 0 { + for error in errors.iter() { + println!("{error}"); + } + bail!("There were errors generating the network keypair") + } + + println!("Network keypair has been successfully generated and encrypted."); + + Ok(()) +} diff --git a/packages/ciphernode/enclave/src/commands/net/mod.rs b/packages/ciphernode/enclave/src/commands/net/mod.rs index 913cf69e..37b6b11e 100644 --- a/packages/ciphernode/enclave/src/commands/net/mod.rs +++ b/packages/ciphernode/enclave/src/commands/net/mod.rs @@ -1,4 +1,6 @@ +mod generate; mod purge; +mod set; use anyhow::*; use clap::Subcommand; use config::AppConfig; @@ -7,11 +9,22 @@ use config::AppConfig; pub enum NetCommands { /// Purge the current peer ID from the database. PurgeId, + + /// Generate a new network keypair + GenerateKey, + + /// Set the network private key + SetKey { + #[arg(long = "net-keypair")] + net_keypair: Option, + }, } -pub async fn execute(command: NetCommands, config: AppConfig) -> Result<()> { +pub async fn execute(command: NetCommands, config: &AppConfig) -> Result<()> { match command { NetCommands::PurgeId => purge::execute(&config).await?, + NetCommands::GenerateKey => generate::execute(&config).await?, + NetCommands::SetKey { net_keypair } => set::execute(&config, net_keypair).await?, }; Ok(()) diff --git a/packages/ciphernode/enclave/src/commands/net/purge.rs b/packages/ciphernode/enclave/src/commands/net/purge.rs index 3c2b3aae..520a75d5 100644 --- a/packages/ciphernode/enclave/src/commands/net/purge.rs +++ b/packages/ciphernode/enclave/src/commands/net/purge.rs @@ -7,7 +7,7 @@ use enclave_node::get_repositories; pub async fn execute(config: &AppConfig) -> Result<()> { let bus = EventBus::new(true).start(); let repositories = get_repositories(&config, &bus)?; - repositories.libp2pid().clear(); + repositories.libp2p_keypair().clear(); println!("Peer ID has been purged. A new Peer ID will be generated upon restart."); Ok(()) } diff --git a/packages/ciphernode/enclave/src/commands/net/set.rs b/packages/ciphernode/enclave/src/commands/net/set.rs new file mode 100644 index 00000000..853ecde0 --- /dev/null +++ b/packages/ciphernode/enclave/src/commands/net/set.rs @@ -0,0 +1,59 @@ +use actix::Actor; +use alloy::primitives::hex; +use anyhow::{bail, Result}; +use cipher::Cipher; +use config::AppConfig; +use dialoguer::{theme::ColorfulTheme, Password}; +use enclave_core::{EventBus, GetErrors}; +use enclave_node::get_repositories; +use libp2p::identity::Keypair; + +pub fn create_keypair(input: &String) -> Result { + match hex::check(input) { + Ok(()) => match Keypair::ed25519_from_bytes(hex::decode(input)?) { + Ok(kp) => Ok(kp), + Err(e) => bail!("Invalid network keypair: {}", e), + }, + Err(e) => bail!("Error decoding network keypair: {}", e), + } +} + +fn validate_keypair_input(input: &String) -> Result<()> { + create_keypair(input).map(|_| ()) +} + +pub async fn execute(config: &AppConfig, net_keypair: Option) -> Result<()> { + let input = if let Some(net_keypair) = net_keypair { + let kp = create_keypair(&net_keypair)?; + kp.try_into_ed25519()?.to_bytes().to_vec() + } else { + let kp = Password::with_theme(&ColorfulTheme::default()) + .with_prompt("Enter your network private key") + .validate_with(validate_keypair_input) + .interact()? + .trim() + .to_string(); + let kp = create_keypair(&kp)?; + kp.try_into_ed25519()?.to_bytes().to_vec() + }; + + let cipher = Cipher::from_config(config).await?; + let encrypted = cipher.encrypt_data(&mut input.clone())?; + let bus = EventBus::new(true).start(); + let repositories = get_repositories(&config, &bus)?; + + // NOTE: We are writing an encrypted string here + repositories.libp2p_keypair().write(&encrypted); + + let errors = bus.send(GetErrors).await?; + if errors.len() > 0 { + for error in errors.iter() { + println!("{error}"); + } + bail!("There were errors setting the network keypair") + } + + println!("Network keypair has been successfully encrypted."); + + Ok(()) +} diff --git a/packages/ciphernode/enclave/src/commands/password/mod.rs b/packages/ciphernode/enclave/src/commands/password/mod.rs index 5ebaf984..fe6259d1 100644 --- a/packages/ciphernode/enclave/src/commands/password/mod.rs +++ b/packages/ciphernode/enclave/src/commands/password/mod.rs @@ -30,7 +30,7 @@ pub enum PasswordCommands { }, } -pub async fn execute(command: PasswordCommands, config: AppConfig) -> Result<()> { +pub async fn execute(command: PasswordCommands, config: &AppConfig) -> Result<()> { match command { PasswordCommands::Create { password, diff --git a/packages/ciphernode/enclave/src/commands/wallet/set.rs b/packages/ciphernode/enclave/src/commands/wallet/set.rs index 0e1faac5..fcafd38e 100644 --- a/packages/ciphernode/enclave/src/commands/wallet/set.rs +++ b/packages/ciphernode/enclave/src/commands/wallet/set.rs @@ -1,4 +1,5 @@ use actix::Actor; +use alloy::{hex::FromHex, primitives::FixedBytes, signers::local::PrivateKeySigner}; use anyhow::{anyhow, bail, Result}; use cipher::Cipher; use config::AppConfig; @@ -7,33 +8,13 @@ use enclave_core::{EventBus, GetErrors}; use enclave_node::get_repositories; pub fn validate_private_key(input: &String) -> Result<()> { - // Require 0x prefix - if !input.starts_with("0x") { - return Err(anyhow!( - "Invalid private key format: must start with '0x' prefix" - )); - } - - // Remove 0x prefix - let key = &input[2..]; - - // Check length - if key.len() != 64 { - return Err(anyhow!( - "Invalid private key length: {}. Expected 64 characters after '0x' prefix", - key.len() - )); - } - - // Validate hex characters and convert to bytes - let _ = (0..key.len()) - .step_by(2) - .map(|i| u8::from_str_radix(&key[i..i + 2], 16)) - .collect::, _>>() - .map_err(|e| anyhow!("Invalid hex character: {}", e))?; - + let bytes = + FixedBytes::<32>::from_hex(input).map_err(|e| anyhow!("Invalid private key: {}", e))?; + let _ = + PrivateKeySigner::from_bytes(&bytes).map_err(|e| anyhow!("Invalid private key: {}", e))?; Ok(()) } + pub async fn execute(config: &AppConfig, private_key: Option) -> Result<()> { let input = if let Some(private_key) = private_key { validate_private_key(&private_key)?; diff --git a/packages/ciphernode/enclave/src/compile_id.rs b/packages/ciphernode/enclave/src/compile_id.rs new file mode 100644 index 00000000..38b4b9d4 --- /dev/null +++ b/packages/ciphernode/enclave/src/compile_id.rs @@ -0,0 +1,13 @@ +use petname::{Generator, Petnames}; +use rand::rngs::StdRng; +use rand::SeedableRng; + +static COMPILE_ID: u64 = compile_time::unix!(); + +/// Generate a unique compilation ID for the build based on the time of compilation +pub fn generate_id() -> String { + let mut rng = StdRng::seed_from_u64(COMPILE_ID); + Petnames::small() + .generate(&mut rng, 3, "_") + .unwrap_or("default-name".to_owned()) +} diff --git a/packages/ciphernode/enclave/src/main.rs b/packages/ciphernode/enclave/src/main.rs index 808b7629..e655fcc6 100644 --- a/packages/ciphernode/enclave/src/main.rs +++ b/packages/ciphernode/enclave/src/main.rs @@ -3,9 +3,11 @@ use clap::Parser; use commands::{aggregator, init, net, password, start, wallet, Commands}; use config::load_config; use enclave_core::{get_tag, set_tag}; -use tracing::instrument; +use tracing::{info, instrument}; use tracing_subscriber::EnvFilter; + pub mod commands; +mod compile_id; const OWO: &str = r#" ___ ___ ___ ___ ___ @@ -55,11 +57,23 @@ impl Cli { eth_address, password, skip_eth, - } => init::execute(rpc_url, eth_address, password, skip_eth).await?, - Commands::Password { command } => password::execute(command, config).await?, + net_keypair, + generate_net_keypair, + } => { + init::execute( + rpc_url, + eth_address, + password, + skip_eth, + net_keypair, + generate_net_keypair, + ) + .await? + } + Commands::Password { command } => password::execute(command, &config).await?, Commands::Aggregator { command } => aggregator::execute(command, config).await?, Commands::Wallet { command } => wallet::execute(command, config).await?, - Commands::Net { command } => net::execute(command, config).await?, + Commands::Net { command } => net::execute(command, &config).await?, } Ok(()) @@ -85,6 +99,9 @@ pub async fn main() { // .with_env_filter("[app{id=cn4}]=info") // .with_env_filter("[app{id=ag}]=info") .init(); + + info!("COMPILATION ID: '{}'", compile_id::generate_id()); + let cli = Cli::parse(); // Set the tag for all future traces diff --git a/packages/ciphernode/enclave_node/src/aggregator.rs b/packages/ciphernode/enclave_node/src/aggregator.rs index b215ce8f..e9fbbf6a 100644 --- a/packages/ciphernode/enclave_node/src/aggregator.rs +++ b/packages/ciphernode/enclave_node/src/aggregator.rs @@ -83,7 +83,7 @@ pub async fn setup_aggregator( &cipher, config.quic_port(), config.enable_mdns(), - repositories.libp2pid(), + repositories.libp2p_keypair(), ) .await?; diff --git a/packages/ciphernode/enclave_node/src/ciphernode.rs b/packages/ciphernode/enclave_node/src/ciphernode.rs index 5205d9be..bf71c73c 100644 --- a/packages/ciphernode/enclave_node/src/ciphernode.rs +++ b/packages/ciphernode/enclave_node/src/ciphernode.rs @@ -72,7 +72,7 @@ pub async fn setup_ciphernode( &cipher, config.quic_port(), config.enable_mdns(), - repositories.libp2pid(), + repositories.libp2p_keypair(), ) .await?; diff --git a/packages/ciphernode/net/src/bin/p2p_test.rs b/packages/ciphernode/net/src/bin/p2p_test.rs index e44f43b8..b34b6ce9 100644 --- a/packages/ciphernode/net/src/bin/p2p_test.rs +++ b/packages/ciphernode/net/src/bin/p2p_test.rs @@ -1,4 +1,6 @@ use anyhow::Result; +use net::correlation_id::CorrelationId; +use net::events::{NetworkPeerCommand, NetworkPeerEvent}; use net::NetworkPeer; use std::time::Duration; use std::{collections::HashSet, env, process}; @@ -18,8 +20,8 @@ async fn main() -> Result<()> { .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"))) .with(tracing_subscriber::fmt::layer()) .init(); - let name = env::args().nth(1).expect("need name"); - + let name = env::args().nth(2).expect("need name"); + let topic = "test-topic"; println!("{} starting up", name); let udp_port = env::var("QUIC_PORT") @@ -42,7 +44,7 @@ async fn main() -> Result<()> { // Extract input and outputs let tx = peer.tx(); - let mut rx = peer.rx().unwrap(); + let mut rx = peer.rx(); let router_task = tokio::spawn({ let name = name.clone(); @@ -60,7 +62,12 @@ async fn main() -> Result<()> { // Send our message first println!("{} sending message", name); - tx.send(name.as_bytes().to_vec()).await?; + tx.send(NetworkPeerCommand::GossipPublish { + correlation_id: CorrelationId::new(), + topic: topic.to_string(), + data: name.as_bytes().to_vec(), + }) + .await?; println!("{} message sent", name); let expected: HashSet = vec![ @@ -79,8 +86,8 @@ async fn main() -> Result<()> { // Wrap the message receiving loop in a timeout let receive_result = timeout(Duration::from_secs(10), async { while received != expected { - if let Some(msg) = rx.recv().await { - match String::from_utf8(msg) { + match rx.recv().await? { + NetworkPeerEvent::GossipData(msg) => match String::from_utf8(msg) { Ok(msg) => { if !received.contains(&msg) { println!("{} received '{}'", name, msg); @@ -88,7 +95,8 @@ async fn main() -> Result<()> { } } Err(e) => println!("{} received invalid UTF8: {}", name, e), - } + }, + _ => (), } } Ok::<(), anyhow::Error>(()) diff --git a/packages/ciphernode/net/src/correlation_id.rs b/packages/ciphernode/net/src/correlation_id.rs new file mode 100644 index 00000000..51265b62 --- /dev/null +++ b/packages/ciphernode/net/src/correlation_id.rs @@ -0,0 +1,25 @@ +use std::{ + fmt::Display, + sync::atomic::{AtomicUsize, Ordering}, +}; + +static NEXT_CORRELATION_ID: AtomicUsize = AtomicUsize::new(1); + +/// CorrelationId provides a way to correlate commands and the events they create. +#[derive(Debug, Clone)] +pub struct CorrelationId { + id: usize, +} + +impl CorrelationId { + pub fn new() -> Self { + let id = NEXT_CORRELATION_ID.fetch_add(1, Ordering::SeqCst); + Self { id } + } +} + +impl Display for CorrelationId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.id) + } +} diff --git a/packages/ciphernode/net/src/dialer.rs b/packages/ciphernode/net/src/dialer.rs new file mode 100644 index 00000000..c6933014 --- /dev/null +++ b/packages/ciphernode/net/src/dialer.rs @@ -0,0 +1,210 @@ +use anyhow::Context; +use anyhow::Result; +use futures::future::join_all; +use libp2p::{ + multiaddr::Protocol, + swarm::{dial_opts::DialOpts, ConnectionId, DialError}, + Multiaddr, +}; +use std::net::ToSocketAddrs; +use tokio::select; +use tokio::sync::{broadcast, mpsc}; +use tokio::time::{sleep, timeout, Duration}; +use tracing::error; +use tracing::info; + +use crate::{ + events::{NetworkPeerCommand, NetworkPeerEvent}, + retry::{retry_with_backoff, to_retry, RetryError, BACKOFF_DELAY, BACKOFF_MAX_RETRIES}, +}; + +/// Dial a single Multiaddr with retries and return an error should those retries not work +async fn dial_multiaddr( + cmd_tx: &mpsc::Sender, + event_tx: &broadcast::Sender, + multiaddr_str: &str, +) -> Result<()> { + let multiaddr = &multiaddr_str.parse()?; + info!("Now dialing in to {}", multiaddr); + retry_with_backoff( + || attempt_connection(cmd_tx, event_tx, multiaddr), + BACKOFF_MAX_RETRIES, + BACKOFF_DELAY, + ) + .await?; + Ok(()) +} + +fn trace_error(r: Result<()>) { + if let Err(err) = r { + error!("{}", err); + } +} + +/// Initiates connections to multiple network peers +/// +/// # Arguments +/// * `cmd_tx` - Sender for network peer commands +/// * `event_tx` - Broadcast sender for peer events +/// * `peers` - List of peer addresses to connect to +pub async fn dial_peers( + cmd_tx: &mpsc::Sender, + event_tx: &broadcast::Sender, + peers: &Vec, +) -> Result<()> { + let futures: Vec<_> = peers + .iter() + .map(|addr| dial_multiaddr(cmd_tx, event_tx, addr)) + .collect(); + let results = join_all(futures).await; + results.into_iter().for_each(trace_error); + Ok(()) +} + +/// Attempt a connection with retrys to a multiaddr return an error if the connection could not be resolved after the retries. +async fn attempt_connection( + cmd_tx: &mpsc::Sender, + event_tx: &broadcast::Sender, + multiaddr: &Multiaddr, +) -> Result<(), RetryError> { + let mut event_rx = event_tx.subscribe(); + let multi = get_resolved_multiaddr(multiaddr).map_err(to_retry)?; + let opts: DialOpts = multi.clone().into(); + let dial_connection = opts.connection_id(); + info!("Dialing: '{}' with connection '{}'", multi, dial_connection); + cmd_tx + .send(NetworkPeerCommand::Dial(opts)) + .await + .map_err(to_retry)?; + wait_for_connection(&mut event_rx, dial_connection).await +} + +/// Wait for results of a retry based on a given correlation id and return the correct variant of +/// RetryError depending on the result from the downstream event +async fn wait_for_connection( + event_rx: &mut broadcast::Receiver, + dial_connection: ConnectionId, +) -> Result<(), RetryError> { + loop { + // Create a timeout future that can be reset + select! { + result = event_rx.recv() => { + match result.map_err(to_retry)? { + NetworkPeerEvent::ConnectionEstablished { connection_id } => { + if connection_id == dial_connection { + info!("Connection Established"); + return Ok(()); + } + } + NetworkPeerEvent::DialError { error } => { + info!("DialError!"); + return match error.as_ref() { + // If we are dialing ourself then we should just fail + DialError::NoAddresses { .. } => { + info!("DialError received. Returning RetryError::Failure"); + Err(RetryError::Failure(error.clone().into())) + } + // Try again otherwise + _ => Err(RetryError::Retry(error.clone().into())), + }; + } + NetworkPeerEvent::OutgoingConnectionError { + connection_id, + error, + } => { + info!("OutgoingConnectionError!"); + if connection_id == dial_connection { + info!( + "Connection {} failed because of error {}. Retrying...", + connection_id, error + ); + return match error.as_ref() { + // If we are dialing ourself then we should just fail + DialError::NoAddresses { .. } => { + Err(RetryError::Failure(error.clone().into())) + } + // Try again otherwise + _ => Err(RetryError::Retry(error.clone().into())), + }; + } + } + _ => (), + } + } + _ = sleep(Duration::from_secs(60)) => { + info!("Connection attempt timed out after 60 seconds of no events"); + return Err(RetryError::Retry(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "Connection attempt timed out", + ).into())); + } + } + } +} + +/// Convert a Multiaddr to use a specific ip address with the ip4 or ip6 protocol +fn dns_to_ip_addr(original: &Multiaddr, ip_str: &str) -> Result { + let ip = ip_str.parse()?; + let mut new_addr = Multiaddr::empty(); + let mut skip_next = false; + + for proto in original.iter() { + if skip_next { + skip_next = false; + continue; + } + + match proto { + Protocol::Dns4(_) | Protocol::Dns6(_) => { + new_addr.push(Protocol::Ip4(ip)); + skip_next = false; + } + _ => new_addr.push(proto), + } + } + + Ok(new_addr) +} + +/// Detect the DNS host from a multiaddr +fn extract_dns_host(addr: &Multiaddr) -> Option { + // Iterate through the protocols in the multiaddr + for proto in addr.iter() { + match proto { + // Match on DNS4 or DNS6 protocols + Protocol::Dns4(hostname) | Protocol::Dns6(hostname) => { + return Some(hostname.to_string()) + } + _ => continue, + } + } + None +} + +/// If the Multiaddr uses a DNS domain look it up and return a multiaddr that uses a resolved IP +/// address +fn get_resolved_multiaddr(value: &Multiaddr) -> Result { + if let Some(domain) = extract_dns_host(value) { + let ip = resolve_ipv4(&domain)?; + let multi = dns_to_ip_addr(value, &ip)?; + return Ok(multi); + } else { + Ok(value.clone()) + } +} + +fn resolve_ipv4(domain: &str) -> Result { + let addr = format!("{}:0", domain) + .to_socket_addrs()? + .find(|addr| addr.ip().is_ipv4()) + .context("no IPv4 addresses found")?; + Ok(addr.ip().to_string()) +} + +fn resolve_ipv6(domain: &str) -> Result { + let addr = format!("{}:0", domain) + .to_socket_addrs()? + .find(|addr| addr.ip().is_ipv6()) + .context("no IPv6 addresses found")?; + Ok(addr.ip().to_string()) +} diff --git a/packages/ciphernode/net/src/events.rs b/packages/ciphernode/net/src/events.rs new file mode 100644 index 00000000..196ea48f --- /dev/null +++ b/packages/ciphernode/net/src/events.rs @@ -0,0 +1,46 @@ +use std::sync::Arc; + +use actix::Message; +use libp2p::{ + gossipsub::{MessageId, PublishError}, + swarm::{dial_opts::DialOpts, ConnectionId, DialError}, +}; + +use crate::correlation_id::CorrelationId; + +/// NetworkPeer Commands are sent to the network peer over a mspc channel +pub enum NetworkPeerCommand { + GossipPublish { + topic: String, + data: Vec, + correlation_id: CorrelationId, + }, + Dial(DialOpts), +} + +/// NetworkPeerEvents are broadcast over a broadcast channel to whom ever wishes to listen +#[derive(Message, Clone, Debug)] +#[rtype(result = "anyhow::Result<()>")] +pub enum NetworkPeerEvent { + /// Bytes have been broadcast over the network + GossipData(Vec), + /// There was an Error publishing bytes over the network + GossipPublishError { + correlation_id: CorrelationId, + error: Arc, + }, + /// Data was successfully published over the network as far as we know. + GossipPublished { + correlation_id: CorrelationId, + message_id: MessageId, + }, + /// There was an error Dialing a peer + DialError { error: Arc }, + /// A connection was established to a peer + ConnectionEstablished { connection_id: ConnectionId }, + /// There was an error creating a connection + OutgoingConnectionError { + connection_id: ConnectionId, + error: Arc, + }, +} diff --git a/packages/ciphernode/net/src/lib.rs b/packages/ciphernode/net/src/lib.rs index 42f3da11..35d4f37e 100644 --- a/packages/ciphernode/net/src/lib.rs +++ b/packages/ciphernode/net/src/lib.rs @@ -1,8 +1,12 @@ #![crate_name = "net"] #![crate_type = "lib"] +pub mod correlation_id; +mod dialer; +pub mod events; mod network_manager; mod network_peer; +mod retry; pub use network_manager::*; pub use network_peer::*; diff --git a/packages/ciphernode/net/src/network_manager.rs b/packages/ciphernode/net/src/network_manager.rs index 8969c908..7c26f1e1 100644 --- a/packages/ciphernode/net/src/network_manager.rs +++ b/packages/ciphernode/net/src/network_manager.rs @@ -1,50 +1,58 @@ +use crate::correlation_id::CorrelationId; +use crate::events::NetworkPeerCommand; +use crate::events::NetworkPeerEvent; use crate::NetworkPeer; /// Actor for connecting to an libp2p client via it's mpsc channel interface /// This Actor should be responsible for use actix::prelude::*; -use anyhow::anyhow; -use anyhow::Result; +use anyhow::{bail, Result}; use cipher::Cipher; use data::Repository; use enclave_core::{EnclaveEvent, EventBus, EventId, Subscribe}; use libp2p::identity::ed25519; use std::collections::HashSet; use std::sync::Arc; -use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::select; +use tokio::sync::broadcast; +use tokio::sync::mpsc; use tracing::{error, info, instrument, trace}; /// NetworkManager Actor converts between EventBus events and Libp2p events forwarding them to a /// NetworkPeer for propagation over the p2p network pub struct NetworkManager { bus: Addr, - tx: Sender>, + tx: mpsc::Sender, sent_events: HashSet, + topic: String, } impl Actor for NetworkManager { type Context = Context; } +/// Libp2pEvent is used to send data to the NetworkPeer from the NetworkManager #[derive(Message, Clone, Debug, PartialEq, Eq)] #[rtype(result = "anyhow::Result<()>")] struct LibP2pEvent(pub Vec); impl NetworkManager { /// Create a new NetworkManager actor - pub fn new(bus: Addr, tx: Sender>) -> Self { + pub fn new(bus: Addr, tx: mpsc::Sender, topic: &str) -> Self { Self { bus, tx, sent_events: HashSet::new(), + topic: topic.to_string(), } } pub fn setup( bus: Addr, - tx: Sender>, - mut rx: Receiver>, + tx: mpsc::Sender, + mut rx: broadcast::Receiver, + topic: &str, ) -> Addr { - let addr = NetworkManager::new(bus.clone(), tx).start(); + let addr = NetworkManager::new(bus.clone(), tx, topic).start(); // Listen on all events bus.do_send(Subscribe { @@ -54,10 +62,18 @@ impl NetworkManager { tokio::spawn({ let addr = addr.clone(); - async move { - while let Some(msg) = rx.recv().await { - addr.do_send(LibP2pEvent(msg)) + loop { + select! { + Ok(event) = rx.recv() => { + match event { + NetworkPeerEvent::GossipData(data) => { + addr.do_send(LibP2pEvent(data)) + }, + _ => () + } + } + } } } }); @@ -75,35 +91,26 @@ impl NetworkManager { enable_mdns: bool, repository: Repository>, ) -> Result<(Addr, tokio::task::JoinHandle>, String)> { - info!("Reading from repository"); - let mut bytes = if let Some(bytes) = repository.read().await? { - let decrypted = cipher.decrypt_data(&bytes)?; - info!("Found keypair in repository"); - decrypted - } else { - let kp = libp2p::identity::Keypair::generate_ed25519(); - info!("Generated new keypair {}", kp.public().to_peer_id()); - let innerkp = kp.try_into_ed25519()?; - let bytes = innerkp.to_bytes().to_vec(); - - // We need to clone here so that returned bytes are not zeroized - repository.write(&cipher.encrypt_data(&mut bytes.clone())?); - info!("Saved new keypair to repository"); - bytes + let topic = "tmp-enclave-gossip-topic"; + // Get existing keypair or generate a new one + let mut bytes = match repository.read().await? { + Some(bytes) => { + info!("Found keypair in repository"); + cipher.decrypt_data(&bytes)? + } + None => bail!("No network keypair found in repository, please generate a new one using `enclave net generate-key`"), }; - let ed25519_keypair = ed25519::Keypair::try_from_bytes(&mut bytes)?; - let keypair: libp2p::identity::Keypair = ed25519_keypair.try_into()?; - let mut peer = NetworkPeer::new( - &keypair, - peers, - Some(quic_port), - "tmp-enclave-gossip-topic", - enable_mdns, - )?; - let rx = peer.rx().ok_or(anyhow!("Peer rx already taken"))?; - let p2p_addr = NetworkManager::setup(bus, peer.tx(), rx); + // Create peer from keypair + let keypair: libp2p::identity::Keypair = + ed25519::Keypair::try_from_bytes(&mut bytes)?.try_into()?; + let mut peer = NetworkPeer::new(&keypair, peers, Some(quic_port), topic, enable_mdns)?; + + // Setup and start network manager + let rx = peer.rx(); + let p2p_addr = NetworkManager::setup(bus, peer.tx(), rx, topic); let handle = tokio::spawn(async move { Ok(peer.start().await?) }); + Ok((p2p_addr, handle, keypair.public().to_peer_id().to_string())) } } @@ -129,6 +136,7 @@ impl Handler for NetworkManager { let sent_events = self.sent_events.clone(); let tx = self.tx.clone(); let evt = event.clone(); + let topic = self.topic.clone(); Box::pin(async move { let id: EventId = evt.clone().into(); @@ -145,8 +153,15 @@ impl Handler for NetworkManager { } match evt.to_bytes() { - Ok(bytes) => { - if let Err(e) = tx.send(bytes).await { + Ok(data) => { + if let Err(e) = tx + .send(NetworkPeerCommand::GossipPublish { + topic, + data, + correlation_id: CorrelationId::new(), + }) + .await + { error!(error=?e, "Error sending bytes to libp2p"); }; } diff --git a/packages/ciphernode/net/src/network_peer.rs b/packages/ciphernode/net/src/network_peer.rs index 17151076..412ae2b1 100644 --- a/packages/ciphernode/net/src/network_peer.rs +++ b/packages/ciphernode/net/src/network_peer.rs @@ -8,15 +8,17 @@ use libp2p::{ kad::{store::MemoryStore, Behaviour as KademliaBehaviour}, mdns, swarm::{behaviour::toggle::Toggle, NetworkBehaviour, SwarmEvent}, - Multiaddr, Swarm, + Swarm, }; use std::hash::{Hash, Hasher}; +use std::sync::Arc; use std::{hash::DefaultHasher, io::Error, time::Duration}; -use tokio::{ - select, - sync::mpsc::{channel, Receiver, Sender}, -}; -use tracing::{debug, error, info, trace, warn}; +use tokio::{select, sync::broadcast, sync::mpsc}; +use tracing::{debug, info, trace, warn}; + +use crate::dialer::dial_peers; +use crate::events::NetworkPeerCommand; +use crate::events::NetworkPeerEvent; #[derive(NetworkBehaviour)] pub struct NodeBehaviour { @@ -27,15 +29,23 @@ pub struct NodeBehaviour { identify: IdentifyBehaviour, } +/// Manage the peer to peer connection. This struct wraps a libp2p Swarm and enables communication +/// with it using channels. pub struct NetworkPeer { + /// The Libp2p Swarm instance swarm: Swarm, + /// A list of peers to automatically dial peers: Vec, + /// The UDP port that the peer listens to over QUIC udp_port: Option, + /// The gossipsub topic that the peer should listen on topic: gossipsub::IdentTopic, - to_bus_tx: Sender>, // to event bus - from_net_rx: Option>>, // from network - to_net_tx: Sender>, // to network - from_bus_rx: Receiver>, // from event bus + /// Broadcast channel to report NetworkPeerEvents to listeners + event_tx: broadcast::Sender, + /// Transmission channel to send NetworkPeerCommands to the NetworkPeer + cmd_tx: mpsc::Sender, + /// Local receiver to process NetworkPeerCommands from + cmd_rx: mpsc::Receiver, } impl NetworkPeer { @@ -46,14 +56,13 @@ impl NetworkPeer { topic: &str, enable_mdns: bool, ) -> Result { - let (to_bus_tx, from_net_rx) = channel(100); // TODO : tune this param - let (to_net_tx, from_bus_rx) = channel(100); // TODO : tune this param + let (event_tx, _) = broadcast::channel(100); // TODO : tune this param + let (cmd_tx, cmd_rx) = mpsc::channel(100); // TODO : tune this param let swarm = libp2p::SwarmBuilder::with_existing_identity(id.clone()) .with_tokio() .with_quic() .with_behaviour(|key| create_mdns_kad_behaviour(enable_mdns, key))? - .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60))) .build(); // TODO: Use topics to manage network traffic instead of just using a single topic @@ -64,58 +73,91 @@ impl NetworkPeer { peers, udp_port, topic, - to_bus_tx, - from_net_rx: Some(from_net_rx), - to_net_tx, - from_bus_rx, + event_tx, + cmd_tx, + cmd_rx, }) } - pub fn rx(&mut self) -> Option>> { - self.from_net_rx.take() + pub fn rx(&mut self) -> broadcast::Receiver { + self.event_tx.subscribe() } - pub fn tx(&self) -> Sender> { - self.to_net_tx.clone() + pub fn tx(&self) -> mpsc::Sender { + self.cmd_tx.clone() } pub async fn start(&mut self) -> Result<()> { - let addr = match self.udp_port { - Some(port) => format!("/ip4/0.0.0.0/udp/{}/quic-v1", port), - None => "/ip4/0.0.0.0/udp/0/quic-v1".to_string(), - }; - info!("Requesting node.listen_on('{}')", addr); + let event_tx = self.event_tx.clone(); + let cmd_tx = self.cmd_tx.clone(); + let cmd_rx = &mut self.cmd_rx; + // Subscribe to topic self.swarm .behaviour_mut() .gossipsub .subscribe(&self.topic)?; + + // Listen on the quic port + let addr = match self.udp_port { + Some(port) => format!("/ip4/0.0.0.0/udp/{}/quic-v1", port), + None => "/ip4/0.0.0.0/udp/0/quic-v1".to_string(), + }; + + info!("Requesting node.listen_on('{}')", addr); self.swarm.listen_on(addr.parse()?)?; info!("Peers to dial: {:?}", self.peers); - for addr in self.peers.clone() { - let multiaddr: Multiaddr = addr.parse()?; - self.swarm.dial(multiaddr)?; - } + tokio::spawn({ + let event_tx = event_tx.clone(); + let peers = self.peers.clone(); + async move { + dial_peers(&cmd_tx, &event_tx, &peers).await?; + + return anyhow::Ok(()); + } + }); loop { select! { - Some(line) = self.from_bus_rx.recv() => { - if let Err(e) = self.swarm - .behaviour_mut().gossipsub - .publish(self.topic.clone(), line) { - error!(error=?e, "Error publishing line to swarm"); + // Process commands + Some(command) = cmd_rx.recv() => { + match command { + NetworkPeerCommand::GossipPublish { data, topic, correlation_id } => { + let gossipsub_behaviour = &mut self.swarm.behaviour_mut().gossipsub; + match gossipsub_behaviour + .publish(gossipsub::IdentTopic::new(topic), data) { + Ok(message_id) => { + event_tx.send(NetworkPeerEvent::GossipPublished { correlation_id, message_id })?; + }, + Err(e) => { + warn!(error=?e, "Could not publish to swarm. Retrying..."); + event_tx.send(NetworkPeerEvent::GossipPublishError { correlation_id, error: Arc::new(e) })?; + } + } + }, + NetworkPeerCommand::Dial(multi) => { + info!("DIAL: {:?}", multi); + match self.swarm.dial(multi) { + Ok(v) => info!("Dial returned {:?}", v), + Err(error) => { + info!("Dialing error! {}", error); + event_tx.send(NetworkPeerEvent::DialError { error: error.into() })?; + } + } + } } } - + // Process events event = self.swarm.select_next_some() => { - process_swarm_event(&mut self.swarm, &mut self.to_bus_tx, event).await? + process_swarm_event(&mut self.swarm, &event_tx, event).await? } } } } } +/// Create the libp2p behaviour fn create_mdns_kad_behaviour( enable_mdns: bool, key: &Keypair, @@ -165,18 +207,42 @@ fn create_mdns_kad_behaviour( }) } +/// Process all swarm events async fn process_swarm_event( swarm: &mut Swarm, - to_bus_tx: &mut Sender>, + event_tx: &broadcast::Sender, event: SwarmEvent, ) -> Result<()> { match event { - SwarmEvent::ConnectionEstablished { peer_id, .. } => { + SwarmEvent::ConnectionEstablished { + peer_id, + endpoint, + connection_id, + .. + } => { info!("Connected to {peer_id}"); + let remote_addr = endpoint.get_remote_address().clone(); + swarm + .behaviour_mut() + .kademlia + .add_address(&peer_id, remote_addr.clone()); + + info!("Added address to kademlia {}", remote_addr); + swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id); + info!("Added peer to gossipsub {}", remote_addr); + event_tx.send(NetworkPeerEvent::ConnectionEstablished { connection_id })?; } - SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { - warn!("Failed to dial {peer_id:?}: {error}"); + SwarmEvent::OutgoingConnectionError { + peer_id, + error, + connection_id, + } => { + info!("Failed to dial {peer_id:?}: {error}"); + event_tx.send(NetworkPeerEvent::OutgoingConnectionError { + connection_id, + error: Arc::new(error), + })?; } SwarmEvent::IncomingConnectionError { error, .. } => { @@ -210,8 +276,7 @@ async fn process_swarm_event( message, })) => { trace!("Got message with id: {id} from peer: {peer_id}",); - trace!("{:?}", message); - to_bus_tx.send(message.data).await?; + event_tx.send(NetworkPeerEvent::GossipData(message.data))?; } SwarmEvent::NewListenAddr { address, .. } => { warn!("Local node is listening on {address}"); diff --git a/packages/ciphernode/net/src/retry.rs b/packages/ciphernode/net/src/retry.rs new file mode 100644 index 00000000..a1fdd95e --- /dev/null +++ b/packages/ciphernode/net/src/retry.rs @@ -0,0 +1,70 @@ +use anyhow::Result; +use std::{future::Future, time::Duration}; +use tokio::time::sleep; +use tracing::{error, warn}; + +pub enum RetryError { + Failure(anyhow::Error), + Retry(anyhow::Error), +} + +pub fn to_retry(e: impl Into) -> RetryError { + RetryError::Retry(e.into()) +} + +pub const BACKOFF_DELAY: u64 = 500; +pub const BACKOFF_MAX_RETRIES: u32 = 10; + +/// Retries an async operation with exponential backoff +/// +/// # Arguments +/// * `operation` - Async function to retry +/// * `max_attempts` - Maximum number of retry attempts +/// * `initial_delay_ms` - Initial delay between retries in milliseconds +/// +/// # Returns +/// * `Result<()>` - Ok if the operation succeeded, Err if all retries failed +pub async fn retry_with_backoff( + operation: F, + max_attempts: u32, + initial_delay_ms: u64, +) -> Result<()> +where + F: Fn() -> Fut, + Fut: Future>, +{ + let mut current_attempt = 1; + let mut delay_ms = initial_delay_ms; + + loop { + match operation().await { + Ok(_) => return Ok(()), + Err(re) => { + match re { + RetryError::Retry(e) => { + if current_attempt >= max_attempts { + return Err(anyhow::anyhow!( + "Operation failed after {} attempts. Last error: {}", + max_attempts, + e + )); + } + + warn!( + "Attempt {}/{} failed, retrying in {}ms: {}", + current_attempt, max_attempts, delay_ms, e + ); + + sleep(Duration::from_millis(delay_ms)).await; + current_attempt += 1; + delay_ms *= 2; // Exponential backoff + } + RetryError::Failure(e) => { + error!("FAILURE!: returning to caller."); + return Err(e); + } + } + } + } + } +} diff --git a/packages/ciphernode/net/tests/Dockerfile b/packages/ciphernode/net/tests/Dockerfile index d983a17d..4dd0812f 100644 --- a/packages/ciphernode/net/tests/Dockerfile +++ b/packages/ciphernode/net/tests/Dockerfile @@ -13,8 +13,6 @@ WORKDIR /app RUN apt-get update && apt-get install -y --no-install-recommends iptables ca-certificates && \ apt-get clean && rm -rf /var/lib/apt/lists/* -COPY --from=builder /app/target/release/p2p_test /app/ -COPY net/tests/entrypoint.sh /app/ -RUN chmod +x /app/entrypoint.sh +COPY --from=builder /app/target/release/p2p_test . -ENTRYPOINT ["/app/entrypoint.sh"] +ENTRYPOINT ["/app/p2p_test"] diff --git a/packages/ciphernode/net/tests/docker-compose.yaml b/packages/ciphernode/net/tests/docker-compose.yaml index d54a3046..71a2449c 100644 --- a/packages/ciphernode/net/tests/docker-compose.yaml +++ b/packages/ciphernode/net/tests/docker-compose.yaml @@ -3,44 +3,38 @@ services: build: dockerfile: net/tests/Dockerfile context: ../.. - image: p2p-test-image - networks: - app_net: - ipv4_address: 172.16.238.10 command: ["/app/p2p_test", "alice"] environment: QUIC_PORT: 9091 - DIAL_TO: "/ip4/172.16.238.12/udp/9091/quic-v1" + DIAL_TO: "/dns4/charlie/udp/9091/quic-v1" ENABLE_MDNS: "${ENABLE_MDNS:-true}" - entrypoint: ["/app/entrypoint.sh"] + networks: + - p2p_test_net bob: - image: p2p-test-image - networks: - app_net: - ipv4_address: 172.16.238.11 + build: + dockerfile: net/tests/Dockerfile + context: ../.. command: ["/app/p2p_test", "bob"] environment: QUIC_PORT: 9091 - DIAL_TO: "/ip4/172.16.238.12/udp/9091/quic-v1" + DIAL_TO: "/dns4/charlie/udp/9091/quic-v1" ENABLE_MDNS: "${ENABLE_MDNS:-true}" - entrypoint: ["/app/entrypoint.sh"] + networks: + - p2p_test_net charlie: - image: p2p-test-image - networks: - app_net: - ipv4_address: 172.16.238.12 + build: + dockerfile: net/tests/Dockerfile + context: ../.. command: ["/app/p2p_test", "charlie"] environment: QUIC_PORT: 9091 ENABLE_MDNS: "${ENABLE_MDNS:-true}" - entrypoint: ["/app/entrypoint.sh"] + networks: + - p2p_test_net networks: - app_net: + p2p_test_net: driver: bridge - ipam: - driver: default - config: - - subnet: 172.16.238.0/24 + diff --git a/packages/ciphernode/net/tests/entrypoint.sh b/packages/ciphernode/net/tests/entrypoint.sh deleted file mode 100755 index a6453106..00000000 --- a/packages/ciphernode/net/tests/entrypoint.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/bash -set -e - -exec "$@" diff --git a/packages/ciphernode/router/src/repositories.rs b/packages/ciphernode/router/src/repositories.rs index bf785d21..d9ab879c 100644 --- a/packages/ciphernode/router/src/repositories.rs +++ b/packages/ciphernode/router/src/repositories.rs @@ -74,8 +74,8 @@ impl Repositories { Repository::new(self.store.scope(format!("//eth_private_key"))) } - pub fn libp2pid(&self) -> Repository> { - Repository::new(self.store.scope(format!("//libp2pid"))) + pub fn libp2p_keypair(&self) -> Repository> { + Repository::new(self.store.scope(format!("//libp2p/keypair"))) } pub fn enclave_sol_reader(&self, chain_id: u64) -> Repository { diff --git a/packages/ciphernode/tests/tests/test_aggregation_and_decryption.rs b/packages/ciphernode/tests/tests/test_aggregation_and_decryption.rs index 2464eb50..b2011ed5 100644 --- a/packages/ciphernode/tests/tests/test_aggregation_and_decryption.rs +++ b/packages/ciphernode/tests/tests/test_aggregation_and_decryption.rs @@ -8,7 +8,7 @@ use enclave_core::{ }; use fhe::{setup_crp_params, ParamsWithCrp, SharedRng}; use logger::SimpleLogger; -use net::NetworkManager; +use net::{correlation_id::CorrelationId, events::NetworkPeerEvent, NetworkManager}; use router::{ CiphernodeSelector, E3RequestRouter, FheFeature, KeyshareFeature, PlaintextAggregatorFeature, PublicKeyAggregatorFeature, RepositoriesFactory, @@ -27,8 +27,8 @@ use rand::Rng; use rand::SeedableRng; use rand_chacha::ChaCha20Rng; use std::{env, path::Path, sync::Arc, time::Duration}; -use tokio::sync::Mutex; -use tokio::{sync::mpsc::channel, time::sleep}; +use tokio::sync::{broadcast, Mutex}; +use tokio::{sync::mpsc, time::sleep}; // Simulating a local node type LocalCiphernodeTuple = ( @@ -465,24 +465,35 @@ async fn test_stopped_keyshares_retain_state() -> Result<()> { #[actix::test] async fn test_p2p_actor_forwards_events_to_network() -> Result<()> { // Setup elements in test - let (tx, mut output) = channel(100); // Transmit byte events to the network - let (input, rx) = channel(100); // Receive byte events from the network + let (cmd_tx, mut cmd_rx) = mpsc::channel(100); // Transmit byte events to the network + let (event_tx, _) = broadcast::channel(100); // Receive byte events from the network let bus = EventBus::new(true).start(); - NetworkManager::setup(bus.clone(), tx.clone(), rx); + let event_rx = event_tx.subscribe(); + // Pas cmd and event channels to NetworkManager + NetworkManager::setup(bus.clone(), cmd_tx.clone(), event_rx, "my-topic"); // Capture messages from output on msgs vec let msgs: Arc>>> = Arc::new(Mutex::new(Vec::new())); let msgs_loop = msgs.clone(); tokio::spawn(async move { - while let Some(msg) = output.recv().await { - msgs_loop.lock().await.push(msg.clone()); - let _ = input.send(msg).await; - // loopback to simulate a rebroadcast message + // Pull events from command channel + while let Some(cmd) = cmd_rx.recv().await { + // If the command is a GossipPublish then extract it and save it whilst sending it to + // the event bus as if it was gossiped from the network and ended up as an external + // message this simulates a rebroadcast message + if let Some(msg) = match cmd { + net::events::NetworkPeerCommand::GossipPublish { data, .. } => Some(data), + _ => None, + } { + msgs_loop.lock().await.push(msg.clone()); + event_tx.send(NetworkPeerEvent::GossipData(msg))?; + } // if this manages to broadcast an event to the // event bus we will expect to see an extra event on - // the bus + // the bus but we don't because we handle this } + anyhow::Ok(()) }); let evt_1 = EnclaveEvent::from(PlaintextAggregated { @@ -532,10 +543,10 @@ async fn test_p2p_actor_forwards_events_to_bus() -> Result<()> { let seed = Seed(ChaCha20Rng::seed_from_u64(123).get_seed()); // Setup elements in test - let (tx, _) = channel(100); // Transmit byte events to the network - let (input, rx) = channel(100); // Receive byte events from the network + let (cmd_tx, _) = mpsc::channel(100); // Transmit byte events to the network + let (event_tx, event_rx) = broadcast::channel(100); // Receive byte events from the network let bus = EventBus::new(true).start(); - NetworkManager::setup(bus.clone(), tx.clone(), rx); + NetworkManager::setup(bus.clone(), cmd_tx.clone(), event_rx, "mytopic"); // Capture messages from output on msgs vec let event = EnclaveEvent::from(E3Requested { @@ -547,7 +558,7 @@ async fn test_p2p_actor_forwards_events_to_bus() -> Result<()> { }); // lets send an event from the network - let _ = input.send(event.to_bytes()?).await; + let _ = event_tx.send(NetworkPeerEvent::GossipData(event.to_bytes()?)); sleep(Duration::from_millis(1)).await; // need to push to next tick diff --git a/tests/basic_integration/base.sh b/tests/basic_integration/base.sh index 8765b7c5..43485526 100755 --- a/tests/basic_integration/base.sh +++ b/tests/basic_integration/base.sh @@ -24,6 +24,13 @@ set_password cn4 "$CIPHERNODE_SECRET" set_password ag "$CIPHERNODE_SECRET" set_private_key ag "$PRIVATE_KEY" +# Set the network private key for all ciphernodes +set_network_private_key cn1 "$NETWORK_PRIVATE_KEY_1" +set_network_private_key cn2 "$NETWORK_PRIVATE_KEY_2" +set_network_private_key cn3 "$NETWORK_PRIVATE_KEY_3" +set_network_private_key cn4 "$NETWORK_PRIVATE_KEY_4" +set_network_private_key ag "$NETWORK_PRIVATE_KEY_AG" + # Launch 4 ciphernodes launch_ciphernode cn1 launch_ciphernode cn2 diff --git a/tests/basic_integration/fns.sh b/tests/basic_integration/fns.sh index 720fcd80..be24e90a 100644 --- a/tests/basic_integration/fns.sh +++ b/tests/basic_integration/fns.sh @@ -16,6 +16,7 @@ fi RPC_URL="ws://localhost:8545" PRIVATE_KEY="0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" +NETWORK_PRIVATE_KEY_AG="0x51a1e500a548b70d88184a1e042900c0ed6c57f8710bcc35dc8c85fa33d3f580" CIPHERNODE_SECRET="We are the music makers and we are the dreamers of the dreams." # These contracts are based on the deterministic order of hardhat deploy @@ -31,6 +32,12 @@ CIPHERNODE_ADDRESS_2="0xbDA5747bFD65F08deb54cb465eB87D40e51B197E" CIPHERNODE_ADDRESS_3="0xdD2FD4581271e230360230F9337D5c0430Bf44C0" CIPHERNODE_ADDRESS_4="0x8626f6940E2eb28930eFb4CeF49B2d1F2C9C1199" +# These are the network private keys for the ciphernodes +NETWORK_PRIVATE_KEY_1="0x11a1e500a548b70d88184a1e042900c0ed6c57f8710bcc35dc8c85fa33d3f580" +NETWORK_PRIVATE_KEY_2="0x21a1e500a548b70d88184a1e042900c0ed6c57f8710bcc35dc8c85fa33d3f580" +NETWORK_PRIVATE_KEY_3="0x31a1e500a548b70d88184a1e042900c0ed6c57f8710bcc35dc8c85fa33d3f580" +NETWORK_PRIVATE_KEY_4="0x41a1e500a548b70d88184a1e042900c0ed6c57f8710bcc35dc8c85fa33d3f580" + # Function to clean up background processes cleanup() { @@ -102,6 +109,15 @@ set_private_key() { --private-key "$private_key" } +set_network_private_key() { + local name="$1" + local private_key="$2" + + yarn enclave net set-key \ + --config "$SCRIPT_DIR/lib/$name/config.yaml" \ + --net-keypair "$private_key" +} + launch_aggregator() { local name="$1" heading "Launch aggregator $name" diff --git a/tests/basic_integration/persist.sh b/tests/basic_integration/persist.sh index d4f4f489..6372e6ee 100755 --- a/tests/basic_integration/persist.sh +++ b/tests/basic_integration/persist.sh @@ -24,6 +24,13 @@ set_password cn4 "$CIPHERNODE_SECRET" set_password ag "$CIPHERNODE_SECRET" set_private_key ag "$PRIVATE_KEY" +# Set the network private key for all ciphernodes +set_network_private_key cn1 "$NETWORK_PRIVATE_KEY_1" +set_network_private_key cn2 "$NETWORK_PRIVATE_KEY_2" +set_network_private_key cn3 "$NETWORK_PRIVATE_KEY_3" +set_network_private_key cn4 "$NETWORK_PRIVATE_KEY_4" +set_network_private_key ag "$NETWORK_PRIVATE_KEY_AG" + # Launch 4 ciphernodes launch_ciphernode cn1 launch_ciphernode cn2