diff --git a/s-pipes-core/src/main/java/cz/cvut/spipes/constants/KBSS_MODULE.java b/s-pipes-core/src/main/java/cz/cvut/spipes/constants/KBSS_MODULE.java index f634309d..b55659fe 100644 --- a/s-pipes-core/src/main/java/cz/cvut/spipes/constants/KBSS_MODULE.java +++ b/s-pipes-core/src/main/java/cz/cvut/spipes/constants/KBSS_MODULE.java @@ -20,7 +20,6 @@ protected static final Property property(String local ) public static final Property is_parse_text = property("is-parse-text"); public static final Property has_max_iteration_count = property("has-max-iteration-count"); public static final Property has_resource_uri = property("has-resource-uri"); - public static final Property stop_iteration_on_stable_triple_count = property("stop-iteration-on-stable-triple-count"); /** returns the URI for this schema diff --git a/s-pipes-core/src/main/java/cz/cvut/spipes/modules/AbstractModule.java b/s-pipes-core/src/main/java/cz/cvut/spipes/modules/AbstractModule.java index 2a1fe6f0..a94fd7c5 100644 --- a/s-pipes-core/src/main/java/cz/cvut/spipes/modules/AbstractModule.java +++ b/s-pipes-core/src/main/java/cz/cvut/spipes/modules/AbstractModule.java @@ -9,6 +9,7 @@ import cz.cvut.spipes.engine.VariablesBinding; import cz.cvut.spipes.exception.ValidationConstraintFailedException; import cz.cvut.spipes.util.JenaUtils; +import cz.cvut.spipes.util.QueryUtils; import org.apache.jena.atlas.lib.NotImplemented; import org.apache.jena.ontology.OntModel; import org.apache.jena.query.*; @@ -306,9 +307,9 @@ protected String getQueryComment(org.topbraid.spin.model.Query query) { if (query.getComment() != null) { return query.getComment(); } - String comment = query.toString().split(System.lineSeparator())[0]; - if (comment.matches("\\s*#.*")) { - return comment.split("\\s*#\\s*", 2)[1]; + String comment = QueryUtils.getQueryComment(query.toString()); + if (comment != null) { + return comment; } // Resource obj = query.getPropertyResourceValue(RDFS.comment); // if (obj == null) { diff --git a/s-pipes-core/src/main/java/cz/cvut/spipes/util/QueryUtils.java b/s-pipes-core/src/main/java/cz/cvut/spipes/util/QueryUtils.java index c8fe5724..05dbe87a 100644 --- a/s-pipes-core/src/main/java/cz/cvut/spipes/util/QueryUtils.java +++ b/s-pipes-core/src/main/java/cz/cvut/spipes/util/QueryUtils.java @@ -1,25 +1,18 @@ package cz.cvut.spipes.util; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.LinkedList; -import java.util.List; -import java.util.regex.Matcher; -import java.util.stream.Collectors; -import org.apache.jena.query.ARQ; -import org.apache.jena.query.ParameterizedSparqlString; -import org.apache.jena.query.Query; -import org.apache.jena.query.QueryExecution; -import org.apache.jena.query.QueryExecutionFactory; -import org.apache.jena.query.QuerySolution; -import org.apache.jena.query.ResultSet; +import org.apache.jena.query.*; import org.apache.jena.rdf.model.Model; import org.apache.jena.rdf.model.RDFNode; import org.apache.jena.sparql.mgt.Explain; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.regex.Matcher; +import java.util.stream.Collectors; + public class QueryUtils { private static final Logger LOG = LoggerFactory.getLogger(QueryUtils.class); @@ -176,4 +169,12 @@ private interface QueryExecutor { T execQuery(QueryExecution execution); } + public static String getQueryComment(String query) { + String comment = query.split(System.lineSeparator())[0]; + if (comment.matches("\\s*#.*")) { + return comment.split("\\s*#\\s*", 2)[1]; + } + return null; + } + } diff --git a/s-pipes-modules/module-rdf4j/src/main/java/cz/cvut/spipes/modules/Rdf4jUpdateModule.java b/s-pipes-modules/module-rdf4j/src/main/java/cz/cvut/spipes/modules/Rdf4jUpdateModule.java index 262ec5e9..034eb606 100644 --- a/s-pipes-modules/module-rdf4j/src/main/java/cz/cvut/spipes/modules/Rdf4jUpdateModule.java +++ b/s-pipes-modules/module-rdf4j/src/main/java/cz/cvut/spipes/modules/Rdf4jUpdateModule.java @@ -5,6 +5,7 @@ import cz.cvut.spipes.engine.ExecutionContext; import cz.cvut.spipes.exception.ModuleConfigurationInconsistentException; import cz.cvut.spipes.exceptions.RepositoryAccessException; +import cz.cvut.spipes.util.QueryUtils; import org.apache.jena.rdf.model.Model; import org.apache.jena.rdf.model.Property; import org.apache.jena.rdf.model.Resource; @@ -43,6 +44,12 @@ public class Rdf4jUpdateModule extends AbstractModule { private String rdf4jRepositoryName; private List updateQueries; + static final Property P_RDF4J_STOP_ITERATION_ON_STABLE_TRIPLE_COUNT = + getParameter("p-stop-iteration-on-stable-triple-count"); + private boolean onlyIfTripleCountChanges; + + private int iterationCount; + private Repository updateRepository; public void setUpdateQueries(List updateQueries) { @@ -53,9 +60,6 @@ public List getUpdateQueries() { return updateQueries; } - private int iterationCount; - private boolean onlyIfTripleCountChanges; - public int getIterationCount() { return iterationCount; } @@ -107,15 +111,32 @@ private static Property getParameter(final String name) { ExecutionContext executeSelf() { try (RepositoryConnection updateConnection = updateRepository.getConnection()) { LOG.debug("Connected to {}", rdf4jRepositoryName); - for(int i = 0;i < iterationCount;i++) { - long oldTriplesCount = updateConnection.size(); - LOG.debug("Number of triples before execution: {}",oldTriplesCount); - for (String updateQuery : updateQueries) { + long newTriplesCount = updateConnection.size(); + long oldTriplesCount; + LOG.debug("Number of triples before execution of updates: {}", newTriplesCount); + + for(int i = 0;i < iterationCount; i++) { + oldTriplesCount = newTriplesCount; + for (int j = 0; j < updateQueries.size(); j++) { + String updateQuery = updateQueries.get(j); + + if (LOG.isTraceEnabled()) { + String queryComment = QueryUtils.getQueryComment(updateQuery); + LOG.trace( + "Executing iteration {}/{} with {}/{} query \"{}\" ...", + i+1, iterationCount, j + 1, updateQueries.size(), queryComment + ); + } makeUpdate(updateQuery, updateConnection); } - long newTriplesCount = updateConnection.size(); - LOG.debug("Number of triples after execution: {}",newTriplesCount); - if(onlyIfTripleCountChanges && (newTriplesCount == oldTriplesCount) )break; + newTriplesCount = updateConnection.size(); + LOG.debug("Number of triples after finishing iteration {}/{}: {}", + i+1, iterationCount, newTriplesCount + ); + if (onlyIfTripleCountChanges && (newTriplesCount == oldTriplesCount)) { + LOG.debug("Stopping execution of iterations as triples count did not change."); + break; + } } } catch (RepositoryException e) { throw new RepositoryAccessException(rdf4jRepositoryName, e); @@ -158,7 +179,7 @@ public void loadConfiguration() { rdf4jServerURL = getEffectiveValue(P_RDF4J_SERVER_URL).asLiteral().getString(); rdf4jRepositoryName = getEffectiveValue(P_RDF4J_REPOSITORY_NAME).asLiteral().getString(); iterationCount = getPropertyValue(KBSS_MODULE.has_max_iteration_count,1); - onlyIfTripleCountChanges = getPropertyValue(KBSS_MODULE.stop_iteration_on_stable_triple_count,false); + onlyIfTripleCountChanges = getPropertyValue(P_RDF4J_STOP_ITERATION_ON_STABLE_TRIPLE_COUNT,false); LOG.debug("Iteration count={}\nOnlyIf...Changes={}" ,iterationCount ,onlyIfTripleCountChanges); @@ -167,7 +188,7 @@ public void loadConfiguration() { "Repository is already initialized. Trying to override its configuration from RDF."); } updateRepository = new SPARQLRepository( - rdf4jServerURL + "repositories/" + rdf4jRepositoryName + "/statements" + rdf4jServerURL + "/repositories/" + rdf4jRepositoryName + "/statements" ); updateQueries = loadUpdateQueries(); }