Skip to content

Commit

Permalink
Replace Rocket with Axum (#358)
Browse files Browse the repository at this point in the history
* first route working

* user migration

* migrate validator endpoints

* signing minus subscribe to me

* migrate unsafe

* subscribe to me

* add tracing

* heatlhz tests

* unsafe tests

* validator tests

* signing_cli tests

* signing tests

* add cors

* unhardcode endpoint

* add warn unsafe

* refactor

* refactor tests

* bump ed25519

* lint

* remove rocket

* fix unwrap issue

* fix scripts

---------

Co-authored-by: jesse <jesse@entropy.wxy>
  • Loading branch information
JesseAbram and jesse authored Jun 23, 2023
1 parent ba2261e commit ee0493c
Show file tree
Hide file tree
Showing 33 changed files with 920 additions and 1,165 deletions.
792 changes: 249 additions & 543 deletions Cargo.lock

Large diffs are not rendered by default.

28 changes: 0 additions & 28 deletions Rocket.toml

This file was deleted.

2 changes: 1 addition & 1 deletion crypto/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Constraints have now moved to [entropyxyz/constraints](https://github.com/entrop
<!-- - eventually: after implementing Partitions, the communication manager will already have this information from on-chain. -->
<!-- 3. CM chooses a signing party (`communication_manager::handle_signing`) -->
<!-- 4. CM broadcasts the party information, calling `new_party` on each selected signer -->
5. Each signer calls `subscribe_to_me` on each other signer, subscribing to all party-related messages, creating a `rocket::EventStream` of signing-related messages
5. Each signer calls `subscribe_to_me` on each other signer, subscribing to all party-related messages, creating a an axum SSE of signing-related messages
6. After each signer has received subscription from each other signer, the nodes proceed to pass signing-protocol related messages until the protocol completes.
7. If the signing protocol fails, the nodes broadcast information about the faulty signer, to be included in the next block. A subsequent block will designate a replacement signer (TODO: substrate).

Expand Down
14 changes: 8 additions & 6 deletions crypto/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ futures="0.3"
tokio ={ version="1.16", features=["macros", "fs", "rt-multi-thread", "io-util"] }

# HTTP
rocket ={ version="0.5.0-rc.2", default-features=false, features=["json"] }
reqwest={ version="0.11", features=["json", "stream"] }

reqwest ={ version="0.11", features=["json", "stream"] }
axum ="0.6.18"
axum-macros="0.3.7"
# Substrate
subxt ={ package="subxt", git="https://github.com/entropyxyz/subxt.git", branch="polkadot-v0.9.31" }
parity-scale-codec="3.0.0"
Expand All @@ -44,9 +44,10 @@ entropy-constraints={ path="../constraints" }
ethers-core ="0.13.0"

# Logging
log ="0.4.17" # todo: remove, overlaps tracing
tracing ="0.1"
tracing-subscriber={ version="0.3", features=["env-filter", "fmt"] }
log ="0.4.17" # todo: remove, overlaps tracing
tracing ="0.1.37"
tracing-subscriber={ version="0.3.16", features=["env-filter", "json"] }
tower-http ={ version="0.3.4", features=["trace", "cors"] }

# Misc
bip39 ={ git="https://github.com/infincia/bip39-rs.git", tag="v0.6.0-beta.1" }
Expand All @@ -56,6 +57,7 @@ base64 ="0.13.0"
clap ={ version="4.0.29", features=["derive"] }
node-primitives={ version="2.0.0", default-features=false, git='https://github.com/paritytech/substrate.git', branch="polkadot-v0.9.31" }
num ="0.4.0"
async-stream ="0.3.5"

[dev-dependencies]
testing-utils={ path="../testing-utils" }
Expand Down
5 changes: 2 additions & 3 deletions crypto/server/src/health/api.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use rocket::http::Status;
use axum::http::StatusCode;

#[get("/healthz")]
pub fn healthz() -> Status { Status::Ok }
pub async fn healthz() -> StatusCode { StatusCode::OK }
11 changes: 6 additions & 5 deletions crypto/server/src/health/tests.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use axum::http::StatusCode;
use kvdb::clean_tests;
use rocket::http::Status;
use serial_test::serial;

use crate::helpers::tests::setup_client;
#[rocket::async_test]
#[tokio::test]
#[serial]
async fn health() {
clean_tests();
let client = setup_client().await;
let response = client.get("/healthz").dispatch().await;
assert_eq!(response.status(), Status::Ok);
setup_client().await;
let client = reqwest::Client::new();
let response = client.get("http://127.0.0.1:3001/healthz").send().await.unwrap();
assert_eq!(response.status(), StatusCode::OK);
clean_tests();
}
10 changes: 4 additions & 6 deletions crypto/server/src/helpers/launch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,7 @@ pub const DEFAULT_ALICE_MNEMONIC: &str =
#[cfg(test)]
pub const DEFAULT_ENDPOINT: &str = "ws://localhost:9944";

pub fn init_tracing() {
let filter = tracing_subscriber::filter::LevelFilter::INFO.into();
tracing_subscriber::filter::EnvFilter::builder()
.with_default_directive(filter)
.from_env_lossy();
}
pub fn init_tracing() { tracing_subscriber::fmt().with_target(false).json().init(); }

#[derive(Deserialize, Debug, Clone)]
pub struct Configuration {
Expand Down Expand Up @@ -83,6 +78,9 @@ pub struct StartupArgs {
default_value = "ws://localhost:9944"
)]
pub chain_endpoint: String,
/// Url to host threshold (axum) server on.
#[arg(short = 'u', long = "threshold-url", required = false, default_value = "127.0.0.1:3001")]
pub threshold_url: String,

/// Wether to allow a validator key to be null.
#[arg(short = 'd', long = "dev")]
Expand Down
27 changes: 17 additions & 10 deletions crypto/server/src/helpers/signing.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::{collections::HashMap, sync::Mutex};
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};

use axum::http::StatusCode;
use bip39::{Language, Mnemonic};
use kvdb::kv_manager::{KvManager, PartyId};
use rocket::{http::Status, State};
use sp_core::crypto::AccountId32;
use synedrion::k256::ecdsa::{RecoveryId, Signature};

Expand Down Expand Up @@ -38,14 +41,14 @@ impl RecoverableSignature {

// TODO: JA Remove all below, temporary
/// The state used to temporarily store completed signatures
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct SignatureState {
pub signatures: Mutex<HashMap<Box<[u8]>, RecoverableSignature>>,
pub signatures: Arc<Mutex<HashMap<Box<[u8]>, RecoverableSignature>>>,
}

impl SignatureState {
pub fn new() -> SignatureState {
let signatures = Mutex::new(HashMap::new());
let signatures = Arc::new(Mutex::new(HashMap::new()));
SignatureState { signatures }
}

Expand All @@ -65,14 +68,18 @@ impl SignatureState {
}
}

impl Default for SignatureState {
fn default() -> Self { Self::new() }
}

/// Start the signing protocol for a given message
pub async fn do_signing(
message: entropy_shared::Message,
state: &State<SignerState>,
kv_manager: &State<KvManager>,
signatures: &State<SignatureState>,
state: &SignerState,
kv_manager: &KvManager,
signatures: &SignatureState,
tx_id: String,
) -> Result<Status, SigningErr> {
) -> Result<StatusCode, SigningErr> {
let info = SignInit::new(message.clone(), tx_id)?;
let signing_service = ThresholdSigningService::new(state, kv_manager);
let signer =
Expand Down Expand Up @@ -122,7 +129,7 @@ pub async fn do_signing(

signing_service.handle_result(&result, message.sig_request.sig_hash.as_slice(), signatures);

Ok(Status::Ok)
Ok(StatusCode::OK)
}

/// Creates a unique tx Id by concatenating the user's signing key and message digest
Expand Down
88 changes: 43 additions & 45 deletions crypto/server/src/helpers/tests.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
// only compile when testing
#![cfg(test)]

use std::{net::TcpListener, time::Duration};

use axum::{routing::IntoMakeService, Router};
use entropy_shared::Constraints;
use futures::future::join_all;
use kvdb::{
clean_tests,
encrypted_sled::PasswordMethod,
get_db_path,
kv_manager::{KvManager, PartyId},
};
use rand_core::OsRng;
use rocket::{local::asynchronous::Client, tokio::time::Duration, Ignite, Rocket};
use serial_test::serial;
use sp_core::{sr25519, Bytes, Pair};
use sp_keyring::Sr25519Keyring;
Expand All @@ -19,6 +22,7 @@ use testing_utils::{constants::X25519_PUBLIC_KEYS, substrate_context::testing_co
use x25519_dalek::PublicKey;

use crate::{
app,
chain_api::{entropy, get_api, EntropyConfig},
get_signer,
helpers::{
Expand All @@ -28,32 +32,36 @@ use crate::{
signing::SignatureState,
substrate::{get_subgroup, make_register},
},
new_user,
r#unsafe::api::{delete, get, put, remove_keys},
sign_tx,
signing_client::{
api::{drain, get_signature, new_party, subscribe_to_me},
SignerState,
},
store_tx,
signing_client::SignerState,
validation::SignedMessage,
validator::api::sync_kvdb,
AppState,
};

pub async fn setup_client() -> Client {
Client::tracked(crate::rocket().await).await.expect("valid `Rocket`")
pub async fn setup_client() {
let kv_store =
KvManager::new(get_db_path(true).into(), PasswordMethod::NoPassword.execute().unwrap())
.unwrap();
let _ = setup_mnemonic(&kv_store, true, false).await;

let signer_state = SignerState::default();
let configuration = Configuration::new(DEFAULT_ENDPOINT.to_string());
let signature_state = SignatureState::new();
let app_state = AppState { signer_state, configuration, kv_store, signature_state };
let app = app(app_state).into_make_service();
let listener = TcpListener::bind("0.0.0.0:3001").unwrap();

tokio::spawn(async move {
axum::Server::from_tcp(listener).unwrap().serve(app).await.unwrap();
});
}

pub async fn create_clients(
port: i64,
key_number: String,
values: Vec<Vec<u8>>,
keys: Vec<String>,
is_alice: bool,
is_bob: bool,
) -> (Rocket<Ignite>, KvManager) {
let config = rocket::Config::figment().merge(("port", port));

) -> (IntoMakeService<Router>, KvManager) {
let signer_state = SignerState::default();
let configuration = Configuration::new(DEFAULT_ENDPOINT.to_string());
let signature_state = SignatureState::new();
Expand All @@ -70,46 +78,36 @@ pub async fn create_clients(
let _ = kv_store.clone().kv().put(reservation, value).await;
}

// Unsafe routes are for testing purposes only
// they are unsafe as they can expose vulnerabilites
// should they be used in production. Unsafe routes
// are disabled by default.
// To enable unsafe routes compile with --feature unsafe.
let mut unsafe_routes = routes![];
if cfg!(feature = "unsafe") || cfg!(test) {
unsafe_routes = routes![remove_keys, get, put, delete];
}
let app_state =
AppState { signer_state, configuration, kv_store: kv_store.clone(), signature_state };

let result = rocket::custom(config)
.mount("/validator", routes![sync_kvdb])
.mount("/signer", routes![new_party, subscribe_to_me, get_signature, drain])
.mount("/user", routes![store_tx, new_user, sign_tx])
.mount("/unsafe", unsafe_routes)
.manage(signer_state)
.manage(configuration)
.manage(kv_store.clone())
.manage(signature_state)
.ignite()
.await
.unwrap();
let app = app(app_state).into_make_service();

(result, kv_store)
(app, kv_store)
}

pub async fn spawn_testing_validators() -> (Vec<String>, Vec<PartyId>) {
// spawn threshold servers
let ports = vec![3001i64, 3002];

let (alice_rocket, alice_kv) =
create_clients(ports[0], "validator1".to_string(), vec![], vec![], true, false).await;
let (alice_axum, alice_kv) =
create_clients("validator1".to_string(), vec![], vec![], true, false).await;
let alice_id = PartyId::new(get_signer(&alice_kv).await.unwrap().account_id().clone());

let (bob_rocket, bob_kv) =
create_clients(ports[1], "validator2".to_string(), vec![], vec![], false, true).await;
let (bob_axum, bob_kv) =
create_clients("validator2".to_string(), vec![], vec![], false, true).await;
let bob_id = PartyId::new(get_signer(&bob_kv).await.unwrap().account_id().clone());
let listener_alice = TcpListener::bind(format!("0.0.0.0:{}", ports[0])).unwrap();
let listener_bob = TcpListener::bind(format!("0.0.0.0:{}", ports[1])).unwrap();

tokio::spawn(async move {
axum::Server::from_tcp(listener_alice).unwrap().serve(alice_axum).await.unwrap();
});

tokio::spawn(async move {
axum::Server::from_tcp(listener_bob).unwrap().serve(bob_axum).await.unwrap();
});

tokio::spawn(async move { alice_rocket.launch().await.unwrap() });
tokio::spawn(async move { bob_rocket.launch().await.unwrap() });
tokio::time::sleep(Duration::from_secs(1)).await;

let ips = ports.iter().map(|port| format!("127.0.0.1:{}", port)).collect();
Expand Down Expand Up @@ -235,7 +233,7 @@ pub async fn check_registered_status(api: &OnlineClient<EntropyConfig>, key: &Sr
api.storage().fetch(&registered_query, None).await.unwrap();
}

#[rocket::async_test]
#[tokio::test]
#[serial]
async fn test_get_signing_group() {
clean_tests();
Expand Down
Loading

0 comments on commit ee0493c

Please sign in to comment.