Skip to content

Commit

Permalink
Merge branch 'main' into migrate_to_new_workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
otani88 authored Nov 14, 2024
2 parents bbb189f + 8620a8e commit 6f92063
Show file tree
Hide file tree
Showing 28 changed files with 544 additions and 626 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 10 additions & 2 deletions core/bin/zksync_server/src/node_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,11 @@ impl MainNodeBuilder {
latest_values_cache_size: rpc_config.latest_values_cache_size() as u64,
latest_values_max_block_lag: rpc_config.latest_values_max_block_lag(),
};
let vm_config = try_load_config!(self.configs.experimental_vm_config);
let vm_config = self
.configs
.experimental_vm_config
.clone()
.unwrap_or_default();

// On main node we always use master pool sink.
self.node.add_layer(MasterPoolSinkLayer);
Expand Down Expand Up @@ -597,7 +601,11 @@ impl MainNodeBuilder {
}

fn add_vm_playground_layer(mut self) -> anyhow::Result<Self> {
let vm_config = try_load_config!(self.configs.experimental_vm_config);
let vm_config = self
.configs
.experimental_vm_config
.clone()
.unwrap_or_default();
self.node.add_layer(VmPlaygroundLayer::new(
vm_config.playground,
self.genesis_config.l2_chain_id,
Expand Down
1 change: 1 addition & 0 deletions core/node/consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ thiserror.workspace = true
tracing.workspace = true
tokio.workspace = true
semver.workspace = true
vise.workspace = true

[dev-dependencies]
zksync_node_genesis.workspace = true
Expand Down
96 changes: 48 additions & 48 deletions core/node/consensus/src/en.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ use zksync_web3_decl::{

use super::{config, storage::Store, ConsensusConfig, ConsensusSecrets};
use crate::{
metrics::METRICS,
registry,
storage::{self, ConnectionPool},
};

/// If less than TEMPORARY_FETCHER_THRESHOLD certificates are missing,
/// the temporary fetcher will stop fetching blocks.
pub(crate) const TEMPORARY_FETCHER_THRESHOLD: u64 = 10;
/// Whenever more than FALLBACK_FETCHER_THRESHOLD certificates are missing,
/// the fallback fetcher is active.
pub(crate) const FALLBACK_FETCHER_THRESHOLD: u64 = 10;

/// External node.
pub(super) struct EN {
Expand Down Expand Up @@ -115,11 +116,9 @@ impl EN {
let store = store.clone();
async {
let store = store;
self.temporary_block_fetcher(ctx, &store).await?;
tracing::info!(
"temporary block fetcher finished, switching to p2p fetching only"
);
Ok(())
self.fallback_block_fetcher(ctx, &store)
.await
.wrap("fallback_block_fetcher()")
}
});

Expand Down Expand Up @@ -179,7 +178,7 @@ impl EN {
tracing::warn!("\
WARNING: this node is using ZKsync API synchronization, which will be deprecated soon. \
Please follow this instruction to switch to p2p synchronization: \
https://github.com/matter-labs/zksync-era/blob/main/docs/guides/external-node/09_decentralization.md");
https://github.com/matter-labs/zksync-era/blob/main/docs/guides/external-node/10_decentralization.md");
let res: ctx::Result<()> = scope::run!(ctx, |ctx, s| async {
// Update sync state in the background.
s.spawn_bg(self.fetch_state_loop(ctx));
Expand All @@ -191,7 +190,7 @@ impl EN {
.new_payload_queue(ctx, actions, self.sync_state.clone())
.await
.wrap("new_fetcher_cursor()")?;
self.fetch_blocks(ctx, &mut payload_queue, None).await
self.fetch_blocks(ctx, &mut payload_queue).await
})
.await;
match res {
Expand Down Expand Up @@ -362,9 +361,14 @@ impl EN {
}

/// Fetches (with retries) the given block from the main node.
async fn fetch_block(&self, ctx: &ctx::Ctx, n: L2BlockNumber) -> ctx::Result<FetchedBlock> {
async fn fetch_block(
&self,
ctx: &ctx::Ctx,
n: validator::BlockNumber,
) -> ctx::Result<FetchedBlock> {
const RETRY_INTERVAL: time::Duration = time::Duration::seconds(5);

let n = L2BlockNumber(n.0.try_into().context("overflow")?);
METRICS.fetch_block.inc();
loop {
match ctx.wait(self.client.sync_l2_block(n, true)).await? {
Ok(Some(block)) => return Ok(block.try_into()?),
Expand All @@ -376,76 +380,72 @@ impl EN {
}
}

/// Fetches blocks from the main node directly, until the certificates
/// are backfilled. This allows for smooth transition from json RPC to p2p block syncing.
pub(crate) async fn temporary_block_fetcher(
/// Fetches blocks from the main node directly whenever the EN is lagging behind too much.
pub(crate) async fn fallback_block_fetcher(
&self,
ctx: &ctx::Ctx,
store: &Store,
) -> ctx::Result<()> {
const MAX_CONCURRENT_REQUESTS: usize = 30;
scope::run!(ctx, |ctx, s| async {
let (send, mut recv) = ctx::channel::bounded(MAX_CONCURRENT_REQUESTS);
s.spawn(async {
let Some(mut next) = store.next_block(ctx).await? else {
return Ok(());
};
while store.persisted().borrow().next().0 + TEMPORARY_FETCHER_THRESHOLD < next.0 {
let n = L2BlockNumber(next.0.try_into().context("overflow")?);
self.sync_state.wait_for_main_node_block(ctx, n).await?;
send.send(ctx, s.spawn(self.fetch_block(ctx, n))).await?;
// TODO: metrics.
s.spawn::<()>(async {
let send = send;
let is_lagging =
|main| main >= store.persisted().borrow().next() + FALLBACK_FETCHER_THRESHOLD;
let mut next = store.next_block(ctx).await.wrap("next_block()")?;
loop {
// Wait until p2p syncing is lagging.
self.sync_state
.wait_for_main_node_block(ctx, is_lagging)
.await?;
// Determine the next block to fetch and wait for it to be available.
next = next.max(store.next_block(ctx).await.wrap("next_block()")?);
self.sync_state
.wait_for_main_node_block(ctx, |main| main >= next)
.await?;
// Fetch the block asynchronously.
send.send(ctx, s.spawn(self.fetch_block(ctx, next))).await?;
next = next.next();
}
drop(send);
Ok(())
});
while let Ok(block) = recv.recv_or_disconnected(ctx).await? {
loop {
let block = recv.recv(ctx).await?;
store
.queue_next_fetched_block(ctx, block.join(ctx).await?)
.await
.wrap("queue_next_fetched_block()")?;
}
Ok(())
})
.await
}

/// Fetches blocks from the main node in range `[cursor.next()..end)`.
/// Fetches blocks starting with `queue.next()`.
async fn fetch_blocks(
&self,
ctx: &ctx::Ctx,
queue: &mut storage::PayloadQueue,
end: Option<validator::BlockNumber>,
) -> ctx::Result<()> {
const MAX_CONCURRENT_REQUESTS: usize = 30;
let first = queue.next();
let mut next = first;
let mut next = queue.next();
scope::run!(ctx, |ctx, s| async {
let (send, mut recv) = ctx::channel::bounded(MAX_CONCURRENT_REQUESTS);
s.spawn(async {
s.spawn::<()>(async {
let send = send;
while end.map_or(true, |end| next < end) {
let n = L2BlockNumber(next.0.try_into().context("overflow")?);
self.sync_state.wait_for_main_node_block(ctx, n).await?;
send.send(ctx, s.spawn(self.fetch_block(ctx, n))).await?;
loop {
self.sync_state
.wait_for_main_node_block(ctx, |main| main >= next)
.await?;
send.send(ctx, s.spawn(self.fetch_block(ctx, next))).await?;
next = next.next();
}
Ok(())
});
while end.map_or(true, |end| queue.next() < end) {
loop {
let block = recv.recv(ctx).await?.join(ctx).await?;
queue.send(block).await.context("queue.send()")?;
}
Ok(())
})
.await?;
// If fetched anything, wait for the last block to be stored persistently.
if first < queue.next() {
self.pool
.wait_for_payload(ctx, queue.next().prev().unwrap())
.await
.wrap("wait_for_payload()")?;
}
Ok(())
.await
}
}
1 change: 1 addition & 0 deletions core/node/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod abi;
mod config;
mod en;
pub mod era;
mod metrics;
mod mn;
mod registry;
mod storage;
Expand Down
13 changes: 13 additions & 0 deletions core/node/consensus/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
//! Consensus related metrics.

#[derive(Debug, vise::Metrics)]
#[metrics(prefix = "zksync_node_consensus")]
pub(crate) struct Metrics {
/// Number of blocks that has been fetched via JSON-RPC.
/// It is used only as a fallback when the p2p syncing is disabled or falling behind.
/// so it shouldn't be increasing under normal circumstances if p2p syncing is enabled.
pub fetch_block: vise::Counter,
}

#[vise::register]
pub(super) static METRICS: vise::Global<Metrics> = vise::Global::new();
8 changes: 3 additions & 5 deletions core/node/consensus/src/storage/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,12 @@ impl Store {
}

/// Number of the next block to queue.
pub(crate) async fn next_block(
&self,
ctx: &ctx::Ctx,
) -> ctx::OrCanceled<Option<validator::BlockNumber>> {
pub(crate) async fn next_block(&self, ctx: &ctx::Ctx) -> ctx::Result<validator::BlockNumber> {
Ok(sync::lock(ctx, &self.block_payloads)
.await?
.as_ref()
.map(|p| p.next()))
.context("payload_queue not set")?
.next())
}

/// Queues the next block.
Expand Down
39 changes: 1 addition & 38 deletions core/node/consensus/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,7 @@ use zksync_types::{
};
use zksync_web3_decl::client::{Client, DynClient, L2};

use crate::{
en,
storage::{ConnectionPool, Store},
};
use crate::{en, storage::ConnectionPool};

/// Fake StateKeeper for tests.
#[derive(Debug)]
Expand Down Expand Up @@ -413,40 +410,6 @@ impl StateKeeper {
.await
}

pub async fn run_temporary_fetcher(
self,
ctx: &ctx::Ctx,
client: Box<DynClient<L2>>,
) -> ctx::Result<()> {
scope::run!(ctx, |ctx, s| async {
let payload_queue = self
.pool
.connection(ctx)
.await
.wrap("connection()")?
.new_payload_queue(ctx, self.actions_sender, self.sync_state.clone())
.await
.wrap("new_payload_queue()")?;
let (store, runner) = Store::new(
ctx,
self.pool.clone(),
Some(payload_queue),
Some(client.clone()),
)
.await
.wrap("Store::new()")?;
s.spawn_bg(async { Ok(runner.run(ctx).await?) });
en::EN {
pool: self.pool.clone(),
client,
sync_state: self.sync_state.clone(),
}
.temporary_block_fetcher(ctx, &store)
.await
})
.await
}

/// Runs consensus node for the external node.
pub async fn run_consensus(
self,
Expand Down
Loading

0 comments on commit 6f92063

Please sign in to comment.