Skip to content

Commit

Permalink
using chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
angelip2303 committed Oct 22, 2023
1 parent 9236be9 commit c69b0e4
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 88 deletions.
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,4 @@ edition = "2021"
zarr3 = { git = "https://github.com/clbarnes/zarr3-rs.git" }
rdf-rs = { path = "./rdf-rs" }
sophia = { version = "0.7.2" }
ndarray = { version = "0.15.6", features = [ "rayon" ] }
bimap = "0.6.3"
ndarray = { version = "0.15.6", features = [ "rayon" ] }
3 changes: 2 additions & 1 deletion rdf-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ edition = "2021"

[dependencies]
sophia = { version = "0.7.2", features = ["xml"] }
bimap = "0.6.3"
bimap = "0.6.3"
rayon = "1.8.0"
41 changes: 7 additions & 34 deletions rdf-rs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use bimap::BiMap;
use ntriples::NTriples;
use rdf_xml::RdfXml;
use sophia::parser::TripleParser;
use sophia::serializer::TripleSerializer;
use sophia::term::BoxTerm;
use sophia::term::Term;
use sophia::triple::stream::TripleSource;
use std::fs::File;
use std::io::{BufReader, BufWriter};
Expand All @@ -13,17 +12,17 @@ mod ntriples;
mod rdf_xml;
mod turtle;

pub type Graph = Vec<[BoxTerm; 3]>;
pub type RdfGraph = Vec<[Term<String>; 3]>;

pub struct RdfParser {
pub graph: Graph,
pub graph: RdfGraph,
}

pub struct RdfSerializer;

trait Backend<P: TripleParser<BufReader<File>>, F: TripleSerializer> {
fn parse(&self, path: &str) -> Result<Graph, String> {
let mut graph: Graph = vec![];
fn parse(&self, path: &str) -> Result<RdfGraph, String> {
let mut graph: RdfGraph = vec![];

let reader = BufReader::new(match File::open(path) {
Ok(file) => file,
Expand All @@ -40,7 +39,7 @@ trait Backend<P: TripleParser<BufReader<File>>, F: TripleSerializer> {
}
}

fn format(&self, path: &str, graph: Graph) -> Result<(), String> {
fn format(&self, path: &str, graph: RdfGraph) -> Result<(), String> {
let file = File::create(path).unwrap();
let writer = BufWriter::new(file);
let mut formatter = self.concrete_formatter(writer);
Expand Down Expand Up @@ -74,36 +73,10 @@ impl RdfParser {
},
})
}

pub fn extract(
&self,
) -> (
BiMap<BoxTerm, usize>,
BiMap<BoxTerm, usize>,
BiMap<BoxTerm, usize>,
) {
let mut subjects = BiMap::<BoxTerm, usize>::new();
let mut predicates = BiMap::<BoxTerm, usize>::new();
let mut objects = BiMap::<BoxTerm, usize>::new();

self.graph.iter().for_each(|triple| {
if !subjects.contains_left(&triple[0].to_owned()) {
subjects.insert(triple[0].to_owned(), subjects.len());
}
if !predicates.contains_left(&triple[1].to_owned()) {
predicates.insert(triple[1].to_owned(), predicates.len());
}
if !objects.contains_left(&triple[2].to_owned()) {
objects.insert(triple[2].to_owned(), objects.len());
}
});

(subjects, predicates, objects)
}
}

impl RdfSerializer {
pub fn serialize(path: &str, graph: Graph) -> Result<(), String> {
pub fn serialize(path: &str, graph: RdfGraph) -> Result<(), String> {
match path.split('.').last() {
Some("nt") => NTriples.format(path, graph),
Some("ttl") => Turtle.format(path, graph),
Expand Down
141 changes: 90 additions & 51 deletions src/remote_hdt.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use bimap::BiHashMap;
use ndarray::parallel::prelude::IntoParallelRefIterator;
use ndarray::parallel::prelude::ParallelIterator;
use ndarray::{ArcArray, ArcArray1, Array2, ArrayBase, Axis, Dim, Ix3, IxDynImpl, OwnedArcRepr};
use rdf_rs::RdfParser;
use sophia::term::BoxTerm;
use sophia::graph::GTripleSource;
use sophia::graph::Graph;
use sophia::term::Term;
use sophia::triple::Triple;
use std::collections::HashMap;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::atomic::AtomicU8;
Expand Down Expand Up @@ -42,7 +45,7 @@ pub enum ReferenceSystem {
OPS,
}

pub enum Term {
pub enum Field {
Subject(usize),
Predicate(usize),
Object(usize),
Expand All @@ -68,7 +71,7 @@ pub trait Engine {
fn get(
&self,
array: &Option<ArcArray3>,
term: Term,
term: Field,
reference_system: &ReferenceSystem,
) -> Result<ArcArray3, String> {
let arr: ArcArray3 = match array {
Expand All @@ -80,23 +83,23 @@ pub trait Engine {
let shape = binding.shape();

let flattened: ArcArray1<u8> = match term {
Term::Subject(term) => match reference_system {
Field::Subject(term) => match reference_system {
ReferenceSystem::SPO => self.get_first_term(arr, shape[0], term),
ReferenceSystem::SOP => self.get_first_term(arr, shape[0], term),
ReferenceSystem::PSO => self.get_second_term(arr, shape[1], term),
ReferenceSystem::POS => self.get_third_term(arr, shape[2], term),
ReferenceSystem::OSP => self.get_second_term(arr, shape[1], term),
ReferenceSystem::OPS => self.get_third_term(arr, shape[2], term),
},
Term::Predicate(term) => match reference_system {
Field::Predicate(term) => match reference_system {
ReferenceSystem::SPO => self.get_second_term(arr, shape[1], term),
ReferenceSystem::SOP => self.get_third_term(arr, shape[2], term),
ReferenceSystem::PSO => self.get_first_term(arr, shape[0], term),
ReferenceSystem::POS => self.get_first_term(arr, shape[0], term),
ReferenceSystem::OSP => self.get_third_term(arr, shape[2], term),
ReferenceSystem::OPS => self.get_second_term(arr, shape[1], term),
},
Term::Object(term) => match reference_system {
Field::Object(term) => match reference_system {
ReferenceSystem::SPO => self.get_third_term(arr, shape[2], term),
ReferenceSystem::SOP => self.get_second_term(arr, shape[1], term),
ReferenceSystem::PSO => self.get_third_term(arr, shape[2], term),
Expand Down Expand Up @@ -330,14 +333,14 @@ impl ReferenceSystem {
}

fn index(&self, sidx: usize, pidx: usize, oidx: usize, domain: &Domain) -> usize {
let sidx = match self {
ReferenceSystem::SPO => sidx * domain.predicates_size * domain.objects_size,
ReferenceSystem::SOP => sidx * domain.predicates_size * domain.objects_size,
ReferenceSystem::PSO => sidx * domain.objects_size,
ReferenceSystem::POS => sidx,
ReferenceSystem::OSP => sidx * domain.predicates_size,
ReferenceSystem::OPS => sidx,
};
// let sidx = match self {
// ReferenceSystem::SPO => sidx * domain.predicates_size * domain.objects_size,
// ReferenceSystem::SOP => sidx * domain.predicates_size * domain.objects_size,
// ReferenceSystem::PSO => sidx * domain.objects_size,
// ReferenceSystem::POS => sidx,
// ReferenceSystem::OSP => sidx * domain.predicates_size,
// ReferenceSystem::OPS => sidx,
// };

let pidx = match self {
ReferenceSystem::SPO => pidx * domain.objects_size,
Expand All @@ -357,7 +360,7 @@ impl ReferenceSystem {
ReferenceSystem::OPS => oidx * domain.predicates_size * domain.subjects_size,
};

sidx + pidx + oidx
pidx + oidx
}
}

Expand Down Expand Up @@ -423,7 +426,24 @@ impl<'a> RemoteHDT<'a> {

// 3. Import the RDF dump using `rdf-rs`
let dump = RdfParser::new(self.rdf_path)?;
let (subjects, predicates, objects) = dump.extract();
let binding = dump.graph.subjects().unwrap();
let subjects = binding
.iter()
.enumerate()
.map(|(key, value)| (value.to_owned(), key)) // TODO: remove to_owned
.collect::<HashMap<Term<String>, usize>>();
let binding = dump.graph.predicates().unwrap();
let predicates = binding
.iter()
.enumerate()
.map(|(key, value)| (value.to_owned(), key))
.collect::<HashMap<Term<String>, usize>>(); // TODO: remove unwrap
let binding = dump.graph.objects().unwrap();
let objects = binding
.iter()
.enumerate()
.map(|(key, value)| (value.to_owned(), key))
.collect::<HashMap<Term<String>, usize>>(); // TODO: remove unwrap

let domain = &Domain {
subjects_size: subjects.len(), // size of the unique values for the Subjects
Expand All @@ -436,6 +456,14 @@ impl<'a> RemoteHDT<'a> {
// of the different dimensions and the default values
let arr_meta = ArrayMetadataBuilder::<u8>::new(&self.reference_system.shape_u64(domain))
.dimension_names(self.reference_system.dimension_names())?
.chunk_grid(
vec![
1,
self.reference_system.shape_u64(domain)[1],
self.reference_system.shape_u64(domain)[2],
] // TODO: improve this
.as_slice(),
)?
.push_bb_codec(GzipCodec::default())
.set_attribute(
"subjects".to_string(),
Expand Down Expand Up @@ -480,50 +508,61 @@ impl<'a> RemoteHDT<'a> {
// the provided values (second vector). What's more, an offset can be set;
// that is, we can insert the created array with and X and Y shift. Lastly,
// the region is written provided the aforementioned data and offset
let data = self.create_array(domain, dump, subjects, predicates, objects)?;
let offset = smallvec![0, 0, 0];

// TODO: could this be done using rayon or a multi-threaded approach.
// Maybe using chunks instead of a region and having several chunks of
// the same size (i.e 100x100). Then we write in parallel?
// This is the place where the system is currently taking more time
if arr.write_region(&offset, data).is_err() {
return Err(String::from("Error writing to the Array"));
};
subjects.par_iter().for_each(|(subject, i)| {
let _ = arr.write_chunk(
&smallvec![*i as u64, 0, 0],
self.create_array(
domain,
dump.graph.triples_with_s(subject),
&predicates,
&objects,
*i,
)
.unwrap(),
);
});

Ok(self)
}

fn create_array(
&self,
domain: &Domain,
dump: RdfParser,
subjects: BiHashMap<BoxTerm, usize>,
predicates: BiHashMap<BoxTerm, usize>,
objects: BiHashMap<BoxTerm, usize>,
triples: GTripleSource<Vec<[Term<String>; 3]>>,
predicates: &HashMap<Term<String>, usize>,
objects: &HashMap<Term<String>, usize>,
chunk_idx: usize,
) -> Result<ArrayBase<OwnedArcRepr<u8>, Dim<IxDynImpl>>, String> {
match ArcArrayD::from_shape_vec(self.reference_system.shape(domain).to_vec(), {
let slice: Vec<AtomicU8> =
vec![0u8; domain.subjects_size * domain.predicates_size * domain.objects_size]
match ArcArrayD::from_shape_vec(
vec![
1 as usize,
self.reference_system.shape(domain)[1],
self.reference_system.shape(domain)[2],
] // TODO: improve this
.as_slice(),
{
let slice: Vec<AtomicU8> = vec![0u8; domain.predicates_size * domain.objects_size]
.par_iter()
.map(|&n| AtomicU8::new(n))
.collect();
dump.graph
.par_iter()
.for_each(|[subject, predicate, object]| {
slice[self.reference_system.index(
subjects.get_by_left(subject).unwrap().to_owned(),
predicates.get_by_left(predicate).unwrap().to_owned(),
objects.get_by_left(object).unwrap().to_owned(),
domain,
)]
triples.for_each(|triple| {
let triple = triple.unwrap();

let pidx = predicates.get(triple.p()).unwrap().to_owned(); // TODO: remove unwrap
let oidx = objects.get(triple.o()).unwrap().to_owned();

slice[self
.reference_system
.index(chunk_idx as usize, pidx, oidx, domain)]
.store(1u8, Ordering::Relaxed);
});
slice
.iter()
.map(|elem| elem.load(Ordering::Relaxed))
.collect::<Vec<u8>>()
}) {

slice
.iter() // TODO: par_iter?
.map(|elem| elem.load(Ordering::Relaxed))
.collect::<Vec<u8>>()
},
) {
Ok(data) => Ok(data),
Err(_) => return Err(String::from("Error creating the data Array")),
}
Expand Down Expand Up @@ -614,16 +653,16 @@ impl<'a> RemoteHDT<'a> {

impl Engine for RemoteHDT<'_> {
fn get_subject(&self, index: usize) -> Result<ArcArray3, String> {
self.get(&self.array, Term::Subject(index), &self.reference_system)
self.get(&self.array, Field::Subject(index), &self.reference_system)
}

// TODO: the current implementation works for SELECT *, but what if we SELECT predicate?
fn get_predicate(&self, index: usize) -> Result<ArcArray3, String> {
self.get(&self.array, Term::Predicate(index), &self.reference_system)
self.get(&self.array, Field::Predicate(index), &self.reference_system)
}

fn get_object(&self, index: usize) -> Result<ArcArray3, String> {
self.get(&self.array, Term::Object(index), &self.reference_system)
self.get(&self.array, Field::Object(index), &self.reference_system)
}
}

Expand Down

0 comments on commit c69b0e4

Please sign in to comment.