Skip to content

Commit

Permalink
feat: synchronous communication
Browse files Browse the repository at this point in the history
  • Loading branch information
jenspots committed Apr 29, 2024
1 parent 217bbe4 commit 2394112
Show file tree
Hide file tree
Showing 12 changed files with 143 additions and 44 deletions.
16 changes: 10 additions & 6 deletions src/main/java/runner/Processor.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,23 @@ public Processor() {
}

protected <T> T getArgument(String name) {
Log.Companion.getShared().debug(name);

Object result = arguments.get(name);

if (result == null) {
Log.Companion.getShared().fatal("Argument " + name + " is missing.");
}

return (T) result;
}

protected <T> Optional<T> getOptionalArgument(String name) {
Log.Companion.getShared().debug(name + " (optional)");
return Optional.ofNullable(getArgument(name));
}

public void setup() {
log.info("Setting up processor");
}
public void setup() {}

public void exec() {
log.info("Executing processor");
}
public void exec() {}
}
3 changes: 3 additions & 0 deletions src/main/kotlin/bridge/Bridge.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package bridge

interface Bridge: Reader, Writer
41 changes: 41 additions & 0 deletions src/main/kotlin/bridge/MemoryBridge.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking
import bridge.Bridge
import bridge.Reader
import technology.idlab.logging.Log

class MemoryBridge : Bridge {
private var channel = Channel<ByteArray>(10)

override fun pushSync(value: ByteArray) {
Log.shared.debug("Pushing ${value.size} bytes")
runBlocking { channel.send(value) }
Log.shared.debug("Done")
}

override fun readSync(): Reader.Result {
Log.shared.debug("Reading bytes")
val result = runBlocking { channel.receiveCatching() }

// Check if the channel got closed.
if (result.isClosed) {
return Reader.Result.closed()
}

// If an error occurred, the runner must handle it itself.
if (result.isFailure) {
Log.shared.fatal("Failed to read bytes")
}

val bytes = result.getOrThrow()
return Reader.Result.success(bytes)
}

override fun isClosed(): Boolean {
return channel.isClosedForSend
}

override fun close() {
channel.close()
}
}
43 changes: 43 additions & 0 deletions src/main/kotlin/bridge/Reader.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package bridge

import technology.idlab.logging.Log


interface Reader {
enum class ResultType {
SUCCESS, CLOSED;
}

class Result(private val type: ResultType, value: ByteArray) {
val value: ByteArray
get() {
if (type == ResultType.SUCCESS) {
return field
} else {
Log.shared.fatal("Cannot get value from invalid read.")
}
}

init {
this.value = value
}

fun isClosed(): Boolean {
return type == ResultType.CLOSED
}

companion object {
fun success(value: ByteArray): Result {
return Result(ResultType.SUCCESS, value)
}

fun closed(): Result {
return Result(ResultType.CLOSED, ByteArray(0))
}
}
}

fun readSync(): Result
fun isClosed(): Boolean
}

6 changes: 6 additions & 0 deletions src/main/kotlin/bridge/Writer.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package bridge

interface Writer {
fun pushSync(value: ByteArray)
fun close()
}
10 changes: 7 additions & 3 deletions src/main/kotlin/logging/Log.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class Log {
"TIME".padEnd(12, ' '),
"THREAD".padEnd(6, ' '),
"LEVEL".padEnd(7, ' '),
"LOCATION".padEnd(30, ' '),
"LOCATION".padEnd(50, ' '),
"MESSAGE",
).joinToString(" ")
println(header)
Expand All @@ -21,7 +21,7 @@ class Log {
"----".padEnd(12, ' '),
"------".padEnd(6, ' '),
"-----".padEnd(7, ' '),
"--------".padEnd(30, ' '),
"--------".padEnd(50, ' '),
"-------",
).joinToString(" ")
println(separator)
Expand All @@ -40,7 +40,7 @@ class Log {
time.padEnd(12, '0'),
"[${Thread.currentThread().id}]".padEnd(6, ' '),
level.padEnd(7, ' '),
name.padEnd(30, ' '),
name.padEnd(50, ' '),
message,
).joinToString(" ")

Expand Down Expand Up @@ -70,6 +70,10 @@ class Log {
exitProcess(1)
}

fun debug(message: String) {
print(message, "DEBUG")
}

companion object {
val shared = Log()
}
Expand Down
7 changes: 4 additions & 3 deletions src/main/kotlin/runner/Parser.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package technology.idlab.runner

import MemoryBridge
import io.reactivex.rxjava3.subjects.PublishSubject
import org.apache.jena.ontology.OntModelSpec
import org.apache.jena.query.QueryExecutionFactory
Expand Down Expand Up @@ -110,22 +111,22 @@ class Parser(file: File) {
Log.shared.info("Parsing stages")

// Initialize the channel.
val channel = PublishSubject.create<String>()
val bridge = MemoryBridge()

// Initialize the producer.
val producerClass = processors[0]
val producerArgs: MutableMap<String, Any> = mutableMapOf()
producerArgs["start"] = 0
producerArgs["end"] = 5
producerArgs["step"] = 1
producerArgs["outgoing"] = channel
producerArgs["writer"] = bridge
val producerConstructor = producerClass.getConstructor(Map::class.java)
val producer = producerConstructor.newInstance(producerArgs)

// Initialize the consumer.
val consumerClass = processors[1]
val consumerArgs: MutableMap<String, Any> = mutableMapOf()
consumerArgs["incoming"] = channel
consumerArgs["reader"] = bridge
val consumerConstructor = consumerClass.getConstructor(Map::class.java)
val consumer = consumerConstructor.newInstance(consumerArgs)

Expand Down
11 changes: 5 additions & 6 deletions src/main/kotlin/runner/Pipeline.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import kotlinx.coroutines.runBlocking
import runner.Processor
import technology.idlab.logging.Log
import java.io.File
import kotlin.concurrent.thread

class Pipeline(config: File) {
/** Processors described in the config. */
Expand All @@ -27,12 +28,10 @@ class Pipeline(config: File) {

// Run execution phase.
Log.shared.info("Running execution phase")
runBlocking {
processors.map {
async { it.exec() }
}.map {
it.await()
}
processors.map {
thread { it.exec() }
}.map {
it.join()
}

Log.shared.info("Pipeline executed successfully")
Expand Down
8 changes: 4 additions & 4 deletions src/main/resources/pipeline.ttl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
sh:maxCount 1;
].

# A channel takes in a reader and writer.
# A bridge takes in a reader and writer.
[]
a sh:NodeShape;
sh:targetClass jvm:Channel;
Expand All @@ -44,21 +44,21 @@ jvm:MemoryChannel rdfs:subClassOf jvm:Channel.
jvm:MemoryChannelWriter rdfs:subClassOf jvm:ChannelWriter.
jvm:MemoryChannelReader rdfs:subClassOf jvm:ChannelReader.

# Define a memory channel reader.
# Define a memory bridge reader.
[]
a sh:NodeShape;
sh:targetClass jvm:MemoryChannelReader;
sh:closed true;
sh:ignoredProperties ( rdf:type ).

# Define a memory channel writer.
# Define a memory bridge writer.
[]
a sh:NodeShape;
sh:targetClass jvm:MemoryChannelWriter;
sh:closed true;
sh:ignoredProperties ( rdf:type ).

# A memory channel takes in a single reader and writer.
# A memory bridge takes in a single reader and writer.
[]
a sh:NodeShape;
sh:targetClass jvm:MemoryChannel;
Expand Down
4 changes: 2 additions & 2 deletions src/test/resources/pipelines/range_reporter.ttl
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
<../processors/range.ttl>,
<../processors/reporter.ttl>.

# Define a memory channel.
# Define a memory bridge.
<reader> a jvm:MemoryChannelReader.
<writer> a jvm:MemoryChannelWriter.

<channel>
<bridge>
a jvm:MemoryChannel;
jvm:reader <reader>;
jvm:writer <writer>.
Expand Down
14 changes: 5 additions & 9 deletions src/test/resources/sources/Range.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import io.reactivex.rxjava3.subjects.PublishSubject;
import bridge.Writer;
import java.util.Map;
import runner.Processor;

Expand All @@ -9,7 +9,7 @@ public class Range extends Processor {
private final int step;

// Channels
private final PublishSubject<String> outgoing;
private final Writer writer;

public Range(Map<String, Object> args) {
// Call super constructor.
Expand All @@ -21,22 +21,18 @@ public Range(Map<String, Object> args) {
this.step = this.getArgument("step");

// Channels
this.outgoing = this.getArgument("outgoing");
}

public void setup() {
log.info("Binding to outgoing channel.");
this.writer = this.getArgument("writer");
}

public void exec() {
log.info("Initializing emitting loop.");

for (int i = start; i < end; i += step) {
log.info("Emitting " + i);
outgoing.onNext(Integer.toString(i));
writer.pushSync(Integer.toString(i).getBytes());
}

log.info("Closing outgoing channel.");
outgoing.onComplete();
writer.close();
}
}
24 changes: 13 additions & 11 deletions src/test/resources/sources/Reporter.java
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.Disposable;
import bridge.Reader;
import java.util.Map;
import runner.Processor;

public class Reporter extends Processor {
private final Observable<String> incoming;
private final Reader reader;

public Reporter(Map<String, Object> args) {
// Call super constructor.
super(args);

// Parameters
this.incoming = this.getArgument("incoming");
this.reader = this.getArgument("reader");
}

public void setup() {
// Local variables
Disposable disposable = incoming.subscribe(
item -> log.info("Received item: " + item),
error -> log.severe("Error: " + error),
() -> log.info("Channel closed.")
);
public void exec() {
while (!reader.isClosed()) {
Reader.Result result = reader.readSync();

if (result.isClosed()) {
break;
}

log.info("Received item: " + new String(result.getValue()));
}
}
}

0 comments on commit 2394112

Please sign in to comment.