Skip to content

Commit

Permalink
refactor: cleaned up parser
Browse files Browse the repository at this point in the history
  • Loading branch information
jenspots committed May 2, 2024
1 parent f844b43 commit 6b176c0
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 87 deletions.
210 changes: 124 additions & 86 deletions src/main/kotlin/runner/Parser.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import org.apache.jena.ontology.OntModelSpec
import org.apache.jena.query.QueryExecutionFactory
import org.apache.jena.query.QueryFactory
import org.apache.jena.query.QuerySolution
import org.apache.jena.rdf.model.Model
import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Resource
import org.apache.jena.shacl.ShaclValidator
Expand All @@ -16,7 +17,7 @@ import technology.idlab.compiler.MemoryClassLoader
import technology.idlab.logging.Log

/**
* Parse a solution to a runner.Processor instance.
* Parse a solution to a Processor instance.
*/
private fun QuerySolution.toProcessor(): Class<Processor> {
val file = this["file"]
Expand All @@ -37,99 +38,88 @@ private fun QuerySolution.toProcessor(): Class<Processor> {
.let { it as Class<Processor> }
}

private fun QuerySolution.toStage(processors: List<Class<Processor>>): Processor {
val byName = processors.associateBy { it.simpleName }

// Extract the list of arguments.
val name = this["processor"]
.toString()
.substringAfterLast("#")

val values = this["values"].toString().split(";")

val keys = this["keys"]
.toString()
.split(";")
.map { it.substringAfterLast("#") }
/**
* Read a model from a file and recursively import all referenced ontologies
* based on <owl:import> statements.
*/
private fun File.readModelRecursively(): Model {
val result = ModelFactory.createDefaultModel()

val onthology = ModelFactory.createOntologyModel(OntModelSpec.OWL_MEM)
onthology.read(this.toURI().toString(), "TURTLE")

// Import any referenced ontologies.
val imported: MutableSet<String> = mutableSetOf()
val iter = onthology.listStatements(null, OWL.imports, null as Resource?)
while (iter.hasNext()) {
val statement = iter.nextStatement()
val uri = statement.getObject().toString()

// Check if we still need to import the referenced ontology.
if (imported.contains(uri)) {
continue
}

val kinds = this["kinds"]
.toString()
.split(";")
.map { it.substringAfterLast("#") }
// Attempt importing the dataset.
Log.shared.info("Importing $uri")
try {
result.read(uri)
} catch (e: Exception) {
Log.shared.fatal(e)
}

// Retrieve a class instance of the Processor.
val processor = byName[name] ?: Log.shared.fatal("Processor $name not found")
val args = mutableMapOf<String, Any>()
imported.add(uri)
}

for (i in keys.indices) {
val key = keys[i]
val value = values[i]
val kind = kinds[i]
// Import original onthology into the model.
result.add(onthology)

Log.shared.debug("$key: $kind = $value")
return result
}

args[key] = when (kind) {
"integer" -> value.toInt()
"ChannelWriter" -> bridge
"ChannelReader" -> bridge
else -> Log.shared.fatal("Unknown kind $kind")
}
/**
* Validates a model against the SHACL schema defined inside the model itself.
*/
private fun Model.validate(): Model {
val graph = this.graph
val report = ShaclValidator.get().validate(graph, graph)

// Exit if the validation failed.
if (!report.conforms()) {
val out = ByteArrayOutputStream()
report.model.write(out, "TURTLE")
Log.shared.fatal("Validation failed\n$out")
}

val constructor = processor.getConstructor(Map::class.java)
return constructor.newInstance(args)
return this
}

// TODO: Create some sort of a factory.
val bridge = MemoryBridge()

class Parser(file: File) {
private val model = ModelFactory.createDefaultModel()

/**
* An RDF model of the configuration file.
*/
private val model = file.readModelRecursively().validate()

/**
* Class references to the different processors.
*/
private val processors: List<Class<Processor>>

/**
* The stages of the pipeline.
*/
val stages: List<Processor>

/**
* Parse the model for processor declarations and save results as a field.
*/
init {
val onthology = ModelFactory.createOntologyModel(OntModelSpec.OWL_MEM)
onthology.read(file.toURI().toString(), "TURTLE")

// Import any referenced ontologies.
val imported: MutableSet<String> = mutableSetOf()
val iter = onthology.listStatements(null, OWL.imports, null as Resource?)
while (iter.hasNext()) {
val statement = iter.nextStatement()
val uri = statement.getObject().toString()

// Check if we still need to import the referenced ontology.
if (imported.contains(uri)) {
continue
}

// Attempt importing the dataset.
Log.shared.info("Importing $uri")
try {
model.read(uri)
} catch (e: Exception) {
Log.shared.fatal(e)
}

imported.add(uri)
}

// Import original onthology into the model.
model.add(onthology)

// Validate using SHACL.
val report = ShaclValidator.get().validate(model.graph, model.graph)

// Exit if the validation failed.
if (!report.conforms()) {
val out = ByteArrayOutputStream()
report.model.write(out, "TURTLE")
Log.shared.fatal("Validation failed\n$out")
}
}

private fun getProcessors(): List<Class<Processor>> {
Log.shared.info("Parsing processors")
val processors = mutableListOf<Class<Processor>>()

val query = this.javaClass.getResource("/queries/processors.sparql")
?.readText()
?.let { QueryFactory.create(it) }
Expand All @@ -150,12 +140,16 @@ class Parser(file: File) {
processors.add(processor)
}

return processors
this.processors = processors
}

fun getStages(): List<Processor> {
val processors = getProcessors()
/**
* Parse the model for concrete stages, initializes the corresponding
* instances and saves the result to a field.
*/
init {
Log.shared.info("Parsing stages")
val stages = mutableListOf<Processor>()

// Execute the stages query.
val query = this.javaClass.getResource("/queries/stages.sparql")
Expand All @@ -171,15 +165,59 @@ class Parser(file: File) {
Log.shared.fatal("No processors found in the configuration")
}

val result = mutableListOf<Processor>()

while (iter.hasNext()) {
val solution = iter.nextSolution()
val stage = solution.toStage(processors)
val stage = this.parseStage(solution)
Log.shared.info("Stage ${stage.javaClass.name} initialised successfully")
result.add(stage)
stages.add(stage)
}

this.stages = stages
}

/**
* Initialize a Processor instance based on the query solution.
*/
private fun parseStage(querySolution: QuerySolution): Processor {
val byName = processors.associateBy { it.simpleName }

// Extract the list of arguments.
val name = querySolution["processor"]
.toString()
.substringAfterLast("#")

val values = querySolution["values"].toString().split(";")

val keys = querySolution["keys"]
.toString()
.split(";")
.map { it.substringAfterLast("#") }

val kinds = querySolution["kinds"]
.toString()
.split(";")
.map { it.substringAfterLast("#") }

// Retrieve a class instance of the Processor.
val processor = byName[name] ?: Log.shared.fatal("Processor $name not found")
val args = mutableMapOf<String, Any>()

for (i in keys.indices) {
val key = keys[i]
val value = values[i]
val kind = kinds[i]

Log.shared.debug("$key: $kind = $value")

args[key] = when (kind) {
"integer" -> value.toInt()
"ChannelWriter" -> bridge
"ChannelReader" -> bridge
else -> Log.shared.fatal("Unknown kind $kind")
}
}

return result
val constructor = processor.getConstructor(Map::class.java)
return constructor.newInstance(args)
}
}
2 changes: 1 addition & 1 deletion src/main/kotlin/runner/Pipeline.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import kotlin.concurrent.thread

class Pipeline(config: File) {
/** Processors described in the config. */
private val processors: List<Processor> = Parser(config).getStages()
private val processors: List<Processor> = Parser(config).stages

/**
* Execute all processors in the configuration in parallel, and block until
Expand Down

0 comments on commit 6b176c0

Please sign in to comment.