diff --git a/Cargo.lock b/Cargo.lock index 311d15a..956ea9f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -417,7 +417,7 @@ dependencies = [ [[package]] name = "colink-server" -version = "0.1.12" +version = "0.1.13" dependencies = [ "async-trait", "chrono", @@ -425,7 +425,6 @@ dependencies = [ "hex", "jsonwebtoken", "lapin", - "openssl", "passwords", "prost", "prost-build", @@ -434,6 +433,7 @@ dependencies = [ "secp256k1", "serde", "serde_json", + "sha2", "structopt", "tokio", "toml", @@ -506,9 +506,9 @@ dependencies = [ [[package]] name = "digest" -version = "0.10.3" +version = "0.10.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2fb860ca6fafa5552fb6d0e816a69c8e49f0908bf524e30a90d97c85892d506" +checksum = "adfbc57365a37acbd2ebf2b64d7e69bb766e2fea813521ed536f5d0520dcf86c" dependencies = [ "block-buffer", "crypto-common", @@ -584,21 +584,6 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" -[[package]] -name = "foreign-types" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" -dependencies = [ - "foreign-types-shared", -] - -[[package]] -name = "foreign-types-shared" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" - [[package]] name = "form_urlencoded" version = "1.0.1" @@ -825,28 +810,28 @@ dependencies = [ ] [[package]] -name = "hyper-timeout" -version = "0.4.1" +name = "hyper-rustls" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +checksum = "d87c48c02e0dc5e3b849a2041db3029fd066650f8f717c07bf8ed78ccb895cac" dependencies = [ + "http", "hyper", - "pin-project-lite", + "rustls", "tokio", - "tokio-io-timeout", + "tokio-rustls", ] [[package]] -name = "hyper-tls" -version = "0.5.0" +name = "hyper-timeout" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "bytes", "hyper", - "native-tls", + "pin-project-lite", "tokio", - "tokio-native-tls", + "tokio-io-timeout", ] [[package]] @@ -1043,24 +1028,6 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" -[[package]] -name = "native-tls" -version = "0.2.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd7e2f3618557f980e0b17e8856252eee3c97fa12c54dff0ca290fb6266ca4a9" -dependencies = [ - "lazy_static", - "libc", - "log", - "openssl", - "openssl-probe", - "openssl-sys", - "schannel", - "security-framework", - "security-framework-sys", - "tempfile", -] - [[package]] name = "nom" version = "7.1.1" @@ -1117,51 +1084,12 @@ version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b10983b38c53aebdf33f542c6275b0f58a238129d00c4ae0e6fb59738d783ca" -[[package]] -name = "openssl" -version = "0.10.40" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb81a6430ac911acb25fe5ac8f1d2af1b4ea8a4fdfda0f1ee4292af2e2d8eb0e" -dependencies = [ - "bitflags", - "cfg-if", - "foreign-types", - "libc", - "once_cell", - "openssl-macros", - "openssl-sys", -] - -[[package]] -name = "openssl-macros" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" -[[package]] -name = "openssl-sys" -version = "0.9.73" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d5fd19fb3e0a8191c1e34935718976a3e70c112ab9a24af6d7cadccd9d90bc0" -dependencies = [ - "autocfg 1.1.0", - "cc", - "libc", - "pkg-config", - "vcpkg", -] - [[package]] name = "p12" version = "0.6.3" @@ -1288,12 +1216,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "pkg-config" -version = "0.3.25" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1df8c4ec4b0627e53bdf214615ad287367e482558cf84b109250b37464dc03ae" - [[package]] name = "polling" version = "2.2.0" @@ -1679,20 +1601,22 @@ dependencies = [ "http", "http-body", "hyper", - "hyper-tls", + "hyper-rustls", "ipnet", "js-sys", "lazy_static", "log", "mime", - "native-tls", "percent-encoding", "pin-project-lite", + "rustls", + "rustls-native-certs", + "rustls-pemfile 0.3.0", "serde", "serde_json", "serde_urlencoded", "tokio", - "tokio-native-tls", + "tokio-rustls", "url", "wasm-bindgen", "wasm-bindgen-futures", @@ -1760,6 +1684,15 @@ dependencies = [ "base64 0.13.0", ] +[[package]] +name = "rustls-pemfile" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ee86d63972a7c661d1536fefe8c3c8407321c3df668891286de28abcd087360" +dependencies = [ + "base64 0.13.0", +] + [[package]] name = "rustls-pemfile" version = "1.0.0" @@ -1897,6 +1830,17 @@ dependencies = [ "digest", ] +[[package]] +name = "sha2" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82e6b795fe2e3b1e845bafcb27aa35405c4d47cdfc92af5fc8d3002f76cebdc0" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sharded-slab" version = "0.1.4" @@ -2115,16 +2059,6 @@ dependencies = [ "syn", ] -[[package]] -name = "tokio-native-tls" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b" -dependencies = [ - "native-tls", - "tokio", -] - [[package]] name = "tokio-rustls" version = "0.23.4" @@ -2434,12 +2368,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" -[[package]] -name = "vcpkg" -version = "0.2.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" - [[package]] name = "vec_map" version = "0.8.2" diff --git a/Cargo.toml b/Cargo.toml index 6c25ba7..72d065e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "colink-server" -version = "0.1.12" +version = "0.1.13" edition = "2021" [dependencies] @@ -10,14 +10,14 @@ futures-lite = "1.12" hex = "0.4" jsonwebtoken = "7.2" lapin = "2.1" -openssl = "0.10" passwords = "3.1" prost = "0.10" rand = { version = "0.8.4", features = ["std_rng"] } -reqwest = { version = "0.11", features = ["json"] } +reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls-native-roots"] } secp256k1 = { version = "0.21.2", features = ["rand-std"] } serde = "1.0" serde_json = "1.0" +sha2 = "0.10" structopt = "0.3" tokio = { version = "1.18", features = ["macros", "rt-multi-thread", "rt", "fs"] } toml = "0.5" diff --git a/src/server.rs b/src/server.rs index 74b401e..af76b57 100644 --- a/src/server.rs +++ b/src/server.rs @@ -8,6 +8,7 @@ use secp256k1::Secp256k1; use std::io::{Read, Write}; use std::net::SocketAddr; use std::path::PathBuf; +use std::sync::Arc; use tokio::sync::Mutex; use tonic::{ transport::{Certificate, Identity, Server, ServerTlsConfig}, @@ -21,6 +22,7 @@ pub struct MyService { pub mq: Box, // We use this mutex to avoid the TOCTOU race condition in task storage. pub task_storage_mutex: Mutex, + pub pom_fetch_mutex: Mutex, pub public_key: secp256k1::PublicKey, pub secret_key: secp256k1::SecretKey, pub inter_core_ca_certificate: Option, @@ -28,106 +30,112 @@ pub struct MyService { pub core_uri: Option, } +pub struct GrpcService { + pub service: Arc, +} + #[tonic::async_trait] -impl CoLink for MyService { +impl CoLink for GrpcService { async fn generate_token( &self, request: Request, ) -> Result, Status> { - self._generate_token(request).await + self.service._generate_token(request).await } async fn import_user(&self, request: Request) -> Result, Status> { - self._import_user(request).await + self.service + ._import_user(request, self.service.clone()) + .await } async fn create_entry( &self, request: Request, ) -> Result, Status> { - self._create_entry(request).await + self.service._create_entry(request).await } async fn read_entries( &self, request: Request, ) -> Result, Status> { - self._read_entries(request).await + self.service._read_entries(request).await } async fn update_entry( &self, request: Request, ) -> Result, Status> { - self._update_entry(request).await + self.service._update_entry(request).await } async fn delete_entry( &self, request: Request, ) -> Result, Status> { - self._delete_entry(request).await + self.service._delete_entry(request).await } async fn read_keys( &self, request: Request, ) -> Result, Status> { - self._read_keys(request).await + self.service._read_keys(request).await } async fn create_task(&self, request: Request) -> Result, Status> { - self._create_task(request).await + self.service._create_task(request).await } async fn confirm_task( &self, request: Request, ) -> Result, Status> { - self._confirm_task(request).await + self.service._confirm_task(request).await } async fn finish_task(&self, request: Request) -> Result, Status> { - self._finish_task(request).await + self.service._finish_task(request).await } async fn request_core_info( &self, request: Request, ) -> Result, Status> { - self._request_core_info(request).await + self.service._request_core_info(request).await } async fn subscribe( &self, request: Request, ) -> Result, Status> { - self._subscribe(request).await + self.service._subscribe(request).await } async fn unsubscribe(&self, request: Request) -> Result, Status> { - self._unsubscribe(request).await + self.service._unsubscribe(request).await } async fn inter_core_sync_task( &self, request: Request, ) -> Result, Status> { - self._inter_core_sync_task(request).await + self.service._inter_core_sync_task(request).await } async fn start_protocol_operator( &self, request: Request, ) -> Result, Status> { - self._start_protocol_operator(request).await + self.service._start_protocol_operator(request).await } async fn stop_protocol_operator( &self, request: Request, ) -> Result, Status> { - self._stop_protocol_operator(request).await + self.service._stop_protocol_operator(request).await } } @@ -214,7 +222,7 @@ async fn run_server( let core_public_key = secp256k1::PublicKey::from_secret_key(&Secp256k1::new(), &core_secret_key); let host_id = hex::encode(&core_public_key.serialize()); - tokio::spawn(print_host_token(jwt_secret, host_id)); + tokio::spawn(print_host_token(jwt_secret, host_id.clone())); let mut service = MyService { storage: Box::new(StorageWithMQSubscription::new( Box::new(BasicStorage::default()), @@ -223,6 +231,7 @@ async fn run_server( jwt_secret, mq: Box::new(RabbitMQ::new(&mq_amqp, &mq_api, &mq_prefix)), task_storage_mutex: Mutex::new(0), + pom_fetch_mutex: Mutex::new(0), secret_key: core_secret_key, public_key: core_public_key, inter_core_ca_certificate: None, @@ -239,14 +248,21 @@ async fn run_server( ); } service.mq.delete_all_accounts().await?; + let mq_uri = service.mq.create_user_account().await?; + service + ._internal_storage_update(&host_id, "mq_uri", mq_uri.as_bytes()) + .await?; + let grpc_service = GrpcService { + service: Arc::new(service), + }; let check_auth_interceptor = CheckAuthInterceptor { jwt_secret }; - let service = CoLinkServer::with_interceptor(service, check_auth_interceptor); + let grpc_service = CoLinkServer::with_interceptor(grpc_service, check_auth_interceptor); if cert.is_none() || key.is_none() { /* No TLS */ Server::builder() .accept_http1(true) - .add_service(service) + .add_service(grpc_service) .serve(socket_address) .await?; } else { @@ -270,7 +286,7 @@ async fn run_server( Server::builder() .tls_config(tls)? - .add_service(service) + .add_service(grpc_service) .serve(socket_address) .await?; } diff --git a/src/service.rs b/src/service.rs index efba141..e200f43 100644 --- a/src/service.rs +++ b/src/service.rs @@ -4,4 +4,5 @@ pub mod pom; pub mod storage; pub mod subscription; pub mod task; +pub mod user_init; pub mod utils; diff --git a/src/service/auth.rs b/src/service/auth.rs index e79275c..aa61d7b 100644 --- a/src/service/auth.rs +++ b/src/service/auth.rs @@ -1,13 +1,14 @@ -use crate::colink_proto::*; +use super::user_init::user_init; +use crate::{colink_proto::*, server::MyService}; use chrono::TimeZone; use jsonwebtoken::{DecodingKey, Validation}; use prost::Message; use rand::RngCore; use secp256k1::PublicKey; use serde::{Deserialize, Serialize}; -use std::fmt::Debug; +use std::{fmt::Debug, sync::Arc}; use tonic::{metadata::MetadataValue, service::Interceptor, Request, Response, Status}; -use tracing::debug; +use tracing::{debug, error}; #[derive(Clone)] pub struct CheckAuthInterceptor { @@ -31,6 +32,12 @@ impl crate::server::MyService { let token = request.metadata().get("authorization").unwrap().clone(); let token = token.to_str().unwrap(); let body: GenerateTokenRequest = request.into_inner(); + if !["user", "guest"].contains(&body.privilege.as_str()) { + return Err(Status::permission_denied(format!( + "generating token with {} privilege is not allowed.", + body.privilege + ))); + } let token = jsonwebtoken::decode::( token, &jsonwebtoken::DecodingKey::from_secret(&self.jwt_secret), @@ -41,7 +48,7 @@ impl crate::server::MyService { let token = jsonwebtoken::encode( &jsonwebtoken::Header::default(), &AuthContent { - privilege: token.privilege, + privilege: body.privilege, user_id: token.user_id, exp: body.expiration_time, }, @@ -55,6 +62,7 @@ impl crate::server::MyService { pub async fn _import_user( &self, request: Request, + service: Arc, ) -> Result, Status> { Self::check_privilege_in(request.metadata(), &["host"])?; let body: UserConsent = request.into_inner(); @@ -110,6 +118,17 @@ impl crate::server::MyService { .unwrap(); self._host_storage_update(&format!("users:{}:user_jwt", user_id), token.as_bytes()) .await?; + self._internal_storage_update(&user_id, "_is_initialized", &[0]) + .await?; + let init_user_id = user_id.clone(); + let init_user_jwt = token.clone(); + tokio::spawn(async move { + match user_init(service, &init_user_id, &init_user_jwt).await { + Ok(_) => {} + Err(err) => error!("user_init: {}", err.to_string()), + } + Ok::<(), Box>(()) + }); let reply = Jwt { jwt: token }; Ok(Response::new(reply)) } diff --git a/src/service/info.rs b/src/service/info.rs index 7ebd435..5fbf271 100644 --- a/src/service/info.rs +++ b/src/service/info.rs @@ -8,7 +8,7 @@ impl crate::server::MyService { ) -> Result, Status> { let public_key_vec = self.public_key.serialize().to_vec(); Ok(Response::new(CoreInfo { - mq_uri: match Self::check_privilege_in(request.metadata(), &["user"]) { + mq_uri: match Self::check_privilege_in(request.metadata(), &["user", "host"]) { Ok(_i) => { let user_id = Self::get_key_from_metadata(request.metadata(), "user_id"); let mq_uri_bytes = self._internal_storage_read(&user_id, "mq_uri").await?; diff --git a/src/service/pom.rs b/src/service/pom.rs index 5a34fb0..66b23c7 100644 --- a/src/service/pom.rs +++ b/src/service/pom.rs @@ -26,13 +26,7 @@ impl crate::server::MyService { if file_name.is_none() || file_name.unwrap() != protocol_name { return Err(Status::invalid_argument("protocol_name is invalid.")); } - let colink_home = if std::env::var("COLINK_HOME").is_ok() { - std::env::var("COLINK_HOME").unwrap() - } else if std::env::var("HOME").is_ok() { - std::env::var("HOME").unwrap() + "/.colink" - } else { - return Err(Status::not_found("colink home not found.")); - }; + let colink_home = self.get_colink_home()?; if !Path::new(&colink_home).join("protocols").exists() { match std::fs::create_dir_all(Path::new(&colink_home).join("protocols")) { Ok(_) => {} @@ -44,6 +38,7 @@ impl crate::server::MyService { .join(protocol_name) .join("colink.toml"); if std::fs::metadata(&path).is_err() { + let _lock = self.pom_fetch_mutex.lock().await; match fetch_protocol_from_inventory(protocol_name, &colink_home).await { Ok(_) => {} Err(err) => { @@ -152,6 +147,13 @@ async fn fetch_protocol_from_inventory( protocol_name: &str, colink_home: &str, ) -> Result<(), String> { + let path = Path::new(&colink_home) + .join("protocols") + .join(protocol_name) + .join("colink.toml"); + if std::fs::metadata(&path).is_ok() { + return Ok(()); + } let url = &format!("{}/{}.toml", PROTOCOL_INVENTORY, protocol_name); let http_client = reqwest::Client::new(); let resp = http_client.get(url).send().await; diff --git a/src/service/subscription.rs b/src/service/subscription.rs index a940383..fb03b75 100644 --- a/src/service/subscription.rs +++ b/src/service/subscription.rs @@ -6,7 +6,7 @@ impl crate::server::MyService { &self, request: Request, ) -> Result, Status> { - Self::check_privilege_in(request.metadata(), &["user"])?; + Self::check_privilege_in(request.metadata(), &["user", "host"])?; let user_id = Self::get_key_from_metadata(request.metadata(), "user_id"); let queue_name = match self .storage @@ -27,7 +27,7 @@ impl crate::server::MyService { &self, request: Request, ) -> Result, Status> { - Self::check_privilege_in(request.metadata(), &["user"])?; + Self::check_privilege_in(request.metadata(), &["user", "host"])?; let user_id = Self::get_key_from_metadata(request.metadata(), "user_id"); match self .storage diff --git a/src/service/task.rs b/src/service/task.rs index 1305c8c..9db3ea1 100644 --- a/src/service/task.rs +++ b/src/service/task.rs @@ -1,9 +1,9 @@ use super::utils::*; use crate::colink_proto::*; pub use colink_registry_proto::UserRecord; -use openssl::sha::sha256; use prost::Message; use secp256k1::{ecdsa::Signature, PublicKey, Secp256k1}; +use sha2::{Digest, Sha256}; use tonic::{Request, Response, Status}; use uuid::Uuid; @@ -388,7 +388,7 @@ impl crate::server::MyService { Ok(()) } - async fn add_task_new_status(&self, user_id: &str, task: &Task) -> Result<(), Status> { + pub async fn add_task_new_status(&self, user_id: &str, task: &Task) -> Result<(), Status> { let protocol_key = if task.status != "started" { task.protocol_name.clone() } else { @@ -548,7 +548,10 @@ impl crate::server::MyService { .unwrap(); msg.extend_from_slice(&verify_decision_bytes); msg.extend_from_slice(&user_consent_bytes); - let verify_signature = secp256k1::Message::from_slice(&sha256(&msg)).unwrap(); + let mut hasher = Sha256::new(); + hasher.update(&msg); + let sha256 = hasher.finalize(); + let verify_signature = secp256k1::Message::from_slice(&sha256).unwrap(); let secp = Secp256k1::new(); match secp.verify_ecdsa(&verify_signature, &signature, &core_public_key) { Ok(_) => {} @@ -578,7 +581,7 @@ impl crate::server::MyService { } } - async fn generate_decision( + pub async fn generate_decision( &self, is_approved: bool, is_rejected: bool, @@ -602,9 +605,12 @@ impl crate::server::MyService { decision.encode(&mut decision_bytes).unwrap(); msg.extend_from_slice(&decision_bytes); msg.extend_from_slice(&user_consent_bytes); + let mut hasher = Sha256::new(); + hasher.update(&msg); + let sha256 = hasher.finalize(); let secp = Secp256k1::new(); let signature = secp.sign_ecdsa( - &secp256k1::Message::from_slice(&sha256(&msg)).unwrap(), + &secp256k1::Message::from_slice(&sha256).unwrap(), &self.secret_key, ); decision.user_consent = Some(Message::decode(&*user_consent_bytes).unwrap()); diff --git a/src/service/user_init.rs b/src/service/user_init.rs new file mode 100644 index 0000000..14c4f9c --- /dev/null +++ b/src/service/user_init.rs @@ -0,0 +1,121 @@ +use super::utils::*; +use crate::{colink_proto::*, server::MyService}; +use std::{path::Path, sync::Arc}; +use toml::Value; + +pub async fn user_init( + service: Arc, + user_id: &str, + user_jwt: &str, +) -> Result<(), Box> { + let colink_home = service.get_colink_home()?; + let mut path = Path::new(&colink_home).join("user_init_config.toml"); + if std::fs::metadata(&path).is_err() { + path = Path::new("user_init_config.template.toml").to_path_buf(); + } + let toml = match std::fs::read_to_string(&path).unwrap().parse::() { + Ok(toml) => toml, + Err(err) => Err(err.to_string())?, + }; + let protocols = toml.as_table().unwrap().clone(); + let mut handles = vec![]; + for (protocol_name, init_param) in protocols { + if init_param.get("operator_num").is_some() + && init_param["operator_num"].as_integer().unwrap() > 0 + { + let service = service.clone(); + let user_id = user_id.to_string(); + let user_jwt = user_jwt.to_string(); + handles.push(tokio::spawn(async move { + init_protocol(&service, &user_id, &user_jwt, &protocol_name, &init_param).await + })); + } + } + for handle in handles { + handle.await??; + } + service + ._internal_storage_update(user_id, "_is_initialized", &[1]) + .await?; + Ok(()) +} + +async fn init_protocol( + service: &MyService, + user_id: &str, + user_jwt: &str, + protocol_name: &str, + init_param: &Value, +) -> Result<(), Box> { + if init_param.get("start_after").is_some() { + let start_after_list = init_param["start_after"].as_array().unwrap(); + for p in start_after_list { + wait_protocol_initialization(service, user_id, p.as_str().unwrap()).await?; + } + } + if init_param.get("create_entry").is_some() { + let entries = init_param["create_entry"].as_array().unwrap(); + for entry in entries { + service + ._user_storage_update( + user_id, + entry["key_name"].as_str().unwrap(), + entry["value"].as_str().unwrap().as_bytes(), + ) + .await?; + } + } + let is_initialized_key = format!("_internal:protocols:{}:_is_initialized", protocol_name); + service + ._user_storage_update(user_id, &is_initialized_key, &[0]) + .await?; + for _ in 0..init_param["operator_num"].as_integer().unwrap() { + _start_protocol_operator(service, user_id, user_jwt, protocol_name).await?; + } + wait_protocol_initialization(service, user_id, protocol_name).await?; + Ok(()) +} + +async fn wait_protocol_initialization( + service: &MyService, + user_id: &str, + protocol_name: &str, +) -> Result<(), Box> { + let is_initialized_key = format!("_internal:protocols:{}:_is_initialized", protocol_name); + loop { + let res = service + ._user_storage_read(user_id, &is_initialized_key) + .await; + if res.is_ok() && res.unwrap()[0] == 1 { + break; + } + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + Ok(()) +} + +async fn _start_protocol_operator( + service: &MyService, + user_id: &str, + user_jwt: &str, + protocol_name: &str, +) -> Result<(), Box> { + let mut req = generate_request( + user_jwt, + ProtocolOperatorInstance { + protocol_name: protocol_name.to_string(), + user_id: user_id.to_string(), + ..Default::default() + }, + ); + req.metadata_mut().insert( + "privilege", + tonic::metadata::MetadataValue::from_static("user"), + ); + req.metadata_mut().insert( + "user_id", + tonic::metadata::MetadataValue::try_from(user_id).unwrap(), + ); + service._start_protocol_operator(req).await?; + Ok(()) +} diff --git a/src/service/utils.rs b/src/service/utils.rs index e0467f1..cadf581 100644 --- a/src/service/utils.rs +++ b/src/service/utils.rs @@ -1,6 +1,6 @@ use crate::colink_proto::{co_link_client::CoLinkClient, UserConsent}; -use openssl::sha::sha256; use secp256k1::{ecdsa::Signature, PublicKey, Secp256k1}; +use sha2::{Digest, Sha256}; use tonic::{ metadata::{MetadataMap, MetadataValue}, transport::{Certificate, Channel, ClientTlsConfig, Identity}, @@ -147,6 +147,35 @@ impl crate::server::MyService { Ok(payload.to_vec()) } + pub async fn _user_storage_update( + &self, + user_id: &str, + key_name: &str, + payload: &[u8], + ) -> Result { + match self.storage.update(user_id, key_name, payload).await { + Ok(key_path) => Ok(key_path), + Err(e) => Err(Status::internal(e)), + } + } + + pub async fn _user_storage_read( + &self, + user_id: &str, + key_name: &str, + ) -> Result, Status> { + let entries = match self + .storage + .read_from_key_names(user_id, &[key_name.to_owned()]) + .await + { + Ok(entries) => entries, + Err(e) => return Err(Status::internal(e)), + }; + let payload = entries.values().next().unwrap(); + Ok(payload.to_vec()) + } + pub fn check_user_consent( &self, user_consent: &UserConsent, @@ -177,8 +206,10 @@ impl crate::server::MyService { user_public_key_vec.extend_from_slice(&user_consent_signature_timestamp.to_le_bytes()); user_public_key_vec.extend_from_slice(&user_consent_expiration_timestamp.to_le_bytes()); user_public_key_vec.extend_from_slice(core_public_key_vec); - let verify_consent_signature = - secp256k1::Message::from_slice(&sha256(&user_public_key_vec)).unwrap(); + let mut hasher = Sha256::new(); + hasher.update(&user_public_key_vec); + let sha256 = hasher.finalize(); + let verify_consent_signature = secp256k1::Message::from_slice(&sha256).unwrap(); let secp = Secp256k1::new(); match secp.verify_ecdsa( &verify_consent_signature, @@ -199,6 +230,17 @@ impl crate::server::MyService { pub fn get_host_id(&self) -> String { hex::encode(&self.public_key.serialize()) } + + pub fn get_colink_home(&self) -> Result { + let colink_home = if std::env::var("COLINK_HOME").is_ok() { + std::env::var("COLINK_HOME").unwrap() + } else if std::env::var("HOME").is_ok() { + std::env::var("HOME").unwrap() + "/.colink" + } else { + return Err(Status::not_found("colink home not found.")); + }; + Ok(colink_home) + } } pub fn generate_request(jwt: &str, data: T) -> tonic::Request { diff --git a/src/subscription/mq.rs b/src/subscription/mq.rs index 3161e93..d3c90fe 100644 --- a/src/subscription/mq.rs +++ b/src/subscription/mq.rs @@ -2,6 +2,7 @@ use crate::colink_proto::SubscriptionMessage; use crate::mq::common::MQ; use crate::storage::common::Storage; use prost::Message; +use sha2::{Digest, Sha256}; use std::collections::HashMap; use tokio::sync::{Mutex, RwLock}; @@ -86,8 +87,16 @@ impl crate::subscription::common::StorageWithSubscription for StorageWithMQSubsc .lock() .await .insert(queue_name.clone(), user_id_key_name.clone()); + let routing_key = if user_id_key_name.len() > 200 { + let mut hasher = Sha256::new(); + hasher.update(user_id_key_name.as_bytes()); + let sha256 = hasher.finalize(); + format!("sha256:{}", hex::encode(sha256)) + } else { + user_id_key_name.clone() + }; self.mq - .queue_bind(&mq_uri, &queue_name, &user_id_key_name) + .queue_bind(&mq_uri, &queue_name, &routing_key) .await?; let key_list = self .storage @@ -190,8 +199,16 @@ impl StorageWithMQSubscription { let mut payload = vec![]; message.encode(&mut payload).unwrap(); let mq_uri = self.get_mq_uri(user_id).await?; + let routing_key = if user_id_key_name.len() > 200 { + let mut hasher = Sha256::new(); + hasher.update(user_id_key_name.as_bytes()); + let sha256 = hasher.finalize(); + format!("sha256:{}", hex::encode(sha256)) + } else { + user_id_key_name.clone() + }; self.mq - .publish_message(&mq_uri, &user_id_key_name, &payload) + .publish_message(&mq_uri, &routing_key, &payload) .await?; } Ok(()) diff --git a/tests/grpc_service_storage.rs b/tests/grpc_service_storage.rs index 223084c..2d371fb 100644 --- a/tests/grpc_service_storage.rs +++ b/tests/grpc_service_storage.rs @@ -6,8 +6,8 @@ use ::colink_server::server::init_and_run_server; use chrono::Duration; use colink_proto::co_link_client::CoLinkClient; use colink_proto::*; -use openssl::sha::sha256; use secp256k1::{All, Message, PublicKey, Secp256k1, SecretKey}; +use sha2::{Digest, Sha256}; use tonic::metadata::MetadataValue; use tonic::transport::Channel; use tonic::{Response, Status}; @@ -31,7 +31,10 @@ async fn send_import_user_request( msg.extend_from_slice(×tamp.to_le_bytes()); msg.extend_from_slice(&(timestamp + Duration::hours(24).num_seconds()).to_le_bytes()); msg.extend_from_slice(&core_pub_key.serialize()); - let signature = secp.sign_ecdsa(&Message::from_slice(&sha256(&msg)).unwrap(), &secret_key); + let mut hasher = Sha256::new(); + hasher.update(&msg); + let sha256 = hasher.finalize(); + let signature = secp.sign_ecdsa(&Message::from_slice(&sha256).unwrap(), &secret_key); let mut request = tonic::Request::new(UserConsent { public_key: public_key_vec.to_vec(), diff --git a/user_init_config.template.toml b/user_init_config.template.toml new file mode 100644 index 0000000..d933f71 --- /dev/null +++ b/user_init_config.template.toml @@ -0,0 +1,21 @@ +[policy_module] +operator_num = 1 +[[policy_module.create_entry]] +key_name = "_policy_module:init:accept_all_tasks" +value = "true" + +[remote_storage] +operator_num = 1 + +[remote_command] +operator_num = 1 + +[registry] +operator_num = 1 +start_after = [ "policy_module", "remote_storage" ] +[[registry.create_entry]] +key_name = "_registry:init:registry_addr" +value = "https://test.registry.colearn.cloud" +[[registry.create_entry]] +key_name = "_registry:init:registry_jwt" +value = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJwcml2aWxlZ2UiOiJndWVzdCIsInVzZXJfaWQiOiIwMzNmMTFjNGJhZGEzMzhhZWZkNjExYmE5ZmFjNWU1NmMwNmQ3NjFjNTYwNzFkYjVmYmQyYjFiNTBiMjg3MDkwYjUiLCJleHAiOjE2OTYyODI4MDh9.aiS-cEkTKhCUlNKwZqQFlGQgXIG8R6jK5tpDY35IHBo"