diff --git a/server/Cargo.lock b/server/Cargo.lock index 1a3ad47..e7657b7 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -1029,6 +1029,7 @@ dependencies = [ "sha2", "thiserror", "tokio", + "tokio-stream", "tokio-util", "tower-http", "tracing", @@ -3877,6 +3878,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-tungstenite" version = "0.20.1" diff --git a/server/Cargo.toml b/server/Cargo.toml index a31200b..dcb18dc 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -53,3 +53,4 @@ bech32 = "0.10.0-alpha" crc32fast = "1.3.2" ciborium = "0.2.1" serde_qs = "0.12.0" +tokio-stream = "0.1.14" diff --git a/server/src/http.rs b/server/src/http.rs index 143fec1..a7848b7 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -63,6 +63,9 @@ pub fn setup(state: AppState) -> App { .directory_route("/bulk/a", get(routes::address::get_bulk)) .directory_route("/bulk/n", get(routes::name::get_bulk)) .directory_route("/bulk/u", get(routes::universal::get_bulk)) + .directory_route("/sse/a", get(routes::address::get_bulk_sse)) + .directory_route("/sse/n", get(routes::name::get_bulk_sse)) + .directory_route("/sse/u", get(routes::universal::get_bulk_sse)) .fallback(routes::four_oh_four::handler) .layer(CorsLayer::permissive()) .layer(TraceLayer::new_for_http()) diff --git a/server/src/models/bulk.rs b/server/src/models/bulk.rs index 6dca74f..a4e5bb4 100644 --- a/server/src/models/bulk.rs +++ b/server/src/models/bulk.rs @@ -4,7 +4,7 @@ use utoipa::ToSchema; use crate::models::error::ErrorResponse; use crate::routes::profile_http_error_mapper; -#[derive(serde::Serialize)] +#[derive(Debug, serde::Serialize)] #[serde(tag = "type")] pub enum BulkResponse { #[serde(rename = "success")] diff --git a/server/src/models/mod.rs b/server/src/models/mod.rs index 78f5cdd..8373dfe 100644 --- a/server/src/models/mod.rs +++ b/server/src/models/mod.rs @@ -1,3 +1,4 @@ pub mod bulk; pub mod error; pub mod profile; +pub mod sse; diff --git a/server/src/models/sse.rs b/server/src/models/sse.rs new file mode 100644 index 0000000..009931f --- /dev/null +++ b/server/src/models/sse.rs @@ -0,0 +1,9 @@ +use enstate_shared::core::Profile; + +use crate::models::bulk::BulkResponse; + +#[derive(Debug, serde::Serialize)] +pub struct SSEResponse { + pub(crate) query: String, + pub(crate) response: BulkResponse, +} diff --git a/server/src/routes/address.rs b/server/src/routes/address.rs index 5280ab3..b468775 100644 --- a/server/src/routes/address.rs +++ b/server/src/routes/address.rs @@ -1,5 +1,9 @@ +use std::convert::Infallible; use std::sync::Arc; +use std::time::Duration; +use axum::response::sse::Event; +use axum::response::{IntoResponse, Sse}; use axum::{ extract::{Path, Query, State}, http::StatusCode, @@ -10,9 +14,14 @@ use enstate_shared::core::Profile; use ethers_core::types::Address; use futures::future::join_all; use serde::Deserialize; +use tokio_stream::wrappers::UnboundedReceiverStream; use crate::models::bulk::{BulkResponse, ListResponse}; -use crate::routes::{http_simple_status_error, validate_bulk_input, FreshQuery, Qs, RouteError}; +use crate::models::sse::SSEResponse; +use crate::routes::{ + http_simple_status_error, profile_http_error_mapper, validate_bulk_input, FreshQuery, Qs, + RouteError, +}; #[utoipa::path( get, @@ -95,3 +104,44 @@ pub async fn get_bulk( Ok(Json(joined)) } + +pub async fn get_bulk_sse( + Qs(query): Qs, + State(state): State>, +) -> impl IntoResponse { + let addresses = validate_bulk_input(&query.addresses, 10).unwrap(); + + let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel::>(); + + for address_input in addresses { + let state_clone = state.clone(); + let event_tx_clone = event_tx.clone(); + tokio::spawn(async move { + let profile = 'a: { + let address = address_input.parse::
(); + + let Ok(address) = address else { + break 'a Err(http_simple_status_error(StatusCode::BAD_REQUEST)); + }; + + state_clone + .service + .resolve_profile(LookupInfo::Address(address), query.fresh.fresh) + .await + .map_err(profile_http_error_mapper) + }; + + let sse_response = SSEResponse { + query: address_input, + response: profile.into(), + }; + + event_tx_clone.send(Ok(Event::default() + .json_data(sse_response) + .expect("json_data should've succeeded"))) + }); + } + + Sse::new(UnboundedReceiverStream::new(event_rx)) + .keep_alive(axum::response::sse::KeepAlive::new().interval(Duration::from_secs(1))) +} diff --git a/server/src/routes/mod.rs b/server/src/routes/mod.rs index 7bf46a1..94ecef3 100644 --- a/server/src/routes/mod.rs +++ b/server/src/routes/mod.rs @@ -62,17 +62,14 @@ pub fn profile_http_error_mapper>(err: T) -> ErrorRespons } } -pub fn http_simple_status_error(status: StatusCode) -> RouteError { - ( - status, - Json(ErrorResponse { - status: status.as_u16(), - error: status - .canonical_reason() - .unwrap_or("Unknown error") - .to_string(), - }), - ) +pub fn http_simple_status_error(status: StatusCode) -> ErrorResponse { + ErrorResponse { + status: status.as_u16(), + error: status + .canonical_reason() + .unwrap_or("Unknown error") + .to_string(), + } } pub fn http_error(status: StatusCode, error: &str) -> RouteError { diff --git a/server/src/routes/name.rs b/server/src/routes/name.rs index 0137222..1381da8 100644 --- a/server/src/routes/name.rs +++ b/server/src/routes/name.rs @@ -1,5 +1,9 @@ +use std::convert::Infallible; use std::sync::Arc; +use std::time::Duration; +use axum::response::sse::Event; +use axum::response::{IntoResponse, Sse}; use axum::{ extract::{Path, Query, State}, Json, @@ -8,9 +12,11 @@ use enstate_shared::core::lookup_data::LookupInfo; use enstate_shared::core::Profile; use futures::future::join_all; use serde::Deserialize; +use tokio_stream::wrappers::UnboundedReceiverStream; use crate::models::bulk::{BulkResponse, ListResponse}; -use crate::routes::{validate_bulk_input, FreshQuery, Qs, RouteError}; +use crate::models::sse::SSEResponse; +use crate::routes::{profile_http_error_mapper, validate_bulk_input, FreshQuery, Qs, RouteError}; #[utoipa::path( get, @@ -83,3 +89,36 @@ pub async fn get_bulk( Ok(Json(joined)) } + +pub async fn get_bulk_sse( + Qs(query): Qs, + State(state): State>, +) -> impl IntoResponse { + let names = validate_bulk_input(&query.names, 10).unwrap(); + + let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel::>(); + + for name in names { + let state_clone = state.clone(); + let event_tx_clone = event_tx.clone(); + tokio::spawn(async move { + let profile = state_clone + .service + .resolve_profile(LookupInfo::Name(name.clone()), query.fresh.fresh) + .await + .map_err(profile_http_error_mapper); + + let sse_response = SSEResponse { + query: name, + response: profile.into(), + }; + + event_tx_clone.send(Ok(Event::default() + .json_data(sse_response) + .expect("json_data should've succeeded"))) + }); + } + + Sse::new(UnboundedReceiverStream::new(event_rx)) + .keep_alive(axum::response::sse::KeepAlive::new().interval(Duration::from_secs(1))) +} diff --git a/server/src/routes/universal.rs b/server/src/routes/universal.rs index 58c9cac..be72d95 100644 --- a/server/src/routes/universal.rs +++ b/server/src/routes/universal.rs @@ -1,5 +1,9 @@ +use std::convert::Infallible; use std::sync::Arc; +use std::time::Duration; +use axum::response::sse::Event; +use axum::response::{IntoResponse, Sse}; use axum::{ extract::{Path, Query, State}, Json, @@ -9,9 +13,11 @@ use enstate_shared::core::lookup_data::{LookupInfo, NameParseError}; use enstate_shared::core::{ENSService, Profile}; use futures::future::join_all; use serde::Deserialize; +use tokio_stream::wrappers::UnboundedReceiverStream; use crate::models::bulk::{BulkResponse, ListResponse}; -use crate::routes::{validate_bulk_input, FreshQuery, Qs, RouteError}; +use crate::models::sse::SSEResponse; +use crate::routes::{profile_http_error_mapper, validate_bulk_input, FreshQuery, Qs, RouteError}; #[utoipa::path( get, @@ -85,6 +91,41 @@ pub async fn get_bulk( Ok(Json(joined)) } +pub async fn get_bulk_sse( + Qs(query): Qs, + State(state): State>, +) -> impl IntoResponse { + let queries = validate_bulk_input(&query.queries, 10).unwrap(); + + let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel::>(); + + for input in queries { + let state_clone = state.clone(); + let event_tx_clone = event_tx.clone(); + tokio::spawn(async move { + let profile = profile_from_lookup_guess( + LookupInfo::guess(&input), + &state_clone.service, + query.fresh.fresh, + ) + .await + .map_err(profile_http_error_mapper); + + let sse_response = SSEResponse { + query: input, + response: profile.into(), + }; + + event_tx_clone.send(Ok(Event::default() + .json_data(sse_response) + .expect("json_data should've succeeded"))) + }); + } + + Sse::new(UnboundedReceiverStream::new(event_rx)) + .keep_alive(axum::response::sse::KeepAlive::new().interval(Duration::from_secs(1))) +} + // helper function for above async fn profile_from_lookup_guess( lookup: Result,