diff --git a/Cargo.lock b/Cargo.lock index caa230e3..679f92cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1408,11 +1408,11 @@ name = "cache-storage" version = "0.2.12" dependencies = [ "anyhow", - "borsh 1.5.1", "futures", "near-indexer-primitives", "readnode-primitives", "redis", + "serde", "serde_json", ] diff --git a/cache-storage/Cargo.toml b/cache-storage/Cargo.toml index c0e00b66..df7f1e68 100644 --- a/cache-storage/Cargo.toml +++ b/cache-storage/Cargo.toml @@ -11,9 +11,9 @@ license.workspace = true [dependencies] anyhow = "1.0.86" -borsh = "1.5.1" futures = "0.3.5" redis = { version = "0.25.2", features = ["tokio-comp", "connection-manager"] } +serde = { version = "1.0.145", features = ["derive"] } serde_json = "1.0.85" readnode-primitives.workspace = true diff --git a/cache-storage/src/lib.rs b/cache-storage/src/lib.rs index c32fa854..ae928d4b 100644 --- a/cache-storage/src/lib.rs +++ b/cache-storage/src/lib.rs @@ -1,3 +1,5 @@ +mod utils; + #[derive(Clone)] struct RedisCacheStorage { client: redis::aio::ConnectionManager, @@ -7,8 +9,8 @@ impl RedisCacheStorage { // Create a new instance of the `RedisCacheStorage` struct. // param `redis_url` - Redis connection URL. // param `database_number` - Number of the database to use. - // We use database 1 for handling the blocks by finality cache. - // We use database 3 for collecting transactions cache. + // We use database 0 for handling the blocks by finality cache. + // We use database 2 for collecting transactions cache. // Different databases are used to avoid key conflicts. async fn new(redis_url: String, database_number: usize) -> anyhow::Result { let redis_client = redis::Client::open(redis_url)? @@ -162,7 +164,7 @@ impl TxIndexerCache { // Use redis database 3 for collecting transactions cache pub async fn new(redis_url: String) -> anyhow::Result { Ok(Self { - cache_storage: RedisCacheStorage::new(redis_url, 3).await?, + cache_storage: RedisCacheStorage::new(redis_url, 2).await?, }) } @@ -205,9 +207,7 @@ impl TxIndexerCache { .cache_storage .get(format!("transaction_{}", transaction_key)) .await?; - Ok(borsh::from_slice::< - readnode_primitives::CollectingTransactionDetails, - >(&result)?) + utils::from_slice::(&result) } pub async fn get_tx_outcomes( @@ -220,9 +220,9 @@ impl TxIndexerCache { .await? .iter() .map(|outcome| { - borsh::from_slice::(outcome) - .expect("Failed to deserialize ExecutionOutcome") + utils::from_slice::(outcome) }) + .filter_map(|outcome| outcome.ok()) .collect()) } @@ -245,7 +245,7 @@ impl TxIndexerCache { self.cache_storage .set( format!("transaction_{}", transaction_details.transaction_key()), - borsh::to_vec(&transaction_details)?, + utils::to_vec(&transaction_details)?, ) .await } @@ -280,7 +280,7 @@ impl TxIndexerCache { self.cache_storage .rpush( format!("outcomes_{}", transaction_key), - borsh::to_vec(&execution_outcome_with_receipt)?, + utils::to_vec(&execution_outcome_with_receipt)?, ) .await? }; diff --git a/cache-storage/src/utils.rs b/cache-storage/src/utils.rs new file mode 100644 index 00000000..49037399 --- /dev/null +++ b/cache-storage/src/utils.rs @@ -0,0 +1,11 @@ +// Help functions to serialize and deserialize the data to be stored in the cache. +// This is needed to handle the backward incompatible changes in the TransactionDetails + +pub fn to_vec(value: &T) -> anyhow::Result> { + let value_json = serde_json::to_value(value)?.to_string(); + Ok(value_json.into_bytes()) +} + +pub fn from_slice<'a, T: serde::de::Deserialize<'a>>(data: &'a [u8]) -> anyhow::Result { + Ok(serde_json::from_slice(data)?) +}