Skip to content

Commit

Permalink
New: add 'dest' query to specify where messages are forwarded (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
cbackas authored Jul 31, 2023
1 parent c7b07bf commit 0d2b089
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 8 deletions.
6 changes: 4 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::sync::Arc;

use serde_json::Value;
Expand All @@ -23,10 +24,11 @@ async fn main() {
.and(warp::header::headers_cloned())
.and(warp::body::json::<Value>())
.and(warp::path::full())
.and(warp::query::<HashMap<String, String>>())
.map({
let sonarr_handler = Arc::clone(&sonarr_handler);

move |headers: HeaderMap, body: Value, path: FullPath| {
move |headers: HeaderMap, body: Value, path: FullPath, map: HashMap<String, String>| {
let sonarr_handler = Arc::clone(&sonarr_handler);
let path = path.as_str().to_string();

Expand All @@ -35,7 +37,7 @@ async fn main() {
Ok(agent) if agent.to_lowercase().starts_with("sonarr") => {
// send the request to the sonarr handler async and move on
tokio::spawn(async move {
sonarr_handler.handle(path, body).await;
sonarr_handler.handle(path, body, map).await;
});

warp::reply::with_status(
Expand Down
22 changes: 16 additions & 6 deletions src/sonarr_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl SonarrHandler {
}
}

pub async fn handle(&self, request_path: String, body: Value) -> warp::reply::WithStatus<Json> {
pub async fn handle(&self, request_path: String, body: Value, query: HashMap<String, String>) -> warp::reply::WithStatus<Json> {
// parse the request body into a SonarrRequestBody
let mut sonarr_request: SonarrRequestBody = serde_json::from_value(body).unwrap();

Expand Down Expand Up @@ -93,12 +93,12 @@ impl SonarrHandler {

// now that the request has been added to the queue and the timer_end Instant has been updated
// we need to start the timer if it's not already running
self.start_timer(request_path).await;
self.start_timer(request_path, query.get("dest").cloned()).await;

warp::reply::with_status(warp::reply::json(&"Request added to queue"), warp::http::StatusCode::OK)
}

async fn start_timer(&self, request_path: String) {
async fn start_timer(&self, request_path: String, destination: Option<String>) {
// get the needed information first and then release the lock
let (timer_id, timer_end) = {
let mut timers = self.timers.lock().await;
Expand All @@ -119,12 +119,18 @@ impl SonarrHandler {

// now you're free to start the timer without holding the lock
let timers = Arc::clone(&self.timers);
tokio::spawn(process_timer(timers, request_path, timer_id, timer_end));
tokio::spawn(process_timer(timers, request_path, destination, timer_id, timer_end));
}
}

// this function is spawned when a url timer expires and it processes the queue of requests
async fn process_timer(timers: Arc<Mutex<HashMap<String, TimerState>>>, request_path: String, timer_id: usize, timer_end: Instant) {
async fn process_timer(
timers: Arc<Mutex<HashMap<String, TimerState>>>,
request_path: String,
destination: Option<String>,
timer_id: usize,
timer_end: Instant,
) {
let duration = timer_end - Instant::now();
tokio::time::sleep(duration).await;

Expand All @@ -150,7 +156,11 @@ async fn process_timer(timers: Arc<Mutex<HashMap<String, TimerState>>>, request_
}
};

let destination_url = crate::env::get_destination_url();
// if destination provded in query then use that else use the env variable
let destination_url = match destination {
Some(url) => url,
None => crate::env::get_destination_url(),
};

if let Some(mut queue) = timer_state_queue {
let grouped_requests = group_sonarr_requests(&mut queue);
Expand Down

0 comments on commit 0d2b089

Please sign in to comment.