Skip to content

Commit

Permalink
LossyLinesCodec in process module + tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pwalski committed May 22, 2024
1 parent 1ea1626 commit 50db07f
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 40 deletions.
34 changes: 34 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ thiserror = "1.0.58"
[dev-dependencies]
assert_cmd = "2.0"
predicates = "3.1"
test-case = "3.3"

[patch.crates-io]
ya-core-model = { git = "https://github.com/golemfactory/yagna.git", rev = "891f319668add90938183d7f266790329bcb5f08" }
Expand Down
66 changes: 58 additions & 8 deletions src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ use bytes::{Buf, BytesMut};
use futures::TryFutureExt;
use serde::de::DeserializeOwned;
use serde_json::Value;
use tokio_util::codec::Decoder;
use tokio::{io::BufReader, process::Child};
use tokio_stream::StreamExt;
use tokio_util::codec::{Decoder, FramedRead};

use std::cell::RefCell;
use std::env::current_exe;
use std::fmt::Debug;
use std::future::Future;
use std::path::{Path, PathBuf};
use std::pin::{pin, Pin};
use std::pin::pin;
use std::pin::Pin;
use std::process::ExitStatus;
use std::rc::Rc;
use std::task::Poll;
Expand Down Expand Up @@ -157,6 +160,26 @@ impl Default for LossyLinesCodec {
}
}

pub type OutputLines = Pin<Box<dyn futures::Stream<Item = anyhow::Result<String>> + Send>>;

/// Reads process stdout and stderr using `LossyLinesCodec`
pub fn process_output(child: &mut Child) -> anyhow::Result<OutputLines> {
let stdout = child
.stdout
.take()
.context("Failed to access process stdout")?;
let stderr = child
.stderr
.take()
.context("Failed to access process stderr")?;

let stdout = FramedRead::new(BufReader::new(stdout), LossyLinesCodec::default());
let stderr = FramedRead::new(BufReader::new(stderr), LossyLinesCodec::default());

Ok(futures::StreamExt::boxed(stdout.merge(stderr)))
}

/// Decodes lines as UTF-8 (lossly) up to `max_length` characters per line.
impl Decoder for LossyLinesCodec {
type Item = String;

Expand All @@ -166,27 +189,54 @@ impl Decoder for LossyLinesCodec {
let read_to = std::cmp::min(self.max_length.saturating_add(1), buf.len());
let new_line_offset = buf[0..read_to].iter().position(|b| *b == b'\n');
let has_new_line = new_line_offset.is_some();
let offset = new_line_offset
.map(|offset| std::cmp::min(offset, read_to))
.unwrap_or(read_to);
let mut line = buf.split_to(offset);
let offset = new_line_offset.unwrap_or(read_to);
let line = buf.split_to(offset);
if has_new_line {
// Move cursor pass new line character so next call of `decode` will not read it.
buf.advance(1);
}
let mut line: &[u8] = &line;
if let Some(&b'\r') = line.last() {
// skip carriage return
// Skip carriage return.
line = &line[..line.len() - 1];
}
if line.is_empty() {
return Ok(None);
}
// Lossy conversion to avoid errors when converting UTF-16 characters on Windows process output.
let line = String::from_utf8_lossy(line).to_string();
Ok(Some(line))
}
}

#[cfg(test)]
mod tests {
// TODO

use test_case::test_case;

use tokio_stream::StreamExt;
use tokio_util::codec::FramedRead;

use super::LossyLinesCodec;

#[test_case("foo\nbar\nbaz".as_bytes(), &["foo", "bar", "baz"]; "CL multi line")]
#[test_case("foo\r\nbar\r\nbaz".as_bytes(), &["foo", "bar", "baz"]; "CRCL multi line")]
#[test_case("foo".as_bytes(), &["foo"]; "one line")]
#[test_case("fóó\r\nbąr\r\nbąż".as_bytes(), &["fóó", "bąr", "bąż"]; "diacritics in UTF-8")]
#[test_case("".as_bytes(), &[]; "empty")]
#[test_case(&[0x66, 0x6F, 0x80], &["fo�"]; "invalid characters")]
#[tokio::test]
async fn lines_codec_test(encoded: &[u8], expected: &[&str]) {
let mut reader: FramedRead<&[u8], LossyLinesCodec> =
FramedRead::new(encoded, LossyLinesCodec::default());
let mut decoded = Vec::new();
while let Some(line) = reader.next().await {
match line {
Ok(line) => decoded.push(line),
Err(e) => panic!("Error reading line: {}", e),
}
}
let decoded = decoded.iter().map(String::as_str).collect::<Vec<&str>>();
assert_eq!(expected, decoded.as_slice());
}
}
35 changes: 3 additions & 32 deletions src/process/automatic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,18 @@ mod monitor;

use self::config::Config;

use super::{LossyLinesCodec, Runtime};
use super::Runtime;

use crate::process::automatic::monitor::OutputMonitor;
use crate::process::{automatic::monitor::OutputMonitor, process_output};
use anyhow::Context;
use async_trait::async_trait;
use bytes::{Buf, BytesMut};
use futures::TryStreamExt;
use tokio::{
io::AsyncBufReadExt,
io::BufReader,
process::{Child, Command},
sync::Mutex,
time::timeout,
};
use tokio_stream::{wrappers::LinesStream, StreamExt};
use tokio_util::codec::{Decoder, FramedRead, LinesCodec};

use std::pin::Pin;
use std::{
io::Read,
path::PathBuf,
process::{ExitStatus, Stdio},
sync::Arc,
Expand All @@ -48,7 +40,7 @@ impl Runtime for Automatic {
log::info!("Spawning Automatic process");
let mut child = cmd.kill_on_drop(true).spawn()?;

let output = output_lines(&mut child)?;
let output = process_output(&mut child)?;

log::info!("Waiting for Automatic startup");
let output_monitor = timeout(
Expand Down Expand Up @@ -136,27 +128,6 @@ fn format_path(path: std::path::PathBuf) -> Option<String> {
path.to_str().map(str::to_string)
}

type OutputLines = Pin<Box<dyn futures::Stream<Item = anyhow::Result<String>> + Send>>;

fn output_lines(child: &mut Child) -> anyhow::Result<OutputLines> {
let stdout = child
.stdout
.take()
.context("Failed to read Automatic stdout")?;
let stderr = child
.stderr
.take()
.context("Failed to read Automatic stderr")?;

let stdout = BufReader::new(stdout);
let stdout = FramedRead::new(stdout, LossyLinesCodec::default());

let stderr = BufReader::new(stderr);
let stderr = FramedRead::new(stderr, LossyLinesCodec::default());

Ok(futures::StreamExt::boxed(stdout.merge(stderr)))
}

#[cfg(target_family = "windows")]
#[cfg(test)]
mod windows_tests {
Expand Down
2 changes: 2 additions & 0 deletions src/process/automatic/monitor.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::process::OutputLines;

use super::*;

use reqwest::Client;
Expand Down

0 comments on commit 50db07f

Please sign in to comment.