diff --git a/src/builder_state.rs b/src/builder_state.rs index f9e96230..a069be6c 100644 --- a/src/builder_state.rs +++ b/src/builder_state.rs @@ -761,15 +761,13 @@ impl BuilderProgress for BuilderState { response.builder_hash ); - // // form the response message and send it back + // form the response message let response_msg = ResponseMessage { builder_hash: response.builder_hash.clone(), block_size: response.block_size, offered_fee: response.offered_fee, }; - self.response_sender.send(response_msg).await.unwrap(); - // write to global state as well // only write if the entry does not exist if self @@ -795,7 +793,8 @@ impl BuilderProgress for BuilderState { }); } - // self.response_sender.send(response_msg).await.unwrap(); + // ... and finally, send the response + self.response_sender.send(response_msg).await.unwrap(); } None => { tracing::warn!("No response to send"); diff --git a/src/service.rs b/src/service.rs index 3d09afb1..a23b253c 100644 --- a/src/service.rs +++ b/src/service.rs @@ -43,24 +43,20 @@ use hotshot_events_service::{ events_source::{BuilderEvent, BuilderEventType}, }; use sha2::{Digest, Sha256}; -use std::collections::{HashMap, HashSet}; use std::fmt::Display; use std::num::NonZeroUsize; use std::sync::Arc; use std::time::Duration; +use std::{ + collections::{HashMap, HashSet}, + ops::Deref, +}; use tagged_base64::TaggedBase64; use tide_disco::method::ReadState; #[allow(clippy::type_complexity)] #[derive(Debug)] pub struct GlobalState { - // identity keys for the builder - // May be ideal place as GlobalState interacts with hotshot apis - // and then can sign on responders as desired - pub builder_keys: ( - Types::BuilderSignatureKey, // pub key - <::BuilderSignatureKey as BuilderSignatureKey>::BuilderPrivateKey, // private key - ), // data store for the blocks pub block_hash_to_block: HashMap< BuilderCommitment, @@ -87,37 +83,27 @@ pub struct GlobalState { // Instance state pub instance_state: Types::InstanceState, - - // max waiting time to serve first api request - pub max_api_waiting_time: Duration, } impl GlobalState { #[allow(clippy::too_many_arguments)] pub fn new( - builder_keys: ( - Types::BuilderSignatureKey, - <::BuilderSignatureKey as BuilderSignatureKey>::BuilderPrivateKey, - ), request_sender: BroadcastSender>, response_receiver: UnboundedReceiver, tx_sender: BroadcastSender>, instance_state: Types::InstanceState, bootstrapped_builder_state_id: VidCommitment, bootstrapped_view_num: Types::Time, - max_api_waiting_time: Duration, ) -> Self { let mut spawned_builder_states = HashMap::new(); spawned_builder_states.insert(bootstrapped_builder_state_id, bootstrapped_view_num); GlobalState { - builder_keys, block_hash_to_block: Default::default(), spawned_builder_states, request_sender, response_receiver, tx_sender, instance_state, - max_api_waiting_time, } } @@ -165,11 +151,44 @@ impl GlobalState { } } +pub struct ProxyGlobalState { + // global state + global_state: Arc>>, + + // identity keys for the builder + // May be ideal place as GlobalState interacts with hotshot apis + // and then can sign on responders as desired + builder_keys: ( + Types::BuilderSignatureKey, // pub key + <::BuilderSignatureKey as BuilderSignatureKey>::BuilderPrivateKey, // private key + ), + + // max waiting time to serve first api request + max_api_waiting_time: Duration, +} + +impl ProxyGlobalState { + pub fn new( + global_state: Arc>>, + builder_keys: ( + Types::BuilderSignatureKey, + <::BuilderSignatureKey as BuilderSignatureKey>::BuilderPrivateKey, + ), + max_api_waiting_time: Duration, + ) -> Self { + ProxyGlobalState { + global_state, + builder_keys, + max_api_waiting_time, + } + } +} + /* Handling Builder API responses */ #[async_trait] -impl BuilderDataSource for GlobalState +impl BuilderDataSource for ProxyGlobalState where for<'a> <::PureAssembledSignatureType as TryFrom< &'a TaggedBase64, @@ -191,7 +210,13 @@ where let mut bootstrapped_state_build_block = false; // check in the local spawned builder states, if it doesn't exist, and then let bootstrapped build a block for it - if !self.spawned_builder_states.contains_key(for_parent) { + if !self + .global_state + .read_arc() + .await + .spawned_builder_states + .contains_key(for_parent) + { bootstrapped_state_build_block = true; } @@ -205,7 +230,10 @@ where req_msg.requested_vid_commitment ); - self.request_sender + self.global_state + .read_arc() + .await + .request_sender .broadcast(MessageType::RequestMessage(req_msg.clone())) .await .unwrap(); @@ -217,16 +245,17 @@ where let response_received = async_compatibility_layer::art::async_timeout( self.max_api_waiting_time, - self.response_receiver.recv(), + self.global_state.read_arc().await.response_receiver.recv(), ) .await; match response_received { - // We got available blocks Ok(Ok(response)) => { + let (pub_key, sign_key) = self.builder_keys.clone(); + // sign over the block info let signature_over_block_info = ::BuilderSignatureKey::sign_block_info( - &self.builder_keys.1, + &sign_key, response.block_size, response.offered_fee, &response.builder_hash, @@ -239,7 +268,7 @@ where block_size: response.block_size, offered_fee: response.offered_fee, signature: signature_over_block_info, - sender: self.builder_keys.0.clone(), + sender: pub_key.clone(), _phantom: Default::default(), }; tracing::info!( @@ -284,13 +313,20 @@ where message: "Signature validation failed in claim block".to_string(), }); } - if let Some(block) = self.block_hash_to_block.get(block_hash) { + let (pub_key, sign_key) = self.builder_keys.clone(); + if let Some(block) = self + .global_state + .read_arc() + .await + .block_hash_to_block + .get(block_hash) + { // sign over the builder commitment, as the proposer can computer it based on provide block_payload // and the metata data let response_block_hash = block.0.builder_commitment(&block.1); let signature_over_builder_commitment = ::BuilderSignatureKey::sign_builder_message( - &self.builder_keys.1, + &sign_key, response_block_hash.as_ref(), ) .expect("Claim block signing failed"); @@ -298,7 +334,7 @@ where block_payload: block.0.clone(), metadata: block.1.clone(), signature: signature_over_builder_commitment, - sender: self.builder_keys.0.clone(), + sender: pub_key.clone(), }; tracing::info!( "Sending claimed block data for block hash: {:?}", @@ -329,18 +365,25 @@ where message: "Signature validation failed in claim block header input".to_string(), }); } - if let Some(block) = self.block_hash_to_block.get(block_hash) { + let (pub_key, sign_key) = self.builder_keys.clone(); + if let Some(block) = self + .global_state + .read_arc() + .await + .block_hash_to_block + .get(block_hash) + { tracing::debug!("Waiting for vid commitment for block {:?}", block_hash); let (vid_commitment, vid_precompute_data) = block.2.write().await.get().await?; let signature_over_vid_commitment = ::BuilderSignatureKey::sign_builder_message( - &self.builder_keys.1, + &sign_key, vid_commitment.as_ref(), ) .expect("Claim block header input message signing failed"); let signature_over_fee_info = Types::BuilderSignatureKey::sign_fee( - &self.builder_keys.1, + &sign_key, block.3, block_hash, &vid_commitment, @@ -352,7 +395,7 @@ where vid_precompute_data, fee_signature: signature_over_fee_info, message_signature: signature_over_vid_commitment, - sender: self.builder_keys.0.clone(), + sender: pub_key.clone(), }; tracing::info!( "Sending claimed block header input response for block hash: {:?}", @@ -373,14 +416,19 @@ where } } #[async_trait] -impl AcceptsTxnSubmits for GlobalState { +impl AcceptsTxnSubmits for ProxyGlobalState { async fn submit_txn( &mut self, txn: ::Transaction, ) -> Result<(), BuildError> { tracing::debug!("Submitting transaction to the builder states{:?}", txn); - let response = self.submit_client_txn(txn).await; - tracing::debug!( + let response = self + .global_state + .read_arc() + .await + .submit_client_txn(txn) + .await; + tracing::info!( "Transaction submitted to the builder states, sending response: {:?}", response ); @@ -388,14 +436,14 @@ impl AcceptsTxnSubmits for GlobalState { } } #[async_trait] -impl ReadState for GlobalState { - type State = Self; +impl ReadState for ProxyGlobalState { + type State = GlobalState; async fn read( &self, op: impl Send + for<'a> FnOnce(&'a Self::State) -> BoxFuture<'a, T> + 'async_trait, ) -> T { - op(self).await + op(self.global_state.read_arc().await.deref()).await } } diff --git a/src/testing/basic_test.rs b/src/testing/basic_test.rs index 31de42a5..361c3a9b 100644 --- a/src/testing/basic_test.rs +++ b/src/testing/basic_test.rs @@ -48,7 +48,6 @@ mod tests { use committable::{Commitment, CommitmentBoundsArkless, Committable}; use sha2::{Digest, Sha256}; use std::sync::Arc; - use std::time::Duration; use serde::{Deserialize, Serialize}; /// This test simulates multiple builder states receiving messages from the channels and processing them @@ -104,18 +103,16 @@ mod tests { // generate the keys for the buidler let seed = [201_u8; 32]; - let (builder_pub_key, builder_private_key) = + let (_builder_pub_key, _builder_private_key) = BLSPubKey::generated_from_seed_indexed(seed, 2011_u64); // instantiate the global state also let global_state = GlobalState::::new( - (builder_pub_key, builder_private_key), req_sender, res_receiver, tx_sender.clone(), TestInstanceState {}, vid_commitment(&vec![], 8), ViewNumber::new(0), - Duration::from_millis(10), ); // to store all the sent messages