From 84a1a0f4d632df8cb2ad5ef46fd792e684a85d57 Mon Sep 17 00:00:00 2001 From: Jens Pots Date: Thu, 2 May 2024 16:02:07 +0200 Subject: [PATCH] feat: remove bridge type in favor of channel setters --- src/main/kotlin/bridge/Bridge.kt | 3 - .../{MemoryBridge.kt => MemoryReader.kt} | 43 +++-- src/main/kotlin/bridge/MemoryWriter.kt | 41 +++++ src/main/kotlin/bridge/Reader.kt | 1 - src/main/kotlin/logging/Log.kt | 2 +- src/main/kotlin/runner/Parser.kt | 150 ++++++++++++++++-- src/main/resources/queries/bridges.sparql | 2 +- .../queries/{reader.sparql => readers.sparql} | 6 +- src/main/resources/queries/writers.sparql | 6 +- 9 files changed, 205 insertions(+), 49 deletions(-) delete mode 100644 src/main/kotlin/bridge/Bridge.kt rename src/main/kotlin/bridge/{MemoryBridge.kt => MemoryReader.kt} (62%) create mode 100644 src/main/kotlin/bridge/MemoryWriter.kt rename src/main/resources/queries/{reader.sparql => readers.sparql} (61%) diff --git a/src/main/kotlin/bridge/Bridge.kt b/src/main/kotlin/bridge/Bridge.kt deleted file mode 100644 index 4475d33..0000000 --- a/src/main/kotlin/bridge/Bridge.kt +++ /dev/null @@ -1,3 +0,0 @@ -package bridge - -interface Bridge: Reader, Writer diff --git a/src/main/kotlin/bridge/MemoryBridge.kt b/src/main/kotlin/bridge/MemoryReader.kt similarity index 62% rename from src/main/kotlin/bridge/MemoryBridge.kt rename to src/main/kotlin/bridge/MemoryReader.kt index e0cb055..db90786 100644 --- a/src/main/kotlin/bridge/MemoryBridge.kt +++ b/src/main/kotlin/bridge/MemoryReader.kt @@ -1,29 +1,26 @@ -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.runBlocking -import bridge.Bridge +package technology.idlab.bridge + import bridge.Reader +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ClosedReceiveChannelException +import kotlinx.coroutines.runBlocking import technology.idlab.logging.Log -class MemoryBridge : Bridge { - private var channel = Channel(10) - - override fun pushSync(value: ByteArray) { - Log.shared.debug("Pushing ${value.size} bytes") - runBlocking { channel.send(value) } - Log.shared.debug("Done") - } +class MemoryReader: Reader { + private var channel: Channel? = null - override suspend fun push(value: ByteArray) { - try { - channel.trySend(value) - } catch (e: Exception) { - Log.shared.fatal(e) + fun setChannel(channel: Channel) { + if (this.channel != null) { + Log.shared.fatal("Channel already set") } + + this.channel = channel } override fun readSync(): Reader.Result { Log.shared.debug("Reading bytes") + val channel = this.channel ?: Log.shared.fatal("Channel not set") + val result = runBlocking { channel.receiveCatching() } // Check if the channel got closed. @@ -41,21 +38,21 @@ class MemoryBridge : Bridge { } override suspend fun read(): Reader.Result { - try { + Log.shared.debug("Reading bytes") + val channel = this.channel ?: Log.shared.fatal("Channel not set") + + return try { val result = channel.receive() - return Reader.Result.success(result) + Reader.Result.success(result) } catch (e: ClosedReceiveChannelException) { - return Reader.Result.closed() + Reader.Result.closed() } catch (e: Exception) { Log.shared.fatal(e) } } override fun isClosed(): Boolean { + val channel = this.channel ?: Log.shared.fatal("Channel not set") return channel.isClosedForSend } - - override fun close() { - channel.close() - } } diff --git a/src/main/kotlin/bridge/MemoryWriter.kt b/src/main/kotlin/bridge/MemoryWriter.kt new file mode 100644 index 0000000..a514359 --- /dev/null +++ b/src/main/kotlin/bridge/MemoryWriter.kt @@ -0,0 +1,41 @@ +package technology.idlab.bridge + +import bridge.Writer +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.runBlocking +import technology.idlab.logging.Log + +class MemoryWriter: Writer { + private var channel: Channel? = null + + fun setChannel(channel: Channel) { + if (this.channel != null) { + Log.shared.fatal("Channel already set") + } + + this.channel = channel + } + + override fun pushSync(value: ByteArray) { + val channel = this.channel ?: Log.shared.fatal("Channel not set") + + Log.shared.debug("Pushing ${value.size} bytes") + runBlocking { channel.send(value) } + Log.shared.debug("Done") + } + + override suspend fun push(value: ByteArray) { + val channel = this.channel ?: Log.shared.fatal("Channel not set") + + try { + channel.trySend(value) + } catch (e: Exception) { + Log.shared.fatal(e) + } + } + + override fun close() { + val channel = this.channel ?: Log.shared.fatal("Channel not set") + channel.close() + } +} diff --git a/src/main/kotlin/bridge/Reader.kt b/src/main/kotlin/bridge/Reader.kt index 1f422ab..a7b5f76 100644 --- a/src/main/kotlin/bridge/Reader.kt +++ b/src/main/kotlin/bridge/Reader.kt @@ -41,4 +41,3 @@ interface Reader { fun readSync(): Result fun isClosed(): Boolean } - diff --git a/src/main/kotlin/logging/Log.kt b/src/main/kotlin/logging/Log.kt index c3f65f4..05c467e 100644 --- a/src/main/kotlin/logging/Log.kt +++ b/src/main/kotlin/logging/Log.kt @@ -34,7 +34,7 @@ class Log private constructor() { val time = tz.format(iso) val caller = Throwable().stackTrace[2] - val name = "${caller.className.substringAfterLast(".")}::${caller.methodName}" + val name = "${caller.className.substringAfterLast(".")}::${caller.methodName}::${caller.lineNumber}" val line = listOf( time.padEnd(12, '0'), diff --git a/src/main/kotlin/runner/Parser.kt b/src/main/kotlin/runner/Parser.kt index 1e16ce4..f261c97 100644 --- a/src/main/kotlin/runner/Parser.kt +++ b/src/main/kotlin/runner/Parser.kt @@ -1,6 +1,9 @@ package technology.idlab.runner -import MemoryBridge +import bridge.HttpReader +import bridge.Reader +import bridge.Writer +import kotlinx.coroutines.channels.Channel import org.apache.jena.ontology.OntModelSpec import org.apache.jena.query.QueryExecutionFactory import org.apache.jena.query.QueryFactory @@ -10,6 +13,9 @@ import org.apache.jena.rdf.model.ModelFactory import org.apache.jena.rdf.model.Resource import org.apache.jena.shacl.ShaclValidator import org.apache.jena.vocabulary.OWL +import technology.idlab.bridge.HttpWriter +import technology.idlab.bridge.MemoryReader +import technology.idlab.bridge.MemoryWriter import java.io.ByteArrayOutputStream import java.io.File import technology.idlab.compiler.Compiler @@ -94,9 +100,6 @@ private fun Model.validate(): Model { return this } -// TODO: Create some sort of a factory. -val bridge = MemoryBridge() - class Parser(file: File) { /** * An RDF model of the configuration file. @@ -108,6 +111,10 @@ class Parser(file: File) { */ private val processors: List> + private val readers: Map + + private val writers: Map + /** * The stages of the pipeline. */ @@ -143,6 +150,121 @@ class Parser(file: File) { this.processors = processors } + /** + * Parse the model for readers. + */ + init { + Log.shared.info("Parsing readers") + val readers = mutableMapOf() + + val query = this.javaClass.getResource("/queries/readers.sparql") + .let { it ?: Log.shared.fatal("Failed to read readers.sparql") } + .readText() + .let { QueryFactory.create(it) } + + val iter = QueryExecutionFactory + .create(query, model) + .execSelect() + + while (iter.hasNext()) { + val solution = iter.nextSolution() + + val subClass = solution["subClass"] + .toString() + .substringAfterLast("#") + + val identifier = solution["reader"] + .toString() + + val reader = when (subClass) { + "MemoryChannelReader" -> MemoryReader() + "HttpChannelReader" -> HttpReader() + else -> Log.shared.fatal("Reader $subClass not found") + } + + readers[identifier] = reader + } + + this.readers = readers + } + + /** + * Parse the model for writers. + */ + init { + Log.shared.info("Parsing writers") + val writers = mutableMapOf() + + val query = this.javaClass.getResource("/queries/writers.sparql") + .let { it ?: Log.shared.fatal("Failed to read writers.sparql") } + .readText() + .let { QueryFactory.create(it) } + + val iter = QueryExecutionFactory + .create(query, model) + .execSelect() + + while (iter.hasNext()) { + val solution = iter.nextSolution() + + val subClass = solution["subClass"] + .toString() + .substringAfterLast("#") + + val identifier = solution["writer"] + .toString() + + val reader = when (subClass) { + "MemoryChannelWriter" -> MemoryWriter() + "HttpChannelWriter" -> HttpWriter("http://localhost:8080") + else -> Log.shared.fatal("Reader $subClass not found") + } + + writers[identifier] = reader + } + + this.writers = writers + } + + /** + * Parse the model for bridges. Readers and writers that may be bridges in + * a single runner instance will be bound to each other here. + */ + init { + Log.shared.info("Parsing bridges") + + val query = this.javaClass.getResource("/queries/bridges.sparql") + ?.readText() + ?.let { QueryFactory.create(it) } + + val iter = QueryExecutionFactory + .create(query, model) + .execSelect() + + while (iter.hasNext()) { + val solution = iter.nextSolution() + + val readerId = solution["reader"].toString() + val writerId = solution["writer"].toString() + + val reader = readers[readerId] ?: Log.shared.fatal("Reader $readerId not found") + val writer = writers[writerId] ?: Log.shared.fatal("Writer $writerId not found") + val channel = Channel(1) + + if (reader is MemoryReader) { + reader.setChannel(channel) + } else { + Log.shared.fatal("Reader $readerId is not a MemoryReader") + } + + if (writer is MemoryWriter) { + writer.setChannel(channel) + } else { + Log.shared.fatal("Writer $writerId is not a MemoryWriter") + } + } + } + /** * Parse the model for concrete stages, initializes the corresponding * instances and saves the result to a field. @@ -188,12 +310,12 @@ class Parser(file: File) { val values = querySolution["values"].toString().split(";") - val keys = querySolution["keys"] + val argumentNames = querySolution["keys"] .toString() .split(";") .map { it.substringAfterLast("#") } - val kinds = querySolution["kinds"] + val types = querySolution["kinds"] .toString() .split(";") .map { it.substringAfterLast("#") } @@ -202,18 +324,18 @@ class Parser(file: File) { val processor = byName[name] ?: Log.shared.fatal("Processor $name not found") val args = mutableMapOf() - for (i in keys.indices) { - val key = keys[i] + for (i in argumentNames.indices) { + val argumentName = argumentNames[i] val value = values[i] - val kind = kinds[i] + val type = types[i] - Log.shared.debug("$key: $kind = $value") + Log.shared.debug("$argumentName: $type = $value") - args[key] = when (kind) { + args[argumentName] = when (type) { "integer" -> value.toInt() - "ChannelWriter" -> bridge - "ChannelReader" -> bridge - else -> Log.shared.fatal("Unknown kind $kind") + "ChannelWriter" -> writers[value] ?: Log.shared.fatal("Writer $argumentName not found") + "ChannelReader" -> readers[value] ?: Log.shared.fatal("Reader $argumentName not found") + else -> Log.shared.fatal("Unknown type $type") } } diff --git a/src/main/resources/queries/bridges.sparql b/src/main/resources/queries/bridges.sparql index 4b0332d..fbe4198 100644 --- a/src/main/resources/queries/bridges.sparql +++ b/src/main/resources/queries/bridges.sparql @@ -2,7 +2,7 @@ PREFIX jvm: PREFIX rdf: PREFIX rdfs: -SELECT ?channel ?subclass ?writer ?reader +SELECT ?channel ?writer ?reader { ?channel a ?subclass. ?subclass rdfs:subClassOf jvm:Channel. diff --git a/src/main/resources/queries/reader.sparql b/src/main/resources/queries/readers.sparql similarity index 61% rename from src/main/resources/queries/reader.sparql rename to src/main/resources/queries/readers.sparql index de4f313..ec9faf2 100644 --- a/src/main/resources/queries/reader.sparql +++ b/src/main/resources/queries/readers.sparql @@ -2,8 +2,8 @@ PREFIX jvm: PREFIX rdf: PREFIX rdfs: -SELECT ?reader ?subclass +SELECT ?reader ?subClass { - ?reader a ?subclass. - ?subclass rdfs:subClassOf jvm:ChannelReader. + ?reader a ?subClass. + ?subClass rdfs:subClassOf jvm:ChannelReader. } diff --git a/src/main/resources/queries/writers.sparql b/src/main/resources/queries/writers.sparql index ccb956d..3d3be2c 100644 --- a/src/main/resources/queries/writers.sparql +++ b/src/main/resources/queries/writers.sparql @@ -2,8 +2,8 @@ PREFIX jvm: PREFIX rdf: PREFIX rdfs: -SELECT ?writer ?subclass +SELECT ?writer ?subClass { - ?writer a ?subclass. - ?subclass rdfs:subClassOf jvm:ChannelWriter. + ?writer a ?subClass. + ?subClass rdfs:subClassOf jvm:ChannelWriter. }