Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(node): more rewind params #2270

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions node/src/actors/chain_manager/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl Handler<EpochNotification<EveryEpochPayload>> for ChainManager {
}) = best_candidate
{
// Persist block and update ChainState
self.consolidate_block(ctx, block, utxo_diff, false);
self.consolidate_block(ctx, block, utxo_diff, None);
} else if msg.checkpoint > 0 {
let previous_epoch = msg.checkpoint - 1;
log::warn!(
Expand Down Expand Up @@ -309,7 +309,7 @@ impl Handler<AddBlocks> for ChainManager {
if msg.blocks.len() == 1 && msg.blocks[0].hash() == consensus_constants.genesis_hash
{
let block = msg.blocks.into_iter().next().unwrap();
match act.process_requested_block(ctx, block, false) {
match act.process_requested_block(ctx, block, None) {
Ok(()) => {
log::debug!("Successfully consolidated genesis block");

Expand Down Expand Up @@ -1024,7 +1024,7 @@ impl Handler<PeersBeacons> for ChainManager {
}
let mut consolidated_consensus_candidate = false;
for consensus_block in candidates {
match self.process_requested_block(ctx, consensus_block, false) {
match self.process_requested_block(ctx, consensus_block, None) {
Ok(()) => {
consolidated_consensus_candidate = true;
log::info!(
Expand Down Expand Up @@ -1720,12 +1720,20 @@ impl Handler<Rewind> for ChainManager {

fn handle(&mut self, msg: Rewind, ctx: &mut Self::Context) -> Self::Result {
// Save list of blocks that are known to be valid
let old_block_chain: VecDeque<(Epoch, Hash)> = self
.chain_state
.block_chain
.range(0..=msg.epoch)
.map(|(k, v)| (*k, *v))
.collect();
let old_block_chain: VecDeque<(Epoch, Hash)> = if let Some(epoch) = msg.epoch {
self.chain_state
.block_chain
.range(0..=epoch)
.map(|(k, v)| (*k, *v))
.collect()
} else {
// If rewind epoch is None, rewind to the latest block
self.chain_state
.block_chain
.range(0..)
.map(|(k, v)| (*k, *v))
.collect()
};

self.delete_chain_state_and_reinitialize()
.map(|_res, act, ctx| {
Expand All @@ -1741,7 +1749,7 @@ impl Handler<Rewind> for ChainManager {
.into_actor(act)
.map(|_res, _act, _ctx| ())
.spawn(ctx);
act.resync_from_storage(old_block_chain, ctx, |act, ctx| {
act.resync_from_storage(old_block_chain, msg, ctx, |act, ctx| {
// After the resync is done:
// Persist chain state to storage
ctx.wait(
Expand Down
44 changes: 28 additions & 16 deletions node/src/actors/chain_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ use crate::{
json_rpc::JsonRpcServer,
messages::{
AddItem, AddItems, AddTransaction, Anycast, BlockNotify, Broadcast, DropOutboundPeers,
GetBlocksEpochRange, GetItemBlock, NodeStatusNotify, RemoveAddressesFromTried,
GetBlocksEpochRange, GetItemBlock, NodeStatusNotify, RemoveAddressesFromTried, Rewind,
SendInventoryItem, SendInventoryRequest, SendLastBeacon, SendSuperBlockVote,
SetLastBeacon, SetSuperBlockTargetBeacon, StoreInventoryItem, SuperBlockNotify,
},
Expand Down Expand Up @@ -376,6 +376,7 @@ impl ChainManager {
fn resync_from_storage<F>(
&mut self,
mut block_list: VecDeque<(Epoch, Hash)>,
msg: Rewind,
ctx: &mut Context<Self>,
done: F,
) where
Expand Down Expand Up @@ -403,7 +404,7 @@ impl ChainManager {
last_epoch,
hash
);
act.process_requested_block(ctx, block, true)
act.process_requested_block(ctx, block, Some(&msg))
.expect("resync from storage fail");
// We need to persist the chain state periodically, otherwise the entire
// UTXO set will be in memory, consuming a huge amount of memory.
Expand All @@ -413,7 +414,7 @@ impl ChainManager {
.wait(ctx);
}
// Recursion
act.resync_from_storage(block_list, ctx, done);
act.resync_from_storage(block_list, msg, ctx, done);
}
Ok(Err(e)) => {
panic!("{:?}", e);
Expand Down Expand Up @@ -496,7 +497,7 @@ impl ChainManager {
&mut self,
ctx: &mut Context<Self>,
block: Block,
resynchronizing: bool,
rewind: Option<&Rewind>,
) -> Result<(), failure::Error> {
if let (
Some(epoch_constants),
Expand Down Expand Up @@ -538,12 +539,12 @@ impl ChainManager {
secp_ctx,
block_number,
&chain_info.consensus_constants,
resynchronizing,
rewind,
&active_wips,
)?;

// Persist block and update ChainState
self.consolidate_block(ctx, block, utxo_diff, resynchronizing);
self.consolidate_block(ctx, block, utxo_diff, rewind);

Ok(())
} else {
Expand Down Expand Up @@ -672,7 +673,7 @@ impl ChainManager {
.expect("No initialized SECP256K1 context"),
self.chain_state.block_number(),
&chain_info.consensus_constants,
false,
None,
&active_wips,
) {
Ok(utxo_diff) => {
Expand Down Expand Up @@ -721,7 +722,7 @@ impl ChainManager {
ctx: &mut Context<Self>,
block: Block,
utxo_diff: Diff,
resynchronizing: bool,
rewind: Option<&Rewind>,
) {
// Update chain_info and reputation_engine
let own_pkh = match self.own_pkh {
Expand All @@ -732,6 +733,12 @@ impl ChainManager {
}
};

let persist_to_storage = if let Some(rewind) = rewind {
rewind.mode.write_items_to_storage
} else {
true
};

match self.chain_state {
ChainState {
chain_info: Some(ref mut chain_info),
Expand Down Expand Up @@ -826,7 +833,7 @@ impl ChainManager {
let to_be_stored =
self.chain_state.data_request_pool.finished_data_requests();

if !resynchronizing {
if persist_to_storage {
self.persist_data_requests(ctx, to_be_stored);
}

Expand All @@ -844,7 +851,7 @@ impl ChainManager {
})
}

if !resynchronizing {
if persist_to_storage {
self.persist_items(
ctx,
vec![StoreInventoryItem::Block(Box::new(block))],
Expand Down Expand Up @@ -876,7 +883,7 @@ impl ChainManager {
show_tally_info(dr_info.tally.as_ref().unwrap(), block_epoch);
}

if !resynchronizing {
if persist_to_storage {
self.persist_data_requests(ctx, to_be_stored);
}

Expand All @@ -902,7 +909,7 @@ impl ChainManager {
// getTransaction will show the content without any warning that the block
// is not on the main chain. To fix this we could remove forked blocks when
// a reorganization is detected.
if !resynchronizing {
if persist_to_storage {
self.persist_items(
ctx,
vec![StoreInventoryItem::Block(Box::new(block.clone()))],
Expand Down Expand Up @@ -1916,7 +1923,7 @@ impl ChainManager {
let mut num_processed_blocks = 0;

for block in blocks.iter() {
if let Err(e) = self.process_requested_block(ctx, block.clone(), false) {
if let Err(e) = self.process_requested_block(ctx, block.clone(), None) {
log::error!("Error processing block: {}", e);
if num_processed_blocks > 0 {
// Restore only in case there were several blocks consolidated before
Expand Down Expand Up @@ -2420,10 +2427,15 @@ pub fn process_validations(
secp_ctx: &CryptoEngine,
block_number: u32,
consensus_constants: &ConsensusConstants,
resynchronizing: bool,
rewind: Option<&Rewind>,
active_wips: &ActiveWips,
) -> Result<Diff, failure::Error> {
if !resynchronizing {
let validate_signatures = if let Some(rewind) = rewind {
rewind.mode.validate_signatures
} else {
true
};
if validate_signatures {
let mut signatures_to_verify = vec![];
validate_block(
block,
Expand Down Expand Up @@ -2452,7 +2464,7 @@ pub fn process_validations(
active_wips,
)?;

if !resynchronizing {
if validate_signatures {
verify_signatures(signatures_to_verify, vrf_ctx, secp_ctx)?;
}

Expand Down
55 changes: 44 additions & 11 deletions node/src/actors/json_rpc/json_rpc_methods.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
collections::HashMap,
convert::TryFrom,
future::Future,
net::SocketAddr,
sync::atomic::{AtomicUsize, Ordering},
sync::Arc,
Expand All @@ -9,6 +10,8 @@ use std::{
use actix::MailboxError;
#[cfg(not(test))]
use actix::SystemService;
use futures::FutureExt;
use futures_util::compat::Compat;
use itertools::Itertools;
use jsonrpc_core::{MetaIoHandler, Params, Value};
use jsonrpc_pubsub::{PubSubHandler, Session, Subscriber, SubscriptionId};
Expand All @@ -35,7 +38,8 @@ use crate::{
GetBalance, GetBlocksEpochRange, GetConsolidatedPeers, GetDataRequestInfo, GetEpoch,
GetHighestCheckpointBeacon, GetItemBlock, GetItemSuperblock, GetItemTransaction,
GetKnownPeers, GetMemoryTransaction, GetMempool, GetNodeStats, GetReputation,
GetSignalingInfo, GetState, GetUtxoInfo, InitializePeers, IsConfirmedBlock, Rewind,
GetSignalingInfo, GetState, GetSupplyInfo, GetUtxoInfo, InitializePeers,
IsConfirmedBlock, Rewind, RewindMode,
},
peers_manager::PeersManager,
sessions_manager::SessionsManager,
Expand All @@ -47,10 +51,6 @@ use super::Subscriptions;

#[cfg(test)]
use self::mock_actix::SystemService;
use crate::actors::messages::GetSupplyInfo;
use futures::FutureExt;
use futures_util::compat::Compat;
use std::future::Future;

type JsonRpcResult = Result<Value, jsonrpc_core::Error>;

Expand Down Expand Up @@ -238,7 +238,7 @@ pub fn jsonrpc_io_handler(
enable_sensitive_methods,
"rewind",
params,
|params| rewind(params.parse()),
rewind,
)))
});

Expand Down Expand Up @@ -1648,15 +1648,48 @@ pub async fn get_consensus_constants(params: Result<(), jsonrpc_core::Error>) ->
}

/// Rewind
pub async fn rewind(params: Result<(Epoch,), jsonrpc_core::Error>) -> JsonRpcResult {
let epoch = match params {
Ok((epoch,)) => epoch,
Err(e) => return Err(e),
pub async fn rewind(params: Params) -> JsonRpcResult {
let rewind_params: Rewind;

// Handle parameters as an array with an epoch field, or an object which is deserialized as a
// Rewind struct
if let Params::Array(params) = params {
if params.len() != 1 {
return Err(jsonrpc_core::Error::invalid_params(
"Argument of `rewind` must be either a one-element array or an object",
));
} else if let Some(Value::Number(epoch)) = params.get(0) {
// Convert Number to Epoch, return error on out of range
match epoch.as_u64().and_then(|epoch| Epoch::try_from(epoch).ok()) {
Some(epoch) => {
rewind_params = Rewind {
epoch: Some(epoch),
mode: RewindMode::default(),
}
}
None => {
return Err(jsonrpc_core::Error::invalid_params(
"First argument of `rewind` must have type `Epoch`",
));
}
}
} else {
return Err(jsonrpc_core::Error::invalid_params(
"First argument of `rewind` must have type `Epoch`",
));
};
} else if let Params::Map(_map) = &params {
let parsed_params = params.parse()?;
rewind_params = parsed_params;
} else {
return Err(jsonrpc_core::Error::invalid_params(
"Argument of `rewind` must be either a one-element array or an object",
));
};

let chain_manager_addr = ChainManager::from_registry();
chain_manager_addr
.send(Rewind { epoch })
.send(rewind_params)
.map(|res| {
res.map_err(internal_error)
.and_then(|success| match success {
Expand Down
23 changes: 20 additions & 3 deletions node/src/actors/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,10 +378,27 @@ impl Message for IsConfirmedBlock {
type Result = Result<bool, failure::Error>;
}

/// Rewind
/// Additional configuration for the rewind method
#[derive(Debug, Default, Clone, Deserialize, Serialize)]
pub struct RewindMode {
/// Validate block and transaction signatures
#[serde(default)]
pub validate_signatures: bool,
/// Write all the blocks, transactions, and data request reports to storage, regardless of
/// whether they already exist or not
#[serde(default)]
pub write_items_to_storage: bool,
}

/// Rewind chain state back to some epoch
#[derive(Debug, Default, Clone, Deserialize, Serialize)]
pub struct Rewind {
/// Epoch
pub epoch: u32,
/// Epoch of the last block that will be consolidated by the rewind method
#[serde(default)]
pub epoch: Option<Epoch>,
/// Additional configuration for the rewind method
#[serde(default)]
pub mode: RewindMode,
}

impl Message for Rewind {
Expand Down