Skip to content

Commit

Permalink
Add decompression traits and a test case (#2295)
Browse files Browse the repository at this point in the history
#1609 (comment)

Add a test case for decompressing DA-compressed blocks. FIx
`CombinedDb::from_config` to respects `state_rewind_policy` with tmp
RocksDB.

---------

Co-authored-by: Rafał Chabowski <88321181+rafal-ch@users.noreply.github.com>
Co-authored-by: Green Baneling <XgreenX9999@gmail.com>
  • Loading branch information
3 people authored Nov 27, 2024
1 parent b6cbb35 commit 9af3ad2
Show file tree
Hide file tree
Showing 12 changed files with 290 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- [2389](https://github.com/FuelLabs/fuel-core/pull/2389): Fix construction of reverse iterator in RocksDB.

### Changed
- [2295](https://github.com/FuelLabs/fuel-core/pull/2295): `CombinedDb::from_config` now respects `state_rewind_policy` with tmp RocksDB.
- [2378](https://github.com/FuelLabs/fuel-core/pull/2378): Use cached hash of the topic instead of calculating it on each publishing gossip message.
- [2429](https://github.com/FuelLabs/fuel-core/pull/2429): Introduce custom enum for representing result of running service tasks
- [2377](https://github.com/FuelLabs/fuel-core/pull/2377): Add more errors that can be returned as responses when using protocol `/fuel/req_res/0.0.2`. The errors supported are `ProtocolV1EmptyResponse` (status code `0`) for converting empty responses sent via protocol `/fuel/req_res/0.0.1`, `RequestedRangeTooLarge`(status code `1`) if the client requests a range of objects such as sealed block headers or transactions too large, `Timeout` (status code `2`) if the remote peer takes too long to fulfill a request, or `SyncProcessorOutOfCapacity` if the remote peer is fulfilling too many requests concurrently.
Expand Down
4 changes: 3 additions & 1 deletion benches/benches/block_target_gas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use fuel_core::{
Config,
FuelService,
},
state::historical_rocksdb::StateRewindPolicy,
};
use fuel_core_benches::{
default_gas_costs::default_gas_costs,
Expand Down Expand Up @@ -265,7 +266,8 @@ fn service_with_many_contracts(
.build()
.unwrap();
let _drop = rt.enter();
let mut database = Database::rocksdb_temp();
let mut database = Database::rocksdb_temp(StateRewindPolicy::NoRewind)
.expect("Failed to create database");

let mut chain_config = ChainConfig::local_testnet();

Expand Down
10 changes: 10 additions & 0 deletions crates/compression/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub use registry::RegistryKeyspace;
use fuel_core_types::{
blockchain::header::PartialBlockHeader,
fuel_tx::CompressedTransaction,
fuel_types::BlockHeight,
};
use registry::RegistrationsPerTable;

Expand All @@ -42,6 +43,15 @@ impl Default for VersionedCompressedBlock {
}
}

impl VersionedCompressedBlock {
/// Returns the height of the compressed block.
pub fn height(&self) -> &BlockHeight {
match self {
VersionedCompressedBlock::V0(block) => block.header.height(),
}
}
}

#[cfg(test)]
mod tests {
use fuel_core_compression as _;
Expand Down
2 changes: 1 addition & 1 deletion crates/compression/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ macro_rules! tables {


impl RegistrationsPerTable {
pub(crate) fn write_to_registry<R>(&self, registry: &mut R, timestamp: Tai64) -> anyhow::Result<()>
pub fn write_to_registry<R>(&self, registry: &mut R, timestamp: Tai64) -> anyhow::Result<()>
where
R: TemporalRegistryAll
{
Expand Down
17 changes: 16 additions & 1 deletion crates/fuel-core/src/combined_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,19 @@ impl CombinedDatabase {
})
}

/// A test-only temporary rocksdb database with given rewind policy.
#[cfg(feature = "rocksdb")]
pub fn temp_database_with_state_rewind_policy(
state_rewind_policy: StateRewindPolicy,
) -> DatabaseResult<Self> {
Ok(Self {
on_chain: Database::rocksdb_temp(state_rewind_policy)?,
off_chain: Database::rocksdb_temp(state_rewind_policy)?,
relayer: Default::default(),
gas_price: Default::default(),
})
}

pub fn from_config(config: &CombinedDatabaseConfig) -> DatabaseResult<Self> {
let combined_database = match config.database_type {
#[cfg(feature = "rocksdb")]
Expand All @@ -114,7 +127,9 @@ impl CombinedDatabase {
tracing::warn!(
"No RocksDB path configured, initializing database with a tmp directory"
);
CombinedDatabase::default()
CombinedDatabase::temp_database_with_state_rewind_policy(
config.state_rewind_policy,
)?
} else {
tracing::info!(
"Opening database {:?} with cache size \"{}\" and state rewind policy \"{:?}\"",
Expand Down
14 changes: 7 additions & 7 deletions crates/fuel-core/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,11 @@ where
}

#[cfg(feature = "rocksdb")]
pub fn rocksdb_temp() -> Self {
let db = RocksDb::<Historical<Description>>::default_open_temp(None).unwrap();
let historical_db =
HistoricalRocksDB::new(db, StateRewindPolicy::NoRewind).unwrap();
pub fn rocksdb_temp(rewind_policy: StateRewindPolicy) -> Result<Self> {
let db = RocksDb::<Historical<Description>>::default_open_temp(None)?;
let historical_db = HistoricalRocksDB::new(db, rewind_policy)?;
let data = Arc::new(historical_db);
Self::from_storage(DataSource::new(data, Stage::default()))
Ok(Self::from_storage(DataSource::new(data, Stage::default())))
}
}

Expand All @@ -275,7 +274,8 @@ where
}
#[cfg(feature = "rocksdb")]
{
Self::rocksdb_temp()
Self::rocksdb_temp(StateRewindPolicy::NoRewind)
.expect("Failed to create a temporary database")
}
}
}
Expand Down Expand Up @@ -408,7 +408,7 @@ impl Modifiable for GenesisDatabase<Relayer> {
}
}

fn commit_changes_with_height_update<Description>(
pub fn commit_changes_with_height_update<Description>(
database: &mut Database<Description>,
changes: Changes,
heights_lookup: impl Fn(
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/graphql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
};

pub mod api_service;
mod da_compression;
pub mod da_compression;
pub mod database;
pub(crate) mod metrics_extension;
pub mod ports;
Expand Down
181 changes: 172 additions & 9 deletions crates/fuel-core/src/graphql_api/da_compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,21 @@ use fuel_core_compression::{
config::Config,
ports::{
EvictorDb,
HistoryLookup,
TemporalRegistry,
UtxoIdToPointer,
},
};
use fuel_core_storage::{
not_found,
tables::{
Coins,
FuelBlocks,
Messages,
},
StorageAsMut,
StorageAsRef,
StorageInspect,
};
use fuel_core_types::{
blockchain::block::Block,
Expand Down Expand Up @@ -49,8 +56,8 @@ where
{
let compressed = compress(
config,
CompressTx {
db_tx,
CompressDbTx {
db_tx: DbTx { db_tx },
block_events,
},
block,
Expand All @@ -65,14 +72,23 @@ where
Ok(())
}

struct CompressTx<'a, Tx> {
db_tx: &'a mut Tx,
pub struct DbTx<'a, Tx> {
pub db_tx: &'a mut Tx,
}

struct CompressDbTx<'a, Tx> {
db_tx: DbTx<'a, Tx>,
block_events: &'a [Event],
}

pub struct DecompressDbTx<'a, Tx, Onchain> {
pub db_tx: DbTx<'a, Tx>,
pub onchain_db: Onchain,
}

macro_rules! impl_temporal_registry {
($type:ty) => { paste::paste! {
impl<'a, Tx> TemporalRegistry<$type> for CompressTx<'a, Tx>
impl<'a, Tx> TemporalRegistry<$type> for DbTx<'a, Tx>
where
Tx: OffChainDatabaseTransaction,
{
Expand Down Expand Up @@ -150,15 +166,87 @@ macro_rules! impl_temporal_registry {
}
}

impl<'a, Tx> EvictorDb<$type> for CompressTx<'a, Tx>
impl<'a, Tx> TemporalRegistry<$type> for CompressDbTx<'a, Tx>
where
Tx: OffChainDatabaseTransaction,
{
fn read_registry(
&self,
key: &fuel_core_types::fuel_compression::RegistryKey,
) -> anyhow::Result<$type> {
self.db_tx.read_registry(key)
}

fn read_timestamp(
&self,
key: &fuel_core_types::fuel_compression::RegistryKey,
) -> anyhow::Result<Tai64> {
<_ as TemporalRegistry<$type>>::read_timestamp(&self.db_tx, key)
}

fn write_registry(
&mut self,
key: &fuel_core_types::fuel_compression::RegistryKey,
value: &$type,
timestamp: Tai64,
) -> anyhow::Result<()> {
self.db_tx.write_registry(key, value, timestamp)
}

fn registry_index_lookup(
&self,
value: &$type,
) -> anyhow::Result<Option<fuel_core_types::fuel_compression::RegistryKey>>
{
self.db_tx.registry_index_lookup(value)
}
}

impl<'a, Tx, Offchain> TemporalRegistry<$type> for DecompressDbTx<'a, Tx, Offchain>
where
Tx: OffChainDatabaseTransaction,
{
fn read_registry(
&self,
key: &fuel_core_types::fuel_compression::RegistryKey,
) -> anyhow::Result<$type> {
self.db_tx.read_registry(key)
}

fn read_timestamp(
&self,
key: &fuel_core_types::fuel_compression::RegistryKey,
) -> anyhow::Result<Tai64> {
<_ as TemporalRegistry<$type>>::read_timestamp(&self.db_tx, key)
}

fn write_registry(
&mut self,
key: &fuel_core_types::fuel_compression::RegistryKey,
value: &$type,
timestamp: Tai64,
) -> anyhow::Result<()> {
self.db_tx.write_registry(key, value, timestamp)
}

fn registry_index_lookup(
&self,
value: &$type,
) -> anyhow::Result<Option<fuel_core_types::fuel_compression::RegistryKey>>
{
self.db_tx.registry_index_lookup(value)
}
}

impl<'a, Tx> EvictorDb<$type> for CompressDbTx<'a, Tx>
where
Tx: OffChainDatabaseTransaction,
{
fn set_latest_assigned_key(
&mut self,
key: fuel_core_types::fuel_compression::RegistryKey,
) -> anyhow::Result<()> {
self.db_tx
self.db_tx.db_tx
.storage_as_mut::<DaCompressionTemporalRegistryEvictorCache>()
.insert(&MetadataKey::$type, &key)?;
Ok(())
Expand All @@ -168,7 +256,7 @@ macro_rules! impl_temporal_registry {
&self,
) -> anyhow::Result<Option<fuel_core_types::fuel_compression::RegistryKey>> {
Ok(self
.db_tx
.db_tx.db_tx
.storage_as_ref::<DaCompressionTemporalRegistryEvictorCache>()
.get(&MetadataKey::$type)?
.map(|v| v.into_owned())
Expand All @@ -185,7 +273,7 @@ impl_temporal_registry!(ContractId);
impl_temporal_registry!(ScriptCode);
impl_temporal_registry!(PredicateCode);

impl<'a, Tx> UtxoIdToPointer for CompressTx<'a, Tx>
impl<'a, Tx> UtxoIdToPointer for CompressDbTx<'a, Tx>
where
Tx: OffChainDatabaseTransaction,
{
Expand All @@ -210,3 +298,78 @@ where
anyhow::bail!("UtxoId not found in the block events");
}
}

impl<'a, Tx, Onchain> HistoryLookup for DecompressDbTx<'a, Tx, Onchain>
where
Tx: OffChainDatabaseTransaction,
Onchain: StorageInspect<Coins, Error = fuel_core_storage::Error>
+ StorageInspect<Messages, Error = fuel_core_storage::Error>
+ StorageInspect<FuelBlocks, Error = fuel_core_storage::Error>,
{
fn utxo_id(
&self,
c: fuel_core_types::fuel_tx::CompressedUtxoId,
) -> anyhow::Result<fuel_core_types::fuel_tx::UtxoId> {
if c.tx_pointer.block_height() == 0u32.into() {
// This is a genesis coin, which is handled differently.
// See CoinConfigGenerator::generate which generates the genesis coins.
let mut bytes = [0u8; 32];
let tx_index = c.tx_pointer.tx_index();
bytes[..std::mem::size_of_val(&tx_index)]
.copy_from_slice(&tx_index.to_be_bytes());
return Ok(fuel_core_types::fuel_tx::UtxoId::new(
fuel_core_types::fuel_tx::TxId::from(bytes),
0,
));
}

let block_info = self
.onchain_db
.storage_as_ref::<FuelBlocks>()
.get(&c.tx_pointer.block_height())?
.ok_or(not_found!(FuelBlocks))?;

let tx_id = *block_info
.transactions()
.get(c.tx_pointer.tx_index() as usize)
.ok_or(anyhow::anyhow!(
"Transaction not found in the block: {:?}",
c.tx_pointer
))?;

Ok(fuel_core_types::fuel_tx::UtxoId::new(tx_id, c.output_index))
}

fn coin(
&self,
utxo_id: fuel_core_types::fuel_tx::UtxoId,
) -> anyhow::Result<fuel_core_compression::ports::CoinInfo> {
let coin = self
.onchain_db
.storage_as_ref::<fuel_core_storage::tables::Coins>()
.get(&utxo_id)?
.ok_or(not_found!(fuel_core_storage::tables::Coins))?;
Ok(fuel_core_compression::ports::CoinInfo {
owner: *coin.owner(),
asset_id: *coin.asset_id(),
amount: *coin.amount(),
})
}

fn message(
&self,
nonce: fuel_core_types::fuel_types::Nonce,
) -> anyhow::Result<fuel_core_compression::ports::MessageInfo> {
let message = self
.onchain_db
.storage_as_ref::<fuel_core_storage::tables::Messages>()
.get(&nonce)?
.ok_or(not_found!(fuel_core_storage::tables::Messages))?;
Ok(fuel_core_compression::ports::MessageInfo {
sender: *message.sender(),
recipient: *message.recipient(),
amount: message.amount(),
data: message.data().clone(),
})
}
}
2 changes: 1 addition & 1 deletion crates/fuel-core/src/state/generic_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl<Storage> GenericDatabase<Storage> {
}

pub fn into_inner(self) -> Storage {
self.storage.into_inner()
self.storage.into_storage()
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/storage/src/structured_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl<S> StructuredStorage<S> {
}

/// Returns the inner storage.
pub fn into_inner(self) -> S {
pub fn into_storage(self) -> S {
self.inner
}
}
Expand Down
Loading

0 comments on commit 9af3ad2

Please sign in to comment.