Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move SSE swap update streams to sidecar #656

Merged
merged 2 commits into from
Aug 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions boltzr/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions boltzr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ build = "build.rs"
[features]
default = ["metrics", "loki", "otel"]
metrics = [
"dep:axum",
"dep:metrics",
"dep:axum-prometheus",
"dep:metrics-process",
Expand All @@ -33,7 +32,7 @@ codegen-units = 1
panic = "abort"

[dependencies]
axum = { version = "0.7.5", optional = true }
axum = "0.7.5"
bitcoin_hashes = "0.14.0"
clap = { version = "4.5.16", features = ["derive"] }
crossbeam-channel = "0.5.13"
Expand Down Expand Up @@ -73,12 +72,15 @@ alloy = { version = "0.2.1", features = ["reqwest", "sol-types", "serde", "eip71
alloy-transport-http = "0.2.1"
async-tungstenite = { version = "0.27.0", features = ["tokio-native-tls", "tokio-runtime"] }
async-trait = "0.1.81"
futures-util = "0.3.30"
async-stream = "0.3.5"

[build-dependencies]
built = { version = "0.7.4", features = ["git2"] }
tonic-build = "0.12.1"

[dev-dependencies]
eventsource-client = "0.13.0"
mockall = "0.13.0"
rand = "0.8.5"
serial_test = "3.1.1"
164 changes: 164 additions & 0 deletions boltzr/src/api/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
use crate::api::sse::sse_handler;
use crate::ws::status::SwapInfos;
use crate::ws::types::SwapStatus;
use axum::routing::get;
use axum::{Extension, Router};
use serde::{Deserialize, Serialize};
use std::error::Error;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info};

#[cfg(feature = "metrics")]
use crate::metrics::server::MetricsLayer;

mod sse;

#[derive(Deserialize, Serialize, PartialEq, Clone, Debug)]
pub struct Config {
pub host: String,
pub port: u16,
}

pub struct Server<S> {
swap_infos: S,
config: Config,
cancellation_token: CancellationToken,
swap_status_update_tx: tokio::sync::broadcast::Sender<Vec<SwapStatus>>,
}

struct ServerState<S> {
swap_infos: S,
swap_status_update_tx: tokio::sync::broadcast::Sender<Vec<SwapStatus>>,
}

impl<S> Server<S>
where
S: SwapInfos + Clone + Send + Sync + 'static,
{
pub fn new(
config: Config,
cancellation_token: CancellationToken,
swap_infos: S,
swap_status_update_tx: tokio::sync::broadcast::Sender<Vec<SwapStatus>>,
) -> Self {
Server {
config,
swap_infos,
cancellation_token,
swap_status_update_tx,
}
}

#[cfg(feature = "metrics")]
pub async fn start(&self, metrics_layer: Option<MetricsLayer>) -> Result<(), Box<dyn Error>> {
let mut router = Router::new();
router = Self::add_routes(router);

if let Some(metrics_layer) = metrics_layer {
router = router.layer(metrics_layer);
}

self.listen(router).await
}

#[cfg(not(feature = "metrics"))]
pub async fn start(&self) -> Result<(), Box<dyn Error>> {
self.listen(Self::add_routes(Router::new())).await
}

async fn listen(&self, router: Router) -> Result<(), Box<dyn Error>> {
let address = format!("{}:{}", self.config.host, self.config.port);
info!("Starting API server on: {}", address);

let cancellation_token = self.cancellation_token.clone();
let listener = tokio::net::TcpListener::bind(address).await;
match listener {
Ok(listener) => {
axum::serve(
listener,
router.layer(Extension(Arc::new(ServerState {
swap_infos: self.swap_infos.clone(),
swap_status_update_tx: self.swap_status_update_tx.clone(),
}))),
)
.with_graceful_shutdown(async move {
cancellation_token.cancelled().await;
debug!("Shutting down API server");
})
.await?;
Ok(())
}
Err(err) => Err(err.into()),
}
}

fn add_routes(router: Router) -> Router {
router.route("/streamswapstatus", get(sse_handler::<S>))
}
}

#[cfg(test)]
mod test {
use crate::api::{Config, Server};
use crate::ws::status::SwapInfos;
use crate::ws::types::SwapStatus;
use async_trait::async_trait;
use reqwest::StatusCode;
use std::time::Duration;
use tokio::sync::broadcast::Sender;
use tokio_util::sync::CancellationToken;

#[derive(Debug, Clone)]
struct Fetcher {
status_tx: Sender<Vec<SwapStatus>>,
}

#[async_trait]
impl SwapInfos for Fetcher {
async fn fetch_status_info(&self, ids: &[String]) {
let mut res = Vec::new();
ids.iter().for_each(|id| {
res.push(SwapStatus::default(id.clone(), "swap.created".into()));
});

self.status_tx.send(res).unwrap();
}
}

#[tokio::test]
async fn start_server() {
let port = 13_001;
let (cancel, _) = start(port).await;

let res = reqwest::get(format!("http://127.0.0.1:{}", port))
.await
.unwrap();
assert_eq!(res.status(), StatusCode::NOT_FOUND);

cancel.cancel();
}

pub async fn start(port: u16) -> (CancellationToken, Sender<Vec<SwapStatus>>) {
let cancel = CancellationToken::new();
let (status_tx, _) = tokio::sync::broadcast::channel::<Vec<SwapStatus>>(1);

let server = Server::new(
Config {
port,
host: "127.0.0.1".to_string(),
},
cancel.clone(),
Fetcher {
status_tx: status_tx.clone(),
},
status_tx.clone(),
);
tokio::spawn(async move {
server.start(None).await.unwrap();
});
tokio::time::sleep(Duration::from_millis(100)).await;

(cancel, status_tx)
}
}
Loading