Skip to content

Commit

Permalink
LossyLinesCodec
Browse files Browse the repository at this point in the history
  • Loading branch information
pwalski committed May 22, 2024
1 parent 69c6578 commit 1ea1626
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 5 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ yansi = "1.0"
chrono = "0.4.34"
tokio = { version = "1.32", features = ["macros", "signal"] }
tokio-stream = { version = "0.1", features = ["io-util"] }
tokio-util = { version = "0.7", features = ["codec"] }
futures = "0.3"
flexi_logger = { version = "0.28", features = ["colors"] }
regex = "1"
Expand Down
48 changes: 48 additions & 0 deletions src/process.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use anyhow::Context;
use async_trait::async_trait;
use bytes::{Buf, BytesMut};
use futures::TryFutureExt;
use serde::de::DeserializeOwned;
use serde_json::Value;
use tokio_util::codec::Decoder;

use std::cell::RefCell;
use std::env::current_exe;
Expand Down Expand Up @@ -142,3 +144,49 @@ impl<T: Runtime> Future for ProcessController<T> {
}
}
}

pub struct LossyLinesCodec {
max_length: usize,
}

impl Default for LossyLinesCodec {
fn default() -> Self {
Self {
max_length: usize::MAX,
}
}
}

impl Decoder for LossyLinesCodec {
type Item = String;

type Error = anyhow::Error;

fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let read_to = std::cmp::min(self.max_length.saturating_add(1), buf.len());
let new_line_offset = buf[0..read_to].iter().position(|b| *b == b'\n');
let has_new_line = new_line_offset.is_some();
let offset = new_line_offset
.map(|offset| std::cmp::min(offset, read_to))
.unwrap_or(read_to);
let mut line = buf.split_to(offset);
if has_new_line {
buf.advance(1);
}
let mut line: &[u8] = &line;
if let Some(&b'\r') = line.last() {
// skip carriage return
line = &line[..line.len() - 1];
}
if line.is_empty() {
return Ok(None);
}
let line = String::from_utf8_lossy(line).to_string();
Ok(Some(line))
}
}

#[cfg(test)]
mod tests {
// TODO
}
16 changes: 12 additions & 4 deletions src/process/automatic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ mod monitor;

use self::config::Config;

use super::Runtime;
use super::{LossyLinesCodec, Runtime};

use crate::process::automatic::monitor::OutputMonitor;
use anyhow::Context;
use async_trait::async_trait;
use bytes::{Buf, BytesMut};
use futures::TryStreamExt;
use tokio::{
io::AsyncBufReadExt,
io::BufReader,
Expand All @@ -17,9 +19,11 @@ use tokio::{
time::timeout,
};
use tokio_stream::{wrappers::LinesStream, StreamExt};
use tokio_util::codec::{Decoder, FramedRead, LinesCodec};

use std::pin::Pin;
use std::{
io::Read,
path::PathBuf,
process::{ExitStatus, Stdio},
sync::Arc,
Expand Down Expand Up @@ -132,7 +136,7 @@ fn format_path(path: std::path::PathBuf) -> Option<String> {
path.to_str().map(str::to_string)
}

type OutputLines = Pin<Box<dyn futures::Stream<Item = std::io::Result<String>> + Send>>;
type OutputLines = Pin<Box<dyn futures::Stream<Item = anyhow::Result<String>> + Send>>;

fn output_lines(child: &mut Child) -> anyhow::Result<OutputLines> {
let stdout = child
Expand All @@ -144,8 +148,12 @@ fn output_lines(child: &mut Child) -> anyhow::Result<OutputLines> {
.take()
.context("Failed to read Automatic stderr")?;

let stdout = LinesStream::new(BufReader::new(stdout).lines());
let stderr = LinesStream::new(BufReader::new(stderr).lines());
let stdout = BufReader::new(stdout);
let stdout = FramedRead::new(stdout, LossyLinesCodec::default());

let stderr = BufReader::new(stderr);
let stderr = FramedRead::new(stderr, LossyLinesCodec::default());

Ok(futures::StreamExt::boxed(stdout.merge(stderr)))
}

Expand Down
2 changes: 1 addition & 1 deletion src/process/automatic/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl OutputMonitor {

on_startup_rx
.await
.context("Monitoring Automatic startup failed")??;
.context("Automatic failed on startup")??;

Ok(Self {
output_task,
Expand Down

0 comments on commit 1ea1626

Please sign in to comment.