From f0309bb433091759b7f27cb1e33b87dff491815b Mon Sep 17 00:00:00 2001 From: Arpad Borsos Date: Tue, 3 Sep 2024 12:47:51 +0200 Subject: [PATCH 1/3] Start parsing the `chunks` file with serde This implements a hand-written parser which scans through the `chunks` file line-by-line, and parses the various headers and line records with serde. The most complex part here is parsing the line records. If that complexity starts to be unreasonable, a hybrid approach is also possible in which the hand-written parser is used along with the simpler serde-based `header` parsers, and still falling back to the existing parser-combinator based parser for the line records. --- core/benches/pyreport.rs | 64 +++- core/src/parsers/pyreport/chunks_serde.rs | 375 ++++++++++++++++++++++ core/src/parsers/pyreport/mod.rs | 4 +- 3 files changed, 439 insertions(+), 4 deletions(-) create mode 100644 core/src/parsers/pyreport/chunks_serde.rs diff --git a/core/benches/pyreport.rs b/core/benches/pyreport.rs index fa69558..f28d909 100644 --- a/core/benches/pyreport.rs +++ b/core/benches/pyreport.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use codecov_rs::{ - parsers::pyreport::{chunks, report_json}, + parsers::pyreport::{chunks, chunks_serde, report_json}, report::test::{TestReport, TestReportBuilder}, }; use divan::Bencher; @@ -53,7 +53,7 @@ fn simple_chunks() { let chunks = &[ // Header and one chunk with an empty line "{}\n<<<<< end_of_header >>>>>\n{}\n", - // No header, one chunk with a populated line and an empty line + // No header, one chunk with a populated line and an empty line "{}\n[1, null, [[0, 1]]]\n", // No header, two chunks, the second having just one empty line "{}\n[1, null, [[0, 1]]]\n\n<<<<< end_of_chunk >>>>>\n{}\n", @@ -106,3 +106,63 @@ fn parse_chunks_file(input: &str, files: HashMap, sessions: HashMap< .parse_next(&mut chunks_stream) .unwrap(); } + +#[divan::bench] +fn simple_chunks_serde() { + let chunks: &[&[u8]] = &[ + // Header and one chunk with an empty line + b"{}\n<<<<< end_of_header >>>>>\n{}\n", + // No header, one chunk with a populated line and an empty line + b"{}\n[1, null, [[0, 1]]]\n", + // No header, two chunks, the second having just one empty line + b"{}\n[1, null, [[0, 1]]]\n\n<<<<< end_of_chunk >>>>>\n{}\n", + // Header, two chunks, the second having multiple data lines and an empty line + b"{}\n<<<<< end_of_header >>>>>\n{}\n[1, null, [[0, 1]]]\n\n<<<<< end_of_chunk >>>>>\n{}\n[1, null, [[0, 1]]]\n[1, null, [[0, 1]]]\n", + ]; + + for input in chunks { + parse_chunks_file_serde(input) + } +} + +// this is currently <300 ms on my machine +#[divan::bench(sample_count = 10)] +fn complex_chunks_serde(bencher: Bencher) { + // this is a ~96M `chunks` file + let chunks = + load_fixture("pyreport/large/worker-c71ddfd4cb1753c7a540e5248c2beaa079fc3341-chunks.txt"); + + bencher.bench(|| parse_chunks_file_serde(&chunks)); +} + +fn parse_chunks_file_serde(input: &[u8]) { + let mut parser = chunks_serde::Parser::new(input); + loop { + // TODO: these are just for debugging + let rest = parser.rest; + let expecting = parser.expecting; + let event = parser.next(); + match event { + Ok(None) => break, + Ok(Some(_)) => {} + Err(err) => { + let rest = std::str::from_utf8(rest).unwrap(); + let rest = rest.get(..32).unwrap_or(rest); + dbg!(rest, expecting); + panic!("{err}"); + } + } + } +} + +#[track_caller] +fn load_fixture(path: &str) -> Vec { + let path = format!("./fixtures/{path}"); + let contents = std::fs::read(path).unwrap(); + + if contents.starts_with(b"version https://git-lfs.github.com/spec/v1") { + panic!("Fixture has not been pulled from Git LFS"); + } + + contents +} diff --git a/core/src/parsers/pyreport/chunks_serde.rs b/core/src/parsers/pyreport/chunks_serde.rs new file mode 100644 index 0000000..9335d8c --- /dev/null +++ b/core/src/parsers/pyreport/chunks_serde.rs @@ -0,0 +1,375 @@ +//! A parser for the `chunks` file format. +//! +//! A chunks file contains an optional header and a series of 1 or more +//! "chunks", separated by an `END_OF_CHUNK` terminator. +//! +//! Chunks files sometimes begin with a JSON object followed by an +//! `END_OF_HEADER` terminator string. +//! The JSON object contains: +//! - `"labels_index"`: assigns a numeric ID to each label to save space +//! +//! If the `"labels_index"` key is present, this parser will insert each label +//! into the report as a [`crate::report::models::Context`] and create a mapping +//! in `buf.state.labels_index` from numeric ID in the header to the +//! new `Context`'s ID in the output report. If the `"labels_index"` key is +//! _not_ present, we will populate `buf.state.labels_index` gradually as we +//! encounter new labels during parsing. +//! +//! A chunk contains all of the line-by-line measurements for +//! a file. The Nth chunk corresponds to the file whose entry in +//! `buf.state.report_json_files` has N in its `chunks_index` field. +//! +//! Each new chunk will reset `buf.state.chunk.current_line` to 0 when it starts +//! and increment `buf.state.chunk.index` when it ends so that the next chunk +//! can associate its data with the correct file. +//! +//! A line may be empty, or it may contain a [`LineRecord`]. +//! A [`LineRecord`] itself does not correspond to anything in the output, +//! but it's an umbrella that includes all of the data +//! tied to a line/[`CoverageSample`]. +//! +//! This parser performs all the writes it can to the output +//! stream and only returns a `ReportLine` for tests. The `report_line_or_empty` +//! parser which wraps this and supports empty lines returns `Ok(())`. + +use std::{collections::HashMap, fmt}; + +use serde::{de, de::IgnoredAny, Deserialize}; + +#[derive(Debug)] +pub struct Parser<'d> { + // TODO: these are pub just for debugging + pub rest: &'d [u8], + pub expecting: Expecting, +} + +#[derive(Debug, PartialEq, Eq)] +pub enum ParserEvent { + EmptyLineRecord, + LineRecord(LineRecord), + EmptyChunk, + FileHeader(FileHeader), + ChunkHeader(ChunkHeader), +} + +#[derive(Debug, PartialEq, Eq, Default, Deserialize)] +pub struct FileHeader { + #[serde(default)] + pub labels_index: HashMap, +} + +#[derive(Debug, PartialEq, Eq, Default, Deserialize)] +pub struct ChunkHeader { + #[serde(default)] + pub present_sessions: Vec, +} + +#[derive(Debug, Clone, Deserialize)] +struct IgnoredAnyEq(IgnoredAny); +impl PartialEq for IgnoredAnyEq { + fn eq(&self, _other: &Self) -> bool { + true + } +} +impl Eq for IgnoredAnyEq {} + +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +pub struct LineRecord( + /// coverage + Coverage, + /// coverage type + Option, + /// sessions + Vec, + /// messages + #[serde(default)] + Option, + /// complexity + #[serde(default)] + Option, + /// TODO: datapoints + #[serde(default)] + Option, +); + +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +pub struct LineSession( + /// session id + u32, + /// coverage + Coverage, + /// TODO: branches + #[serde(default)] + Option, + /// TODO: partials + #[serde(default)] + Option, + /// TODO: complexity + #[serde(default)] + Option, +); + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize)] +#[serde(try_from = "&str")] +pub enum CoverageType { + #[default] + Line, + Branch, + Method, +} + +impl<'s> TryFrom<&'s str> for CoverageType { + type Error = &'s str; + + fn try_from(value: &'s str) -> Result { + match value { + "line" => Ok(Self::Line), + "b" | "branch" => Ok(Self::Branch), + "m" | "method" => Ok(Self::Method), + s => Err(s), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Coverage { + Partial, + BranchTaken(u32, u32), + HitCount(u32), +} + +impl<'de> Deserialize<'de> for Coverage { + fn deserialize(deserializer: D) -> Result + where + D: de::Deserializer<'de>, + { + struct CoverageVisitor; + impl<'de> de::Visitor<'de> for CoverageVisitor { + type Value = Coverage; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a coverage value") + } + + fn visit_bool(self, v: bool) -> Result + where + E: de::Error, + { + if v { + Ok(Coverage::Partial) + } else { + Err(de::Error::invalid_value(de::Unexpected::Bool(v), &self)) + } + } + + fn visit_u64(self, value: u64) -> Result + where + E: de::Error, + { + Ok(Coverage::HitCount(value as u32)) + } + + fn visit_str(self, v: &str) -> Result + where + E: de::Error, + { + let invalid = || de::Error::invalid_value(de::Unexpected::Str(v), &self); + let (covered, total) = v.split_once('/').ok_or_else(invalid)?; + + let covered: u32 = covered.parse().map_err(|_| invalid())?; + let total: u32 = total.parse().map_err(|_| invalid())?; + Ok(Coverage::BranchTaken(covered, total)) + } + } + + deserializer.deserialize_any(CoverageVisitor) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum ParserError { + #[error("unexpected EOF")] + UnexpectedEof, + #[error("unexpected input")] + UnexpectedInput, + #[error("invalid file header")] + InvalidFileHeader(#[source] serde_json::Error), + #[error("invalid chunk header")] + InvalidChunkHeader(#[source] serde_json::Error), + #[error("invalid line record")] + InvalidLineRecord(#[source] serde_json::Error), +} + +impl PartialEq for ParserError { + fn eq(&self, other: &Self) -> bool { + core::mem::discriminant(self) == core::mem::discriminant(other) + } +} +impl Eq for ParserError {} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Expecting { + FileHeader, + ChunkHeader, + LineRecord, + EndOfChunk, +} + +const END_OF_CHUNK: &[u8] = b"<<<<< end_of_chunk >>>>>"; +const END_OF_HEADER: &[u8] = b"<<<<< end_of_header >>>>>"; + +// `slice::split_once` is still unstable: +// +fn slice_split_once(slice: &[u8], pred: u8) -> Option<(&[u8], &[u8])> { + let index = slice.iter().position(|b| *b == pred)?; + Some((&slice[..index], &slice[index + 1..])) +} + +impl<'d> Parser<'d> { + pub fn new(input: &'d [u8]) -> Self { + Self { + rest: input, + expecting: Expecting::FileHeader, + } + } + + pub fn next(&mut self) -> Result, ParserError> { + loop { + let Some((line, rest)) = slice_split_once(self.rest, b'\n') else { + return Ok(None); + }; + self.rest = rest; + + if self.expecting == Expecting::LineRecord { + if line.is_empty() { + return Ok(Some(ParserEvent::EmptyLineRecord)); + } + if line == END_OF_CHUNK { + self.expecting = Expecting::ChunkHeader; + continue; + } + + let line_record: LineRecord = + serde_json::from_slice(line).map_err(ParserError::InvalidLineRecord)?; + return Ok(Some(ParserEvent::LineRecord(line_record))); + } + + if self.expecting == Expecting::EndOfChunk { + if line != END_OF_CHUNK { + return Err(ParserError::UnexpectedInput); + } + + self.expecting = Expecting::ChunkHeader; + continue; + } + + // else: expecting a file or chunk header + + // this is an empty chunk (header) + if line == b"null" { + self.expecting = Expecting::EndOfChunk; + + return Ok(Some(ParserEvent::EmptyChunk)); + } + + // otherwise, the header has to be a JSON object + if !line.starts_with(b"{") { + return Err(ParserError::UnexpectedInput); + } + if self.expecting == Expecting::FileHeader { + if let Some((next_line, rest)) = slice_split_once(self.rest, b'\n') { + if next_line == END_OF_HEADER { + self.rest = rest; + self.expecting = Expecting::ChunkHeader; + + let file_header: FileHeader = + serde_json::from_slice(line).map_err(ParserError::InvalidFileHeader)?; + return Ok(Some(ParserEvent::FileHeader(file_header))); + } + } + } + // else: chunk header + + self.expecting = Expecting::LineRecord; + + let chunk_header: ChunkHeader = + serde_json::from_slice(line).map_err(ParserError::InvalidChunkHeader)?; + return Ok(Some(ParserEvent::ChunkHeader(chunk_header))); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + type ParserItem = Result, ParserError>; + + #[test] + fn test_parsing_events() { + let simple_line_record = LineRecord( + Coverage::HitCount(1), + None, + vec![LineSession(0, Coverage::HitCount(1), None, None, None)], + None, + None, + None, + ); + + let cases: &[(&[u8], &[ParserItem])] = &[ + ( + // Header and one chunk with an empty line + b"{}\n<<<<< end_of_header >>>>>\n{}\n", + &[ + Ok(Some(ParserEvent::FileHeader(FileHeader::default()))), + Ok(Some(ParserEvent::ChunkHeader(ChunkHeader::default()))), + Ok(None), + ], + ), + ( + // No header, one chunk with a populated line and an empty line + b"{}\n[1, null, [[0, 1]]]\n", + &[ + Ok(Some(ParserEvent::ChunkHeader(ChunkHeader::default()))), + Ok(Some(ParserEvent::LineRecord(simple_line_record.clone()))), + Ok(None), + ], + ), + ( + // No header, two chunks, the second having just one empty line + b"{}\n[1, null, [[0, 1]]]\n\n<<<<< end_of_chunk >>>>>\n{}\n", + &[ + Ok(Some(ParserEvent::ChunkHeader(ChunkHeader::default()))), + Ok(Some(ParserEvent::LineRecord(simple_line_record.clone()))), + Ok(Some(ParserEvent::EmptyLineRecord)), + Ok(Some(ParserEvent::ChunkHeader(ChunkHeader::default()))), + Ok(None), + ], + ), + ( + // Header, two chunks, the second having multiple data lines and an empty line + b"{}\n<<<<< end_of_header >>>>>\n{}\n[1, null, [[0, 1]]]\n\n<<<<< end_of_chunk >>>>>\n{}\n[1, null, [[0, 1]]]\n[1, null, [[0, 1]]]\n", + &[ + Ok(Some(ParserEvent::FileHeader(FileHeader::default()))), + Ok(Some(ParserEvent::ChunkHeader(ChunkHeader::default()))), + Ok(Some(ParserEvent::LineRecord(simple_line_record.clone()))), + Ok(Some(ParserEvent::EmptyLineRecord)), + Ok(Some(ParserEvent::ChunkHeader(ChunkHeader::default()))), + Ok(Some(ParserEvent::LineRecord(simple_line_record.clone()))), + Ok(Some(ParserEvent::LineRecord(simple_line_record.clone()))), + Ok(None), + ], + ), + ]; + + for (input, expected_events) in cases { + let mut parser = Parser::new(input); + + for expected_event in *expected_events { + dbg!(std::str::from_utf8(parser.rest).unwrap(), parser.expecting); + let event = parser.next(); + assert_eq!(dbg!(event), *expected_event); + } + } + } +} diff --git a/core/src/parsers/pyreport/mod.rs b/core/src/parsers/pyreport/mod.rs index 4a4b1f2..6d9ff6a 100644 --- a/core/src/parsers/pyreport/mod.rs +++ b/core/src/parsers/pyreport/mod.rs @@ -8,9 +8,9 @@ use crate::{ report::{SqliteReport, SqliteReportBuilder, SqliteReportBuilderTx}, }; -pub mod report_json; - pub mod chunks; +pub mod chunks_serde; +pub mod report_json; mod utils; From 816d6325f7dcc86c40281e2c6ea04fe20c89c858 Mon Sep 17 00:00:00 2001 From: Arpad Borsos Date: Wed, 4 Sep 2024 11:57:09 +0200 Subject: [PATCH 2/3] Use `memchr`-based splitting instead of an iterator/event-based interface --- Cargo.lock | 1 + core/Cargo.toml | 1 + core/benches/pyreport.rs | 22 +- core/src/parsers/pyreport/chunks_serde.rs | 330 +++++++++++----------- 4 files changed, 179 insertions(+), 175 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 54fff2b..eef7292 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -85,6 +85,7 @@ version = "0.1.0" dependencies = [ "divan", "include_dir", + "memchr", "memmap2", "rand", "rusqlite", diff --git a/core/Cargo.toml b/core/Cargo.toml index 4d0155f..8e167b9 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -10,6 +10,7 @@ testing = [] [dependencies] include_dir = "0.7.3" +memchr = "2.7.4" memmap2 = "0.9.4" rand = "0.8.5" rusqlite = { version = "0.31.0", features = ["bundled", "limits", "serde_json"] } diff --git a/core/benches/pyreport.rs b/core/benches/pyreport.rs index f28d909..4e1fcb8 100644 --- a/core/benches/pyreport.rs +++ b/core/benches/pyreport.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::{collections::HashMap, hint::black_box}; use codecov_rs::{ parsers::pyreport::{chunks, chunks_serde, report_json}, @@ -136,21 +136,11 @@ fn complex_chunks_serde(bencher: Bencher) { } fn parse_chunks_file_serde(input: &[u8]) { - let mut parser = chunks_serde::Parser::new(input); - loop { - // TODO: these are just for debugging - let rest = parser.rest; - let expecting = parser.expecting; - let event = parser.next(); - match event { - Ok(None) => break, - Ok(Some(_)) => {} - Err(err) => { - let rest = std::str::from_utf8(rest).unwrap(); - let rest = rest.get(..32).unwrap_or(rest); - dbg!(rest, expecting); - panic!("{err}"); - } + let chunks_file = chunks_serde::ChunksFile::new(input).unwrap(); + let mut chunks = chunks_file.chunks(); + while let Some(mut chunk) = chunks.next_chunk().unwrap() { + while let Some(line) = chunk.next_line().unwrap() { + black_box(line); } } } diff --git a/core/src/parsers/pyreport/chunks_serde.rs b/core/src/parsers/pyreport/chunks_serde.rs index 9335d8c..121057b 100644 --- a/core/src/parsers/pyreport/chunks_serde.rs +++ b/core/src/parsers/pyreport/chunks_serde.rs @@ -32,24 +32,146 @@ //! stream and only returns a `ReportLine` for tests. The `report_line_or_empty` //! parser which wraps this and supports empty lines returns `Ok(())`. -use std::{collections::HashMap, fmt}; +use std::{collections::HashMap, fmt, mem, sync::OnceLock}; +use memchr::{memchr, memmem}; use serde::{de, de::IgnoredAny, Deserialize}; +use crate::report::pyreport::{CHUNKS_FILE_END_OF_CHUNK, CHUNKS_FILE_HEADER_TERMINATOR}; + +#[derive(Debug, thiserror::Error)] +pub enum ParserError { + #[error("unexpected EOF")] + UnexpectedEof, + #[error("unexpected input")] + UnexpectedInput, + #[error("invalid file header")] + InvalidFileHeader(#[source] serde_json::Error), + #[error("invalid chunk header")] + InvalidChunkHeader(#[source] serde_json::Error), + #[error("invalid line record")] + InvalidLineRecord(#[source] serde_json::Error), +} + +impl PartialEq for ParserError { + fn eq(&self, other: &Self) -> bool { + core::mem::discriminant(self) == core::mem::discriminant(other) + } +} +impl Eq for ParserError {} + #[derive(Debug)] -pub struct Parser<'d> { - // TODO: these are pub just for debugging - pub rest: &'d [u8], - pub expecting: Expecting, +pub struct ChunksFile<'d> { + file_header: FileHeader, + input: &'d [u8], +} + +impl<'d> ChunksFile<'d> { + pub fn new(mut input: &'d [u8]) -> Result { + static HEADER_FINDER: OnceLock = OnceLock::new(); + let header_finder = + HEADER_FINDER.get_or_init(|| memmem::Finder::new(CHUNKS_FILE_HEADER_TERMINATOR)); + + let file_header = if let Some(pos) = header_finder.find(input) { + let header_bytes = &input[..pos]; + input = &input[pos + header_finder.needle().len()..]; + let file_header: FileHeader = + serde_json::from_slice(header_bytes).map_err(ParserError::InvalidFileHeader)?; + file_header + } else { + FileHeader::default() + }; + + Ok(Self { file_header, input }) + } + + pub fn labels_index(&self) -> &HashMap { + &self.file_header.labels_index + } + + pub fn chunks(&self) -> Chunks { + Chunks { input: self.input } + } } -#[derive(Debug, PartialEq, Eq)] -pub enum ParserEvent { - EmptyLineRecord, - LineRecord(LineRecord), - EmptyChunk, - FileHeader(FileHeader), - ChunkHeader(ChunkHeader), +pub struct Chunks<'d> { + input: &'d [u8], +} + +impl<'d> Chunks<'d> { + pub fn next_chunk(&mut self) -> Result>, ParserError> { + if self.input.is_empty() { + return Ok(None); + } + + static CHUNK_FINDER: OnceLock = OnceLock::new(); + let chunk_finder = + CHUNK_FINDER.get_or_init(|| memmem::Finder::new(CHUNKS_FILE_END_OF_CHUNK)); + + let mut chunk_bytes = if let Some(pos) = chunk_finder.find(self.input) { + let chunk_bytes = &self.input[..pos]; + self.input = &self.input[pos + chunk_finder.needle().len()..]; + chunk_bytes + } else { + mem::take(&mut self.input) + }; + + if chunk_bytes == b"null" { + return Ok(Some(Chunk { + chunk_header: ChunkHeader::default(), + input: &[], + })); + } + + let header_bytes = next_line(&mut chunk_bytes).ok_or(ParserError::UnexpectedInput)?; + let chunk_header: ChunkHeader = + serde_json::from_slice(header_bytes).map_err(ParserError::InvalidFileHeader)?; + + Ok(Some(Chunk { + chunk_header, + input: chunk_bytes, + })) + } +} + +pub struct Chunk<'d> { + chunk_header: ChunkHeader, + input: &'d [u8], +} + +impl<'d> Chunk<'d> { + pub fn present_sessions(&self) -> &[u32] { + &self.chunk_header.present_sessions + } + + pub fn next_line(&mut self) -> Result>, ParserError> { + let Some(line) = next_line(&mut self.input) else { + return Ok(None); + }; + + if line.is_empty() { + return Ok(Some(None)); + } + + let line_record: LineRecord = + serde_json::from_slice(line).map_err(ParserError::InvalidLineRecord)?; + return Ok(Some(Some(line_record))); + } +} + +fn next_line<'d>(input: &mut &'d [u8]) -> Option<&'d [u8]> { + if input.is_empty() { + return None; + } + + let line_bytes = if let Some(pos) = memchr(b'\n', input) { + let line_bytes = &input[..pos]; + *input = &input[pos + 1..]; + line_bytes + } else { + mem::take(input) + }; + Some(line_bytes) } #[derive(Debug, PartialEq, Eq, Default, Deserialize)] @@ -186,125 +308,10 @@ impl<'de> Deserialize<'de> for Coverage { } } -#[derive(Debug, thiserror::Error)] -pub enum ParserError { - #[error("unexpected EOF")] - UnexpectedEof, - #[error("unexpected input")] - UnexpectedInput, - #[error("invalid file header")] - InvalidFileHeader(#[source] serde_json::Error), - #[error("invalid chunk header")] - InvalidChunkHeader(#[source] serde_json::Error), - #[error("invalid line record")] - InvalidLineRecord(#[source] serde_json::Error), -} - -impl PartialEq for ParserError { - fn eq(&self, other: &Self) -> bool { - core::mem::discriminant(self) == core::mem::discriminant(other) - } -} -impl Eq for ParserError {} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum Expecting { - FileHeader, - ChunkHeader, - LineRecord, - EndOfChunk, -} - -const END_OF_CHUNK: &[u8] = b"<<<<< end_of_chunk >>>>>"; -const END_OF_HEADER: &[u8] = b"<<<<< end_of_header >>>>>"; - -// `slice::split_once` is still unstable: -// -fn slice_split_once(slice: &[u8], pred: u8) -> Option<(&[u8], &[u8])> { - let index = slice.iter().position(|b| *b == pred)?; - Some((&slice[..index], &slice[index + 1..])) -} - -impl<'d> Parser<'d> { - pub fn new(input: &'d [u8]) -> Self { - Self { - rest: input, - expecting: Expecting::FileHeader, - } - } - - pub fn next(&mut self) -> Result, ParserError> { - loop { - let Some((line, rest)) = slice_split_once(self.rest, b'\n') else { - return Ok(None); - }; - self.rest = rest; - - if self.expecting == Expecting::LineRecord { - if line.is_empty() { - return Ok(Some(ParserEvent::EmptyLineRecord)); - } - if line == END_OF_CHUNK { - self.expecting = Expecting::ChunkHeader; - continue; - } - - let line_record: LineRecord = - serde_json::from_slice(line).map_err(ParserError::InvalidLineRecord)?; - return Ok(Some(ParserEvent::LineRecord(line_record))); - } - - if self.expecting == Expecting::EndOfChunk { - if line != END_OF_CHUNK { - return Err(ParserError::UnexpectedInput); - } - - self.expecting = Expecting::ChunkHeader; - continue; - } - - // else: expecting a file or chunk header - - // this is an empty chunk (header) - if line == b"null" { - self.expecting = Expecting::EndOfChunk; - - return Ok(Some(ParserEvent::EmptyChunk)); - } - - // otherwise, the header has to be a JSON object - if !line.starts_with(b"{") { - return Err(ParserError::UnexpectedInput); - } - if self.expecting == Expecting::FileHeader { - if let Some((next_line, rest)) = slice_split_once(self.rest, b'\n') { - if next_line == END_OF_HEADER { - self.rest = rest; - self.expecting = Expecting::ChunkHeader; - - let file_header: FileHeader = - serde_json::from_slice(line).map_err(ParserError::InvalidFileHeader)?; - return Ok(Some(ParserEvent::FileHeader(file_header))); - } - } - } - // else: chunk header - - self.expecting = Expecting::LineRecord; - - let chunk_header: ChunkHeader = - serde_json::from_slice(line).map_err(ParserError::InvalidChunkHeader)?; - return Ok(Some(ParserEvent::ChunkHeader(chunk_header))); - } - } -} - #[cfg(test)] mod tests { use super::*; - type ParserItem = Result, ParserError>; - #[test] fn test_parsing_events() { let simple_line_record = LineRecord( @@ -316,60 +323,65 @@ mod tests { None, ); - let cases: &[(&[u8], &[ParserItem])] = &[ + let cases: &[( + &[u8], // input + HashMap, // labels index + &[(&[u32], &[Option])], // chunks: session ids, line records + )] = &[ ( // Header and one chunk with an empty line b"{}\n<<<<< end_of_header >>>>>\n{}\n", - &[ - Ok(Some(ParserEvent::FileHeader(FileHeader::default()))), - Ok(Some(ParserEvent::ChunkHeader(ChunkHeader::default()))), - Ok(None), - ], + HashMap::default(), + &[(&[], &[])], ), ( // No header, one chunk with a populated line and an empty line b"{}\n[1, null, [[0, 1]]]\n", - &[ - Ok(Some(ParserEvent::ChunkHeader(ChunkHeader::default()))), - Ok(Some(ParserEvent::LineRecord(simple_line_record.clone()))), - Ok(None), - ], + HashMap::default(), + &[(&[], &[Some(simple_line_record.clone())])], ), ( // No header, two chunks, the second having just one empty line b"{}\n[1, null, [[0, 1]]]\n\n<<<<< end_of_chunk >>>>>\n{}\n", - &[ - Ok(Some(ParserEvent::ChunkHeader(ChunkHeader::default()))), - Ok(Some(ParserEvent::LineRecord(simple_line_record.clone()))), - Ok(Some(ParserEvent::EmptyLineRecord)), - Ok(Some(ParserEvent::ChunkHeader(ChunkHeader::default()))), - Ok(None), - ], + HashMap::default(), + &[(&[], &[Some(simple_line_record.clone())]), (&[], &[])], ), ( // Header, two chunks, the second having multiple data lines and an empty line b"{}\n<<<<< end_of_header >>>>>\n{}\n[1, null, [[0, 1]]]\n\n<<<<< end_of_chunk >>>>>\n{}\n[1, null, [[0, 1]]]\n[1, null, [[0, 1]]]\n", + HashMap::default(), &[ - Ok(Some(ParserEvent::FileHeader(FileHeader::default()))), - Ok(Some(ParserEvent::ChunkHeader(ChunkHeader::default()))), - Ok(Some(ParserEvent::LineRecord(simple_line_record.clone()))), - Ok(Some(ParserEvent::EmptyLineRecord)), - Ok(Some(ParserEvent::ChunkHeader(ChunkHeader::default()))), - Ok(Some(ParserEvent::LineRecord(simple_line_record.clone()))), - Ok(Some(ParserEvent::LineRecord(simple_line_record.clone()))), - Ok(None), + (&[], &[Some(simple_line_record.clone())]), + ( + &[], + &[ + Some(simple_line_record.clone()), + Some(simple_line_record.clone()), + ], + ), ], ), ]; - for (input, expected_events) in cases { - let mut parser = Parser::new(input); + for (input, expected_labels_index, expected_chunks) in cases { + let chunks_file = ChunksFile::new(input).unwrap(); + let mut chunks = chunks_file.chunks(); + + assert_eq!(chunks_file.labels_index(), expected_labels_index); + + for (expected_sessions, expected_line_records) in *expected_chunks { + let mut chunk = chunks.next_chunk().unwrap().unwrap(); + + assert_eq!(chunk.present_sessions(), *expected_sessions); + + let mut lines = vec![]; + while let Some(line) = chunk.next_line().unwrap() { + lines.push(line); + } - for expected_event in *expected_events { - dbg!(std::str::from_utf8(parser.rest).unwrap(), parser.expecting); - let event = parser.next(); - assert_eq!(dbg!(event), *expected_event); + assert_eq!(lines, *expected_line_records); } + assert!(chunks.next_chunk().unwrap().is_none()); } } } From bd18f582fdbfaf93b08f0d8d47af304e3d6a2322 Mon Sep 17 00:00:00 2001 From: Arpad Borsos Date: Wed, 4 Sep 2024 13:03:38 +0200 Subject: [PATCH 3/3] get closer to the existing parser interface dealing with report builders --- core/benches/pyreport.rs | 28 +++--- core/src/error.rs | 5 + core/src/parsers/pyreport/chunks_serde.rs | 115 +++++++++++++++++++--- core/src/report/pyreport/types.rs | 17 ++++ 4 files changed, 140 insertions(+), 25 deletions(-) diff --git a/core/benches/pyreport.rs b/core/benches/pyreport.rs index 4e1fcb8..ed9b10a 100644 --- a/core/benches/pyreport.rs +++ b/core/benches/pyreport.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, hint::black_box}; +use std::collections::HashMap; use codecov_rs::{ parsers::pyreport::{chunks, chunks_serde, report_json}, @@ -120,8 +120,13 @@ fn simple_chunks_serde() { b"{}\n<<<<< end_of_header >>>>>\n{}\n[1, null, [[0, 1]]]\n\n<<<<< end_of_chunk >>>>>\n{}\n[1, null, [[0, 1]]]\n[1, null, [[0, 1]]]\n", ]; + let report_json = report_json::ParsedReportJson { + files: Default::default(), + sessions: Default::default(), + }; + for input in chunks { - parse_chunks_file_serde(input) + parse_chunks_file_serde(input, &report_json); } } @@ -132,17 +137,18 @@ fn complex_chunks_serde(bencher: Bencher) { let chunks = load_fixture("pyreport/large/worker-c71ddfd4cb1753c7a540e5248c2beaa079fc3341-chunks.txt"); - bencher.bench(|| parse_chunks_file_serde(&chunks)); + // parsing the chunks depends on having loaded the `report_json` + let report = load_fixture( + "pyreport/large/worker-c71ddfd4cb1753c7a540e5248c2beaa079fc3341-report_json.json", + ); + let report_json = parse_report_json(&report); + + bencher.bench(|| parse_chunks_file_serde(&chunks, &report_json)); } -fn parse_chunks_file_serde(input: &[u8]) { - let chunks_file = chunks_serde::ChunksFile::new(input).unwrap(); - let mut chunks = chunks_file.chunks(); - while let Some(mut chunk) = chunks.next_chunk().unwrap() { - while let Some(line) = chunk.next_line().unwrap() { - black_box(line); - } - } +fn parse_chunks_file_serde(input: &[u8], report_json: &report_json::ParsedReportJson) { + let mut report_builder = TestReportBuilder::default(); + chunks_serde::parse_chunks_file(input, report_json, &mut report_builder).unwrap(); } #[track_caller] diff --git a/core/src/error.rs b/core/src/error.rs index 5f793bb..b3d0563 100644 --- a/core/src/error.rs +++ b/core/src/error.rs @@ -1,5 +1,7 @@ use thiserror::Error; +use crate::parsers::pyreport::chunks_serde::ChunksFileParseError; + pub type Result = std::result::Result; #[derive(Error, Debug)] @@ -26,4 +28,7 @@ pub enum CodecovError { #[cfg(feature = "pyreport")] #[error("failed to convert sqlite to pyreport: '{0}'")] PyreportConversionError(String), + + #[error(transparent)] + ChunksFileParseError(#[from] ChunksFileParseError), } diff --git a/core/src/parsers/pyreport/chunks_serde.rs b/core/src/parsers/pyreport/chunks_serde.rs index 121057b..9372e7b 100644 --- a/core/src/parsers/pyreport/chunks_serde.rs +++ b/core/src/parsers/pyreport/chunks_serde.rs @@ -37,10 +37,84 @@ use std::{collections::HashMap, fmt, mem, sync::OnceLock}; use memchr::{memchr, memmem}; use serde::{de, de::IgnoredAny, Deserialize}; -use crate::report::pyreport::{CHUNKS_FILE_END_OF_CHUNK, CHUNKS_FILE_HEADER_TERMINATOR}; +use super::report_json::ParsedReportJson; +use crate::{ + error::CodecovError, + report::{ + models, + pyreport::{ + types::{self, PyreportCoverage, ReportLine}, + CHUNKS_FILE_END_OF_CHUNK, CHUNKS_FILE_HEADER_TERMINATOR, + }, + Report, ReportBuilder, + }, +}; + +pub fn parse_chunks_file( + input: &[u8], + _report_json: &ParsedReportJson, + builder: &mut B, +) -> Result<(), CodecovError> +where + B: ReportBuilder, + R: Report, +{ + let chunks_file = ChunksFile::new(input)?; + + let mut labels_index = HashMap::with_capacity(chunks_file.labels_index().len()); + for (index, name) in chunks_file.labels_index() { + let context = builder.insert_context(name)?; + labels_index.insert(index.clone(), context.id); + } + + let mut report_lines = vec![]; + + let mut chunks = chunks_file.chunks(); + while let Some(mut chunk) = chunks.next_chunk()? { + let mut line_no = 0; + report_lines.clear(); + while let Some(line) = chunk.next_line()? { + line_no += 1; + if let Some(line) = line { + let coverage_type = match line.1.unwrap_or_default() { + CoverageType::Line => models::CoverageType::Line, + CoverageType::Branch => models::CoverageType::Branch, + CoverageType::Method => models::CoverageType::Method, + }; + let sessions = line + .2 + .into_iter() + .map(|session| types::LineSession { + session_id: session.0, + coverage: session.1.into(), + branches: None, // TODO + partials: None, // TODO + complexity: None, // TODO + }) + .collect(); + + let mut report_line = ReportLine { + line_no, + coverage: line.0.into(), + coverage_type, + sessions, + _messages: None, + _complexity: None, + datapoints: None, // TODO + }; + report_line.normalize(); + report_lines.push(report_line); + } + } + // TODO: + // utils::save_report_lines()?; + } + + Ok(()) +} #[derive(Debug, thiserror::Error)] -pub enum ParserError { +pub enum ChunksFileParseError { #[error("unexpected EOF")] UnexpectedEof, #[error("unexpected input")] @@ -53,12 +127,12 @@ pub enum ParserError { InvalidLineRecord(#[source] serde_json::Error), } -impl PartialEq for ParserError { +impl PartialEq for ChunksFileParseError { fn eq(&self, other: &Self) -> bool { core::mem::discriminant(self) == core::mem::discriminant(other) } } -impl Eq for ParserError {} +impl Eq for ChunksFileParseError {} #[derive(Debug)] pub struct ChunksFile<'d> { @@ -67,7 +141,7 @@ pub struct ChunksFile<'d> { } impl<'d> ChunksFile<'d> { - pub fn new(mut input: &'d [u8]) -> Result { + pub fn new(mut input: &'d [u8]) -> Result { static HEADER_FINDER: OnceLock = OnceLock::new(); let header_finder = HEADER_FINDER.get_or_init(|| memmem::Finder::new(CHUNKS_FILE_HEADER_TERMINATOR)); @@ -75,8 +149,8 @@ impl<'d> ChunksFile<'d> { let file_header = if let Some(pos) = header_finder.find(input) { let header_bytes = &input[..pos]; input = &input[pos + header_finder.needle().len()..]; - let file_header: FileHeader = - serde_json::from_slice(header_bytes).map_err(ParserError::InvalidFileHeader)?; + let file_header: FileHeader = serde_json::from_slice(header_bytes) + .map_err(ChunksFileParseError::InvalidFileHeader)?; file_header } else { FileHeader::default() @@ -99,7 +173,7 @@ pub struct Chunks<'d> { } impl<'d> Chunks<'d> { - pub fn next_chunk(&mut self) -> Result>, ParserError> { + pub fn next_chunk(&mut self) -> Result>, ChunksFileParseError> { if self.input.is_empty() { return Ok(None); } @@ -123,9 +197,10 @@ impl<'d> Chunks<'d> { })); } - let header_bytes = next_line(&mut chunk_bytes).ok_or(ParserError::UnexpectedInput)?; - let chunk_header: ChunkHeader = - serde_json::from_slice(header_bytes).map_err(ParserError::InvalidFileHeader)?; + let header_bytes = + next_line(&mut chunk_bytes).ok_or(ChunksFileParseError::UnexpectedInput)?; + let chunk_header: ChunkHeader = serde_json::from_slice(header_bytes) + .map_err(ChunksFileParseError::InvalidFileHeader)?; Ok(Some(Chunk { chunk_header, @@ -144,7 +219,7 @@ impl<'d> Chunk<'d> { &self.chunk_header.present_sessions } - pub fn next_line(&mut self) -> Result>, ParserError> { + pub fn next_line(&mut self) -> Result>, ChunksFileParseError> { let Some(line) = next_line(&mut self.input) else { return Ok(None); }; @@ -154,7 +229,7 @@ impl<'d> Chunk<'d> { } let line_record: LineRecord = - serde_json::from_slice(line).map_err(ParserError::InvalidLineRecord)?; + serde_json::from_slice(line).map_err(ChunksFileParseError::InvalidLineRecord)?; return Ok(Some(Some(line_record))); } } @@ -217,7 +292,7 @@ pub struct LineRecord( #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] pub struct LineSession( /// session id - u32, + usize, /// coverage Coverage, /// TODO: branches @@ -260,6 +335,18 @@ pub enum Coverage { HitCount(u32), } +impl Into for Coverage { + fn into(self) -> PyreportCoverage { + match self { + Coverage::Partial => PyreportCoverage::Partial(), + Coverage::BranchTaken(covered, total) => { + PyreportCoverage::BranchesTaken { covered, total } + } + Coverage::HitCount(hits) => PyreportCoverage::HitCount(hits), + } + } +} + impl<'de> Deserialize<'de> for Coverage { fn deserialize(deserializer: D) -> Result where diff --git a/core/src/report/pyreport/types.rs b/core/src/report/pyreport/types.rs index 2c3b06e..4dba6d7 100644 --- a/core/src/report/pyreport/types.rs +++ b/core/src/report/pyreport/types.rs @@ -184,6 +184,23 @@ pub struct ReportLine { pub datapoints: Option>>, } +impl ReportLine { + pub fn normalize(&mut self) { + // Fix issues like recording branch coverage with `CoverageType::Method` + let (correct_coverage, correct_type) = + normalize_coverage_measurement(&self.coverage, &self.coverage_type); + self.coverage = correct_coverage; + self.coverage_type = correct_type; + + // Fix the `coverage` values in each `LineSession` as well + for line_session in &mut self.sessions { + let (correct_coverage, _) = + normalize_coverage_measurement(&line_session.coverage, &self.coverage_type); + line_session.coverage = correct_coverage; + } + } +} + /// Account for some quirks and malformed data. See code comments for details. pub(crate) fn normalize_coverage_measurement( coverage: &PyreportCoverage,