Skip to content

Commit

Permalink
minor updates
Browse files Browse the repository at this point in the history
  • Loading branch information
bharath-123 committed Oct 29, 2024
1 parent 9f2df92 commit 61988df
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ pub(crate) async fn initialize_app_with_storage(
let snapshot = storage.latest_snapshot();
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics, 100);
let mut app = App::new(snapshot, mempool, metrics).await.unwrap();
let mut app = App::new(snapshot, mempool, None, metrics).await.unwrap();

let genesis_state = genesis_state.unwrap_or_else(self::genesis_state);

Expand Down
41 changes: 22 additions & 19 deletions crates/astria-sequencer/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use astria_core::{
v1::block::SequencerBlock,
v1alpha1::optimistic_block::SequencerBlockCommit,
},
Protobuf as _,
};
use astria_eyre::{
anyhow_to_eyre,
Expand Down Expand Up @@ -203,6 +204,22 @@ impl OptimisticBlockChannels {
pub(crate) fn committed_block_sender(&self) -> Sender<Option<SequencerBlockCommit>> {
self.committed_block_sender.clone()
}

pub(crate) fn send_optimistic_block(&self, block: Option<SequencerBlock>) {
if self.optimistic_block_sender.receiver_count() > 0 {
if let Err(e) = self.optimistic_block_sender.send(block) {
error!(error = %e, "failed to send optimistic block");
}
}
}

pub(crate) fn send_committed_block(&self, block: Option<SequencerBlockCommit>) {
if self.committed_block_sender.receiver_count() > 0 {
if let Err(e) = self.committed_block_sender.send(block) {
error!(error = %e, "failed to send committed block");
}
}
}
}

/// The Sequencer application, written as a bundle of [`Component`]s.
Expand Down Expand Up @@ -485,12 +502,7 @@ impl App {
.wrap_err("failed to run post execute transactions handler")?;

if let Some(optimistic_block_channels) = &self.optimistic_block_channels {
if let Err(e) = optimistic_block_channels
.optimistic_block_sender()
.send(Some(sequencer_block))
{
error!(error = %e, "failed to send sequencer block to optimistic block sender");
}
optimistic_block_channels.send_optimistic_block(Some(sequencer_block));
}

return Ok(());
Expand Down Expand Up @@ -595,12 +607,7 @@ impl App {
.wrap_err("failed to run post execute transactions handler")?;

if let Some(optimistic_block_channels) = &self.optimistic_block_channels {
if let Err(e) = optimistic_block_channels
.optimistic_block_sender()
.send(Some(sequencer_block))
{
error!(error = %e, "failed to send sequencer block to optimistic block sender");
}
optimistic_block_channels.send_optimistic_block(Some(sequencer_block));
}

Ok(())
Expand Down Expand Up @@ -1135,17 +1142,13 @@ impl App {
tx_results: post_transaction_execution_result.tx_results,
};

if let Some(obc) = &self.optimistic_block_channels {
if let Some(optimistic_block_channels) = &self.optimistic_block_channels {
let Hash::Sha256(block_hash) = block_hash else {
bail!("block hash is empty; this should not occur")
};

if let Err(e) = obc
.committed_block_sender
.send(Some(SequencerBlockCommit::new(height.value(), block_hash)))
{
error!(error = %e, "failed to send committed block to optimistic block sender");
};
optimistic_block_channels
.send_committed_block(Some(SequencerBlockCommit::new(height.value(), block_hash)));
}

Ok(finalize_block)
Expand Down
91 changes: 45 additions & 46 deletions crates/astria-sequencer/src/grpc/optimistic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ use tonic::{
Response,
Status,
};
use tracing::{
debug,
info,
};
use tracing::error;

use crate::app::OptimisticBlockChannels;

Expand Down Expand Up @@ -74,31 +71,32 @@ impl OptimisticBlockService for OptimisticBlockServer {

tokio::spawn(async move {
loop {
while let Ok(()) = optimistic_block_receiver.changed().await {
let optimistic_block = optimistic_block_receiver
.borrow_and_update()
.clone()
.expect("received an invalid optimistic block");

let filtered_optimistic_block =
optimistic_block.to_filtered_block(vec![rollup_id]);
let raw_filtered_optimistic_block = filtered_optimistic_block.into_raw();

let get_optimistic_block_stream_response = GetOptimisticBlockStreamResponse {
block: Some(raw_filtered_optimistic_block),
};

match tx.send(Ok(get_optimistic_block_stream_response)).await {
Ok(()) => {
debug!("sent optimistic block");
}
Err(_item) => {
info!("receiver for optimistic block has been dropped");
match optimistic_block_receiver.changed().await {
Ok(()) => {
let optimistic_block = optimistic_block_receiver
.borrow_and_update()
.clone()
.expect("received an invalid optimistic block");

let filtered_optimistic_block =
optimistic_block.to_filtered_block(vec![rollup_id]);
let raw_filtered_optimistic_block = filtered_optimistic_block.into_raw();

let get_optimistic_block_stream_response =
GetOptimisticBlockStreamResponse {
block: Some(raw_filtered_optimistic_block),
};

if let Err(e) = tx.send(Ok(get_optimistic_block_stream_response)).await {
error!(error = %e, "receiver for optimistic block has been dropped");
break;
}
};
};
}
Err(e) => {
error!(error = %e, "optimistic block sender has been dropped");
break;
}
}
debug!("optimistic block sender has dropped");
}
});

Expand All @@ -121,27 +119,28 @@ impl OptimisticBlockService for OptimisticBlockServer {

tokio::spawn(async move {
loop {
while let Ok(()) = committed_block_receiver.changed().await {
let sequencer_block_commit = committed_block_receiver
.borrow_and_update()
.clone()
.expect("received an invalid sequencer block commit");

let get_block_commitment_stream_response = GetBlockCommitmentStreamResponse {
commitment: Some(sequencer_block_commit.to_raw()),
};

match tx.send(Ok(get_block_commitment_stream_response)).await {
Ok(()) => {
debug!("sent block commitment");
}
Err(_item) => {
debug!("receiver for block commitment failed");
match committed_block_receiver.changed().await {
Ok(()) => {
let sequencer_block_commit = committed_block_receiver
.borrow_and_update()
.clone()
.expect("received an invalid sequencer block commit");

let get_block_commitment_stream_response =
GetBlockCommitmentStreamResponse {
commitment: Some(sequencer_block_commit.to_raw()),
};

if let Err(e) = tx.send(Ok(get_block_commitment_stream_response)).await {
error!(error = %e, "receiver for block commitment failed");
break;
}
};
};
}
Err(e) => {
error!(error = %e, "committed block sender has been dropped");
break;
}
}
debug!("commited block sender has dropped");
}
});

Expand Down
9 changes: 4 additions & 5 deletions crates/astria-sequencer/src/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,17 @@ impl Sequencer {

let mempool = Mempool::new(metrics, config.mempool_parked_max_tx_count);

let optimistic_block_channels = if config.no_optimistic_block {
let mut optimistic_block_channels: Option<OptimisticBlockChannels> = None;
if !config.no_optimistic_block {
let (optimistic_block_sender, _) =
tokio::sync::watch::channel::<Option<SequencerBlock>>(None);
let (committed_block_sender, _) =
tokio::sync::watch::channel::<Option<SequencerBlockCommit>>(None);

Some(OptimisticBlockChannels::new(
optimistic_block_channels = Some(OptimisticBlockChannels::new(
optimistic_block_sender,
committed_block_sender,
))
} else {
None
));
};

let app = App::new(
Expand Down

0 comments on commit 61988df

Please sign in to comment.