Skip to content

Commit

Permalink
feat: remove bridge type in favor of channel setters
Browse files Browse the repository at this point in the history
  • Loading branch information
jenspots committed May 2, 2024
1 parent 986a83b commit 84a1a0f
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 49 deletions.
3 changes: 0 additions & 3 deletions src/main/kotlin/bridge/Bridge.kt

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,29 +1,26 @@
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking
import bridge.Bridge
package technology.idlab.bridge

import bridge.Reader
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.runBlocking
import technology.idlab.logging.Log

class MemoryBridge : Bridge {
private var channel = Channel<ByteArray>(10)

override fun pushSync(value: ByteArray) {
Log.shared.debug("Pushing ${value.size} bytes")
runBlocking { channel.send(value) }
Log.shared.debug("Done")
}
class MemoryReader: Reader {
private var channel: Channel<ByteArray>? = null

override suspend fun push(value: ByteArray) {
try {
channel.trySend(value)
} catch (e: Exception) {
Log.shared.fatal(e)
fun setChannel(channel: Channel<ByteArray>) {
if (this.channel != null) {
Log.shared.fatal("Channel already set")
}

this.channel = channel
}

override fun readSync(): Reader.Result {
Log.shared.debug("Reading bytes")
val channel = this.channel ?: Log.shared.fatal("Channel not set")

val result = runBlocking { channel.receiveCatching() }

// Check if the channel got closed.
Expand All @@ -41,21 +38,21 @@ class MemoryBridge : Bridge {
}

override suspend fun read(): Reader.Result {
try {
Log.shared.debug("Reading bytes")
val channel = this.channel ?: Log.shared.fatal("Channel not set")

return try {
val result = channel.receive()
return Reader.Result.success(result)
Reader.Result.success(result)
} catch (e: ClosedReceiveChannelException) {
return Reader.Result.closed()
Reader.Result.closed()
} catch (e: Exception) {
Log.shared.fatal(e)
}
}

override fun isClosed(): Boolean {
val channel = this.channel ?: Log.shared.fatal("Channel not set")
return channel.isClosedForSend
}

override fun close() {
channel.close()
}
}
41 changes: 41 additions & 0 deletions src/main/kotlin/bridge/MemoryWriter.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package technology.idlab.bridge

import bridge.Writer
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking
import technology.idlab.logging.Log

class MemoryWriter: Writer {
private var channel: Channel<ByteArray>? = null

fun setChannel(channel: Channel<ByteArray>) {
if (this.channel != null) {
Log.shared.fatal("Channel already set")
}

this.channel = channel
}

override fun pushSync(value: ByteArray) {
val channel = this.channel ?: Log.shared.fatal("Channel not set")

Log.shared.debug("Pushing ${value.size} bytes")
runBlocking { channel.send(value) }
Log.shared.debug("Done")
}

override suspend fun push(value: ByteArray) {
val channel = this.channel ?: Log.shared.fatal("Channel not set")

try {
channel.trySend(value)
} catch (e: Exception) {
Log.shared.fatal(e)
}
}

override fun close() {
val channel = this.channel ?: Log.shared.fatal("Channel not set")
channel.close()
}
}
1 change: 0 additions & 1 deletion src/main/kotlin/bridge/Reader.kt
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,3 @@ interface Reader {
fun readSync(): Result
fun isClosed(): Boolean
}

2 changes: 1 addition & 1 deletion src/main/kotlin/logging/Log.kt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class Log private constructor() {
val time = tz.format(iso)

val caller = Throwable().stackTrace[2]
val name = "${caller.className.substringAfterLast(".")}::${caller.methodName}"
val name = "${caller.className.substringAfterLast(".")}::${caller.methodName}::${caller.lineNumber}"

val line = listOf(
time.padEnd(12, '0'),
Expand Down
150 changes: 136 additions & 14 deletions src/main/kotlin/runner/Parser.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package technology.idlab.runner

import MemoryBridge
import bridge.HttpReader
import bridge.Reader
import bridge.Writer
import kotlinx.coroutines.channels.Channel
import org.apache.jena.ontology.OntModelSpec
import org.apache.jena.query.QueryExecutionFactory
import org.apache.jena.query.QueryFactory
Expand All @@ -10,6 +13,9 @@ import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.Resource
import org.apache.jena.shacl.ShaclValidator
import org.apache.jena.vocabulary.OWL
import technology.idlab.bridge.HttpWriter
import technology.idlab.bridge.MemoryReader
import technology.idlab.bridge.MemoryWriter
import java.io.ByteArrayOutputStream
import java.io.File
import technology.idlab.compiler.Compiler
Expand Down Expand Up @@ -94,9 +100,6 @@ private fun Model.validate(): Model {
return this
}

// TODO: Create some sort of a factory.
val bridge = MemoryBridge()

class Parser(file: File) {
/**
* An RDF model of the configuration file.
Expand All @@ -108,6 +111,10 @@ class Parser(file: File) {
*/
private val processors: List<Class<Processor>>

private val readers: Map<String, Reader>

private val writers: Map<String, Writer>

/**
* The stages of the pipeline.
*/
Expand Down Expand Up @@ -143,6 +150,121 @@ class Parser(file: File) {
this.processors = processors
}

/**
* Parse the model for readers.
*/
init {
Log.shared.info("Parsing readers")
val readers = mutableMapOf<String, Reader>()

val query = this.javaClass.getResource("/queries/readers.sparql")
.let { it ?: Log.shared.fatal("Failed to read readers.sparql") }
.readText()
.let { QueryFactory.create(it) }

val iter = QueryExecutionFactory
.create(query, model)
.execSelect()

while (iter.hasNext()) {
val solution = iter.nextSolution()

val subClass = solution["subClass"]
.toString()
.substringAfterLast("#")

val identifier = solution["reader"]
.toString()

val reader = when (subClass) {
"MemoryChannelReader" -> MemoryReader()
"HttpChannelReader" -> HttpReader()
else -> Log.shared.fatal("Reader $subClass not found")
}

readers[identifier] = reader
}

this.readers = readers
}

/**
* Parse the model for writers.
*/
init {
Log.shared.info("Parsing writers")
val writers = mutableMapOf<String, Writer>()

val query = this.javaClass.getResource("/queries/writers.sparql")
.let { it ?: Log.shared.fatal("Failed to read writers.sparql") }
.readText()
.let { QueryFactory.create(it) }

val iter = QueryExecutionFactory
.create(query, model)
.execSelect()

while (iter.hasNext()) {
val solution = iter.nextSolution()

val subClass = solution["subClass"]
.toString()
.substringAfterLast("#")

val identifier = solution["writer"]
.toString()

val reader = when (subClass) {
"MemoryChannelWriter" -> MemoryWriter()
"HttpChannelWriter" -> HttpWriter("http://localhost:8080")
else -> Log.shared.fatal("Reader $subClass not found")
}

writers[identifier] = reader
}

this.writers = writers
}

/**
* Parse the model for bridges. Readers and writers that may be bridges in
* a single runner instance will be bound to each other here.
*/
init {
Log.shared.info("Parsing bridges")

val query = this.javaClass.getResource("/queries/bridges.sparql")
?.readText()
?.let { QueryFactory.create(it) }

val iter = QueryExecutionFactory
.create(query, model)
.execSelect()

while (iter.hasNext()) {
val solution = iter.nextSolution()

val readerId = solution["reader"].toString()
val writerId = solution["writer"].toString()

val reader = readers[readerId] ?: Log.shared.fatal("Reader $readerId not found")
val writer = writers[writerId] ?: Log.shared.fatal("Writer $writerId not found")
val channel = Channel<ByteArray>(1)

if (reader is MemoryReader) {
reader.setChannel(channel)
} else {
Log.shared.fatal("Reader $readerId is not a MemoryReader")
}

if (writer is MemoryWriter) {
writer.setChannel(channel)
} else {
Log.shared.fatal("Writer $writerId is not a MemoryWriter")
}
}
}

/**
* Parse the model for concrete stages, initializes the corresponding
* instances and saves the result to a field.
Expand Down Expand Up @@ -188,12 +310,12 @@ class Parser(file: File) {

val values = querySolution["values"].toString().split(";")

val keys = querySolution["keys"]
val argumentNames = querySolution["keys"]
.toString()
.split(";")
.map { it.substringAfterLast("#") }

val kinds = querySolution["kinds"]
val types = querySolution["kinds"]
.toString()
.split(";")
.map { it.substringAfterLast("#") }
Expand All @@ -202,18 +324,18 @@ class Parser(file: File) {
val processor = byName[name] ?: Log.shared.fatal("Processor $name not found")
val args = mutableMapOf<String, Any>()

for (i in keys.indices) {
val key = keys[i]
for (i in argumentNames.indices) {
val argumentName = argumentNames[i]
val value = values[i]
val kind = kinds[i]
val type = types[i]

Log.shared.debug("$key: $kind = $value")
Log.shared.debug("$argumentName: $type = $value")

args[key] = when (kind) {
args[argumentName] = when (type) {
"integer" -> value.toInt()
"ChannelWriter" -> bridge
"ChannelReader" -> bridge
else -> Log.shared.fatal("Unknown kind $kind")
"ChannelWriter" -> writers[value] ?: Log.shared.fatal("Writer $argumentName not found")
"ChannelReader" -> readers[value] ?: Log.shared.fatal("Reader $argumentName not found")
else -> Log.shared.fatal("Unknown type $type")
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/queries/bridges.sparql
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ PREFIX jvm: <https://w3id.org/conn/jvm#>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>

SELECT ?channel ?subclass ?writer ?reader
SELECT ?channel ?writer ?reader
{
?channel a ?subclass.
?subclass rdfs:subClassOf jvm:Channel.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ PREFIX jvm: <https://w3id.org/conn/jvm#>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>

SELECT ?reader ?subclass
SELECT ?reader ?subClass
{
?reader a ?subclass.
?subclass rdfs:subClassOf jvm:ChannelReader.
?reader a ?subClass.
?subClass rdfs:subClassOf jvm:ChannelReader.
}
6 changes: 3 additions & 3 deletions src/main/resources/queries/writers.sparql
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ PREFIX jvm: <https://w3id.org/conn/jvm#>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>

SELECT ?writer ?subclass
SELECT ?writer ?subClass
{
?writer a ?subclass.
?subclass rdfs:subClassOf jvm:ChannelWriter.
?writer a ?subClass.
?subClass rdfs:subClassOf jvm:ChannelWriter.
}

0 comments on commit 84a1a0f

Please sign in to comment.