Skip to content

Commit

Permalink
pass in notifier to update_database func
Browse files Browse the repository at this point in the history
  • Loading branch information
digizeph committed Aug 13, 2024
1 parent cb3f0cb commit 16f3dc3
Showing 1 changed file with 21 additions and 12 deletions.
33 changes: 21 additions & 12 deletions src/cli/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,16 +247,9 @@ async fn update_database(
mut db: LocalBrokerDb,
collectors: Vec<Collector>,
days: Option<u32>,
notifier: &Option<NatsNotifier>,
send_heartbeat: bool,
) {
let notifier = match NatsNotifier::new(None).await {
Ok(n) => Some(n),
Err(e) => {
error!("want to set up notifier but failed: {}", e);
None
}
};

let now = Utc::now();

let latest_ts_map: HashMap<String, NaiveDateTime> = db
Expand Down Expand Up @@ -311,7 +304,7 @@ async fn update_database(
Ok(items) => {
let inserted = db.insert_items(&items, true).await.unwrap();
if !inserted.is_empty() {
if let Some(n) = &notifier {
if let Some(n) = notifier {
if let Err(e) = n.send(&inserted).await {
error!("{}", e);
}
Expand Down Expand Up @@ -412,16 +405,24 @@ fn main() {
let rt = get_tokio_runtime();

let collectors = load_collectors().unwrap();

rt.block_on(async {
let notifier = match NatsNotifier::new(None).await {
Ok(n) => Some(n),
Err(_e) => {
info!("no nats notifier configured, skip pushing notification");
None
}
};

let db = LocalBrokerDb::new(path.as_str()).await.unwrap();
let mut interval =
tokio::time::interval(std::time::Duration::from_secs(update_interval));

loop {
interval.tick().await;
// updating from the latest data available
update_database(db.clone(), collectors.clone(), None, true).await;
update_database(db.clone(), collectors.clone(), None, &notifier, true)
.await;
info!("wait for {} seconds before next update", update_interval);
}
});
Expand Down Expand Up @@ -535,7 +536,14 @@ fn main() {

rt.block_on(async {
let db = LocalBrokerDb::new(&db_path).await.unwrap();
update_database(db, collectors, days, false).await;
let notifier = match NatsNotifier::new(None).await {
Ok(n) => Some(n),
Err(_e) => {
info!("no nats notifier configured, skip pushing notification");
None
}
};
update_database(db, collectors, days, &notifier, false).await;
});
}
Commands::Search { query, json, url } => {
Expand Down Expand Up @@ -644,6 +652,7 @@ fn main() {
return;
}
};

if let Err(e) = notifier.start_subscription(subject).await {
error!("{}", e);
return;
Expand Down

0 comments on commit 16f3dc3

Please sign in to comment.