From d85e59d42f18d8d0c4f281f69254bc235b981054 Mon Sep 17 00:00:00 2001 From: Maksim Ryndin Date: Tue, 21 May 2024 20:07:45 +0000 Subject: [PATCH] 0.1.7rc2 --- src/services/system/mod.rs | 19 +++--- src/services/system/ssh.rs | 117 ++++++++++++++++++++++++++++++++++--- 2 files changed, 119 insertions(+), 17 deletions(-) diff --git a/src/services/system/mod.rs b/src/services/system/mod.rs index b5fedf8..5c569b0 100644 --- a/src/services/system/mod.rs +++ b/src/services/system/mod.rs @@ -18,7 +18,6 @@ use std::sync::{ }; use std::time::Duration; use tokio::sync::mpsc::{self}; -use tokio::sync::oneshot; use tokio::task::JoinHandle; pub const SYSTEM_SERVICE_NAME: &str = "system"; @@ -104,13 +103,14 @@ impl SystemService { } } + #[cfg(target_os = "linux")] async fn ssh_observer( is_shutdown: Arc, sender: mpsc::Sender, messenger: Sender, ) { tracing::info!("starting ssh monitoring"); - let (tx, rx) = oneshot::channel::<()>(); + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); std::thread::Builder::new() .name("ssh-observer".into()) .spawn(move || ssh::process_sshd_log(is_shutdown, sender, messenger, tx)) @@ -187,7 +187,8 @@ impl Service for SystemService { fn get_example_rules(&self) -> Vec { let mut rows = Vec::with_capacity(3 + self.mounts.len() + 2 * self.process_names.len()); - if cfg!(target_os = "linux") { + #[cfg(target_os = "linux")] + { rows.push( Rule { log_name: ssh::SSH_LOG.to_string(), @@ -314,16 +315,16 @@ impl Service for SystemService { let scrape_interval = self.scrape_interval; let scrape_timeout = self.scrape_timeout; - let mut tasks = if cfg!(target_os = "linux") { + let mut tasks = vec![]; + #[cfg(target_os = "linux")] + { let cloned_is_shutdown = is_shutdown.clone(); let cloned_sender = sender.clone(); let cloned_messenger = messenger.clone(); - vec![tokio::spawn(async move { + tasks.push(tokio::spawn(async move { Self::ssh_observer(cloned_is_shutdown, cloned_sender, cloned_messenger).await; - })] - } else { - vec![] - }; + })); + } tasks.push(tokio::spawn(async move { Self::sys_observer( diff --git a/src/services/system/ssh.rs b/src/services/system/ssh.rs index f53a9ed..4c4904e 100644 --- a/src/services/system/ssh.rs +++ b/src/services/system/ssh.rs @@ -5,6 +5,7 @@ use chrono::{NaiveDateTime, Utc}; use lazy_static::lazy_static; use logwatcher::{LogWatcher, LogWatcherAction, LogWatcherEvent}; use regex::Regex; +use std::collections::HashMap; use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -16,6 +17,7 @@ use tracing::Level; pub const SSH_LOG: &str = "ssh"; pub const SSH_LOG_STATUS: &str = "status"; pub const SSH_LOG_STATUS_CONNECTED: &str = "connected"; +pub const SSH_LOG_STATUS_TERMINATED: &str = "terminated"; pub(super) fn process_sshd_log( is_shutdown: Arc, @@ -39,11 +41,25 @@ pub(super) fn process_sshd_log( } }; + let mut connections = HashMap::new(); + log_watcher.watch(&mut move |result| { let result = match result { Ok(event) => match event { LogWatcherEvent::Line(line) => match parse(&line) { - Some(line) => Ok(Data::Single(line)), + Some(mut datarow) => { + lookup_connection(&mut datarow, &mut connections); + let Datavalue::Text(ref status) = datarow.data[4].1 else { + panic!("assert: ssh status is parsed") + }; + if status == SSH_LOG_STATUS_CONNECTED && connections.len() > 100 { + let message = + format!("there are {} active ssh connections", connections.len()); + tracing::warn!("{}", message); + messenger.send_nonblock(Notification::new(message, Level::WARN)); + } + Ok(Data::Single(datarow)) + } None => { return LogWatcherAction::None; } @@ -66,6 +82,7 @@ pub(super) fn process_sshd_log( "assert: ssh monitoring messages queue shouldn't be closed before shutdown signal" ); } + LogWatcherAction::None }); @@ -73,6 +90,61 @@ pub(super) fn process_sshd_log( tracing::info!("exiting ssh monitoring thread"); } +struct Connection { + user_ip: String, + user_port: u32, + user_key: String, +} + +fn lookup_connection(datarow: &mut Datarow, connections: &mut HashMap) { + let Datavalue::Text(ref status) = datarow.data[4].1 else { + panic!("assert: ssh status is parsed") + }; + if status != SSH_LOG_STATUS_CONNECTED && status != SSH_LOG_STATUS_TERMINATED { + return; + } + let Datavalue::IntegerID(id) = datarow.data[0].1 else { + panic!("assert: ssh id is parsed") + }; + + // For terminated + if let Some(Connection { + user_ip, + user_port, + user_key, + }) = connections.remove(&id) + { + datarow.data[2].1 = Datavalue::Text(user_ip); + datarow.data[3].1 = Datavalue::IntegerID(user_port); + datarow.data[5].1 = Datavalue::Text(user_key); + return; + } + + if status == SSH_LOG_STATUS_TERMINATED { + return; + } + + // For connected + let Datavalue::Text(ref ip) = datarow.data[2].1 else { + panic!("assert: connected ssh user has an ip") + }; + let Datavalue::IntegerID(port) = datarow.data[3].1 else { + panic!("assert: connected ssh user has a port") + }; + let Datavalue::Text(ref key) = datarow.data[5].1 else { + panic!("assert: connected ssh user has a pubkey") + }; + + connections.insert( + id, + Connection { + user_ip: ip.to_string(), + user_port: port, + user_key: key.to_string(), + }, + ); +} + fn parse(line: &str) -> Option { lazy_static! { static ref RE: Regex = Regex::new( @@ -107,9 +179,9 @@ fn parse(line: &str) -> Option { }) .expect("assert: can get auth log datetime"); let id = cap.name("id").and_then(|d| d.as_str().parse().ok())?; - let ip = cap.name("ip").map(|d| d.as_str()); + let ip = cap.name("ip").map(|d| d.as_str().to_string()); let port = cap.name("port").and_then(|d| d.as_str().parse().ok()); - let key = cap.name("key").map(|d| d.as_str()); + let key = cap.name("key").map(|d| d.as_str().to_string()); let (username, status) = if let Some(username) = cap.name("username_rejected") { (Datavalue::Text(username.as_str().to_string()), "rejected") @@ -119,7 +191,10 @@ fn parse(line: &str) -> Option { SSH_LOG_STATUS_CONNECTED, ) } else if let Some(username) = cap.name("username_terminated") { - (Datavalue::Text(username.as_str().to_string()), "terminated") + ( + Datavalue::Text(username.as_str().to_string()), + SSH_LOG_STATUS_TERMINATED, + ) } else { (Datavalue::NotAvailable, "timeout") }; @@ -132,8 +207,7 @@ fn parse(line: &str) -> Option { ("user".to_string(), username), ( "ip".to_string(), - ip.map(|ip| Datavalue::Text(ip.to_string())) - .unwrap_or(Datavalue::NotAvailable), + ip.map(Datavalue::Text).unwrap_or(Datavalue::NotAvailable), ), ( "port".to_string(), @@ -146,8 +220,7 @@ fn parse(line: &str) -> Option { ), ( "pubkey".to_string(), - key.map(|key| Datavalue::Text(key.to_string())) - .unwrap_or(Datavalue::NotAvailable), + key.map(Datavalue::Text).unwrap_or(Datavalue::NotAvailable), ), ], )) @@ -238,4 +311,32 @@ mod tests { let line = "May 21 19:26:17 server1 sshd[59895]: pam_unix(sshd:session): session opened for user los(uid=1000) by (uid=0)"; assert!(parse(line).is_none()); } + + #[test] + fn lookup_connections() { + let mut connections = HashMap::new(); + + let line = "May 21 11:22:18 server1 sshd[136063]: Accepted publickey for ubuntu from 77.222.27.80 port 17827 ssh2: RSA SHA256:D726XJ0DkstyhsyH2rAbfYuIaeBOa3Su2l2WWbyXnXs"; + let mut parsed = parse(line).unwrap(); + lookup_connection(&mut parsed, &mut connections); + assert_eq!(connections.len(), 1); + + let line = "May 21 11:22:56 server1 sshd[136063]: pam_unix(sshd:session): session closed for user ubuntu"; + let mut parsed = parse(line).unwrap(); + lookup_connection(&mut parsed, &mut connections); + assert_eq!(connections.len(), 0); + + assert_eq!(parsed.data[0].1, Datavalue::IntegerID(136063)); + assert_eq!(parsed.data[1].1, Datavalue::Text("ubuntu".to_string())); + assert_eq!( + parsed.data[2].1, + Datavalue::Text("77.222.27.80".to_string()) + ); + assert_eq!(parsed.data[3].1, Datavalue::IntegerID(17827)); + assert_eq!(parsed.data[4].1, Datavalue::Text("terminated".to_string())); + assert_eq!( + parsed.data[5].1, + Datavalue::Text("RSA SHA256:D726XJ0DkstyhsyH2rAbfYuIaeBOa3Su2l2WWbyXnXs".to_string()) + ); + } }