Skip to content

Commit

Permalink
working on improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
angelip2303 committed Jun 28, 2023
1 parent 80cccc3 commit fa719a5
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 117 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ keywords = ["pregel", "wikidata", "subsetting", "duckdb", "validation"]
categories = ["algorithms", "database", "mathematics", "science"]

[dependencies]
pregel-rs = { version = "0.0.13" }
pregel-rs = { path="../pregel-rs" }
wikidata-rs = { version = "0.0.4" }
polars = { version = "0.30.0", features = ["lazy", "is_in", "performant", "parquet", "chunked_ids", "list_eval", "dtype-categorical", "rows"] }
duckdb = { version = "0.7.1" }
Expand Down
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,18 @@ To use `pschema-rs` in your Rust project, you can add it as a dependency in your

```toml
[dependencies]
pschema = "0.0.3"
pschema = "0.0.4"
```

## Usage

Here's an example of how you can use `pschema-rs` to perform schema validation and generate a subset of data from Wikidata:
Here's an example of how you can use `pschema-rs` to perform schema validation and generate a subset of data from Wikidata.
Note that what we are doing here is first, defining the `ShapeExpression` we want the algorithm to validate. Next, we import
the Wikidata entities from a file. Note that the import methods we have defined create an edge DataFrame, and as such, we
need to call to the function `GraphFrame::from_edges(edges)`, which will build the GraphFrame from the imported edges. Lastly,
by calling `PSchema::new(start).validate(graph)`, we will both construct the `PSchema` algorithm provided the `ShapeExpression`
we have defined, first, and create the subset of the graph, second. Then, we print the results. Note that we can also export
the results to a file. See the [examples](https://github.com/angelip2303/pschema-rs/tree/main/examples) for more information.

```rust
use pregel_rs::graph_frame::GraphFrame;
Expand Down
160 changes: 67 additions & 93 deletions examples/cardinality/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,110 +24,84 @@ static GLOBAL: MiMalloc = MiMalloc;

fn main() -> Result<(), String> {
// Define validation rules
let shape = Cardinality::new(
ShapeOr::new(
"type",
let shape = ShapeReference::new(
"protein",
"<http://purl.uniprot.org/core/annotation>",
ShapeAnd::new(
"annotation",
vec![
TripleConstraint::new(
"Transmembrane_Annotation",
"<http://www.w3.org/1999/02/22-rdf-syntax-ns#type>",
NodeConstraint::Value(
"<http://purl.uniprot.org/core/Transmembrane_Annotation>",
),
ShapeReference::new(
"range",
"<http://purl.uniprot.org/core/range>",
ShapeAnd::new(
"grouping",
vec![
ShapeReference::new(
"lower_range",
"<http://biohackathon.org/resource/faldo#begin>",
TripleConstraint::new(
"begin",
"<http://biohackathon.org/resource/faldo#position>",
NodeConstraint::Any,
)
.into(),
)
.into(),
ShapeReference::new(
"upper_range",
"<http://biohackathon.org/resource/faldo#end>",
TripleConstraint::new(
"end",
"<http://biohackathon.org/resource/faldo#position>",
NodeConstraint::Any,
)
.into(),
)
.into(),
],
)
.into(),
)
.into(),
TripleConstraint::new(
"Transmembrane_Annotation",
"<http://www.w3.org/1999/02/22-rdf-syntax-ns#type>",
NodeConstraint::Value(
"<http://purl.uniprot.org/core/Topological_Domain_Annotation>",
),
"comment",
"<http://www.w3.org/2000/01/rdf-schema#comment>",
NodeConstraint::Any,
)
.into(),
Cardinality::new(
"cardinality",
ShapeOr::new(
"type",
vec![
TripleConstraint::new(
"Transmembrane_Annotation",
"<http://www.w3.org/1999/02/22-rdf-syntax-ns#type>",
NodeConstraint::Value(
"<http://purl.uniprot.org/core/Transmembrane_Annotation>",
),
)
.into(),
TripleConstraint::new(
"Transmembrane_Annotation",
"<http://www.w3.org/1999/02/22-rdf-syntax-ns#type>",
NodeConstraint::Value(
"<http://purl.uniprot.org/core/Topological_Domain_Annotation>",
),
)
.into(),
],
)
.into(),
Bound::Zero,
Bound::Many,
)
.into(),
],
)
.into(),
Bound::Zero,
Bound::Many,
)
.into();
// let shape = ShapeReference::new(
// "protein",
// "<http://purl.uniprot.org/core/annotation>",
// ShapeAnd::new(
// "annotation",
// vec![
// ShapeReference::new(
// "range",
// "<http://purl.uniprot.org/core/range>",
// ShapeAnd::new(
// "grouping",
// vec![
// ShapeReference::new(
// "lower_range",
// "<http://biohackathon.org/resource/faldo#begin>",
// TripleConstraint::new(
// "begin",
// "<http://biohackathon.org/resource/faldo#position>",
// NodeConstraint::Any,
// )
// .into(),
// )
// .into(),
// ShapeReference::new(
// "upper_range",
// "<http://biohackathon.org/resource/faldo#end>",
// TripleConstraint::new(
// "end",
// "<http://biohackathon.org/resource/faldo#position>",
// NodeConstraint::Any,
// )
// .into(),
// )
// .into(),
// ],
// )
// .into(),
// )
// .into(),
// TripleConstraint::new(
// "comment",
// "<http://www.w3.org/2000/01/rdf-schema#comment>",
// NodeConstraint::Any,
// )
// .into(),
// Cardinality::new(
// ShapeOr::new(
// "type",
// vec![
// TripleConstraint::new(
// "Transmembrane_Annotation",
// "<http://www.w3.org/1999/02/22-rdf-syntax-ns#type>",
// NodeConstraint::Value(
// "<http://purl.uniprot.org/core/Transmembrane_Annotation>",
// ),
// )
// .into(),
// TripleConstraint::new(
// "Transmembrane_Annotation",
// "<http://www.w3.org/1999/02/22-rdf-syntax-ns#type>",
// NodeConstraint::Value(
// "<http://purl.uniprot.org/core/Topological_Domain_Annotation>",
// ),
// )
// .into(),
// ],
// )
// .into(),
// Bound::Zero,
// Bound::Many,
// )
// .into(),
// ],
// )
// .into(),
// )
// .into();

// Load Wikidata entities
let edges = NTriples::import("uniprotkb_reviewed_viruses_10239_0.nt")?;
Expand Down
2 changes: 1 addition & 1 deletion examples/wikidata_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ fn main() -> Result<(), String> {
));

// Load Wikidata entities
let edges = match DuckDB::import("../wd2duckdb/wikidata-20170821-all.duckdb") {
let edges = match DuckDB::import("wikidata-20170821-all.duckdb") {
Ok(edges) => edges,
Err(_) => return Err(String::from("Error creating the edges :(")),
};
Expand Down
43 changes: 32 additions & 11 deletions src/pschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::shape::shape_tree::{ShapeTree, ShapeTreeItem};
use crate::shape::shex::{Shape, Validate};
use crate::utils::check::check_field;

use polars::enable_string_cache;
use polars::prelude::*;
use pregel_rs::graph_frame::GraphFrame;
use pregel_rs::pregel::{Column, MessageReceiver, PregelBuilder};
Expand Down Expand Up @@ -62,6 +63,7 @@ impl<T: Literal + Clone> PSchema<T> {
/// there is an error during execution, it returns an `Err(PolarsError)` with a
/// description of the error.
pub fn validate(self, graph: GraphFrame) -> PolarsResult<DataFrame> {
enable_string_cache(true);
// First, we check if the graph has the required columns. If the graph does not have the
// required columns or in case they are empty, we return an error. The required columns are:
// - `subject`: the source vertex of the edge
Expand Down Expand Up @@ -96,7 +98,12 @@ impl<T: Literal + Clone> PSchema<T> {
.lazy()
.select(&[
col(Column::VertexId.as_ref()),
col(Column::Custom("labels").as_ref()),
col(Column::Custom("labels").as_ref())
.explode()
.drop_nulls()
.unique()
.implode()
.over([Column::VertexId.as_ref()]),
])
.filter(
col(Column::Custom("labels").as_ref())
Expand Down Expand Up @@ -143,7 +150,13 @@ impl<T: Literal + Clone> PSchema<T> {
}
}
}
ans.cast(DataType::Categorical(None))
match concat_list([
Column::subject(Column::Custom("labels")),
ans.cast(DataType::Categorical(None)),
]) {
Ok(concat) => concat,
Err(_) => Column::subject(Column::Custom("labels")),
}
}

/// The function returns an expression that aggregates messages by exploding a
Expand All @@ -156,7 +169,7 @@ impl<T: Literal + Clone> PSchema<T> {
/// element in the column), and drops any rows that have NULL values in the
/// resulting column.
fn aggregate_messages() -> Expr {
Column::msg(None).filter(Column::msg(None).is_not_null())
Column::msg(None).explode()
}

/// The function takes a shape iterator, validates the shapes in it, concatenates
Expand Down Expand Up @@ -191,10 +204,10 @@ impl<T: Literal + Clone> PSchema<T> {
#[cfg(test)]
mod tests {
use crate::pschema::PSchema;
use crate::shape::shex::Shape;
use crate::utils::examples::Value::*;
use crate::utils::examples::*;

use crate::shape::shex::Shape;
use polars::df;
use polars::prelude::*;
use pregel_rs::graph_frame::GraphFrame;
Expand All @@ -210,6 +223,7 @@ mod tests {
.select([col("labels").list().lengths()])
.collect()
.unwrap();
println!("count: {:?}", count);
match count == expected {
true => Ok(()),
false => return Err(String::from("The DataFrames are not equals")),
Expand All @@ -225,18 +239,20 @@ mod tests {
Ok(graph) => graph,
Err(error) => return Err(error),
};

let expected = match DataFrame::new(vec![Series::new(Custom("labels").as_ref(), result)]) {
Ok(expected) => expected,
Err(_) => return Err(String::from("Error creating the expected DataFrame")),
};

match PSchema::new(schema).validate(graph) {
Ok(actual) => {
println!("actual: {:?}", actual);
assert(expected, actual)
}
Err(error) => Err(error.to_string()),
Err(error) => {
println!("asd");
println!("{}", error);
Err(error.to_string())
}
}
}

Expand All @@ -252,17 +268,17 @@ mod tests {

#[test]
fn complex_test() -> Result<(), String> {
test(paper_graph(), vec![4u32, 1u32], complex_schema())
test(paper_graph(), vec![4u32, 1u32, 1u32], complex_schema())
}

#[test]
fn reference_test() -> Result<(), String> {
test(paper_graph(), vec![1u32], reference_schema())
test(paper_graph(), vec![2u32, 1u32, 1u32], reference_schema())
}

#[test]
fn optional_test() -> Result<(), String> {
test(paper_graph(), vec![1u32, 1u32], optional_schema())
test(paper_graph(), vec![3u32, 1u32, 1u32], optional_schema())
}

#[test]
Expand All @@ -277,7 +293,12 @@ mod tests {

#[test]
fn cardinality_test() -> Result<(), String> {
test(paper_graph(), vec![1u32, 1u32], cardinality_schema())
test(paper_graph(), vec![3u32, 1u32], cardinality_schema())
}

#[test]
fn vprog_to_vprog_test() -> Result<(), String> {
test(paper_graph(), vec![3u32], vprog_to_vprog())
}

#[test]
Expand Down
5 changes: 5 additions & 0 deletions src/shape/shape_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,4 +165,9 @@ pub mod tests {
fn optional_schema_test() {
assert_eq!(3, ShapeTree::new(optional_schema()).into_iter().count())
}

#[test]
fn v_prog_to_vprog_schema_test() {
assert_eq!(3, ShapeTree::new(vprog_to_vprog()).into_iter().count())
}
}
Loading

0 comments on commit fa719a5

Please sign in to comment.