Skip to content

Commit

Permalink
including support for NTriples export
Browse files Browse the repository at this point in the history
  • Loading branch information
angelip2303 committed Jun 27, 2023
1 parent aa74086 commit 93d6787
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 23 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ keywords = ["pregel", "wikidata", "subsetting", "duckdb", "validation"]
categories = ["algorithms", "database", "mathematics", "science"]

[dependencies]
pregel-rs = { version = "0.0.12" }
pregel-rs = { version = "0.0.13" }
wikidata-rs = { version = "0.0.4" }
polars = { version = "0.30.0", features = ["lazy", "is_in", "performant", "parquet", "chunked_ids", "list_eval", "dtype-categorical"] }
polars = { version = "0.30.0", features = ["lazy", "is_in", "performant", "parquet", "chunked_ids", "list_eval", "dtype-categorical", "rows"] }
duckdb = { version = "0.7.1" }
rayon = "1.7.0"
wikidata = "0.3.0"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ To use `pschema-rs` in your Rust project, you can add it as a dependency in your

```toml
[dependencies]
pschema = "0.0.3"
pschema = "0.0.4"
```

## Usage
Expand Down
7 changes: 2 additions & 5 deletions examples/from_duckdb/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use pregel_rs::graph_frame::GraphFrame;
use pschema_rs::backends::duckdb::DuckDB;
use pschema_rs::backends::parquet::Parquet;
use pschema_rs::backends::Backend;
use pschema_rs::pschema::PSchema;
use pschema_rs::shape::shex::Shape;
Expand All @@ -20,11 +21,7 @@ fn main() -> Result<(), String> {
// Perform schema validation
match GraphFrame::from_edges(edges) {
Ok(graph) => match PSchema::new(start).validate(graph) {
Ok(result) => {
println!("Schema validation result:");
println!("{:?}", result);
Ok(())
}
Ok(mut result) => Parquet::export("3000lines-subset.parquet", &mut result),
Err(error) => Err(error.to_string()),
},
Err(error) => Err(format!("Cannot create a GraphFrame: {}", error)),
Expand Down
4 changes: 2 additions & 2 deletions examples/from_ntriples/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ fn main() -> Result<(), String> {
// Perform schema validation
match GraphFrame::from_edges(edges) {
Ok(graph) => match PSchema::new(start).validate(graph) {
Ok(result) => {
Ok(mut result) => {
println!("Schema validation result:");
println!("{:?}", result);
Ok(())
NTriples::export("linkedmdb-latest-subset.nt", &mut result)
}
Err(error) => Err(error.to_string()),
},
Expand Down
5 changes: 2 additions & 3 deletions examples/from_uniprot/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::time::Instant;

use pregel_rs::graph_frame::GraphFrame;
use pschema_rs::backends::ntriples::NTriples;
use pschema_rs::backends::parquet::Parquet;
use pschema_rs::backends::Backend;
use pschema_rs::pschema::PSchema;
use pschema_rs::shape::shex::{ShapeAnd, ShapeReference, TripleConstraint};
Expand Down Expand Up @@ -54,10 +53,10 @@ fn main() -> Result<(), String> {
let start = Instant::now();
match GraphFrame::from_edges(edges) {
Ok(graph) => match PSchema::new(shape).validate(graph) {
Ok(subset) => {
Ok(mut subset) => {
let duration = start.elapsed();
println!("Time elapsed in validate() is: {:?}", duration);
Parquet::export("uniprotkb_reviewed_viruses_10239.parquet", subset)
NTriples::export("uniprotkb_reviewed_viruses_10239_0-subset.nt", &mut subset)
}
Err(error) => Err(error.to_string()),
},
Expand Down
4 changes: 2 additions & 2 deletions examples/wikidata_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ fn main() -> Result<(), String> {
// Perform schema validation
let start = Instant::now();
match PSchema::new(shape).validate(graph) {
Ok(subset) => {
Ok(mut subset) => {
let duration = start.elapsed();
println!("Time elapsed in validate() is: {:?}", duration);
Parquet::export("wikidata-20170821-subset.parquet", subset)
Parquet::export("wikidata-20170821-subset.parquet", &mut subset)
}
Err(_) => return Err(String::from("Error creating the sub-graph :(")),
}
Expand Down
2 changes: 1 addition & 1 deletion src/backends/duckdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl Backend for DuckDB {
.reduce(DataFrame::empty, |acc, e| acc.vstack(&e).unwrap()))
}

fn export(_path: &str, _df: DataFrame) -> Result<(), String> {
fn export(_path: &str, _df: &mut DataFrame) -> Result<(), String> {
todo!()
}
}
2 changes: 1 addition & 1 deletion src/backends/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ pub mod ntriples;

pub trait Backend {
fn import(path: &str) -> Result<DataFrame, String>;
fn export(path: &str, df: DataFrame) -> Result<(), String>;
fn export(path: &str, df: &mut DataFrame) -> Result<(), String>;
}
60 changes: 57 additions & 3 deletions src/backends/ntriples.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::io::BufWriter;
use std::{fs::File, io::BufReader};

use polars::df;
use polars::prelude::*;
use pregel_rs::pregel::Column;
use rio_api::model::Triple;
use rio_api::formatter::TriplesFormatter;
use rio_api::model::{NamedNode, Triple};
use rio_api::parser::TriplesParser;
use rio_turtle::NTriplesFormatter;
use rio_turtle::NTriplesParser;
use rio_turtle::TurtleError;

Expand Down Expand Up @@ -50,7 +53,58 @@ impl Backend for NTriples {
}
}

fn export(_path: &str, _df: DataFrame) -> Result<(), String> {
unimplemented!()
fn export(path: &str, df: &mut DataFrame) -> Result<(), String> {
let file = File::create(path).unwrap();
let writer = BufWriter::new(file);
let mut formatter = NTriplesFormatter::new(writer);

for i in 0..df.height() {
let row = match df.get_row(i) {
Ok(row) => row.0,
Err(_) => return Err(format!("Error retrieving the {}th row", i)),
};

if let Err(_) = formatter.format(&Triple {

Check failure on line 67 in src/backends/ntriples.rs

View workflow job for this annotation

GitHub Actions / Clippy (stable)

redundant pattern matching, consider using `is_err()`
subject: match row.get(0) {
Some(subject) => match subject {
AnyValue::Utf8(iri) => NamedNode {
iri: &iri[1..iri.len() - 1],
}
.into(),
_ => return Err(format!("Cannot parse from non-string at {}th row", i)),
},
None => return Err(format!("Error obtaining the subject of the {}th row", i)),
},
predicate: match row.get(1) {
Some(predicate) => match predicate {
AnyValue::Utf8(iri) => NamedNode {

Check failure on line 80 in src/backends/ntriples.rs

View workflow job for this annotation

GitHub Actions / Clippy (stable)

useless conversion to the same type: `rio_api::model::NamedNode<'_>`
iri: &iri[1..iri.len() - 1],
}
.into(),
_ => return Err(format!("Cannot parse from non-string at {}th row", i)),
},
None => {
return Err(format!("Error obtaining the predicate of the {}th row", i))
}
},
object: match row.get(2) {
Some(object) => match object {
AnyValue::Utf8(iri) => NamedNode {
iri: &iri[1..iri.len() - 1],
}
.into(),
_ => return Err(format!("Cannot parse from non-string at {}th row", i)),
},
None => return Err(format!("Error obtaining the object of the {}th row", i)),
},
}) {
return Err(format!("Error parsing the {}th row", i));
}
}

match formatter.finish() {
Ok(_) => Ok(()),
Err(_) => return Err(format!("Error storing the results to the file")),

Check failure on line 107 in src/backends/ntriples.rs

View workflow job for this annotation

GitHub Actions / Clippy (stable)

unneeded `return` statement

Check failure on line 107 in src/backends/ntriples.rs

View workflow job for this annotation

GitHub Actions / Clippy (stable)

useless use of `format!`
}
}
}
2 changes: 1 addition & 1 deletion src/backends/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ impl Backend for Parquet {
todo!()
}

fn export(path: &str, mut df: DataFrame) -> Result<(), String> {
fn export(path: &str, mut df: &mut DataFrame) -> Result<(), String> {
let buffer = match File::create(path) {
Ok(buffer) => buffer,
Err(_) => return Err(String::from("Error creating the Parquet file")),
Expand Down
17 changes: 15 additions & 2 deletions src/pschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl<T: Literal + Clone> PSchema<T> {
v_prog_iter.next(); // skip the leaf nodes :D
// Then, we can define the algorithm that will be executed on the graph. The algorithm
// will be executed in parallel on all vertices of the graph.
let pregel = PregelBuilder::new(graph)
let pregel = PregelBuilder::new(graph.to_owned())
.max_iterations(ShapeTree::new(start).iterations())
.with_vertex_column(Column::Custom("labels"))
.initial_message(Self::initial_message())
Expand All @@ -104,6 +104,17 @@ impl<T: Literal + Clone> PSchema<T> {
.lengths()
.gt(lit(0)),
)
.left_join(
graph.to_owned().edges.lazy(),

Check failure on line 108 in src/pschema.rs

View workflow job for this annotation

GitHub Actions / Clippy (stable)

redundant clone
Column::VertexId.as_ref(),
Column::Subject.as_ref(),
)
.select(&[
col(Column::VertexId.as_ref()).alias(Column::Subject.as_ref()),
col(Column::Predicate.as_ref()),
col(Column::Object.as_ref()),
col(Column::Custom("labels").as_ref()),
])
.collect(),
Err(error) => Err(error),
}
Expand Down Expand Up @@ -193,7 +204,9 @@ mod tests {
fn assert(expected: DataFrame, actual: DataFrame) -> Result<(), String> {
let count = actual
.lazy()
.sort(Column::VertexId.as_ref(), Default::default())
.groupby([Column::Subject.as_ref()])
.agg([col("labels").first()])
.sort(Column::Subject.as_ref(), Default::default())
.select([col("labels").list().lengths()])
.collect()
.unwrap();
Expand Down

0 comments on commit 93d6787

Please sign in to comment.