Skip to content

Commit

Permalink
Merge pull request #15 from anoma/tiago/block-index-builder-height
Browse files Browse the repository at this point in the history
include height in masp txs block index
  • Loading branch information
Fraccaman committed Aug 9, 2024
2 parents 6f154bc + f283413 commit bcfa07a
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 36 deletions.
74 changes: 52 additions & 22 deletions block-index/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,32 +197,58 @@ async fn build_new_block_index(app_state: &AppState) -> Result<(), MainError> {

let conn = app_state.get_db_connection().await.into_db_error()?;

let block_heights = conn
let (last_height, block_heights_with_txs) = conn
.interact(|conn| {
use schema::tx::dsl::*;

tx.select(block_height)
.distinct()
.load_iter::<_, DbDefaultLoadingMode>(conn)
.context("Failed to query block heights with masp txs")?
.try_fold(Vec::new(), |mut accum, maybe_block_height| {
tracing::debug!("Reading block height entry from db");
let height: i32 = maybe_block_height.context(
"Failed to get tx block height row data from db",
)?;
tracing::debug!("Read block height entry from db");
accum.push(u64::try_from(height).context(
"Failed to convert block height from i32 to u64",
)?);
anyhow::Ok(accum)
})
conn.build_transaction().read_only().run(|conn| {
let last_height = {
use diesel::prelude::OptionalExtension;
use schema::chain_state::dsl::*;

chain_state
.select(block_height)
.first(conn)
.optional()
.context("Failed to query last block height")
}?;

let block_heights_with_txs = {
use schema::tx::dsl::*;

tx.select(block_height)
.distinct()
.load_iter::<_, DbDefaultLoadingMode>(conn)
.context("Failed to query block heights with masp txs")?
.try_fold(
Vec::new(),
|mut accum, maybe_block_height| {
tracing::debug!(
"Reading block height entry from db"
);
let height: i32 = maybe_block_height.context(
"Failed to get tx block height row data \
from db",
)?;
tracing::debug!(
"Read block height entry from db"
);
accum.push(u64::try_from(height).context(
"Failed to convert block height from i32 \
to u64",
)?);
anyhow::Ok(accum)
},
)
}?;

anyhow::Ok((last_height, block_heights_with_txs))
})
})
.await
.context_db_interact_error()
.into_db_error()?
.into_db_error()?;

let block_heights_len = block_heights.len();
let block_heights_len = block_heights_with_txs.len();
tracing::debug!(
num_blocks_with_masp_txs = block_heights_len,
"Read all block heights with masp transactions from db"
Expand All @@ -234,7 +260,7 @@ async fn build_new_block_index(app_state: &AppState) -> Result<(), MainError> {
transactions"
);

let filter: BinaryFuse16 = block_heights
let filter: BinaryFuse16 = block_heights_with_txs
.try_into()
.map_err(|err| {
anyhow!(
Expand All @@ -260,19 +286,23 @@ async fn build_new_block_index(app_state: &AppState) -> Result<(), MainError> {

tracing::debug!("Storing binary fuse xor filter in db");

conn.interact(|conn| {
conn.interact(move |conn| {
use schema::block_index::dsl::*;

let db_filter = BlockIndex {
id: 0,
serialized_data: serialized_filter,
block_height: last_height.unwrap_or_default(),
};

diesel::insert_into(block_index)
.values(&db_filter)
.on_conflict(id)
.do_update()
.set(serialized_data.eq(&db_filter.serialized_data))
.set((
block_height.eq(&db_filter.block_height),
serialized_data.eq(&db_filter.serialized_data),
))
.execute(conn)
.context("Failed to insert masp txs block index into db")?;

Expand Down
3 changes: 2 additions & 1 deletion orm/migrations/2024-07-23-171457_block_index/up.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
CREATE TABLE block_index (
id SERIAL PRIMARY KEY,
-- NB: serialized with `bincode`
serialized_data bytea NOT NULL
serialized_data bytea NOT NULL,
block_height INT NOT NULL
);
1 change: 1 addition & 0 deletions orm/src/block_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ use crate::schema::block_index;
pub struct BlockIndex {
pub id: i32,
pub serialized_data: Vec<u8>,
pub block_height: i32,
}
1 change: 1 addition & 0 deletions orm/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ diesel::table! {
block_index (id) {
id -> Int4,
serialized_data -> Bytea,
block_height -> Int4,
}
}

Expand Down
4 changes: 4 additions & 0 deletions swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ components:
BlockIndexResponse:
type: object
properties:
block_height:
type: integer
minimum: 0
description: The block height of the index.
index:
description: Compressed (lossy) index of all blocks containing masp txs.
type: object
Expand Down
7 changes: 5 additions & 2 deletions webserver/src/handler/namada_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ pub async fn get_block_index(
NamadaStateError::Database(err.to_string())
})?;

if let Some(index) = maybe_block_index {
Ok(Json(BlockIndexResponse { index }))
if let Some((height, index)) = maybe_block_index {
Ok(Json(BlockIndexResponse {
block_height: height.0,
index,
}))
} else {
Err(NamadaStateError::BlockIndexNotFound)
}
Expand Down
27 changes: 18 additions & 9 deletions webserver/src/repository/namada_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ pub trait NamadaStateRepositoryTrait {

async fn get_latest_height(&self) -> anyhow::Result<Option<BlockHeight>>;

async fn get_block_index(&self) -> anyhow::Result<Option<BinaryFuse16>>;
async fn get_block_index(
&self,
) -> anyhow::Result<Option<(i32, BinaryFuse16)>>;
}

impl NamadaStateRepositoryTrait for NamadaStateRepository {
Expand Down Expand Up @@ -46,13 +48,15 @@ impl NamadaStateRepositoryTrait for NamadaStateRepository {
Ok(block_height.map(BlockHeight::from))
}

async fn get_block_index(&self) -> anyhow::Result<Option<BinaryFuse16>> {
async fn get_block_index(
&self,
) -> anyhow::Result<Option<(i32, BinaryFuse16)>> {
let conn = self.app_state.get_db_connection().await.context(
"Failed to retrieve connection from the pool of database \
connections",
)?;

let maybe_serialized_data = conn
let maybe_index = conn
.interact(move |conn| {
use orm::block_index::BlockIndex;
use orm::schema::block_index::dsl::block_index;
Expand All @@ -65,21 +69,26 @@ impl NamadaStateRepositoryTrait for NamadaStateRepository {
.context("Failed to get latest block index from db")?
.map(
|BlockIndex {
serialized_data, ..
}| serialized_data,
block_height,
serialized_data,
..
}| {
(block_height, serialized_data)
},
),
)
})
.await
.context_db_interact_error()??;

tokio::task::block_in_place(|| {
maybe_serialized_data
.map(|data| {
bincode::deserialize(&data).context(
maybe_index
.map(|(height, data)| {
let filter = bincode::deserialize(&data).context(
"Failed to deserialize block index data returned from \
db",
)
)?;
anyhow::Ok((height, filter))
})
.transpose()
})
Expand Down
1 change: 1 addition & 0 deletions webserver/src/response/namada_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ pub struct LatestHeightResponse {

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct BlockIndexResponse {
pub block_height: u64,
pub index: BinaryFuse16,
}
10 changes: 8 additions & 2 deletions webserver/src/service/namada_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@ impl NamadaStateService {

pub async fn get_block_index(
&self,
) -> anyhow::Result<Option<xorf::BinaryFuse16>> {
self.namada_state_repo.get_block_index().await
) -> anyhow::Result<Option<(BlockHeight, xorf::BinaryFuse16)>> {
self.namada_state_repo
.get_block_index()
.await
.map(|option| {
option
.map(|(height, filter)| (BlockHeight(height as _), filter))
})
}
}

0 comments on commit bcfa07a

Please sign in to comment.