Skip to content

Commit

Permalink
ssh access monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
maksimryndin committed May 21, 2024
1 parent 97bd09d commit 3e3df5b
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 70 deletions.
1 change: 1 addition & 0 deletions .github/site/src/system.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ With this configuration System service will create the following sheets:
* `top_open_files` (for Linux only) - among the processes with the same user as Goral (!) - process with the most opened files
* for every process with name containing one of the substrings in `names` - a sheet with process info. Note: the first match (_case sensitive_) is used so plan accordingly a unique name for your binary.
* for every mount in `mounts` - disk usage and free space.
* `ssh` - for Linux systems ssh access log is monitored

System service doesn't require root privileges to collect the telemetry.
For a process a cpu usage percent may be [more than 100%](https://blog.guillaume-gomez.fr/articles/2021-09-06+sysinfo%3A+how+to+extract+systems%27+information) in a case of a multi-threaded process on several cores. `memory_used` by process is a [resident-set size](https://www.baeldung.com/linux/resident-set-vs-virtual-memory-size).
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
* 0.1.7
* ssh log monitoring
* rules for text now support "is" and "is not" conditions

* 0.1.6
* safe numbers conversions
* ids collision tests
Expand Down
15 changes: 7 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "goral"
version = "0.1.6"
version = "0.1.7"
edition = "2021"
author = "Maksim Ryndin"
license = "Apache-2.0"
Expand Down Expand Up @@ -35,13 +35,13 @@ http = "0.2"
hyper = { version = "0.14", features = ["http1", "http2", "client", "server"]} # using the same version as google-sheets4
hyper-rustls = "0.24"
lazy_static = "1.4"
logwatcher2 = { git = "https://github.com/maksimryndin/logwatcher2.git" }
prometheus-parse = "0.2.4"
regex = { version = "1", features = ["std", "unicode"] } # use the same version as current crates
serde = "^1.0"
serde_derive = "^1.0"
serde_json = "^1.0"
serde_valid = { version = "0.16.3", features = ["toml"] }
staart = "0.7.2"
sysinfo = { version = "0.30", default_features = false }
tokio = { version = "^1.0", features = ["sync", "signal", "io-std", "process", "net"] }
tonic = { version = "^0.10", features = ["transport", "prost"]}
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ So Goral provides the following features being deployed next to your app(s):
* [Periodic healthchecks](https://maksimryndin.github.io/goral/healthcheck.html) (aka [liveness probes](https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/))
* [Metrics collection](https://maksimryndin.github.io/goral/metrics.html) (fully compatible with Prometheus to be easily replaced with more advanced stack as your project grows)
* [Logs](https://maksimryndin.github.io/goral/logs.html) collection (importing logs from stdout/stderr of the target process)
* [System telemetry](https://maksimryndin.github.io/goral/system.html) (CPU, Memory, Free/Busy storage space etc)
* [System telemetry](https://maksimryndin.github.io/goral/system.html) (CPU, Memory, Free/Busy storage space, ssh access log etc)
* A general key-value appendable log storage (see [the user case](https://maksimryndin.github.io/goral/kv-log.html))
* Features are modular - all [services](https://maksimryndin.github.io/goral/services.html) are switched on/off in the configuration.
* You can observe several instances of the same app or different apps on the same host with a single Goral daemon (except logs as logs are collected via stdin of Goral - see [Logs](https://maksimryndin.github.io/goral/logs.html))
Expand Down
40 changes: 20 additions & 20 deletions src/google/datavalue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,26 +405,26 @@ impl From<Datarow> for RuleApplicant {
"assert: sheet id should be initialized before the rule applicant transformation",
);
// convert datavalues into types supported by rules with O(1) access
let data =
data.into_iter()
.chain([(
DATETIME_COLUMN_NAME.to_string(),
Datavalue::Datetime(timestamp),
)])
.map(|(k, v)| {
let v = match v {
Text(t) | RedText(t) | OrangeText(t) | GreenText(t) => Text(t),
Number(n) => Number(n),
Integer(i) | IntegerID(i) => Integer(i),
Percent(p) | HeatmapPercent(p) => Number(p / 100.0),
Datetime(d) => Number(convert_datetime_to_spreadsheet_double(d)),
Bool(b) => Bool(b),
Size(s) => Number(s as f64), // Rounding errors are acceptable for Size datavalues
NotAvailable => NotAvailable,
};
(k, v)
})
.collect();
let data = data
.into_iter()
.chain([(
DATETIME_COLUMN_NAME.to_string(),
Datavalue::Datetime(timestamp),
)])
.map(|(k, v)| {
let v = match v {
Text(t) | RedText(t) | OrangeText(t) | GreenText(t) => Text(t),
Number(n) => Number(n),
Integer(i) | IntegerID(i) => Integer(i),
Percent(p) | HeatmapPercent(p) => Number(p / 100.0),
Datetime(d) => Number(convert_datetime_to_spreadsheet_double(d)),
Bool(b) => Bool(b),
Size(s) => Number(s as f64), // Rounding errors are acceptable for Size datavalues
NotAvailable => NotAvailable,
};
(k, v)
})
.collect();
RuleApplicant {
log_name,
data,
Expand Down
10 changes: 2 additions & 8 deletions src/rules/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1003,10 +1003,7 @@ mod tests {
let mut datarow = Datarow::new(
"log_name1".to_string(),
Utc::now().naive_utc(),
vec![(
"key".to_string(),
Datavalue::Text("substring".to_string()),
)],
vec![("key".to_string(), Datavalue::Text("substring".to_string()))],
);
datarow.sheet_id(TEST_HOST_ID, "test");
match rule.unwrap().apply(&datarow.into()) {
Expand All @@ -1030,10 +1027,7 @@ mod tests {
let mut datarow = Datarow::new(
"log_name1".to_string(),
Utc::now().naive_utc(),
vec![(
"key".to_string(),
Datavalue::Text("second".to_string()),
)],
vec![("key".to_string(), Datavalue::Text("second".to_string()))],
);
datarow.sheet_id(TEST_HOST_ID, "test");
match rule.unwrap().apply(&datarow.into()) {
Expand Down
9 changes: 2 additions & 7 deletions src/services/system/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ impl Service for SystemService {
Ok(data) => data,
Err(Data::Message(msg)) => {
tracing::error!("{}", msg);
self.send_error(format!("`{}` while scraping sysinfo", msg))
self.send_error(format!("`{}` while observing system", msg))
.await;
Data::Empty
}
Expand Down Expand Up @@ -319,12 +319,7 @@ impl Service for SystemService {
let cloned_sender = sender.clone();
let cloned_messenger = messenger.clone();
vec![tokio::spawn(async move {
Self::ssh_observer(
cloned_is_shutdown,
cloned_sender,
cloned_messenger,
)
.await;
Self::ssh_observer(cloned_is_shutdown, cloned_sender, cloned_messenger).await;
})]
} else {
vec![]
Expand Down
67 changes: 43 additions & 24 deletions src/services/system/ssh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use crate::notifications::{Notification, Sender};
use crate::services::{Data, TaskResult};
use chrono::{NaiveDateTime, Utc};
use lazy_static::lazy_static;
use logwatcher::{LogWatcher, LogWatcherAction, LogWatcherEvent};
use regex::Regex;
use staart::TailedFile;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
Expand All @@ -15,7 +15,7 @@ use tracing::Level;

pub const SSH_LOG: &str = "ssh";
pub const SSH_LOG_STATUS: &str = "status";
pub const SSH_LOG_STATUS_CONNECTED: &str = "status";
pub const SSH_LOG_STATUS_CONNECTED: &str = "connected";

pub(super) fn process_sshd_log(
is_shutdown: Arc<AtomicBool>,
Expand All @@ -25,8 +25,8 @@ pub(super) fn process_sshd_log(
) {
tracing::info!("started ssh monitoring thread");

let mut f = match TailedFile::new("/var/log/auth.log")
.or_else(|_| TailedFile::new("/var/log/secure"))
let mut log_watcher = match LogWatcher::register("/var/log/auth.log")
.or_else(|_| LogWatcher::register("/var/log/secure"))
{
Ok(f) => f,
Err(_) => {
Expand All @@ -39,26 +39,36 @@ pub(super) fn process_sshd_log(
}
};

loop {
f.read_and(|d| {
let result = match std::str::from_utf8(d).map(|s| parse(s)) {
Ok(Some(line)) => Ok(Data::Single(line)),
Ok(None) => {
return;
log_watcher.watch(&mut move |result| {
let result = match result {
Ok(event) => match event {
LogWatcherEvent::Line(line) => match parse(&line) {
Some(line) => Ok(Data::Single(line)),
None => {
return LogWatcherAction::None;
}
},
Err(e) => Err(Data::Message(format!("ssh monitoring thread error {e}"))),
};
if sender.blocking_send(TaskResult { id: 0, result }).is_err() {
if is_shutdown.load(Ordering::Relaxed) {
return;
LogWatcherEvent::LogRotation => {
tracing::info!("auth log file rotation");
return LogWatcherAction::None;
}
panic!("assert: ssh monitoring messages queue shouldn't be closed before shutdown signal");
},
Err(err) => {
let message = format!("error watching ssh access log: {err}");
Err(Data::Message(message))
}
});
if is_shutdown.load(Ordering::Relaxed) {
break;
};
if sender.blocking_send(TaskResult { id: 0, result }).is_err() {
if is_shutdown.load(Ordering::Relaxed) {
return LogWatcherAction::Finish;
}
panic!(
"assert: ssh monitoring messages queue shouldn't be closed before shutdown signal"
);
}
}
LogWatcherAction::None
});

let _ = tx.send(());
tracing::info!("exiting ssh monitoring thread");
}
Expand All @@ -76,7 +86,7 @@ fn parse(line: &str) -> Option<Datarow> {
Disconnected\sfrom\s(authenticating|invalid)\suser\s(?P<username_rejected>\S+)|
Accepted\spublickey\sfor\s(?P<username_accepted>\S+)\sfrom|
pam_unix\(sshd:session\):\ssession\sclosed\sfor\suser\s(?P<username_terminated>\S+)|
fatal:\sTimeout\sbefore\sauthentication\sfor|
fatal:\sTimeout\sbefore\sauthentication\sfor
)
\s?
((?P<ip>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\sport\s(?P<port>\d{2,5}))?
Expand Down Expand Up @@ -104,7 +114,10 @@ fn parse(line: &str) -> Option<Datarow> {
let (username, status) = if let Some(username) = cap.name("username_rejected") {
(Datavalue::Text(username.as_str().to_string()), "rejected")
} else if let Some(username) = cap.name("username_accepted") {
(Datavalue::Text(username.as_str().to_string()), SSH_LOG_STATUS_CONNECTED)
(
Datavalue::Text(username.as_str().to_string()),
SSH_LOG_STATUS_CONNECTED,
)
} else if let Some(username) = cap.name("username_terminated") {
(Datavalue::Text(username.as_str().to_string()), "terminated")
} else {
Expand All @@ -124,10 +137,13 @@ fn parse(line: &str) -> Option<Datarow> {
),
(
"port".to_string(),
port.map(|p| Datavalue::IntegerID(p))
port.map(Datavalue::IntegerID)
.unwrap_or(Datavalue::NotAvailable),
),
(SSH_LOG_STATUS.to_string(), Datavalue::Text(status.to_string())),
(
SSH_LOG_STATUS.to_string(),
Datavalue::Text(status.to_string()),
),
(
"pubkey".to_string(),
key.map(|key| Datavalue::Text(key.to_string()))
Expand Down Expand Up @@ -218,5 +234,8 @@ mod tests {
);
assert_eq!(parsed.data[3].1, Datavalue::IntegerID(47014));
assert_eq!(parsed.data[4].1, Datavalue::Text("timeout".to_string()));

let line = "May 21 19:26:17 server1 sshd[59895]: pam_unix(sshd:session): session opened for user los(uid=1000) by (uid=0)";
assert!(parse(line).is_none());
}
}

0 comments on commit 3e3df5b

Please sign in to comment.