Skip to content

Commit

Permalink
Introduce server-sent events for bulk endpoints (#70)
Browse files Browse the repository at this point in the history
* Introduce universal SSE in server

* Implement better SSE message response

* Implement SSE for name & address endpoints
  • Loading branch information
Antony1060 authored Feb 13, 2024
1 parent 2cf6d9f commit 4cddd26
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 15 deletions.
12 changes: 12 additions & 0 deletions server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,4 @@ bech32 = "0.10.0-alpha"
crc32fast = "1.3.2"
ciborium = "0.2.1"
serde_qs = "0.12.0"
tokio-stream = "0.1.14"
3 changes: 3 additions & 0 deletions server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ pub fn setup(state: AppState) -> App {
.directory_route("/bulk/a", get(routes::address::get_bulk))
.directory_route("/bulk/n", get(routes::name::get_bulk))
.directory_route("/bulk/u", get(routes::universal::get_bulk))
.directory_route("/sse/a", get(routes::address::get_bulk_sse))
.directory_route("/sse/n", get(routes::name::get_bulk_sse))
.directory_route("/sse/u", get(routes::universal::get_bulk_sse))
.fallback(routes::four_oh_four::handler)
.layer(CorsLayer::permissive())
.layer(TraceLayer::new_for_http())
Expand Down
2 changes: 1 addition & 1 deletion server/src/models/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use utoipa::ToSchema;
use crate::models::error::ErrorResponse;
use crate::routes::profile_http_error_mapper;

#[derive(serde::Serialize)]
#[derive(Debug, serde::Serialize)]
#[serde(tag = "type")]
pub enum BulkResponse<Ok> {
#[serde(rename = "success")]
Expand Down
1 change: 1 addition & 0 deletions server/src/models/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod bulk;
pub mod error;
pub mod profile;
pub mod sse;
9 changes: 9 additions & 0 deletions server/src/models/sse.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use enstate_shared::core::Profile;

use crate::models::bulk::BulkResponse;

#[derive(Debug, serde::Serialize)]
pub struct SSEResponse {
pub(crate) query: String,
pub(crate) response: BulkResponse<Profile>,
}
52 changes: 51 additions & 1 deletion server/src/routes/address.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;

use axum::response::sse::Event;
use axum::response::{IntoResponse, Sse};
use axum::{
extract::{Path, Query, State},
http::StatusCode,
Expand All @@ -10,9 +14,14 @@ use enstate_shared::core::Profile;
use ethers_core::types::Address;
use futures::future::join_all;
use serde::Deserialize;
use tokio_stream::wrappers::UnboundedReceiverStream;

use crate::models::bulk::{BulkResponse, ListResponse};
use crate::routes::{http_simple_status_error, validate_bulk_input, FreshQuery, Qs, RouteError};
use crate::models::sse::SSEResponse;
use crate::routes::{
http_simple_status_error, profile_http_error_mapper, validate_bulk_input, FreshQuery, Qs,
RouteError,
};

#[utoipa::path(
get,
Expand Down Expand Up @@ -95,3 +104,44 @@ pub async fn get_bulk(

Ok(Json(joined))
}

pub async fn get_bulk_sse(
Qs(query): Qs<AddressGetBulkQuery>,
State(state): State<Arc<crate::AppState>>,
) -> impl IntoResponse {
let addresses = validate_bulk_input(&query.addresses, 10).unwrap();

let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel::<Result<Event, Infallible>>();

for address_input in addresses {
let state_clone = state.clone();
let event_tx_clone = event_tx.clone();
tokio::spawn(async move {
let profile = 'a: {
let address = address_input.parse::<Address>();

let Ok(address) = address else {
break 'a Err(http_simple_status_error(StatusCode::BAD_REQUEST));
};

state_clone
.service
.resolve_profile(LookupInfo::Address(address), query.fresh.fresh)
.await
.map_err(profile_http_error_mapper)
};

let sse_response = SSEResponse {
query: address_input,
response: profile.into(),
};

event_tx_clone.send(Ok(Event::default()
.json_data(sse_response)
.expect("json_data should've succeeded")))
});
}

Sse::new(UnboundedReceiverStream::new(event_rx))
.keep_alive(axum::response::sse::KeepAlive::new().interval(Duration::from_secs(1)))
}
19 changes: 8 additions & 11 deletions server/src/routes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,14 @@ pub fn profile_http_error_mapper<T: AsRef<ProfileError>>(err: T) -> ErrorRespons
}
}

pub fn http_simple_status_error(status: StatusCode) -> RouteError {
(
status,
Json(ErrorResponse {
status: status.as_u16(),
error: status
.canonical_reason()
.unwrap_or("Unknown error")
.to_string(),
}),
)
pub fn http_simple_status_error(status: StatusCode) -> ErrorResponse {
ErrorResponse {
status: status.as_u16(),
error: status
.canonical_reason()
.unwrap_or("Unknown error")
.to_string(),
}
}

pub fn http_error(status: StatusCode, error: &str) -> RouteError {
Expand Down
41 changes: 40 additions & 1 deletion server/src/routes/name.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;

use axum::response::sse::Event;
use axum::response::{IntoResponse, Sse};
use axum::{
extract::{Path, Query, State},
Json,
Expand All @@ -8,9 +12,11 @@ use enstate_shared::core::lookup_data::LookupInfo;
use enstate_shared::core::Profile;
use futures::future::join_all;
use serde::Deserialize;
use tokio_stream::wrappers::UnboundedReceiverStream;

use crate::models::bulk::{BulkResponse, ListResponse};
use crate::routes::{validate_bulk_input, FreshQuery, Qs, RouteError};
use crate::models::sse::SSEResponse;
use crate::routes::{profile_http_error_mapper, validate_bulk_input, FreshQuery, Qs, RouteError};

#[utoipa::path(
get,
Expand Down Expand Up @@ -83,3 +89,36 @@ pub async fn get_bulk(

Ok(Json(joined))
}

pub async fn get_bulk_sse(
Qs(query): Qs<NameGetBulkQuery>,
State(state): State<Arc<crate::AppState>>,
) -> impl IntoResponse {
let names = validate_bulk_input(&query.names, 10).unwrap();

let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel::<Result<Event, Infallible>>();

for name in names {
let state_clone = state.clone();
let event_tx_clone = event_tx.clone();
tokio::spawn(async move {
let profile = state_clone
.service
.resolve_profile(LookupInfo::Name(name.clone()), query.fresh.fresh)
.await
.map_err(profile_http_error_mapper);

let sse_response = SSEResponse {
query: name,
response: profile.into(),
};

event_tx_clone.send(Ok(Event::default()
.json_data(sse_response)
.expect("json_data should've succeeded")))
});
}

Sse::new(UnboundedReceiverStream::new(event_rx))
.keep_alive(axum::response::sse::KeepAlive::new().interval(Duration::from_secs(1)))
}
43 changes: 42 additions & 1 deletion server/src/routes/universal.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;

use axum::response::sse::Event;
use axum::response::{IntoResponse, Sse};
use axum::{
extract::{Path, Query, State},
Json,
Expand All @@ -9,9 +13,11 @@ use enstate_shared::core::lookup_data::{LookupInfo, NameParseError};
use enstate_shared::core::{ENSService, Profile};
use futures::future::join_all;
use serde::Deserialize;
use tokio_stream::wrappers::UnboundedReceiverStream;

use crate::models::bulk::{BulkResponse, ListResponse};
use crate::routes::{validate_bulk_input, FreshQuery, Qs, RouteError};
use crate::models::sse::SSEResponse;
use crate::routes::{profile_http_error_mapper, validate_bulk_input, FreshQuery, Qs, RouteError};

#[utoipa::path(
get,
Expand Down Expand Up @@ -85,6 +91,41 @@ pub async fn get_bulk(
Ok(Json(joined))
}

pub async fn get_bulk_sse(
Qs(query): Qs<UniversalGetBulkQuery>,
State(state): State<Arc<crate::AppState>>,
) -> impl IntoResponse {
let queries = validate_bulk_input(&query.queries, 10).unwrap();

let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel::<Result<Event, Infallible>>();

for input in queries {
let state_clone = state.clone();
let event_tx_clone = event_tx.clone();
tokio::spawn(async move {
let profile = profile_from_lookup_guess(
LookupInfo::guess(&input),
&state_clone.service,
query.fresh.fresh,
)
.await
.map_err(profile_http_error_mapper);

let sse_response = SSEResponse {
query: input,
response: profile.into(),
};

event_tx_clone.send(Ok(Event::default()
.json_data(sse_response)
.expect("json_data should've succeeded")))
});
}

Sse::new(UnboundedReceiverStream::new(event_rx))
.keep_alive(axum::response::sse::KeepAlive::new().interval(Duration::from_secs(1)))
}

// helper function for above
async fn profile_from_lookup_guess(
lookup: Result<LookupInfo, NameParseError>,
Expand Down

0 comments on commit 4cddd26

Please sign in to comment.