From 6b176c094b51ac83364786a92c7db915f6a6af63 Mon Sep 17 00:00:00 2001 From: Jens Pots Date: Thu, 2 May 2024 14:47:34 +0200 Subject: [PATCH] refactor: cleaned up parser --- src/main/kotlin/runner/Parser.kt | 210 +++++++++++++++++------------ src/main/kotlin/runner/Pipeline.kt | 2 +- 2 files changed, 125 insertions(+), 87 deletions(-) diff --git a/src/main/kotlin/runner/Parser.kt b/src/main/kotlin/runner/Parser.kt index 7454528..1e16ce4 100644 --- a/src/main/kotlin/runner/Parser.kt +++ b/src/main/kotlin/runner/Parser.kt @@ -5,6 +5,7 @@ import org.apache.jena.ontology.OntModelSpec import org.apache.jena.query.QueryExecutionFactory import org.apache.jena.query.QueryFactory import org.apache.jena.query.QuerySolution +import org.apache.jena.rdf.model.Model import org.apache.jena.rdf.model.ModelFactory import org.apache.jena.rdf.model.Resource import org.apache.jena.shacl.ShaclValidator @@ -16,7 +17,7 @@ import technology.idlab.compiler.MemoryClassLoader import technology.idlab.logging.Log /** - * Parse a solution to a runner.Processor instance. + * Parse a solution to a Processor instance. */ private fun QuerySolution.toProcessor(): Class { val file = this["file"] @@ -37,99 +38,88 @@ private fun QuerySolution.toProcessor(): Class { .let { it as Class } } -private fun QuerySolution.toStage(processors: List>): Processor { - val byName = processors.associateBy { it.simpleName } - - // Extract the list of arguments. - val name = this["processor"] - .toString() - .substringAfterLast("#") - - val values = this["values"].toString().split(";") - - val keys = this["keys"] - .toString() - .split(";") - .map { it.substringAfterLast("#") } +/** + * Read a model from a file and recursively import all referenced ontologies + * based on statements. + */ +private fun File.readModelRecursively(): Model { + val result = ModelFactory.createDefaultModel() + + val onthology = ModelFactory.createOntologyModel(OntModelSpec.OWL_MEM) + onthology.read(this.toURI().toString(), "TURTLE") + + // Import any referenced ontologies. + val imported: MutableSet = mutableSetOf() + val iter = onthology.listStatements(null, OWL.imports, null as Resource?) + while (iter.hasNext()) { + val statement = iter.nextStatement() + val uri = statement.getObject().toString() + + // Check if we still need to import the referenced ontology. + if (imported.contains(uri)) { + continue + } - val kinds = this["kinds"] - .toString() - .split(";") - .map { it.substringAfterLast("#") } + // Attempt importing the dataset. + Log.shared.info("Importing $uri") + try { + result.read(uri) + } catch (e: Exception) { + Log.shared.fatal(e) + } - // Retrieve a class instance of the Processor. - val processor = byName[name] ?: Log.shared.fatal("Processor $name not found") - val args = mutableMapOf() + imported.add(uri) + } - for (i in keys.indices) { - val key = keys[i] - val value = values[i] - val kind = kinds[i] + // Import original onthology into the model. + result.add(onthology) - Log.shared.debug("$key: $kind = $value") + return result +} - args[key] = when (kind) { - "integer" -> value.toInt() - "ChannelWriter" -> bridge - "ChannelReader" -> bridge - else -> Log.shared.fatal("Unknown kind $kind") - } +/** + * Validates a model against the SHACL schema defined inside the model itself. + */ +private fun Model.validate(): Model { + val graph = this.graph + val report = ShaclValidator.get().validate(graph, graph) + + // Exit if the validation failed. + if (!report.conforms()) { + val out = ByteArrayOutputStream() + report.model.write(out, "TURTLE") + Log.shared.fatal("Validation failed\n$out") } - val constructor = processor.getConstructor(Map::class.java) - return constructor.newInstance(args) + return this } // TODO: Create some sort of a factory. val bridge = MemoryBridge() class Parser(file: File) { - private val model = ModelFactory.createDefaultModel() - + /** + * An RDF model of the configuration file. + */ + private val model = file.readModelRecursively().validate() + + /** + * Class references to the different processors. + */ + private val processors: List> + + /** + * The stages of the pipeline. + */ + val stages: List + + /** + * Parse the model for processor declarations and save results as a field. + */ init { - val onthology = ModelFactory.createOntologyModel(OntModelSpec.OWL_MEM) - onthology.read(file.toURI().toString(), "TURTLE") - - // Import any referenced ontologies. - val imported: MutableSet = mutableSetOf() - val iter = onthology.listStatements(null, OWL.imports, null as Resource?) - while (iter.hasNext()) { - val statement = iter.nextStatement() - val uri = statement.getObject().toString() - - // Check if we still need to import the referenced ontology. - if (imported.contains(uri)) { - continue - } - - // Attempt importing the dataset. - Log.shared.info("Importing $uri") - try { - model.read(uri) - } catch (e: Exception) { - Log.shared.fatal(e) - } - - imported.add(uri) - } - - // Import original onthology into the model. - model.add(onthology) - - // Validate using SHACL. - val report = ShaclValidator.get().validate(model.graph, model.graph) - - // Exit if the validation failed. - if (!report.conforms()) { - val out = ByteArrayOutputStream() - report.model.write(out, "TURTLE") - Log.shared.fatal("Validation failed\n$out") - } - } - - private fun getProcessors(): List> { Log.shared.info("Parsing processors") val processors = mutableListOf>() + val query = this.javaClass.getResource("/queries/processors.sparql") ?.readText() ?.let { QueryFactory.create(it) } @@ -150,12 +140,16 @@ class Parser(file: File) { processors.add(processor) } - return processors + this.processors = processors } - fun getStages(): List { - val processors = getProcessors() + /** + * Parse the model for concrete stages, initializes the corresponding + * instances and saves the result to a field. + */ + init { Log.shared.info("Parsing stages") + val stages = mutableListOf() // Execute the stages query. val query = this.javaClass.getResource("/queries/stages.sparql") @@ -171,15 +165,59 @@ class Parser(file: File) { Log.shared.fatal("No processors found in the configuration") } - val result = mutableListOf() - while (iter.hasNext()) { val solution = iter.nextSolution() - val stage = solution.toStage(processors) + val stage = this.parseStage(solution) Log.shared.info("Stage ${stage.javaClass.name} initialised successfully") - result.add(stage) + stages.add(stage) + } + + this.stages = stages + } + + /** + * Initialize a Processor instance based on the query solution. + */ + private fun parseStage(querySolution: QuerySolution): Processor { + val byName = processors.associateBy { it.simpleName } + + // Extract the list of arguments. + val name = querySolution["processor"] + .toString() + .substringAfterLast("#") + + val values = querySolution["values"].toString().split(";") + + val keys = querySolution["keys"] + .toString() + .split(";") + .map { it.substringAfterLast("#") } + + val kinds = querySolution["kinds"] + .toString() + .split(";") + .map { it.substringAfterLast("#") } + + // Retrieve a class instance of the Processor. + val processor = byName[name] ?: Log.shared.fatal("Processor $name not found") + val args = mutableMapOf() + + for (i in keys.indices) { + val key = keys[i] + val value = values[i] + val kind = kinds[i] + + Log.shared.debug("$key: $kind = $value") + + args[key] = when (kind) { + "integer" -> value.toInt() + "ChannelWriter" -> bridge + "ChannelReader" -> bridge + else -> Log.shared.fatal("Unknown kind $kind") + } } - return result + val constructor = processor.getConstructor(Map::class.java) + return constructor.newInstance(args) } } diff --git a/src/main/kotlin/runner/Pipeline.kt b/src/main/kotlin/runner/Pipeline.kt index 13c5390..418a2d7 100644 --- a/src/main/kotlin/runner/Pipeline.kt +++ b/src/main/kotlin/runner/Pipeline.kt @@ -8,7 +8,7 @@ import kotlin.concurrent.thread class Pipeline(config: File) { /** Processors described in the config. */ - private val processors: List = Parser(config).getStages() + private val processors: List = Parser(config).stages /** * Execute all processors in the configuration in parallel, and block until