From 1ea1626724dac0359400658b6fe5bdb4797fe0cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Walski?= Date: Wed, 22 May 2024 12:59:29 +0200 Subject: [PATCH] LossyLinesCodec --- Cargo.lock | 1 + Cargo.toml | 1 + src/process.rs | 48 ++++++++++++++++++++++++++++++++ src/process/automatic.rs | 16 ++++++++--- src/process/automatic/monitor.rs | 2 +- 5 files changed, 63 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7f2d1e8..a248b90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6155,6 +6155,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", + "tokio-util", "ya-agreement-utils 0.5.0", "ya-client-model 0.6.0", "ya-core-model 0.9.1", diff --git a/Cargo.toml b/Cargo.toml index e7c4388..b8c7076 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ yansi = "1.0" chrono = "0.4.34" tokio = { version = "1.32", features = ["macros", "signal"] } tokio-stream = { version = "0.1", features = ["io-util"] } +tokio-util = { version = "0.7", features = ["codec"] } futures = "0.3" flexi_logger = { version = "0.28", features = ["colors"] } regex = "1" diff --git a/src/process.rs b/src/process.rs index bb36544..e05c711 100644 --- a/src/process.rs +++ b/src/process.rs @@ -1,8 +1,10 @@ use anyhow::Context; use async_trait::async_trait; +use bytes::{Buf, BytesMut}; use futures::TryFutureExt; use serde::de::DeserializeOwned; use serde_json::Value; +use tokio_util::codec::Decoder; use std::cell::RefCell; use std::env::current_exe; @@ -142,3 +144,49 @@ impl Future for ProcessController { } } } + +pub struct LossyLinesCodec { + max_length: usize, +} + +impl Default for LossyLinesCodec { + fn default() -> Self { + Self { + max_length: usize::MAX, + } + } +} + +impl Decoder for LossyLinesCodec { + type Item = String; + + type Error = anyhow::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { + let read_to = std::cmp::min(self.max_length.saturating_add(1), buf.len()); + let new_line_offset = buf[0..read_to].iter().position(|b| *b == b'\n'); + let has_new_line = new_line_offset.is_some(); + let offset = new_line_offset + .map(|offset| std::cmp::min(offset, read_to)) + .unwrap_or(read_to); + let mut line = buf.split_to(offset); + if has_new_line { + buf.advance(1); + } + let mut line: &[u8] = &line; + if let Some(&b'\r') = line.last() { + // skip carriage return + line = &line[..line.len() - 1]; + } + if line.is_empty() { + return Ok(None); + } + let line = String::from_utf8_lossy(line).to_string(); + Ok(Some(line)) + } +} + +#[cfg(test)] +mod tests { + // TODO +} diff --git a/src/process/automatic.rs b/src/process/automatic.rs index b46c062..1484e84 100644 --- a/src/process/automatic.rs +++ b/src/process/automatic.rs @@ -4,11 +4,13 @@ mod monitor; use self::config::Config; -use super::Runtime; +use super::{LossyLinesCodec, Runtime}; use crate::process::automatic::monitor::OutputMonitor; use anyhow::Context; use async_trait::async_trait; +use bytes::{Buf, BytesMut}; +use futures::TryStreamExt; use tokio::{ io::AsyncBufReadExt, io::BufReader, @@ -17,9 +19,11 @@ use tokio::{ time::timeout, }; use tokio_stream::{wrappers::LinesStream, StreamExt}; +use tokio_util::codec::{Decoder, FramedRead, LinesCodec}; use std::pin::Pin; use std::{ + io::Read, path::PathBuf, process::{ExitStatus, Stdio}, sync::Arc, @@ -132,7 +136,7 @@ fn format_path(path: std::path::PathBuf) -> Option { path.to_str().map(str::to_string) } -type OutputLines = Pin> + Send>>; +type OutputLines = Pin> + Send>>; fn output_lines(child: &mut Child) -> anyhow::Result { let stdout = child @@ -144,8 +148,12 @@ fn output_lines(child: &mut Child) -> anyhow::Result { .take() .context("Failed to read Automatic stderr")?; - let stdout = LinesStream::new(BufReader::new(stdout).lines()); - let stderr = LinesStream::new(BufReader::new(stderr).lines()); + let stdout = BufReader::new(stdout); + let stdout = FramedRead::new(stdout, LossyLinesCodec::default()); + + let stderr = BufReader::new(stderr); + let stderr = FramedRead::new(stderr, LossyLinesCodec::default()); + Ok(futures::StreamExt::boxed(stdout.merge(stderr))) } diff --git a/src/process/automatic/monitor.rs b/src/process/automatic/monitor.rs index 9a9f1e3..87f2546 100644 --- a/src/process/automatic/monitor.rs +++ b/src/process/automatic/monitor.rs @@ -26,7 +26,7 @@ impl OutputMonitor { on_startup_rx .await - .context("Monitoring Automatic startup failed")??; + .context("Automatic failed on startup")??; Ok(Self { output_task,