Skip to content

Commit

Permalink
Merge pull request #12 from ezyang/jjwu_generalize_parsers
Browse files Browse the repository at this point in the history
Generalize structured log parsers
  • Loading branch information
jamesjwu authored Apr 2, 2024
2 parents d0c1afa + f5cace0 commit 2ac593a
Show file tree
Hide file tree
Showing 3 changed files with 263 additions and 92 deletions.
124 changes: 37 additions & 87 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use anyhow::anyhow;
use fxhash::FxHashMap;
use md5::{Digest, Md5};
use std::ffi::{OsStr, OsString};

use regex::Regex;
use std::fs::File;
use std::io::{self, BufRead};
use std::path::Path;
use std::path::PathBuf;
use tinytemplate::TinyTemplate;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
Expand All @@ -15,13 +13,12 @@ use std::time::Instant;

use crate::types::*;
use crate::templates::*;

use crate::parsers::all_parsers;
mod parsers;
mod templates;
mod types;


pub type ParseOutput = Vec<(PathBuf, String)>;

pub struct ParseConfig {
pub strict: bool,
}
Expand Down Expand Up @@ -86,6 +83,8 @@ pub fn parse_path(path: &PathBuf, config: ParseConfig) -> anyhow::Result<ParseOu
})
.peekable();

let all_parsers = all_parsers(&tt);

while let Some((lineno, line)) = iter.next() {
bytes_read += line.len() as u64;
pb.set_position(bytes_read);
Expand Down Expand Up @@ -140,23 +139,8 @@ pub fn parse_path(path: &PathBuf, config: ParseConfig) -> anyhow::Result<ParseOu
}
};

let compile_id_dir: PathBuf = e
.compile_id
.as_ref()
.map_or(
format!("unknown_{lineno}"),
|CompileId {
frame_id,
frame_compile_id,
attempt,
}| { format!("{frame_id}_{frame_compile_id}_{attempt}") },
)
.into();

let subdir = &compile_id_dir;

let mut payload = String::new();
if let Some(expect) = e.has_payload {
if let Some(ref expect) = e.has_payload {
let mut first = true;
while let Some((_payload_lineno, payload_line)) =
iter.next_if(|(_, l)| l.starts_with('\t'))
Expand Down Expand Up @@ -185,8 +169,39 @@ pub fn parse_path(path: &PathBuf, config: ParseConfig) -> anyhow::Result<ParseOu
stats.ok += 1;

// lol this clone, probably shouldn't use entry
// TODO: output should be able to generate this without explicitly creating
let compile_directory = directory.entry(e.compile_id.clone()).or_default();

for parser in &all_parsers {
if let Some(md) = parser.get_metadata(&e) {
let results = parser.parse(lineno, md, e.rank, &e.compile_id, &payload);
match results {
Ok(results) => {
for (filename, out) in results {
output.push((filename.clone(), out));
compile_directory.push(filename);
}
}
Err(err) => {
match parser.name() {
"dynamo_guards" => {
eprintln!("Failed to parse guards json: {}", err);
stats.fail_dynamo_guards_json += 1;
}
name => {
eprintln!("Parser {name} failed: {err}");
stats.fail_parser += 1;
}
}
}
}

}
}

// TODO: implement this as StructuredLogParser
// This one is hard because it consumes the stack, and
// I don't want to clone the stack when passing it as reference
if let Some(m) = e.dynamo_start {
if let Some(stack) = m.stack {
stack_trie.insert(
Expand All @@ -198,72 +213,6 @@ pub fn parse_path(path: &PathBuf, config: ParseConfig) -> anyhow::Result<ParseOu
};
};

let mut output_dump =
|filename: &str, sentinel: Option<EmptyMetadata>| -> anyhow::Result<()> {
if sentinel.is_some() {
let f = subdir.join(filename);
output.push((f, payload.clone()));
compile_directory.push(compile_id_dir.join(filename));
}
Ok(())
};

output_dump("optimize_ddp_split_graph.txt", e.optimize_ddp_split_graph)?;
output_dump("compiled_autograd_graph.txt", e.compiled_autograd_graph)?;
output_dump("aot_forward_graph.txt", e.aot_forward_graph)?;
output_dump("aot_backward_graph.txt", e.aot_backward_graph)?;
output_dump("aot_joint_graph.txt", e.aot_joint_graph)?;
output_dump("inductor_post_grad_graph.txt", e.inductor_post_grad_graph)?;

if e.dynamo_output_graph.is_some() {
// TODO: dump sizes
let filename = "dynamo_output_graph.txt";
let f = subdir.join(filename);
output.push((f, payload.clone()));
compile_directory.push(compile_id_dir.join(filename));
}

if e.dynamo_guards.is_some() {
let filename = "dynamo_guards.html";
let f = subdir.join(filename);
match serde_json::from_str::<Vec<DynamoGuard>>(payload.as_str()) {
Ok(guards) => {
let guards_context = DynamoGuardsContext { guards };
output.push((f, tt.render("dynamo_guards.html", &guards_context)?));
compile_directory.push(compile_id_dir.join(filename));
}
Err(err) => {
eprintln!("Failed to parse guards json: {}", err);
stats.fail_dynamo_guards_json += 1;
}
}
}

if let Some(metadata) = e.inductor_output_code {
let filename = metadata
.filename
.as_ref()
.and_then(|p| Path::file_stem(p))
.map_or_else(
|| PathBuf::from("inductor_output_code.txt"),
|stem| {
let mut r = OsString::from("inductor_output_code_");
r.push(stem);
r.push(OsStr::new(".txt"));
r.into()
},
);
let f = subdir.join(&filename);
output.push((f, payload.clone()));
compile_directory.push(compile_id_dir.join(filename));
}

if let Some(metadata) = e.optimize_ddp_split_child {
let filename = format!("optimize_ddp_split_child_{}.txt", metadata.name);
let f = subdir.join(&filename);
output.push((f, payload.clone()));
compile_directory.push(compile_id_dir.join(filename));
}
}
pb.finish_with_message("done");
spinner.finish();
Expand All @@ -288,6 +237,7 @@ pub fn parse_path(path: &PathBuf, config: ParseConfig) -> anyhow::Result<ParseOu
+ stats.fail_payload_md5
+ stats.other_rank
+ stats.fail_dynamo_guards_json
+ stats.fail_parser
> 0)
{
// Report something went wrong
Expand Down
208 changes: 208 additions & 0 deletions src/parsers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
use crate::types::*;
use std::path::PathBuf;
use std::ffi::{OsStr, OsString};
use std::path::Path;
use tinytemplate::TinyTemplate;

/**
* StructuredLogParser
* Parses a structured log and returns a vec of file outputs.
* Implement this trait to add your own analyses.
*
* 'e is the lifetime of the envelope being parsed
*/
pub trait StructuredLogParser {
// If this returns Some value, the parser will be run on that metadata.
// Otherwise, it will be skipped.
fn get_metadata<'e>(&self, e: &'e Envelope) -> Option<Metadata<'e>>;

// Take a log input and the metadata you asked for, return a set of files to write
fn parse<'e>(&self,
lineno: usize, // Line number from log
metadata: Metadata<'e>, // Metadata from get_metadata
rank: Option<u32>, // Rank of the log
compile_id: &Option<CompileId>, // Compile ID of the envelope
payload: &str // Payload from the log (empty string when None)
) -> anyhow::Result<ParseOutput>;

// Name of the parser, for error logging
fn name(&self) -> &'static str;
}

// Takes a filename and a payload and writes that payload into a the file
fn simple_file_output(
filename: &str,
lineno: usize,
compile_id: &Option<CompileId>,
payload: &str
) -> anyhow::Result<ParseOutput> {
let compile_id_dir: PathBuf = compile_id
.as_ref()
.map_or(
format!("unknown_{lineno}"),
|CompileId {
frame_id,
frame_compile_id,
attempt,
}| { format!("{frame_id}_{frame_compile_id}_{attempt}") },
)
.into();
let subdir = PathBuf::from(compile_id_dir);
let f = subdir.join(filename);
Ok(Vec::from([(f, String::from(payload))]))
}

/**
* Parser for simple output dumps where the metadata is a sentinel {}
*/
pub struct SentinelFileParser {
filename: &'static str,
get_sentinel: fn (&Envelope) -> Option<&EmptyMetadata>,
} impl SentinelFileParser {
pub fn new(filename: &'static str, get_sentinel: fn (&Envelope) -> Option<&EmptyMetadata>) -> Self {
Self { filename, get_sentinel }
}
}
impl StructuredLogParser for SentinelFileParser {
fn name(&self) -> &'static str {
self.filename
}
fn get_metadata<'e>(&self, e: &'e Envelope) -> Option<Metadata<'e>> {
(self.get_sentinel)(e).map(|m| Metadata::Empty(m))
}
fn parse<'e>(&self,
lineno: usize,
_metadata: Metadata<'e>,
_rank: Option<u32>,
compile_id: &Option<CompileId>,
payload: &str
) -> anyhow::Result<ParseOutput> {
simple_file_output(&format!("{}.txt",self.filename), lineno, compile_id, payload)
}
}

// Same as SentinelFileParser, but can log the size of the graph
pub struct DynamoOutputGraphParser;
impl StructuredLogParser for DynamoOutputGraphParser {
fn name(&self) -> &'static str {
"dynamo_output_graph"
}
fn get_metadata<'e>(&self, e: &'e Envelope) -> Option<Metadata<'e>> {
e.dynamo_output_graph.as_ref().map(|m| Metadata::DynamoOutputGraph(m))
}
fn parse<'e>(&self,
lineno: usize,
_metadata: Metadata<'e>, // TODO: log size of graph
_rank: Option<u32>,
compile_id: &Option<CompileId>,
payload: &str
) -> anyhow::Result<ParseOutput> {
simple_file_output("dynamo_output_graph.txt", lineno, compile_id, payload)
}
}

pub struct DynamoGuardParser<'t> {
tt: &'t TinyTemplate<'t>,
}
impl StructuredLogParser for DynamoGuardParser<'_> {
fn name(&self) -> &'static str {
"dynamo_guards"
}
fn get_metadata<'e>(&self, e: &'e Envelope) -> Option<Metadata<'e>> {
e.dynamo_guards.as_ref().map(|m| Metadata::Empty(m))
}
fn parse<'e>(&self,
lineno: usize,
_metadata: Metadata<'e>,
_rank: Option<u32>,
compile_id: &Option<CompileId>,
payload: &str
) -> anyhow::Result<ParseOutput> {
let filename = format!("{}.html", self.name());
let guards = serde_json::from_str::<Vec<DynamoGuard>>(payload)?;
let guards_context = DynamoGuardsContext { guards };
let output = self.tt.render(&filename, &guards_context)?;
simple_file_output(&filename, lineno, compile_id, &output)
}
}

pub struct InductorOutputCodeParser;
impl StructuredLogParser for InductorOutputCodeParser {
fn name(&self) -> &'static str {
"inductor_output_code"
}
fn get_metadata<'e>(&self, e: &'e Envelope) -> Option<Metadata<'e>> {
e.inductor_output_code.as_ref().map(|m| Metadata::InductorOutputCode(m))
}

fn parse<'e>(&self,
lineno: usize,
metadata: Metadata<'e>,
_rank: Option<u32>,
compile_id: &Option<CompileId>,
payload: &str
) -> anyhow::Result<ParseOutput> {
if let Metadata::InductorOutputCode(metadata) = metadata {
let filename = metadata
.filename
.as_ref()
.and_then(|p| Path::file_stem(p))
.map_or_else(
|| PathBuf::from("inductor_output_code.txt"),
|stem| {
let mut r = OsString::from("inductor_output_code_");
r.push(stem);
r.push(OsStr::new(".txt"));
r.into()
},
);
simple_file_output(&filename.to_string_lossy(), lineno, compile_id, payload)
} else {
Err(anyhow::anyhow!("Expected InductorOutputCode metadata"))
}
}
}

pub struct OptimizeDdpSplitChildParser;
impl StructuredLogParser for OptimizeDdpSplitChildParser {
fn name(&self) -> &'static str {
"optimize_ddp_split_child"
}
fn get_metadata<'e>(&self, e: &'e Envelope) -> Option<Metadata<'e>> {
e.optimize_ddp_split_child.as_ref().map(|m| Metadata::OptimizeDdpSplitChild(m))
}

fn parse<'e>(&self,
lineno: usize,
metadata: Metadata<'e>,
_rank: Option<u32>,
compile_id: &Option<CompileId>,
payload: &str
) -> anyhow::Result<ParseOutput> {
if let Metadata::OptimizeDdpSplitChild(m) = metadata {
let filename = format!("optimize_ddp_split_child_{}.txt", m.name);
simple_file_output(&filename, lineno, compile_id, payload)
} else {
Err(anyhow::anyhow!("Expected OptimizeDdpSplitChild metadata"))
}
}
}

// Register your parser here
pub fn all_parsers<'t>(tt: &'t TinyTemplate<'t>) -> Vec<Box<dyn StructuredLogParser + 't>> {
// We need to use Box wrappers here because vecs in Rust need to have known size
let result : Vec<Box<dyn StructuredLogParser>> = vec![
Box::new(SentinelFileParser::new("optimize_ddp_split_graph", |e| e.optimize_ddp_split_graph.as_ref())),
Box::new(SentinelFileParser::new("compiled_autograd_graph", |e| e.compiled_autograd_graph.as_ref())),
Box::new(SentinelFileParser::new("aot_forward_graph", |e| e.aot_forward_graph.as_ref())),
Box::new(SentinelFileParser::new("aot_backward_graph", |e| e.aot_backward_graph.as_ref())),
Box::new(SentinelFileParser::new("aot_joint_graph", |e| e.aot_joint_graph.as_ref())),
Box::new(SentinelFileParser::new("inductor_post_grad_graph", |e| e.inductor_post_grad_graph.as_ref())),
Box::new(DynamoOutputGraphParser),
Box::new(DynamoGuardParser { tt }),
Box::new(InductorOutputCodeParser),
Box::new(OptimizeDdpSplitChildParser),
];

result
}
Loading

0 comments on commit 2ac593a

Please sign in to comment.