From c69b0e4477bd355ab83a9b5bac49463c3c6dbdad Mon Sep 17 00:00:00 2001 From: angelip2303 Date: Sun, 22 Oct 2023 17:27:20 +0000 Subject: [PATCH] using chunks --- Cargo.toml | 3 +- rdf-rs/Cargo.toml | 3 +- rdf-rs/src/lib.rs | 41 +++----------- src/remote_hdt.rs | 141 +++++++++++++++++++++++++++++----------------- 4 files changed, 100 insertions(+), 88 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8f03618..927a51f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,5 +7,4 @@ edition = "2021" zarr3 = { git = "https://github.com/clbarnes/zarr3-rs.git" } rdf-rs = { path = "./rdf-rs" } sophia = { version = "0.7.2" } -ndarray = { version = "0.15.6", features = [ "rayon" ] } -bimap = "0.6.3" \ No newline at end of file +ndarray = { version = "0.15.6", features = [ "rayon" ] } \ No newline at end of file diff --git a/rdf-rs/Cargo.toml b/rdf-rs/Cargo.toml index 1812a07..cb9a15a 100644 --- a/rdf-rs/Cargo.toml +++ b/rdf-rs/Cargo.toml @@ -5,4 +5,5 @@ edition = "2021" [dependencies] sophia = { version = "0.7.2", features = ["xml"] } -bimap = "0.6.3" \ No newline at end of file +bimap = "0.6.3" +rayon = "1.8.0" \ No newline at end of file diff --git a/rdf-rs/src/lib.rs b/rdf-rs/src/lib.rs index 327b164..23f27a3 100644 --- a/rdf-rs/src/lib.rs +++ b/rdf-rs/src/lib.rs @@ -1,9 +1,8 @@ -use bimap::BiMap; use ntriples::NTriples; use rdf_xml::RdfXml; use sophia::parser::TripleParser; use sophia::serializer::TripleSerializer; -use sophia::term::BoxTerm; +use sophia::term::Term; use sophia::triple::stream::TripleSource; use std::fs::File; use std::io::{BufReader, BufWriter}; @@ -13,17 +12,17 @@ mod ntriples; mod rdf_xml; mod turtle; -pub type Graph = Vec<[BoxTerm; 3]>; +pub type RdfGraph = Vec<[Term; 3]>; pub struct RdfParser { - pub graph: Graph, + pub graph: RdfGraph, } pub struct RdfSerializer; trait Backend>, F: TripleSerializer> { - fn parse(&self, path: &str) -> Result { - let mut graph: Graph = vec![]; + fn parse(&self, path: &str) -> Result { + let mut graph: RdfGraph = vec![]; let reader = BufReader::new(match File::open(path) { Ok(file) => file, @@ -40,7 +39,7 @@ trait Backend>, F: TripleSerializer> { } } - fn format(&self, path: &str, graph: Graph) -> Result<(), String> { + fn format(&self, path: &str, graph: RdfGraph) -> Result<(), String> { let file = File::create(path).unwrap(); let writer = BufWriter::new(file); let mut formatter = self.concrete_formatter(writer); @@ -74,36 +73,10 @@ impl RdfParser { }, }) } - - pub fn extract( - &self, - ) -> ( - BiMap, - BiMap, - BiMap, - ) { - let mut subjects = BiMap::::new(); - let mut predicates = BiMap::::new(); - let mut objects = BiMap::::new(); - - self.graph.iter().for_each(|triple| { - if !subjects.contains_left(&triple[0].to_owned()) { - subjects.insert(triple[0].to_owned(), subjects.len()); - } - if !predicates.contains_left(&triple[1].to_owned()) { - predicates.insert(triple[1].to_owned(), predicates.len()); - } - if !objects.contains_left(&triple[2].to_owned()) { - objects.insert(triple[2].to_owned(), objects.len()); - } - }); - - (subjects, predicates, objects) - } } impl RdfSerializer { - pub fn serialize(path: &str, graph: Graph) -> Result<(), String> { + pub fn serialize(path: &str, graph: RdfGraph) -> Result<(), String> { match path.split('.').last() { Some("nt") => NTriples.format(path, graph), Some("ttl") => Turtle.format(path, graph), diff --git a/src/remote_hdt.rs b/src/remote_hdt.rs index be02388..f455c1f 100644 --- a/src/remote_hdt.rs +++ b/src/remote_hdt.rs @@ -1,9 +1,12 @@ -use bimap::BiHashMap; use ndarray::parallel::prelude::IntoParallelRefIterator; use ndarray::parallel::prelude::ParallelIterator; use ndarray::{ArcArray, ArcArray1, Array2, ArrayBase, Axis, Dim, Ix3, IxDynImpl, OwnedArcRepr}; use rdf_rs::RdfParser; -use sophia::term::BoxTerm; +use sophia::graph::GTripleSource; +use sophia::graph::Graph; +use sophia::term::Term; +use sophia::triple::Triple; +use std::collections::HashMap; use std::path::PathBuf; use std::str::FromStr; use std::sync::atomic::AtomicU8; @@ -42,7 +45,7 @@ pub enum ReferenceSystem { OPS, } -pub enum Term { +pub enum Field { Subject(usize), Predicate(usize), Object(usize), @@ -68,7 +71,7 @@ pub trait Engine { fn get( &self, array: &Option, - term: Term, + term: Field, reference_system: &ReferenceSystem, ) -> Result { let arr: ArcArray3 = match array { @@ -80,7 +83,7 @@ pub trait Engine { let shape = binding.shape(); let flattened: ArcArray1 = match term { - Term::Subject(term) => match reference_system { + Field::Subject(term) => match reference_system { ReferenceSystem::SPO => self.get_first_term(arr, shape[0], term), ReferenceSystem::SOP => self.get_first_term(arr, shape[0], term), ReferenceSystem::PSO => self.get_second_term(arr, shape[1], term), @@ -88,7 +91,7 @@ pub trait Engine { ReferenceSystem::OSP => self.get_second_term(arr, shape[1], term), ReferenceSystem::OPS => self.get_third_term(arr, shape[2], term), }, - Term::Predicate(term) => match reference_system { + Field::Predicate(term) => match reference_system { ReferenceSystem::SPO => self.get_second_term(arr, shape[1], term), ReferenceSystem::SOP => self.get_third_term(arr, shape[2], term), ReferenceSystem::PSO => self.get_first_term(arr, shape[0], term), @@ -96,7 +99,7 @@ pub trait Engine { ReferenceSystem::OSP => self.get_third_term(arr, shape[2], term), ReferenceSystem::OPS => self.get_second_term(arr, shape[1], term), }, - Term::Object(term) => match reference_system { + Field::Object(term) => match reference_system { ReferenceSystem::SPO => self.get_third_term(arr, shape[2], term), ReferenceSystem::SOP => self.get_second_term(arr, shape[1], term), ReferenceSystem::PSO => self.get_third_term(arr, shape[2], term), @@ -330,14 +333,14 @@ impl ReferenceSystem { } fn index(&self, sidx: usize, pidx: usize, oidx: usize, domain: &Domain) -> usize { - let sidx = match self { - ReferenceSystem::SPO => sidx * domain.predicates_size * domain.objects_size, - ReferenceSystem::SOP => sidx * domain.predicates_size * domain.objects_size, - ReferenceSystem::PSO => sidx * domain.objects_size, - ReferenceSystem::POS => sidx, - ReferenceSystem::OSP => sidx * domain.predicates_size, - ReferenceSystem::OPS => sidx, - }; + // let sidx = match self { + // ReferenceSystem::SPO => sidx * domain.predicates_size * domain.objects_size, + // ReferenceSystem::SOP => sidx * domain.predicates_size * domain.objects_size, + // ReferenceSystem::PSO => sidx * domain.objects_size, + // ReferenceSystem::POS => sidx, + // ReferenceSystem::OSP => sidx * domain.predicates_size, + // ReferenceSystem::OPS => sidx, + // }; let pidx = match self { ReferenceSystem::SPO => pidx * domain.objects_size, @@ -357,7 +360,7 @@ impl ReferenceSystem { ReferenceSystem::OPS => oidx * domain.predicates_size * domain.subjects_size, }; - sidx + pidx + oidx + pidx + oidx } } @@ -423,7 +426,24 @@ impl<'a> RemoteHDT<'a> { // 3. Import the RDF dump using `rdf-rs` let dump = RdfParser::new(self.rdf_path)?; - let (subjects, predicates, objects) = dump.extract(); + let binding = dump.graph.subjects().unwrap(); + let subjects = binding + .iter() + .enumerate() + .map(|(key, value)| (value.to_owned(), key)) // TODO: remove to_owned + .collect::, usize>>(); + let binding = dump.graph.predicates().unwrap(); + let predicates = binding + .iter() + .enumerate() + .map(|(key, value)| (value.to_owned(), key)) + .collect::, usize>>(); // TODO: remove unwrap + let binding = dump.graph.objects().unwrap(); + let objects = binding + .iter() + .enumerate() + .map(|(key, value)| (value.to_owned(), key)) + .collect::, usize>>(); // TODO: remove unwrap let domain = &Domain { subjects_size: subjects.len(), // size of the unique values for the Subjects @@ -436,6 +456,14 @@ impl<'a> RemoteHDT<'a> { // of the different dimensions and the default values let arr_meta = ArrayMetadataBuilder::::new(&self.reference_system.shape_u64(domain)) .dimension_names(self.reference_system.dimension_names())? + .chunk_grid( + vec![ + 1, + self.reference_system.shape_u64(domain)[1], + self.reference_system.shape_u64(domain)[2], + ] // TODO: improve this + .as_slice(), + )? .push_bb_codec(GzipCodec::default()) .set_attribute( "subjects".to_string(), @@ -480,16 +508,19 @@ impl<'a> RemoteHDT<'a> { // the provided values (second vector). What's more, an offset can be set; // that is, we can insert the created array with and X and Y shift. Lastly, // the region is written provided the aforementioned data and offset - let data = self.create_array(domain, dump, subjects, predicates, objects)?; - let offset = smallvec![0, 0, 0]; - - // TODO: could this be done using rayon or a multi-threaded approach. - // Maybe using chunks instead of a region and having several chunks of - // the same size (i.e 100x100). Then we write in parallel? - // This is the place where the system is currently taking more time - if arr.write_region(&offset, data).is_err() { - return Err(String::from("Error writing to the Array")); - }; + subjects.par_iter().for_each(|(subject, i)| { + let _ = arr.write_chunk( + &smallvec![*i as u64, 0, 0], + self.create_array( + domain, + dump.graph.triples_with_s(subject), + &predicates, + &objects, + *i, + ) + .unwrap(), + ); + }); Ok(self) } @@ -497,33 +528,41 @@ impl<'a> RemoteHDT<'a> { fn create_array( &self, domain: &Domain, - dump: RdfParser, - subjects: BiHashMap, - predicates: BiHashMap, - objects: BiHashMap, + triples: GTripleSource; 3]>>, + predicates: &HashMap, usize>, + objects: &HashMap, usize>, + chunk_idx: usize, ) -> Result, Dim>, String> { - match ArcArrayD::from_shape_vec(self.reference_system.shape(domain).to_vec(), { - let slice: Vec = - vec![0u8; domain.subjects_size * domain.predicates_size * domain.objects_size] + match ArcArrayD::from_shape_vec( + vec![ + 1 as usize, + self.reference_system.shape(domain)[1], + self.reference_system.shape(domain)[2], + ] // TODO: improve this + .as_slice(), + { + let slice: Vec = vec![0u8; domain.predicates_size * domain.objects_size] .par_iter() .map(|&n| AtomicU8::new(n)) .collect(); - dump.graph - .par_iter() - .for_each(|[subject, predicate, object]| { - slice[self.reference_system.index( - subjects.get_by_left(subject).unwrap().to_owned(), - predicates.get_by_left(predicate).unwrap().to_owned(), - objects.get_by_left(object).unwrap().to_owned(), - domain, - )] + triples.for_each(|triple| { + let triple = triple.unwrap(); + + let pidx = predicates.get(triple.p()).unwrap().to_owned(); // TODO: remove unwrap + let oidx = objects.get(triple.o()).unwrap().to_owned(); + + slice[self + .reference_system + .index(chunk_idx as usize, pidx, oidx, domain)] .store(1u8, Ordering::Relaxed); }); - slice - .iter() - .map(|elem| elem.load(Ordering::Relaxed)) - .collect::>() - }) { + + slice + .iter() // TODO: par_iter? + .map(|elem| elem.load(Ordering::Relaxed)) + .collect::>() + }, + ) { Ok(data) => Ok(data), Err(_) => return Err(String::from("Error creating the data Array")), } @@ -614,16 +653,16 @@ impl<'a> RemoteHDT<'a> { impl Engine for RemoteHDT<'_> { fn get_subject(&self, index: usize) -> Result { - self.get(&self.array, Term::Subject(index), &self.reference_system) + self.get(&self.array, Field::Subject(index), &self.reference_system) } // TODO: the current implementation works for SELECT *, but what if we SELECT predicate? fn get_predicate(&self, index: usize) -> Result { - self.get(&self.array, Term::Predicate(index), &self.reference_system) + self.get(&self.array, Field::Predicate(index), &self.reference_system) } fn get_object(&self, index: usize) -> Result { - self.get(&self.array, Term::Object(index), &self.reference_system) + self.get(&self.array, Field::Object(index), &self.reference_system) } }