Skip to content

Commit

Permalink
Merge pull request #6 from w3f/handle-escalation-interval
Browse files Browse the repository at this point in the history
Add lock for escalation handling subtask
  • Loading branch information
ironoa authored Dec 2, 2021
2 parents 6854ab8 + 0c57f2b commit 5c55d38
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 8 deletions.
2 changes: 1 addition & 1 deletion charts/matrixbot-ack/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
description: Matrixbot Ack
name: matrixbot-ack
version: v0.2.4
version: v0.2.5
apiVersion: v2
2 changes: 1 addition & 1 deletion charts/matrixbot-ack/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ environment: production

image:
repository: web3f/matrixbot-ack
tag: v0.2.4
tag: v0.2.5
pullPolicy: IfNotPresent

config:
Expand Down
25 changes: 19 additions & 6 deletions src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use crate::matrix::MatrixClient;
use crate::webhook::Alert;
use crate::{unix_time, AlertId, Result};
use actix::prelude::*;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Duration;

const CRON_JON_INTERVAL: u64 = 5;
const CRON_JOB_INTERVAL: u64 = 5;

#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct AlertContext {
Expand Down Expand Up @@ -103,6 +103,8 @@ pub struct Processor {
db: Option<Arc<Database>>,
escalation_window: u64,
should_escalate: bool,
// Ensures that only one escalation task is running at the time.
escalation_lock: Arc<Mutex<()>>,
}

impl Processor {
Expand All @@ -111,6 +113,7 @@ impl Processor {
db: db.map(|db| Arc::new(db)),
escalation_window: escalation_window,
should_escalate: should_escalate,
escalation_lock: Default::default(),
}
}
fn db(&self) -> Arc<Database> {
Expand Down Expand Up @@ -159,15 +162,25 @@ impl Actor for Processor {
Result::<()>::Ok(())
};

let lock = Arc::clone(&self.escalation_lock);
ctx.run_interval(
Duration::from_secs(CRON_JON_INTERVAL),
Duration::from_secs(CRON_JOB_INTERVAL),
move |_proc, _ctx| {
// Acquire new handles for async task.
let db = Arc::clone(&db);
let lock = Arc::clone(&lock);

actix::spawn(async move {
match local(db, escalation_window).await {
Ok(_) => {}
Err(err) => error!("{:?}", err),
// Immediately exits if the lock cannot be acquired.
if let Ok(locked) = lock.try_lock() {
// Lock acquired and will remain locked until
// `_l` goes out of scope.
let _l = locked;

match local(db, escalation_window).await {
Ok(_) => {}
Err(err) => error!("{:?}", err),
}
}
});
},
Expand Down

0 comments on commit 5c55d38

Please sign in to comment.