Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance performance and rdf4j update module logging #189

Merged
merged 3 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ protected static final Property property(String local )
public static final Property is_parse_text = property("is-parse-text");
public static final Property has_max_iteration_count = property("has-max-iteration-count");
public static final Property has_resource_uri = property("has-resource-uri");
public static final Property stop_iteration_on_stable_triple_count = property("stop-iteration-on-stable-triple-count");

/**
returns the URI for this schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import cz.cvut.spipes.engine.VariablesBinding;
import cz.cvut.spipes.exception.ValidationConstraintFailedException;
import cz.cvut.spipes.util.JenaUtils;
import cz.cvut.spipes.util.QueryUtils;
import org.apache.jena.atlas.lib.NotImplemented;
import org.apache.jena.ontology.OntModel;
import org.apache.jena.query.*;
Expand Down Expand Up @@ -306,9 +307,9 @@ protected String getQueryComment(org.topbraid.spin.model.Query query) {
if (query.getComment() != null) {
return query.getComment();
}
String comment = query.toString().split(System.lineSeparator())[0];
if (comment.matches("\\s*#.*")) {
return comment.split("\\s*#\\s*", 2)[1];
String comment = QueryUtils.getQueryComment(query.toString());
if (comment != null) {
return comment;
}
// Resource obj = query.getPropertyResourceValue(RDFS.comment);
// if (obj == null) {
Expand Down
27 changes: 14 additions & 13 deletions s-pipes-core/src/main/java/cz/cvut/spipes/util/QueryUtils.java
Original file line number Diff line number Diff line change
@@ -1,25 +1,18 @@
package cz.cvut.spipes.util;


import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.LinkedList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.stream.Collectors;
import org.apache.jena.query.ARQ;
import org.apache.jena.query.ParameterizedSparqlString;
import org.apache.jena.query.Query;
import org.apache.jena.query.QueryExecution;
import org.apache.jena.query.QueryExecutionFactory;
import org.apache.jena.query.QuerySolution;
import org.apache.jena.query.ResultSet;
import org.apache.jena.query.*;
import org.apache.jena.rdf.model.Model;
import org.apache.jena.rdf.model.RDFNode;
import org.apache.jena.sparql.mgt.Explain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.regex.Matcher;
import java.util.stream.Collectors;

public class QueryUtils {

private static final Logger LOG = LoggerFactory.getLogger(QueryUtils.class);
Expand Down Expand Up @@ -176,4 +169,12 @@ private interface QueryExecutor<T> {
T execQuery(QueryExecution execution);
}

public static String getQueryComment(String query) {
String comment = query.split(System.lineSeparator())[0];
if (comment.matches("\\s*#.*")) {
return comment.split("\\s*#\\s*", 2)[1];
}
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import cz.cvut.spipes.engine.ExecutionContext;
import cz.cvut.spipes.exception.ModuleConfigurationInconsistentException;
import cz.cvut.spipes.exceptions.RepositoryAccessException;
import cz.cvut.spipes.util.QueryUtils;
import org.apache.jena.rdf.model.Model;
import org.apache.jena.rdf.model.Property;
import org.apache.jena.rdf.model.Resource;
Expand Down Expand Up @@ -43,6 +44,12 @@ public class Rdf4jUpdateModule extends AbstractModule {
private String rdf4jRepositoryName;
private List<String> updateQueries;

static final Property P_RDF4J_STOP_ITERATION_ON_STABLE_TRIPLE_COUNT =
getParameter("p-stop-iteration-on-stable-triple-count");
private boolean onlyIfTripleCountChanges;

private int iterationCount;

private Repository updateRepository;

public void setUpdateQueries(List<String> updateQueries) {
Expand All @@ -53,9 +60,6 @@ public List<String> getUpdateQueries() {
return updateQueries;
}

private int iterationCount;
private boolean onlyIfTripleCountChanges;

public int getIterationCount() {
return iterationCount;
}
Expand Down Expand Up @@ -107,15 +111,32 @@ private static Property getParameter(final String name) {
ExecutionContext executeSelf() {
try (RepositoryConnection updateConnection = updateRepository.getConnection()) {
LOG.debug("Connected to {}", rdf4jRepositoryName);
for(int i = 0;i < iterationCount;i++) {
long oldTriplesCount = updateConnection.size();
LOG.debug("Number of triples before execution: {}",oldTriplesCount);
for (String updateQuery : updateQueries) {
long newTriplesCount = updateConnection.size();
long oldTriplesCount;
LOG.debug("Number of triples before execution of updates: {}", newTriplesCount);

for(int i = 0;i < iterationCount; i++) {
oldTriplesCount = newTriplesCount;
for (int j = 0; j < updateQueries.size(); j++) {
String updateQuery = updateQueries.get(j);

if (LOG.isTraceEnabled()) {
String queryComment = QueryUtils.getQueryComment(updateQuery);
LOG.trace(
"Executing iteration {}/{} with {}/{} query \"{}\" ...",
i+1, iterationCount, j + 1, updateQueries.size(), queryComment
);
}
makeUpdate(updateQuery, updateConnection);
}
long newTriplesCount = updateConnection.size();
LOG.debug("Number of triples after execution: {}",newTriplesCount);
if(onlyIfTripleCountChanges && (newTriplesCount == oldTriplesCount) )break;
newTriplesCount = updateConnection.size();
LOG.debug("Number of triples after finishing iteration {}/{}: {}",
i+1, iterationCount, newTriplesCount
);
if (onlyIfTripleCountChanges && (newTriplesCount == oldTriplesCount)) {
LOG.debug("Stopping execution of iterations as triples count did not change.");
break;
}
}
} catch (RepositoryException e) {
throw new RepositoryAccessException(rdf4jRepositoryName, e);
Expand Down Expand Up @@ -158,7 +179,7 @@ public void loadConfiguration() {
rdf4jServerURL = getEffectiveValue(P_RDF4J_SERVER_URL).asLiteral().getString();
rdf4jRepositoryName = getEffectiveValue(P_RDF4J_REPOSITORY_NAME).asLiteral().getString();
iterationCount = getPropertyValue(KBSS_MODULE.has_max_iteration_count,1);
onlyIfTripleCountChanges = getPropertyValue(KBSS_MODULE.stop_iteration_on_stable_triple_count,false);
onlyIfTripleCountChanges = getPropertyValue(P_RDF4J_STOP_ITERATION_ON_STABLE_TRIPLE_COUNT,false);
LOG.debug("Iteration count={}\nOnlyIf...Changes={}"
,iterationCount
,onlyIfTripleCountChanges);
Expand All @@ -167,7 +188,7 @@ public void loadConfiguration() {
"Repository is already initialized. Trying to override its configuration from RDF.");
}
updateRepository = new SPARQLRepository(
rdf4jServerURL + "repositories/" + rdf4jRepositoryName + "/statements"
rdf4jServerURL + "/repositories/" + rdf4jRepositoryName + "/statements"
);
updateQueries = loadUpdateQueries();
}
Expand Down
Loading