From 044a19d0e14c74d13c0f7b373bcc5c7ea236748f Mon Sep 17 00:00:00 2001 From: 0xZensh Date: Mon, 18 Dec 2023 17:34:51 +0800 Subject: [PATCH] feat: implement Indexer API --- crates/internal/axum-web/Cargo.toml | 2 + crates/internal/axum-web/src/erring.rs | 33 ++- crates/ns-indexer/Cargo.toml | 3 +- crates/ns-indexer/src/api/inscription.rs | 196 ++++++++++++++++++ crates/ns-indexer/src/api/mod.rs | 108 ++++++++++ crates/ns-indexer/src/api/name.rs | 80 +++++++ crates/ns-indexer/src/api/service.rs | 69 ++++++ crates/ns-indexer/src/bin/main.rs | 12 +- crates/ns-indexer/src/bitcoin.rs | 37 ++-- crates/ns-indexer/src/db/model_inscription.rs | 107 ++++++++++ crates/ns-indexer/src/db/model_name_state.rs | 48 +++++ .../ns-indexer/src/db/model_service_state.rs | 27 +++ crates/ns-indexer/src/db/scylladb.rs | 3 +- crates/ns-indexer/src/indexer.rs | 36 +++- crates/ns-indexer/src/indexer_api.rs | 47 ----- crates/ns-indexer/src/lib.rs | 5 +- crates/ns-indexer/src/router.rs | 64 +++--- crates/ns-indexer/src/scanner.rs | 19 +- crates/ns-protocol/src/ns.rs | 16 ++ 19 files changed, 798 insertions(+), 114 deletions(-) create mode 100644 crates/ns-indexer/src/api/inscription.rs create mode 100644 crates/ns-indexer/src/api/mod.rs create mode 100644 crates/ns-indexer/src/api/name.rs create mode 100644 crates/ns-indexer/src/api/service.rs delete mode 100644 crates/ns-indexer/src/indexer_api.rs diff --git a/crates/internal/axum-web/Cargo.toml b/crates/internal/axum-web/Cargo.toml index 13743f6..18e95fd 100644 --- a/crates/internal/axum-web/Cargo.toml +++ b/crates/internal/axum-web/Cargo.toml @@ -25,4 +25,6 @@ serde = { workspace = true } serde_json = { workspace = true } structured-logger = { workspace = true } tokio = { workspace = true } +scylla = "0.10" zstd = "0.12" +validator = { version = "0.16", features = ["derive"] } diff --git a/crates/internal/axum-web/src/erring.rs b/crates/internal/axum-web/src/erring.rs index c2e0e34..41984ef 100644 --- a/crates/internal/axum-web/src/erring.rs +++ b/crates/internal/axum-web/src/erring.rs @@ -3,10 +3,10 @@ use axum::{ response::{IntoResponse, Response}, Json, }; +use scylla::transport::query_result::SingleRowError; use serde::{Deserialize, Serialize}; use std::{error::Error, fmt, fmt::Debug}; - -use crate::object::PackObject; +use validator::{ValidationError, ValidationErrors}; /// ErrorResponse is the response body for error. #[derive(Deserialize, Serialize)] @@ -20,7 +20,7 @@ pub struct SuccessResponse { #[serde(skip_serializing_if = "Option::is_none")] pub total_size: Option, #[serde(skip_serializing_if = "Option::is_none")] - pub next_page_token: Option>>, + pub next_page_token: Option, pub result: T, } @@ -76,3 +76,30 @@ impl IntoResponse for HTTPError { (status, body).into_response() } } + +impl From for HTTPError { + fn from(err: anyhow::Error) -> Self { + match err.downcast::() { + Ok(err) => err, + Err(sel) => match sel.downcast::() { + Ok(sel) => HTTPError::new(400, format!("{:?}", sel)), + Err(sel) => match sel.downcast::() { + Ok(_) => HTTPError::new(404, "data not found".to_string()), + Err(sel) => HTTPError::new(500, format!("{:?}", sel)), + }, + }, + } + } +} + +impl From for HTTPError { + fn from(err: ValidationError) -> Self { + HTTPError::new(400, format!("{:?}", err)) + } +} + +impl From for HTTPError { + fn from(err: ValidationErrors) -> Self { + HTTPError::new(400, format!("{:?}", err)) + } +} diff --git a/crates/ns-indexer/Cargo.toml b/crates/ns-indexer/Cargo.toml index 7dc27d9..dc4745b 100644 --- a/crates/ns-indexer/Cargo.toml +++ b/crates/ns-indexer/Cargo.toml @@ -40,7 +40,7 @@ reqwest = { version = "0.11", features = [ ], default-features = false } bitcoin = { version = "0.31", features = ["serde", "base64", "rand"] } dotenvy = "0.15" -faster-hex = "0.8" +faster-hex = "0.9" bitcoincore-rpc-json = "0.18.0" scylla = "0.10" tower = "0.4" @@ -52,6 +52,7 @@ tower-http = { version = "0.5", features = [ "decompression-zstd", "propagate-header", ] } +validator = { version = "0.16", features = ["derive"] } [dev-dependencies] hex-literal = "0.4" diff --git a/crates/ns-indexer/src/api/inscription.rs b/crates/ns-indexer/src/api/inscription.rs new file mode 100644 index 0000000..118164d --- /dev/null +++ b/crates/ns-indexer/src/api/inscription.rs @@ -0,0 +1,196 @@ +use axum::{ + extract::{Query, State}, + Extension, +}; +use std::sync::Arc; +use validator::Validate; + +use axum_web::{ + context::ReqContext, + erring::{HTTPError, SuccessResponse}, + object::PackObject, +}; +use ns_protocol::index::{Inscription, InvalidInscription}; + +use crate::api::{IndexerAPI, QueryHeight, QueryName, QueryNamePagination}; +use crate::db; + +pub struct InscriptionAPI; + +impl InscriptionAPI { + pub async fn get_last_accepted( + Extension(ctx): Extension>, + to: PackObject<()>, + State(api): State>, + ) -> Result>>, HTTPError> { + ctx.set("action", "get_last_accepted_inscription".into()) + .await; + + let last_accepted_state = api.state.last_accepted.read().await; + + Ok(to.with(SuccessResponse::new(last_accepted_state.clone()))) + } + + pub async fn get_best( + Extension(ctx): Extension>, + to: PackObject<()>, + State(api): State>, + ) -> Result>>, HTTPError> { + ctx.set("action", "get_best_inscription".into()).await; + + let best_inscriptions_state = api.state.best_inscriptions.read().await; + let mut inscription = best_inscriptions_state.last().cloned(); + if inscription.is_none() { + let last_accepted_state = api.state.last_accepted.read().await; + inscription = last_accepted_state.clone(); + } + + Ok(to.with(SuccessResponse::new(inscription))) + } + + pub async fn get( + State(app): State>, + Extension(ctx): Extension>, + to: PackObject<()>, + input: Query, + ) -> Result>, HTTPError> { + input.validate()?; + if input.sequence.is_none() { + return Err(HTTPError::new(400, "sequence is required".to_string())); + } + + let name = input.name.clone(); + let sequence = input.sequence.unwrap(); + ctx.set_kvs(vec![ + ("action", "get_inscription".into()), + ("name", name.clone().into()), + ("sequence", sequence.into()), + ]) + .await; + + let mut inscription = db::Inscription::with_pk(name, sequence); + inscription.get_one(&app.scylla, vec![]).await?; + + Ok(to.with(SuccessResponse::new(inscription.to_index()?))) + } + + pub async fn get_by_height( + State(app): State>, + Extension(ctx): Extension>, + to: PackObject<()>, + input: Query, + ) -> Result>, HTTPError> { + input.validate()?; + + let height = input.height; + ctx.set_kvs(vec![ + ("action", "get_inscription_by_height".into()), + ("height", height.into()), + ]) + .await; + + let inscription = db::Inscription::get_by_height(&app.scylla, height, vec![]).await?; + + Ok(to.with(SuccessResponse::new(inscription.to_index()?))) + } + + pub async fn list_best( + Extension(ctx): Extension>, + to: PackObject<()>, + State(api): State>, + ) -> Result>>, HTTPError> { + ctx.set("action", "list_best_inscriptions".into()).await; + let best_inscriptions_state = api.state.best_inscriptions.read().await; + Ok(to.with(SuccessResponse::new(best_inscriptions_state.clone()))) + } + + pub async fn list_by_block_height( + State(app): State>, + Extension(ctx): Extension>, + to: PackObject<()>, + input: Query, + ) -> Result>>, HTTPError> { + input.validate()?; + + let height = input.height; + ctx.set_kvs(vec![ + ("action", "list_inscriptions_block_height".into()), + ("height", height.into()), + ]) + .await; + + let res = db::Inscription::list_by_block_height(&app.scylla, height, vec![]).await?; + let mut inscriptions: Vec = Vec::with_capacity(res.len()); + for i in res { + inscriptions.push(i.to_index()?); + } + Ok(to.with(SuccessResponse::new(inscriptions))) + } + + pub async fn list_by_name( + State(app): State>, + Extension(ctx): Extension>, + to: PackObject<()>, + input: Query, + ) -> Result>>, HTTPError> { + input.validate()?; + + let name = input.name.clone(); + ctx.set_kvs(vec![ + ("action", "list_inscriptions_by_name".into()), + ("name", name.clone().into()), + ]) + .await; + + let res = db::Inscription::list_by_name( + &app.scylla, + &name, + vec![], + input.page_size.unwrap_or(10), + input.page_token, + ) + .await?; + let mut inscriptions: Vec = Vec::with_capacity(res.len()); + for i in res { + inscriptions.push(i.to_index()?); + } + let next_sequence = if let Some(last) = inscriptions.last() { + last.sequence + } else { + 0 + }; + Ok(to.with(SuccessResponse { + total_size: None, + next_page_token: if next_sequence > 0 { + Some(next_sequence.to_string()) + } else { + None + }, + result: inscriptions, + })) + } + + pub async fn list_invalid_by_name( + State(app): State>, + Extension(ctx): Extension>, + to: PackObject<()>, + input: Query, + ) -> Result>>, HTTPError> { + input.validate()?; + + let name = input.name.clone(); + ctx.set_kvs(vec![ + ("action", "list_invalid_inscriptions_by_name".into()), + ("name", name.clone().into()), + ]) + .await; + + let res = db::InvalidInscription::list_by_name(&app.scylla, &name).await?; + let mut inscriptions: Vec = Vec::with_capacity(res.len()); + for i in res { + inscriptions.push(i.to_index()?); + } + + Ok(to.with(SuccessResponse::new(inscriptions))) + } +} diff --git a/crates/ns-indexer/src/api/mod.rs b/crates/ns-indexer/src/api/mod.rs new file mode 100644 index 0000000..51c56f2 --- /dev/null +++ b/crates/ns-indexer/src/api/mod.rs @@ -0,0 +1,108 @@ +use axum::extract::State; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use validator::{Validate, ValidationError}; + +use axum_web::erring::{HTTPError, SuccessResponse}; +use axum_web::object::PackObject; +use ns_protocol::ns; + +use crate::db::scylladb::ScyllaDB; +use crate::indexer::{Indexer, IndexerState}; + +mod inscription; +mod name; +mod service; + +pub use inscription::InscriptionAPI; +pub use name::NameAPI; +pub use service::ServiceAPI; + +#[derive(Serialize, Deserialize)] +pub struct AppVersion { + pub name: String, + pub version: String, +} + +#[derive(Serialize, Deserialize)] +pub struct AppHealth { + pub block_height: u64, + pub inscription_height: u64, +} + +pub struct IndexerAPI { + pub(crate) scylla: Arc, + pub(crate) state: Arc, +} + +impl IndexerAPI { + pub fn new(indexer: Arc) -> Self { + Self { + scylla: indexer.scylla.clone(), + state: indexer.state.clone(), + } + } +} + +pub async fn version( + to: PackObject<()>, + State(_): State>, +) -> PackObject { + to.with(AppVersion { + name: crate::APP_NAME.to_string(), + version: crate::APP_VERSION.to_string(), + }) +} + +pub async fn healthz( + to: PackObject<()>, + State(api): State>, +) -> Result>, HTTPError> { + let last_accepted_state = api.state.last_accepted.read().await; + let (block_height, height) = match *last_accepted_state { + Some(ref last_accepted) => (last_accepted.block_height, last_accepted.height), + None => (0, 0), + }; + Ok(to.with(SuccessResponse::new(AppHealth { + block_height, + inscription_height: height, + }))) +} + +#[derive(Debug, Deserialize, Validate)] +pub struct QueryName { + #[validate(custom = "validate_name")] + pub name: String, + #[validate(range(min = 0))] + pub sequence: Option, + #[validate(range(min = 0))] + pub code: Option, +} + +#[derive(Debug, Deserialize, Validate)] +pub struct QueryHeight { + #[validate(range(min = 0))] + pub height: i64, +} + +#[derive(Debug, Deserialize, Validate)] +pub struct QueryNamePagination { + #[validate(custom = "validate_name")] + pub name: String, + pub page_token: Option, + #[validate(range(min = 2, max = 1000))] + pub page_size: Option, +} + +#[derive(Debug, Deserialize, Validate)] +pub struct QueryPubkey { + pub pubkey: String, +} + +fn validate_name(name: &str) -> Result<(), ValidationError> { + if !ns::valid_name(name) { + return Err(ValidationError::new("invalid name")); + } + + Ok(()) +} diff --git a/crates/ns-indexer/src/api/name.rs b/crates/ns-indexer/src/api/name.rs new file mode 100644 index 0000000..0131a33 --- /dev/null +++ b/crates/ns-indexer/src/api/name.rs @@ -0,0 +1,80 @@ +use axum::{ + extract::{Query, State}, + Extension, +}; +use std::sync::Arc; +use validator::Validate; + +use axum_web::{ + context::ReqContext, + erring::{HTTPError, SuccessResponse}, + object::PackObject, +}; +use ns_protocol::index::NameState; + +use crate::api::{IndexerAPI, QueryName, QueryPubkey}; +use crate::db; + +pub struct NameAPI; + +impl NameAPI { + pub async fn get( + State(app): State>, + Extension(ctx): Extension>, + to: PackObject<()>, + input: Query, + ) -> Result>, HTTPError> { + input.validate()?; + + let name = input.name.clone(); + ctx.set_kvs(vec![ + ("action", "get_name_state".into()), + ("name", name.clone().into()), + ]) + .await; + + let mut name_state = db::NameState::with_pk(name); + name_state.get_one(&app.scylla, vec![]).await?; + + Ok(to.with(SuccessResponse::new(name_state.to_index()?))) + } + + pub async fn list_by_query( + State(app): State>, + Extension(ctx): Extension>, + to: PackObject<()>, + input: Query, + ) -> Result>>, HTTPError> { + input.validate()?; + + let query = input.name.clone(); + ctx.set_kvs(vec![ + ("action", "list_names_by_query".into()), + ("query", query.clone().into()), + ]) + .await; + + let names = db::NameState::list_by_query(&app.scylla, query).await?; + + Ok(to.with(SuccessResponse::new(names))) + } + + pub async fn list_by_pubkey( + State(app): State>, + Extension(ctx): Extension>, + to: PackObject<()>, + input: Query, + ) -> Result>>, HTTPError> { + input.validate()?; + + let mut pubkey = [0u8; 32]; + faster_hex::hex_decode(input.pubkey.as_bytes(), &mut pubkey) + .map_err(|_| HTTPError::new(400, format!("Invalid pubkey: {}", input.pubkey)))?; + ctx.set_kvs(vec![("action", "list_names_by_pubkey".into())]) + .await; + + let mut names = db::NameState::list_by_pubkey(&app.scylla, pubkey.to_vec()).await?; + names.sort(); + Ok(to.with(SuccessResponse::new(names))) + } +} diff --git a/crates/ns-indexer/src/api/service.rs b/crates/ns-indexer/src/api/service.rs new file mode 100644 index 0000000..6878c6d --- /dev/null +++ b/crates/ns-indexer/src/api/service.rs @@ -0,0 +1,69 @@ +use axum::{ + extract::{Query, State}, + Extension, +}; +use std::sync::Arc; +use validator::Validate; + +use axum_web::{ + context::ReqContext, + erring::{HTTPError, SuccessResponse}, + object::PackObject, +}; +use ns_protocol::index::ServiceState; + +use crate::api::{IndexerAPI, QueryName}; +use crate::db; + +pub struct ServiceAPI; + +impl ServiceAPI { + pub async fn get( + State(app): State>, + Extension(ctx): Extension>, + to: PackObject<()>, + input: Query, + ) -> Result>, HTTPError> { + input.validate()?; + if input.code.is_none() { + return Err(HTTPError::new(400, "service code is required".to_string())); + } + + let name = input.name.clone(); + let code = input.code.unwrap(); + ctx.set_kvs(vec![ + ("action", "get_service_state".into()), + ("name", name.clone().into()), + ("code", code.into()), + ]) + .await; + + let mut service_state = db::ServiceState::with_pk(name, code); + service_state.get_one(&app.scylla, vec![]).await?; + + Ok(to.with(SuccessResponse::new(service_state.to_index()?))) + } + + pub async fn list_by_name( + State(app): State>, + Extension(ctx): Extension>, + to: PackObject<()>, + input: Query, + ) -> Result>>, HTTPError> { + input.validate()?; + + let name = input.name.clone(); + ctx.set_kvs(vec![ + ("action", "list_service_states_by_name".into()), + ("name", name.clone().into()), + ]) + .await; + + let res = db::ServiceState::list_by_name(&app.scylla, &name, vec![]).await?; + let mut service_states: Vec = Vec::with_capacity(res.len()); + for i in res { + service_states.push(i.to_index()?); + } + Ok(to.with(SuccessResponse::new(service_states))) + } +} diff --git a/crates/ns-indexer/src/bin/main.rs b/crates/ns-indexer/src/bin/main.rs index 6c8078a..c46027c 100644 --- a/crates/ns-indexer/src/bin/main.rs +++ b/crates/ns-indexer/src/bin/main.rs @@ -4,10 +4,10 @@ use std::sync::Arc; use structured_logger::{async_json::new_writer, get_env_level, Builder}; use tokio::signal; +use ns_indexer::api::IndexerAPI; use ns_indexer::bitcoin::{BitcoinRPC, BitcoinRPCOptions}; use ns_indexer::db::scylladb::ScyllaDBOptions; use ns_indexer::indexer::{Indexer, IndexerOptions}; -use ns_indexer::indexer_api::{self, IndexerAPI}; use ns_indexer::router; use ns_indexer::scanner::Scanner; @@ -76,8 +76,8 @@ fn main() -> anyhow::Result<()> { let addr = std::env::var("INDEXER_SERVER_ADDR").unwrap_or("127.0.0.1:3000".to_string()); log::info!( "{}@{} start at {}", - indexer_api::APP_NAME, - indexer_api::APP_VERSION, + ns_indexer::APP_NAME, + ns_indexer::APP_VERSION, &addr ); let listener = tokio::net::TcpListener::bind(&addr) @@ -102,7 +102,11 @@ fn main() -> anyhow::Result<()> { let background_job = async { match scanner.run(shutdown.clone(), start_height).await { Ok(_) => log::info!(target: "server", "scanner finished"), - Err(err) => log::error!(target: "server", "scanner error: {}", err), + Err(err) => { + log::error!(target: "server", "scanner error: {}", err); + // should exit the process and restart + return Err(err); + } } Ok::<(), anyhow::Error>(()) diff --git a/crates/ns-indexer/src/bitcoin.rs b/crates/ns-indexer/src/bitcoin.rs index 688aab5..316af88 100644 --- a/crates/ns-indexer/src/bitcoin.rs +++ b/crates/ns-indexer/src/bitcoin.rs @@ -117,23 +117,32 @@ impl BitcoinRPC { }; let input = to_vec(&input)?; - let mut res = self - .client - .post(self.url.clone()) - .body(input.clone()) - .send() - .await?; - - // retry once if server error - if res.status().as_u16() > 500 { - sleep(Duration::from_secs(1)).await; - res = self + // retry if server error + let mut retry_secs = 0; + let res = loop { + match self .client .post(self.url.clone()) - .body(input) + .body(input.clone()) .send() - .await?; - } + .await + { + Ok(res) => break res, + Err(err) => { + retry_secs += 1; + if retry_secs <= 5 { + log::warn!(target: "ns-indexer", + action = "bitcoin_rpc_retry"; + "{}", err.to_string(), + ); + sleep(Duration::from_secs(retry_secs)).await; + continue; + } else { + anyhow::bail!("BitcoinRPC: {}", err.to_string()); + } + } + } + }; let data = res.bytes().await?; let output: RPCResponse = serde_json::from_slice(&data).map_err(|err| { diff --git a/crates/ns-indexer/src/db/model_inscription.rs b/crates/ns-indexer/src/db/model_inscription.rs index be17474..c825bec 100644 --- a/crates/ns-indexer/src/db/model_inscription.rs +++ b/crates/ns-indexer/src/db/model_inscription.rs @@ -389,6 +389,90 @@ impl Inscription { let _ = db.batch(statements, values).await?; Ok(()) } + + pub async fn get_by_height( + db: &scylladb::ScyllaDB, + height: i64, + select_fields: Vec, + ) -> anyhow::Result { + let fields = Self::select_fields(select_fields, true)?; + + let query = format!( + "SELECT {} FROM inscription WHERE height=? LIMIT 1", + fields.clone().join(",") + ); + let params = (height,); + let res = db.execute(query, params).await?.single_row()?; + + let mut cols = ColumnsMap::with_capacity(fields.len()); + cols.fill(res, &fields)?; + let mut doc = Self::default(); + doc.fill(&cols); + doc._fields = fields.clone(); + + Ok(doc) + } + + pub async fn list_by_block_height( + db: &scylladb::ScyllaDB, + height: i64, + select_fields: Vec, + ) -> anyhow::Result> { + let fields = Self::select_fields(select_fields, true)?; + + let query = format!( + "SELECT {} FROM inscription WHERE block_height=?", + fields.clone().join(",") + ); + let params = (height,); + let rows = db.execute_iter(query, params).await?; + + let mut res: Vec = Vec::with_capacity(rows.len()); + for row in rows { + let mut doc = Self::default(); + let mut cols = ColumnsMap::with_capacity(fields.len()); + cols.fill(row, &fields)?; + doc.fill(&cols); + doc._fields = fields.clone(); + res.push(doc); + } + + Ok(res) + } + + pub async fn list_by_name( + db: &scylladb::ScyllaDB, + name: &String, + select_fields: Vec, + page_size: u16, + page_token: Option, + ) -> anyhow::Result> { + let fields = Self::select_fields(select_fields, true)?; + + let token = match page_token { + Some(i) => i, + None => i64::MAX, + }; + + let query = format!( + "SELECT {} FROM inscription WHERE name=? AND sequence = Vec::with_capacity(rows.len()); + for row in rows { + let mut doc = Self::default(); + let mut cols = ColumnsMap::with_capacity(fields.len()); + cols.fill(row, &fields)?; + doc.fill(&cols); + doc._fields = fields.clone(); + res.push(doc); + } + + Ok(res) + } } impl InvalidInscription { @@ -439,4 +523,27 @@ impl InvalidInscription { let _ = db.execute(query, params).await?; Ok(true) } + + pub async fn list_by_name(db: &scylladb::ScyllaDB, name: &String) -> anyhow::Result> { + let fields = Self::fields(); + + let query = format!( + "SELECT {} FROM invalid_inscription WHERE name=? USING TIMEOUT 3s", + fields.clone().join(",") + ); + let params = (name.to_cql(),); + let rows = db.execute_iter(query, params).await?; + + let mut res: Vec = Vec::with_capacity(rows.len()); + for row in rows { + let mut doc = Self::default(); + let mut cols = ColumnsMap::with_capacity(fields.len()); + cols.fill(row, &fields)?; + doc.fill(&cols); + doc._fields = fields.clone(); + res.push(doc); + } + + Ok(res) + } } diff --git a/crates/ns-indexer/src/db/model_name_state.rs b/crates/ns-indexer/src/db/model_name_state.rs index d937d2d..19ffc64 100644 --- a/crates/ns-indexer/src/db/model_name_state.rs +++ b/crates/ns-indexer/src/db/model_name_state.rs @@ -235,4 +235,52 @@ impl NameState { let _ = db.batch(statements, values).await?; Ok(()) } + + pub async fn list_by_query(db: &scylladb::ScyllaDB, q: String) -> anyhow::Result> { + let fields = NameIndex::fields(); + + let query = format!( + "SELECT {} FROM name_index WHERE name LIKE ? LIMIT 100 ALLOW FILTERING", + fields.clone().join(",") + ); + let params = (q + "%",); + let rows = db.execute_iter(query, params).await?; + + let mut res: Vec = Vec::with_capacity(rows.len()); + for row in rows { + let mut doc = NameIndex::default(); + let mut cols = ColumnsMap::with_capacity(fields.len()); + cols.fill(row, &fields)?; + doc.fill(&cols); + res.push(doc); + } + + res.sort_by(|a, b| b.block_time.partial_cmp(&a.block_time).unwrap()); + Ok(res.into_iter().map(|name| name.name).collect()) + } + + pub async fn list_by_pubkey( + db: &scylladb::ScyllaDB, + pubkey: Vec, + ) -> anyhow::Result> { + let fields = vec!["name".to_string()]; + if pubkey.len() != 32 { + return Ok(vec![]); + } + + let query = "SELECT name FROM pubkey_name WHERE pubkey=?"; + let params = (pubkey,); + let rows = db.execute_iter(query, params).await?; + + let mut res: Vec = Vec::with_capacity(rows.len()); + for row in rows { + let mut doc = PubkeyName::default(); + let mut cols = ColumnsMap::with_capacity(fields.len()); + cols.fill(row, &fields)?; + doc.fill(&cols); + res.push(doc); + } + + Ok(res.into_iter().map(|name| name.name).collect()) + } } diff --git a/crates/ns-indexer/src/db/model_service_state.rs b/crates/ns-indexer/src/db/model_service_state.rs index 896d436..e6c5f36 100644 --- a/crates/ns-indexer/src/db/model_service_state.rs +++ b/crates/ns-indexer/src/db/model_service_state.rs @@ -99,4 +99,31 @@ impl ServiceState { Ok(()) } + + pub async fn list_by_name( + db: &scylladb::ScyllaDB, + name: &String, + select_fields: Vec, + ) -> anyhow::Result> { + let fields = Self::select_fields(select_fields, true)?; + + let query = format!( + "SELECT {} FROM service_state WHERE name=? USING TIMEOUT 3s", + fields.clone().join(",") + ); + let params = (name.to_cql(),); + let rows = db.execute_iter(query, params).await?; + + let mut res: Vec = Vec::with_capacity(rows.len()); + for row in rows { + let mut doc = Self::default(); + let mut cols = ColumnsMap::with_capacity(fields.len()); + cols.fill(row, &fields)?; + doc.fill(&cols); + doc._fields = fields.clone(); + res.push(doc); + } + + Ok(res) + } } diff --git a/crates/ns-indexer/src/db/scylladb.rs b/crates/ns-indexer/src/db/scylladb.rs index 80d6727..d0bd146 100644 --- a/crates/ns-indexer/src/db/scylladb.rs +++ b/crates/ns-indexer/src/db/scylladb.rs @@ -176,7 +176,7 @@ mod tests { .collect(), username: std::env::var("SCYLLA_USERNAME").unwrap_or_default(), password: std::env::var("SCYLLA_PASSWORD").unwrap_or_default(), - keyspace: std::env::var("SCYLLA_KEYSPACE").unwrap_or_default(), + keyspace: "".to_string(), }; let res = db::scylladb::ScyllaDB::new(&cfg).await; res.unwrap() @@ -187,7 +187,6 @@ mod tests { #[tokio::test(flavor = "current_thread")] async fn exec_cqls_works() { dotenvy::from_filename(".env.sample").expect(".env file not found"); - let db = get_db().await; let schema = std::include_str!("../../cql/schema.cql"); diff --git a/crates/ns-indexer/src/indexer.rs b/crates/ns-indexer/src/indexer.rs index f76ffae..6178f3d 100644 --- a/crates/ns-indexer/src/indexer.rs +++ b/crates/ns-indexer/src/indexer.rs @@ -79,10 +79,6 @@ impl Indexer { let mut last_accepted_state = self.state.last_accepted.write().await; *last_accepted_state = Some(last_accepted.clone()); } - { - let mut best_inscriptions_state = self.state.best_inscriptions.write().await; - best_inscriptions_state.push(last_accepted); - } } Ok(last_accepted_height.block_height as u64) } @@ -96,7 +92,7 @@ impl Indexer { ) -> anyhow::Result<()> { let accepted_height = { let last_accepted_height_state = self.state.last_accepted_height.read().await; - if *last_accepted_height_state + ACCEPTED_DISTANCE <= block_height { + if *last_accepted_height_state + ACCEPTED_DISTANCE < block_height { block_height - ACCEPTED_DISTANCE } else { 0 @@ -110,12 +106,13 @@ impl Indexer { for envelope in envelopes { for name in envelope.payload { log::info!(target: "ns-indexer", + action = "index_name", block_height = block_height, block_time = block_time, txid = envelope.txid.to_string(), name = name.name, sequence = name.sequence; - "indexing name", + "", ); match self.index_name(block_height, block_time, &name).await { Err(err) => { @@ -158,6 +155,7 @@ impl Indexer { { let mut best_inscriptions_state = self.state.best_inscriptions.write().await; + match best_inscriptions_state.last() { Some(prev_best_inscription) => { inscription.height = prev_best_inscription.height + 1; @@ -165,10 +163,18 @@ impl Indexer { .hash() .expect("hash_sha3(inscription) should not fail"); } - None => { - // this is the first inscription - inscription.previous_hash = [0u8; 32].to_vec(); - } + None => match *self.state.last_accepted.read().await { + Some(ref last_accepted_state) => { + inscription.height = last_accepted_state.height + 1; + inscription.previous_hash = last_accepted_state + .hash() + .expect("hash_sha3(inscription) should not fail"); + } + None => { + // this is the first inscription + inscription.previous_hash = [0u8; 32].to_vec(); + } + }, } best_inscriptions_state.push(inscription); @@ -449,6 +455,16 @@ impl Indexer { ) .await?; + log::info!(target: "ns-indexer", + action = "save_accepted", + block_height = accepted_height, + name_states = name_states.len(), + service_states = service_states.len(), + protocol_states = protocol_states.len(), + inscriptions = inscriptions.len(); + "", + ); + { let mut last_accepted_state = self.state.last_accepted.write().await; *last_accepted_state = Some(inscriptions.last().unwrap().clone()); diff --git a/crates/ns-indexer/src/indexer_api.rs b/crates/ns-indexer/src/indexer_api.rs deleted file mode 100644 index 89a0f4c..0000000 --- a/crates/ns-indexer/src/indexer_api.rs +++ /dev/null @@ -1,47 +0,0 @@ -use axum::extract::State; -use serde::{Deserialize, Serialize}; -use std::sync::Arc; - -use axum_web::erring::{HTTPError, SuccessResponse}; -use axum_web::object::PackObject; -use ns_protocol::index::Inscription; - -use crate::indexer::Indexer; - -pub const APP_NAME: &str = env!("CARGO_PKG_NAME"); -pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); - -#[derive(Serialize, Deserialize)] -pub struct AppVersion { - pub name: String, - pub version: String, -} - -pub struct IndexerAPI { - indexer: Arc, -} - -impl IndexerAPI { - pub fn new(indexer: Arc) -> Self { - Self { indexer } - } -} - -pub async fn version( - to: PackObject<()>, - State(_): State>, -) -> PackObject { - to.with(AppVersion { - name: APP_NAME.to_string(), - version: APP_VERSION.to_string(), - }) -} - -pub async fn get_last_accepted( - to: PackObject<()>, - State(api): State>, -) -> Result>>, HTTPError> { - let last_accepted_state = api.indexer.state.last_accepted.read().await; - - Ok(to.with(SuccessResponse::new(last_accepted_state.clone()))) -} diff --git a/crates/ns-indexer/src/lib.rs b/crates/ns-indexer/src/lib.rs index 2731936..221c0b7 100644 --- a/crates/ns-indexer/src/lib.rs +++ b/crates/ns-indexer/src/lib.rs @@ -1,7 +1,10 @@ +pub mod api; pub mod bitcoin; pub mod db; pub mod envelope; pub mod indexer; -pub mod indexer_api; pub mod router; pub mod scanner; + +pub const APP_NAME: &str = env!("CARGO_PKG_NAME"); +pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/crates/ns-indexer/src/router.rs b/crates/ns-indexer/src/router.rs index 90705b6..84806db 100644 --- a/crates/ns-indexer/src/router.rs +++ b/crates/ns-indexer/src/router.rs @@ -9,55 +9,71 @@ use tower_http::{ use axum_web::context; use axum_web::encoding; -use crate::indexer_api; +use crate::api; -pub fn new(state: Arc) -> Router { +pub fn new(state: Arc) -> Router { let mds = ServiceBuilder::new() .layer(CatchPanicLayer::new()) .layer(middleware::from_fn(context::middleware)) .layer(CompressionLayer::new().compress_when(SizeAbove::new(encoding::MIN_ENCODING_SIZE))); Router::new() - .route("/", routing::get(indexer_api::version)) - .route("/healthz", routing::get(indexer_api::version)) + .route("/", routing::get(api::version)) + .route("/healthz", routing::get(api::healthz)) .nest( "/v1/name", Router::new() - .route("/", routing::get(indexer_api::version)) - .route("/list", routing::get(indexer_api::version)) - .route("/list_by_pubkey", routing::get(indexer_api::version)), + .route("/", routing::get(api::NameAPI::get)) + .route("/list_by_query", routing::get(api::NameAPI::list_by_query)) + .route( + "/list_by_pubkey", + routing::get(api::NameAPI::list_by_pubkey), + ), ) .nest( "/v1/service", Router::new() - .route("/", routing::get(indexer_api::version)) - .route("/list", routing::get(indexer_api::version)), + .route("/", routing::get(api::ServiceAPI::get)) + .route("/list_by_name", routing::get(api::ServiceAPI::list_by_name)), ) .nest( "/v1/inscription", Router::new() - .route("/", routing::get(indexer_api::version)) + .route("/", routing::get(api::InscriptionAPI::get)) .route( "/get_last_accepted", - routing::get(indexer_api::get_last_accepted), + routing::get(api::InscriptionAPI::get_last_accepted), + ) + .route("/get_best", routing::get(api::InscriptionAPI::get_best)) + .route( + "/get_by_height", + routing::get(api::InscriptionAPI::get_by_height), + ) + .route("/list_best", routing::get(api::InscriptionAPI::list_best)) + .route( + "/list_by_block_height", + routing::get(api::InscriptionAPI::list_by_block_height), ) - .route("/get_best", routing::get(indexer_api::version)) - .route("/get_by_height", routing::get(indexer_api::version)) - .route("/list_by_name", routing::get(indexer_api::version)) - .route("/list_by_block_height", routing::get(indexer_api::version)), + .route( + "/list_by_name", + routing::get(api::InscriptionAPI::list_by_name), + ), ) .nest( "/v1/invalid_inscription", - Router::new().route("/list_by_name", routing::get(indexer_api::version)), - ) - .nest( - "/v1/service_protocol", - Router::new() - .route("/", routing::get(indexer_api::version)) - .route("/list", routing::get(indexer_api::version)) - .route("/list_by_code", routing::get(indexer_api::version)) - .route("/list_by_submitter", routing::get(indexer_api::version)), + Router::new().route( + "/list_by_name", + routing::get(api::InscriptionAPI::list_invalid_by_name), + ), ) + // .nest( + // "/v1/service_protocol", + // Router::new() + // .route("/", routing::get(api::ServiceAPI::get)) + // .route("/list", routing::get(api::ServiceAPI::get)) + // .route("/list_by_code", routing::get(api::ServiceAPI::get)) + // .route("/list_by_submitter", routing::get(api::ServiceAPI::get)), + // ) .route_layer(mds) .with_state(state) } diff --git a/crates/ns-indexer/src/scanner.rs b/crates/ns-indexer/src/scanner.rs index 9c88f39..819e3f5 100644 --- a/crates/ns-indexer/src/scanner.rs +++ b/crates/ns-indexer/src/scanner.rs @@ -49,9 +49,10 @@ impl Scanner { bestblock = self.bitcoin.wait_for_new_block(1).await?; if height > bestblock.height { log::info!(target: "ns-indexer", - blockhash = bestblock.hash.to_string(), - height = bestblock.height; - "waiting for new block", + action = "waiting_block", + block_hash = bestblock.hash.to_string(), + block_height = bestblock.height; + "", ); continue; } @@ -63,11 +64,6 @@ impl Scanner { }; self.index_block(&blockhash).await?; - log::info!(target: "ns-indexer", - blockhash = blockhash.to_string(), - height = height; - "scanned block", - ); height += 1; } } @@ -79,6 +75,13 @@ impl Scanner { return Ok(()); } + log::info!(target: "ns-indexer", + action = "index_block", + block_hash = blockhash.to_string(), + block_height = block_height; + "", + ); + for tx in block.txdata { let envelopes = envelope::Envelope::from_transaction(&tx); self.indexer diff --git a/crates/ns-protocol/src/ns.rs b/crates/ns-protocol/src/ns.rs index 3ccbeb3..05d7cf1 100644 --- a/crates/ns-protocol/src/ns.rs +++ b/crates/ns-protocol/src/ns.rs @@ -214,6 +214,22 @@ impl Name { return Err(Error::Custom(format!("invalid name {}", self.name))); } + if self.sequence > i64::MAX as u64 { + return Err(Error::Custom(format!( + "invalid sequence {}, expected less than {}", + self.sequence, + i64::MAX + ))); + } + + if self.payload.code > i64::MAX as u64 { + return Err(Error::Custom(format!( + "invalid payload code {}, expected less than {}", + self.payload.code, + i64::MAX + ))); + } + if let Some(approver) = &self.payload.approver { if !valid_name(approver) { return Err(Error::Custom(format!("invalid approver {}", approver)));