diff --git a/src/main/kotlin/runner/Parser.kt b/src/main/kotlin/runner/Parser.kt index f251b2f..ed77f4e 100644 --- a/src/main/kotlin/runner/Parser.kt +++ b/src/main/kotlin/runner/Parser.kt @@ -22,24 +22,6 @@ import technology.idlab.compiler.Compiler import technology.idlab.compiler.MemoryClassLoader import technology.idlab.logging.Log -/** Parse a solution to a Processor instance. */ -private fun QuerySolution.toProcessor(): Class { - val file = this["file"].toString().drop(7).let { File(it) } - - // Either compile or load the file. - val bytes = - if (file.absolutePath.endsWith(".java")) { - Compiler.compile(file) - } else { - file.readBytes() - } - - // Load the class and return. - return MemoryClassLoader().fromBytes(bytes, file.nameWithoutExtension).let { - it as Class - } -} - /** * Read a model from a file and recursively import all referenced ontologies based on * statements. @@ -79,6 +61,23 @@ private fun File.readModelRecursively(): Model { return result } +private fun Model.query(resource: String, func: (QuerySolution) -> Unit) { + val query = + object {} + .javaClass + .getResource(resource) + .let { it ?: Log.shared.fatal("Failed to read $resource") } + .readText() + .let { QueryFactory.create(it) } + + val iter = QueryExecutionFactory.create(query, this).execSelect() + + while (iter.hasNext()) { + val solution = iter.nextSolution() + func(solution) + } +} + /** Validates a model against the SHACL schema defined inside the model itself. */ private fun Model.validate(): Model { val graph = this.graph @@ -99,62 +98,45 @@ class Parser(file: File) { private val model = file.readModelRecursively().validate() /** Class references to the different processors. */ - private val processors: List> + private val processors: MutableMap> = mutableMapOf() - private val readers: Map + /** A list of all the readers in the model. */ + private val readers: MutableMap = mutableMapOf() - private val writers: Map + /** A list of all the writers in the model. */ + private val writers: MutableMap = mutableMapOf() /** The stages of the pipeline. */ - val stages: List + private val stages: MutableList = mutableListOf() /** Parse the model for processor declarations and save results as a field. */ init { Log.shared.info("Parsing processors") - val processors = mutableListOf>() - - val query = - this.javaClass.getResource("/queries/processors.sparql")?.readText()?.let { - QueryFactory.create(it) - } - // Execute the query. - val iter = QueryExecutionFactory.create(query, model).execSelect() + model.query("/queries/processors.sparql") { + val path = it["file"].toString().drop(7) + val sourceFile = File(path) - if (!iter.hasNext()) { - Log.shared.fatal("No processors found in the configuration") - } + val bytes = + if (sourceFile.absolutePath.endsWith(".java")) { + Compiler.compile(sourceFile) + } else { + sourceFile.readBytes() + } - while (iter.hasNext()) { - val solution = iter.nextSolution() - val processor = solution.toProcessor() - Log.shared.info("Class ${processor.name} initialised successfully") - processors.add(processor) + val processor = + MemoryClassLoader().fromBytes(bytes, sourceFile.nameWithoutExtension) as Class + processors[processor.simpleName] = processor } - - this.processors = processors } /** Parse the model for readers. */ init { Log.shared.info("Parsing readers") - val readers = mutableMapOf() - val query = - this.javaClass - .getResource("/queries/readers.sparql") - .let { it ?: Log.shared.fatal("Failed to read readers.sparql") } - .readText() - .let { QueryFactory.create(it) } - - val iter = QueryExecutionFactory.create(query, model).execSelect() - - while (iter.hasNext()) { - val solution = iter.nextSolution() - - val subClass = solution["subClass"].toString().substringAfterLast("#") - - val identifier = solution["reader"].toString() + model.query("/queries/readers.sparql") { + val subClass = it["subClass"].toString().substringAfterLast("#") + val identifier = it["reader"].toString() val reader = when (subClass) { @@ -165,42 +147,25 @@ class Parser(file: File) { readers[identifier] = reader } - - this.readers = readers } /** Parse the model for writers. */ init { Log.shared.info("Parsing writers") - val writers = mutableMapOf() - val query = - this.javaClass - .getResource("/queries/writers.sparql") - .let { it ?: Log.shared.fatal("Failed to read writers.sparql") } - .readText() - .let { QueryFactory.create(it) } + model.query("/queries/writers.sparql") { + val subClass = it["subClass"].toString().substringAfterLast("#") + val identifier = it["writer"].toString() - val iter = QueryExecutionFactory.create(query, model).execSelect() - - while (iter.hasNext()) { - val solution = iter.nextSolution() - - val subClass = solution["subClass"].toString().substringAfterLast("#") - - val identifier = solution["writer"].toString() - - val reader = + val writer = when (subClass) { "MemoryChannelWriter" -> MemoryWriter() "HttpChannelWriter" -> HttpWriter("http://localhost:8080") else -> Log.shared.fatal("Reader $subClass not found") } - writers[identifier] = reader + writers[identifier] = writer } - - this.writers = writers } /** @@ -210,18 +175,9 @@ class Parser(file: File) { init { Log.shared.info("Parsing bridges") - val query = - this.javaClass.getResource("/queries/bridges.sparql")?.readText()?.let { - QueryFactory.create(it) - } - - val iter = QueryExecutionFactory.create(query, model).execSelect() - - while (iter.hasNext()) { - val solution = iter.nextSolution() - - val readerId = solution["reader"].toString() - val writerId = solution["writer"].toString() + model.query("/queries/bridges.sparql") { + val readerId = it["reader"].toString() + val writerId = it["writer"].toString() val reader = readers[readerId] ?: Log.shared.fatal("Reader $readerId not found") val writer = writers[writerId] ?: Log.shared.fatal("Writer $writerId not found") @@ -247,66 +203,45 @@ class Parser(file: File) { */ init { Log.shared.info("Parsing stages") - val stages = mutableListOf() - - // Execute the stages query. - val query = - this.javaClass.getResource("/queries/stages.sparql")?.readText()?.let { - QueryFactory.create(it) - } - // Execute the query. - val iter = QueryExecutionFactory.create(query, model).execSelect() + model.query("/queries/stages.sparql") { + val name = it["processor"].toString().substringAfterLast("#") - if (!iter.hasNext()) { - Log.shared.fatal("No processors found in the configuration") - } - - while (iter.hasNext()) { - val solution = iter.nextSolution() - val stage = this.parseStage(solution) - Log.shared.info("Stage ${stage.javaClass.name} initialised successfully") - stages.add(stage) - } - - this.stages = stages - } + val values = it["values"].toString().split(";") - /** Initialize a Processor instance based on the query solution. */ - private fun parseStage(querySolution: QuerySolution): Processor { - val byName = processors.associateBy { it.simpleName } + val argumentNames = it["keys"].toString().split(";").map { it.substringAfterLast("#") } - // Extract the list of arguments. - val name = querySolution["processor"].toString().substringAfterLast("#") + val types = it["kinds"].toString().split(";").map { it.substringAfterLast("#") } - val values = querySolution["values"].toString().split(";") + // Retrieve a class instance of the Processor. + val processor = processors[name] ?: Log.shared.fatal("Processor $name not found") + val args = mutableMapOf() - val argumentNames = - querySolution["keys"].toString().split(";").map { it.substringAfterLast("#") } + for (i in argumentNames.indices) { + val argumentName = argumentNames[i] + val value = values[i] + val type = types[i] - val types = querySolution["kinds"].toString().split(";").map { it.substringAfterLast("#") } + Log.shared.debug("$argumentName: $type = $value") - // Retrieve a class instance of the Processor. - val processor = byName[name] ?: Log.shared.fatal("Processor $name not found") - val args = mutableMapOf() - - for (i in argumentNames.indices) { - val argumentName = argumentNames[i] - val value = values[i] - val type = types[i] - - Log.shared.debug("$argumentName: $type = $value") + args[argumentName] = + when (type) { + "integer" -> value.toInt() + "ChannelWriter" -> + writers[value] ?: Log.shared.fatal("Writer $argumentName not found") + "ChannelReader" -> + readers[value] ?: Log.shared.fatal("Reader $argumentName not found") + else -> Log.shared.fatal("Unknown type $type") + } + } - args[argumentName] = - when (type) { - "integer" -> value.toInt() - "ChannelWriter" -> writers[value] ?: Log.shared.fatal("Writer $argumentName not found") - "ChannelReader" -> readers[value] ?: Log.shared.fatal("Reader $argumentName not found") - else -> Log.shared.fatal("Unknown type $type") - } + val constructor = processor.getConstructor(Map::class.java) + val instance = constructor.newInstance(args) + this.stages.add(instance) } + } - val constructor = processor.getConstructor(Map::class.java) - return constructor.newInstance(args) + fun getStages(): List { + return stages } } diff --git a/src/main/kotlin/runner/Pipeline.kt b/src/main/kotlin/runner/Pipeline.kt index 9344d52..ce4033e 100644 --- a/src/main/kotlin/runner/Pipeline.kt +++ b/src/main/kotlin/runner/Pipeline.kt @@ -6,7 +6,7 @@ import technology.idlab.logging.Log class Pipeline(config: File) { /** Processors described in the config. */ - private val processors: List = Parser(config).stages + private val processors: List = Parser(config).getStages() /** Execute all processors in the configuration in parallel, and block until all are done. */ fun executeSync() {