Skip to content

Commit

Permalink
Merge pull request #90 from EspressoSystems/nfy/dont_overwrite_global…
Browse files Browse the repository at this point in the history
…_response

Avoid potentially overwriting with entry(..).or_insert_with(...)
Reorder update of storage and response, with requisite rewrite of locking.
  • Loading branch information
nyospe authored Apr 23, 2024
2 parents 59f2f9e + b05760a commit 8dc46a8
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 46 deletions.
7 changes: 3 additions & 4 deletions src/builder_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,15 +761,13 @@ impl<TYPES: NodeType> BuilderProgress<TYPES> for BuilderState<TYPES> {
response.builder_hash
);

// // form the response message and send it back
// form the response message
let response_msg = ResponseMessage {
builder_hash: response.builder_hash.clone(),
block_size: response.block_size,
offered_fee: response.offered_fee,
};

self.response_sender.send(response_msg).await.unwrap();

// write to global state as well
// only write if the entry does not exist
if self
Expand All @@ -795,7 +793,8 @@ impl<TYPES: NodeType> BuilderProgress<TYPES> for BuilderState<TYPES> {
});
}

// self.response_sender.send(response_msg).await.unwrap();
// ... and finally, send the response
self.response_sender.send(response_msg).await.unwrap();
}
None => {
tracing::warn!("No response to send");
Expand Down
124 changes: 86 additions & 38 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,20 @@ use hotshot_events_service::{
events_source::{BuilderEvent, BuilderEventType},
};
use sha2::{Digest, Sha256};
use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Duration;
use std::{
collections::{HashMap, HashSet},
ops::Deref,
};
use tagged_base64::TaggedBase64;
use tide_disco::method::ReadState;

#[allow(clippy::type_complexity)]
#[derive(Debug)]
pub struct GlobalState<Types: NodeType> {
// identity keys for the builder
// May be ideal place as GlobalState interacts with hotshot apis
// and then can sign on responders as desired
pub builder_keys: (
Types::BuilderSignatureKey, // pub key
<<Types as NodeType>::BuilderSignatureKey as BuilderSignatureKey>::BuilderPrivateKey, // private key
),
// data store for the blocks
pub block_hash_to_block: HashMap<
BuilderCommitment,
Expand All @@ -87,37 +83,27 @@ pub struct GlobalState<Types: NodeType> {

// Instance state
pub instance_state: Types::InstanceState,

// max waiting time to serve first api request
pub max_api_waiting_time: Duration,
}

impl<Types: NodeType> GlobalState<Types> {
#[allow(clippy::too_many_arguments)]
pub fn new(
builder_keys: (
Types::BuilderSignatureKey,
<<Types as NodeType>::BuilderSignatureKey as BuilderSignatureKey>::BuilderPrivateKey,
),
request_sender: BroadcastSender<MessageType<Types>>,
response_receiver: UnboundedReceiver<ResponseMessage>,
tx_sender: BroadcastSender<MessageType<Types>>,
instance_state: Types::InstanceState,
bootstrapped_builder_state_id: VidCommitment,
bootstrapped_view_num: Types::Time,
max_api_waiting_time: Duration,
) -> Self {
let mut spawned_builder_states = HashMap::new();
spawned_builder_states.insert(bootstrapped_builder_state_id, bootstrapped_view_num);
GlobalState {
builder_keys,
block_hash_to_block: Default::default(),
spawned_builder_states,
request_sender,
response_receiver,
tx_sender,
instance_state,
max_api_waiting_time,
}
}

Expand Down Expand Up @@ -165,11 +151,44 @@ impl<Types: NodeType> GlobalState<Types> {
}
}

pub struct ProxyGlobalState<Types: NodeType> {
// global state
global_state: Arc<RwLock<GlobalState<Types>>>,

// identity keys for the builder
// May be ideal place as GlobalState interacts with hotshot apis
// and then can sign on responders as desired
builder_keys: (
Types::BuilderSignatureKey, // pub key
<<Types as NodeType>::BuilderSignatureKey as BuilderSignatureKey>::BuilderPrivateKey, // private key
),

// max waiting time to serve first api request
max_api_waiting_time: Duration,
}

impl<Types: NodeType> ProxyGlobalState<Types> {
pub fn new(
global_state: Arc<RwLock<GlobalState<Types>>>,
builder_keys: (
Types::BuilderSignatureKey,
<<Types as NodeType>::BuilderSignatureKey as BuilderSignatureKey>::BuilderPrivateKey,
),
max_api_waiting_time: Duration,
) -> Self {
ProxyGlobalState {
global_state,
builder_keys,
max_api_waiting_time,
}
}
}

/*
Handling Builder API responses
*/
#[async_trait]
impl<Types: NodeType> BuilderDataSource<Types> for GlobalState<Types>
impl<Types: NodeType> BuilderDataSource<Types> for ProxyGlobalState<Types>
where
for<'a> <<Types::SignatureKey as SignatureKey>::PureAssembledSignatureType as TryFrom<
&'a TaggedBase64,
Expand All @@ -191,7 +210,13 @@ where

let mut bootstrapped_state_build_block = false;
// check in the local spawned builder states, if it doesn't exist, and then let bootstrapped build a block for it
if !self.spawned_builder_states.contains_key(for_parent) {
if !self
.global_state
.read_arc()
.await
.spawned_builder_states
.contains_key(for_parent)
{
bootstrapped_state_build_block = true;
}

Expand All @@ -205,7 +230,10 @@ where
req_msg.requested_vid_commitment
);

self.request_sender
self.global_state
.read_arc()
.await
.request_sender
.broadcast(MessageType::RequestMessage(req_msg.clone()))
.await
.unwrap();
Expand All @@ -217,16 +245,17 @@ where

let response_received = async_compatibility_layer::art::async_timeout(
self.max_api_waiting_time,
self.response_receiver.recv(),
self.global_state.read_arc().await.response_receiver.recv(),
)
.await;

match response_received {
// We got available blocks
Ok(Ok(response)) => {
let (pub_key, sign_key) = self.builder_keys.clone();
// sign over the block info
let signature_over_block_info =
<Types as NodeType>::BuilderSignatureKey::sign_block_info(
&self.builder_keys.1,
&sign_key,
response.block_size,
response.offered_fee,
&response.builder_hash,
Expand All @@ -239,7 +268,7 @@ where
block_size: response.block_size,
offered_fee: response.offered_fee,
signature: signature_over_block_info,
sender: self.builder_keys.0.clone(),
sender: pub_key.clone(),
_phantom: Default::default(),
};
tracing::info!(
Expand Down Expand Up @@ -284,21 +313,28 @@ where
message: "Signature validation failed in claim block".to_string(),
});
}
if let Some(block) = self.block_hash_to_block.get(block_hash) {
let (pub_key, sign_key) = self.builder_keys.clone();
if let Some(block) = self
.global_state
.read_arc()
.await
.block_hash_to_block
.get(block_hash)
{
// sign over the builder commitment, as the proposer can computer it based on provide block_payload
// and the metata data
let response_block_hash = block.0.builder_commitment(&block.1);
let signature_over_builder_commitment =
<Types as NodeType>::BuilderSignatureKey::sign_builder_message(
&self.builder_keys.1,
&sign_key,
response_block_hash.as_ref(),
)
.expect("Claim block signing failed");
let block_data = AvailableBlockData::<Types> {
block_payload: block.0.clone(),
metadata: block.1.clone(),
signature: signature_over_builder_commitment,
sender: self.builder_keys.0.clone(),
sender: pub_key.clone(),
};
tracing::info!(
"Sending claimed block data for block hash: {:?}",
Expand Down Expand Up @@ -329,18 +365,25 @@ where
message: "Signature validation failed in claim block header input".to_string(),
});
}
if let Some(block) = self.block_hash_to_block.get(block_hash) {
let (pub_key, sign_key) = self.builder_keys.clone();
if let Some(block) = self
.global_state
.read_arc()
.await
.block_hash_to_block
.get(block_hash)
{
tracing::debug!("Waiting for vid commitment for block {:?}", block_hash);
let (vid_commitment, vid_precompute_data) = block.2.write().await.get().await?;
let signature_over_vid_commitment =
<Types as NodeType>::BuilderSignatureKey::sign_builder_message(
&self.builder_keys.1,
&sign_key,
vid_commitment.as_ref(),
)
.expect("Claim block header input message signing failed");

let signature_over_fee_info = Types::BuilderSignatureKey::sign_fee(
&self.builder_keys.1,
&sign_key,
block.3,
block_hash,
&vid_commitment,
Expand All @@ -352,7 +395,7 @@ where
vid_precompute_data,
fee_signature: signature_over_fee_info,
message_signature: signature_over_vid_commitment,
sender: self.builder_keys.0.clone(),
sender: pub_key.clone(),
};
tracing::info!(
"Sending claimed block header input response for block hash: {:?}",
Expand All @@ -373,29 +416,34 @@ where
}
}
#[async_trait]
impl<Types: NodeType> AcceptsTxnSubmits<Types> for GlobalState<Types> {
impl<Types: NodeType> AcceptsTxnSubmits<Types> for ProxyGlobalState<Types> {
async fn submit_txn(
&mut self,
txn: <Types as NodeType>::Transaction,
) -> Result<(), BuildError> {
tracing::debug!("Submitting transaction to the builder states{:?}", txn);
let response = self.submit_client_txn(txn).await;
tracing::debug!(
let response = self
.global_state
.read_arc()
.await
.submit_client_txn(txn)
.await;
tracing::info!(
"Transaction submitted to the builder states, sending response: {:?}",
response
);
response
}
}
#[async_trait]
impl<Types: NodeType> ReadState for GlobalState<Types> {
type State = Self;
impl<Types: NodeType> ReadState for ProxyGlobalState<Types> {
type State = GlobalState<Types>;

async fn read<T>(
&self,
op: impl Send + for<'a> FnOnce(&'a Self::State) -> BoxFuture<'a, T> + 'async_trait,
) -> T {
op(self).await
op(self.global_state.read_arc().await.deref()).await
}
}

Expand Down
5 changes: 1 addition & 4 deletions src/testing/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ mod tests {
use committable::{Commitment, CommitmentBoundsArkless, Committable};
use sha2::{Digest, Sha256};
use std::sync::Arc;
use std::time::Duration;

use serde::{Deserialize, Serialize};
/// This test simulates multiple builder states receiving messages from the channels and processing them
Expand Down Expand Up @@ -104,18 +103,16 @@ mod tests {

// generate the keys for the buidler
let seed = [201_u8; 32];
let (builder_pub_key, builder_private_key) =
let (_builder_pub_key, _builder_private_key) =
BLSPubKey::generated_from_seed_indexed(seed, 2011_u64);
// instantiate the global state also
let global_state = GlobalState::<TestTypes>::new(
(builder_pub_key, builder_private_key),
req_sender,
res_receiver,
tx_sender.clone(),
TestInstanceState {},
vid_commitment(&vec![], 8),
ViewNumber::new(0),
Duration::from_millis(10),
);

// to store all the sent messages
Expand Down

0 comments on commit 8dc46a8

Please sign in to comment.