Skip to content

Commit

Permalink
refactor: parser uses higher order query function
Browse files Browse the repository at this point in the history
  • Loading branch information
jenspots committed May 2, 2024
1 parent a0a397a commit cecd6d4
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 141 deletions.
215 changes: 75 additions & 140 deletions src/main/kotlin/runner/Parser.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,6 @@ import technology.idlab.compiler.Compiler
import technology.idlab.compiler.MemoryClassLoader
import technology.idlab.logging.Log

/** Parse a solution to a Processor instance. */
private fun QuerySolution.toProcessor(): Class<Processor> {
val file = this["file"].toString().drop(7).let { File(it) }

// Either compile or load the file.
val bytes =
if (file.absolutePath.endsWith(".java")) {
Compiler.compile(file)
} else {
file.readBytes()
}

// Load the class and return.
return MemoryClassLoader().fromBytes(bytes, file.nameWithoutExtension).let {
it as Class<Processor>
}
}

/**
* Read a model from a file and recursively import all referenced ontologies based on <owl:import>
* statements.
Expand Down Expand Up @@ -79,6 +61,23 @@ private fun File.readModelRecursively(): Model {
return result
}

private fun Model.query(resource: String, func: (QuerySolution) -> Unit) {
val query =
object {}
.javaClass
.getResource(resource)
.let { it ?: Log.shared.fatal("Failed to read $resource") }
.readText()
.let { QueryFactory.create(it) }

val iter = QueryExecutionFactory.create(query, this).execSelect()

while (iter.hasNext()) {
val solution = iter.nextSolution()
func(solution)
}
}

/** Validates a model against the SHACL schema defined inside the model itself. */
private fun Model.validate(): Model {
val graph = this.graph
Expand All @@ -99,62 +98,45 @@ class Parser(file: File) {
private val model = file.readModelRecursively().validate()

/** Class references to the different processors. */
private val processors: List<Class<Processor>>
private val processors: MutableMap<String, Class<Processor>> = mutableMapOf()

private val readers: Map<String, Reader>
/** A list of all the readers in the model. */
private val readers: MutableMap<String, Reader> = mutableMapOf()

private val writers: Map<String, Writer>
/** A list of all the writers in the model. */
private val writers: MutableMap<String, Writer> = mutableMapOf()

/** The stages of the pipeline. */
val stages: List<Processor>
private val stages: MutableList<Processor> = mutableListOf()

/** Parse the model for processor declarations and save results as a field. */
init {
Log.shared.info("Parsing processors")
val processors = mutableListOf<Class<Processor>>()

val query =
this.javaClass.getResource("/queries/processors.sparql")?.readText()?.let {
QueryFactory.create(it)
}

// Execute the query.
val iter = QueryExecutionFactory.create(query, model).execSelect()
model.query("/queries/processors.sparql") {
val path = it["file"].toString().drop(7)
val sourceFile = File(path)

if (!iter.hasNext()) {
Log.shared.fatal("No processors found in the configuration")
}
val bytes =
if (sourceFile.absolutePath.endsWith(".java")) {
Compiler.compile(sourceFile)
} else {
sourceFile.readBytes()
}

while (iter.hasNext()) {
val solution = iter.nextSolution()
val processor = solution.toProcessor()
Log.shared.info("Class ${processor.name} initialised successfully")
processors.add(processor)
val processor =
MemoryClassLoader().fromBytes(bytes, sourceFile.nameWithoutExtension) as Class<Processor>
processors[processor.simpleName] = processor
}

this.processors = processors
}

/** Parse the model for readers. */
init {
Log.shared.info("Parsing readers")
val readers = mutableMapOf<String, Reader>()

val query =
this.javaClass
.getResource("/queries/readers.sparql")
.let { it ?: Log.shared.fatal("Failed to read readers.sparql") }
.readText()
.let { QueryFactory.create(it) }

val iter = QueryExecutionFactory.create(query, model).execSelect()

while (iter.hasNext()) {
val solution = iter.nextSolution()

val subClass = solution["subClass"].toString().substringAfterLast("#")

val identifier = solution["reader"].toString()
model.query("/queries/readers.sparql") {
val subClass = it["subClass"].toString().substringAfterLast("#")
val identifier = it["reader"].toString()

val reader =
when (subClass) {
Expand All @@ -165,42 +147,25 @@ class Parser(file: File) {

readers[identifier] = reader
}

this.readers = readers
}

/** Parse the model for writers. */
init {
Log.shared.info("Parsing writers")
val writers = mutableMapOf<String, Writer>()

val query =
this.javaClass
.getResource("/queries/writers.sparql")
.let { it ?: Log.shared.fatal("Failed to read writers.sparql") }
.readText()
.let { QueryFactory.create(it) }
model.query("/queries/writers.sparql") {
val subClass = it["subClass"].toString().substringAfterLast("#")
val identifier = it["writer"].toString()

val iter = QueryExecutionFactory.create(query, model).execSelect()

while (iter.hasNext()) {
val solution = iter.nextSolution()

val subClass = solution["subClass"].toString().substringAfterLast("#")

val identifier = solution["writer"].toString()

val reader =
val writer =
when (subClass) {
"MemoryChannelWriter" -> MemoryWriter()
"HttpChannelWriter" -> HttpWriter("http://localhost:8080")
else -> Log.shared.fatal("Reader $subClass not found")
}

writers[identifier] = reader
writers[identifier] = writer
}

this.writers = writers
}

/**
Expand All @@ -210,18 +175,9 @@ class Parser(file: File) {
init {
Log.shared.info("Parsing bridges")

val query =
this.javaClass.getResource("/queries/bridges.sparql")?.readText()?.let {
QueryFactory.create(it)
}

val iter = QueryExecutionFactory.create(query, model).execSelect()

while (iter.hasNext()) {
val solution = iter.nextSolution()

val readerId = solution["reader"].toString()
val writerId = solution["writer"].toString()
model.query("/queries/bridges.sparql") {
val readerId = it["reader"].toString()
val writerId = it["writer"].toString()

val reader = readers[readerId] ?: Log.shared.fatal("Reader $readerId not found")
val writer = writers[writerId] ?: Log.shared.fatal("Writer $writerId not found")
Expand All @@ -247,66 +203,45 @@ class Parser(file: File) {
*/
init {
Log.shared.info("Parsing stages")
val stages = mutableListOf<Processor>()

// Execute the stages query.
val query =
this.javaClass.getResource("/queries/stages.sparql")?.readText()?.let {
QueryFactory.create(it)
}

// Execute the query.
val iter = QueryExecutionFactory.create(query, model).execSelect()
model.query("/queries/stages.sparql") {
val name = it["processor"].toString().substringAfterLast("#")

if (!iter.hasNext()) {
Log.shared.fatal("No processors found in the configuration")
}

while (iter.hasNext()) {
val solution = iter.nextSolution()
val stage = this.parseStage(solution)
Log.shared.info("Stage ${stage.javaClass.name} initialised successfully")
stages.add(stage)
}

this.stages = stages
}
val values = it["values"].toString().split(";")

/** Initialize a Processor instance based on the query solution. */
private fun parseStage(querySolution: QuerySolution): Processor {
val byName = processors.associateBy { it.simpleName }
val argumentNames = it["keys"].toString().split(";").map { it.substringAfterLast("#") }

// Extract the list of arguments.
val name = querySolution["processor"].toString().substringAfterLast("#")
val types = it["kinds"].toString().split(";").map { it.substringAfterLast("#") }

val values = querySolution["values"].toString().split(";")
// Retrieve a class instance of the Processor.
val processor = processors[name] ?: Log.shared.fatal("Processor $name not found")
val args = mutableMapOf<String, Any>()

val argumentNames =
querySolution["keys"].toString().split(";").map { it.substringAfterLast("#") }
for (i in argumentNames.indices) {
val argumentName = argumentNames[i]
val value = values[i]
val type = types[i]

val types = querySolution["kinds"].toString().split(";").map { it.substringAfterLast("#") }
Log.shared.debug("$argumentName: $type = $value")

// Retrieve a class instance of the Processor.
val processor = byName[name] ?: Log.shared.fatal("Processor $name not found")
val args = mutableMapOf<String, Any>()

for (i in argumentNames.indices) {
val argumentName = argumentNames[i]
val value = values[i]
val type = types[i]

Log.shared.debug("$argumentName: $type = $value")
args[argumentName] =
when (type) {
"integer" -> value.toInt()
"ChannelWriter" ->
writers[value] ?: Log.shared.fatal("Writer $argumentName not found")
"ChannelReader" ->
readers[value] ?: Log.shared.fatal("Reader $argumentName not found")
else -> Log.shared.fatal("Unknown type $type")
}
}

args[argumentName] =
when (type) {
"integer" -> value.toInt()
"ChannelWriter" -> writers[value] ?: Log.shared.fatal("Writer $argumentName not found")
"ChannelReader" -> readers[value] ?: Log.shared.fatal("Reader $argumentName not found")
else -> Log.shared.fatal("Unknown type $type")
}
val constructor = processor.getConstructor(Map::class.java)
val instance = constructor.newInstance(args)
this.stages.add(instance)
}
}

val constructor = processor.getConstructor(Map::class.java)
return constructor.newInstance(args)
fun getStages(): List<Processor> {
return stages
}
}
2 changes: 1 addition & 1 deletion src/main/kotlin/runner/Pipeline.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import technology.idlab.logging.Log

class Pipeline(config: File) {
/** Processors described in the config. */
private val processors: List<Processor> = Parser(config).stages
private val processors: List<Processor> = Parser(config).getStages()

/** Execute all processors in the configuration in parallel, and block until all are done. */
fun executeSync() {
Expand Down

0 comments on commit cecd6d4

Please sign in to comment.