Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ctrl-c handling #28

Merged
merged 1 commit into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ serde = { version = "1.0.188", features = ["derive"] }
serde_json = { version = "1.0.107", features = ["std"] }
serde_json_any_key = "2.0.0"
thiserror = "1.0"
tokio = { version = "1.32.0", features = ["macros"] }
tokio = { version = "1.32.0", features = ["macros", "signal"] }
tracing = "0.1.37"
tracing-subscriber = "0.3.17"
zksync_merkle_tree = { git = "https://github.com/matter-labs/zksync-era.git" }
136 changes: 76 additions & 60 deletions src/l1_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use eyre::Result;
use rand::random;
use thiserror::Error;
use tokio::{
sync::{mpsc, Mutex},
sync::{mpsc, oneshot, Mutex},
time::{sleep, Duration},
};

Expand Down Expand Up @@ -126,77 +126,93 @@ impl L1Fetcher {
}
});

{
// NOTE: The channel should close once it goes out of scope we move it here.
let hash_tx = hash_tx;
let mut latest_l2_block_number = U256::zero();

// If an `end_block` was supplied we shouldn't poll for newer blocks.
if end_block.is_some() {
disable_polling = true;
}
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
let main_handle = tokio::spawn({
let provider_clone = self.provider.clone();
let snapshot_clone = self.snapshot.clone();
async move {
let mut latest_l2_block_number = U256::zero();

let end_block_number = end_block.unwrap_or(
self.provider
.get_block(BlockNumber::Latest)
.await?
.unwrap()
.number
.unwrap(),
);

loop {
if disable_polling && current_l1_block_number > end_block_number {
tracing::info!("Successfully reached end block. Shutting down...");
break;
// If an `end_block` was supplied we shouldn't poll for newer blocks.
if end_block.is_some() {
disable_polling = true;
}

// Create a filter showing only `BlockCommit`s from the [`ZK_SYNC_ADDR`].
// TODO: Filter by executed blocks too.
let filter = Filter::new()
.address(ZK_SYNC_ADDR.parse::<Address>()?)
.topic0(event.signature())
.from_block(current_l1_block_number)
.to_block(current_l1_block_number + BLOCK_STEP);

// Grab all relevant logs.
if let Ok(logs) =
L1Fetcher::retry_call(|| self.provider.get_logs(&filter), L1FetchError::GetLogs)
let end_block_number = end_block.unwrap_or(
provider_clone
.get_block(BlockNumber::Latest)
.await
{
for log in logs {
// log.topics:
// topics[1]: L2 block number.
// topics[2]: L2 block hash.
// topics[3]: L2 commitment.

let new_l2_block_number =
U256::from_big_endian(log.topics[1].as_fixed_bytes());
if new_l2_block_number <= latest_l2_block_number {
continue;
}
.unwrap()
.unwrap()
.number
.unwrap(),
);

loop {
// Break when reaching the `end_block` or on the receivement of a `ctrl_c` signal.
if (disable_polling && current_l1_block_number > end_block_number)
|| shutdown_rx.try_recv().is_ok()
{
break;
}

if let Some(tx_hash) = log.transaction_hash {
hash_tx.send(tx_hash).await?;
// Create a filter showing only `BlockCommit`s from the [`ZK_SYNC_ADDR`].
// TODO: Filter by executed blocks too.
let filter = Filter::new()
.address(ZK_SYNC_ADDR.parse::<Address>().unwrap())
.topic0(event.signature())
.from_block(current_l1_block_number)
.to_block(current_l1_block_number + BLOCK_STEP);

// Grab all relevant logs.
if let Ok(logs) = L1Fetcher::retry_call(
|| provider_clone.get_logs(&filter),
L1FetchError::GetLogs,
)
.await
{
for log in logs {
// log.topics:
// topics[1]: L2 block number.
// topics[2]: L2 block hash.
// topics[3]: L2 commitment.

let new_l2_block_number =
U256::from_big_endian(log.topics[1].as_fixed_bytes());
if new_l2_block_number <= latest_l2_block_number {
continue;
}

if let Some(tx_hash) = log.transaction_hash {
hash_tx.send(tx_hash).await.unwrap();
}

latest_l2_block_number = new_l2_block_number;
}
} else {
tokio::time::sleep(Duration::from_secs(LONG_POLLING_INTERVAL_S)).await;
continue;
};

latest_l2_block_number = new_l2_block_number;
// Store our current L1 block number so we can resume if the process exits.
if let Some(snapshot) = &snapshot_clone {
snapshot.lock().await.latest_l1_block_number = current_l1_block_number;
}
} else {
tokio::time::sleep(Duration::from_secs(LONG_POLLING_INTERVAL_S)).await;
continue;
};

// Store our current L1 block number so we can resume if the process exits.
if let Some(snapshot) = &self.snapshot {
snapshot.lock().await.latest_l1_block_number = current_l1_block_number;
// Increment current block index.
current_l1_block_number += BLOCK_STEP.into();
}

// Increment current block index.
current_l1_block_number += BLOCK_STEP.into();
}
}
});

// Wait for shutdown signal in background.
tokio::spawn(async move {
let _ = tokio::signal::ctrl_c().await;
tracing::info!("Shutdown signal received, finishing up and shutting down...");
let _ = shutdown_tx.send("");
});

main_handle.await?;
tx_handle.await?;
parse_handle.await?;

Expand Down
3 changes: 1 addition & 2 deletions src/processor/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ pub struct TreeProcessor<'a> {

impl TreeProcessor<'static> {
pub async fn new(db_path: PathBuf, snapshot: Arc<Mutex<StateSnapshot>>) -> Result<Self> {
// TODO: Implement graceful shutdown.
// If database directory already exists, we try to restore the latest state.
// The state contains the last processed block and a mapping of index to key
// values, if a state file does not exist, we simply use the defaults instead.
Expand Down Expand Up @@ -60,7 +59,7 @@ impl Processor for TreeProcessor<'static> {
let mut snapshot = self.snapshot.lock().await;
// Check if we've already processed this block.
if snapshot.latest_l2_block_number >= block.block_number {
tracing::info!(
tracing::debug!(
"Block {} has already been processed, skipping.",
block.block_number
);
Expand Down