Skip to content

Commit

Permalink
feat: extend demo pipeline with square
Browse files Browse the repository at this point in the history
  • Loading branch information
jenspots committed May 2, 2024
1 parent 84a1a0f commit f070dc9
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 33 deletions.
2 changes: 0 additions & 2 deletions src/main/kotlin/bridge/MemoryReader.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ class MemoryReader: Reader {
}

override fun readSync(): Reader.Result {
Log.shared.debug("Reading bytes")
val channel = this.channel ?: Log.shared.fatal("Channel not set")

val result = runBlocking { channel.receiveCatching() }
Expand All @@ -38,7 +37,6 @@ class MemoryReader: Reader {
}

override suspend fun read(): Reader.Result {
Log.shared.debug("Reading bytes")
val channel = this.channel ?: Log.shared.fatal("Channel not set")

return try {
Expand Down
3 changes: 0 additions & 3 deletions src/main/kotlin/bridge/MemoryWriter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ class MemoryWriter: Writer {

override fun pushSync(value: ByteArray) {
val channel = this.channel ?: Log.shared.fatal("Channel not set")

Log.shared.debug("Pushing ${value.size} bytes")
runBlocking { channel.send(value) }
Log.shared.debug("Done")
}

override suspend fun push(value: ByteArray) {
Expand Down
12 changes: 1 addition & 11 deletions src/main/kotlin/runner/Pipeline.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,8 @@ class Pipeline(config: File) {
* all are done.
*/
fun executeSync() {
// Run setup phase.
Log.shared.info("Running setup phase")
runBlocking {
processors.map {
async { it.setup() }
}.map {
it.await()
}
}
Log.shared.info("Executing pipeline.")

// Run execution phase.
Log.shared.info("Running execution phase")
processors.map {
thread { it.exec() }
}.map {
Expand Down
4 changes: 1 addition & 3 deletions src/main/kotlin/runner/Processor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,5 @@ abstract class Processor {
return Optional.ofNullable(result) as Optional<T>
}

open fun setup() {}

open fun exec() {}
abstract fun exec()
}
33 changes: 25 additions & 8 deletions src/test/resources/pipelines/range_reporter.ttl
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,44 @@
# Include processor definitions.
<> owl:imports
<../pipeline.ttl>,
<../processors/square.ttl>,
<../processors/range.ttl>,
<../processors/reporter.ttl>.

# Define a memory bridge.
<reader> a jvm:MemoryChannelReader.
<writer> a jvm:MemoryChannelWriter.
# Range -> Square
<reader1> a jvm:MemoryChannelReader.
<writer1> a jvm:MemoryChannelWriter.

<bridge>
<bridge1>
a jvm:MemoryChannel;
jvm:reader <reader>;
jvm:writer <writer>.
jvm:reader <reader1>;
jvm:writer <writer1>.

<reader2> a jvm:MemoryChannelReader.
<writer2> a jvm:MemoryChannelWriter.

# Square -> Reporter
<bridge2>
a jvm:MemoryChannel;
jvm:reader <reader2>;
jvm:writer <writer2>.

# Define a range processor.
[]
a jvm:Range;
jvm:start "0"^^xsd:integer;
jvm:end "10"^^xsd:integer;
jvm:step "1"^^xsd:integer;
jvm:output <writer>.
jvm:output <writer1>.

# Define a square processor.
[]
a jvm:Square;
jvm:input <reader1>;
jvm:output <writer2>.

# Define a reporter.
[]
a jvm:Reporter;
jvm:input <reader>.
jvm:input <reader2>.

32 changes: 32 additions & 0 deletions src/test/resources/processors/square.ttl
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
@prefix jvm: <https://w3id.org/conn/jvm#>.
@prefix fno: <https://w3id.org/function/ontology#>.
@prefix fnom: <https://w3id.org/function/vocabulary/mapping#>.
@prefix xsd: <http://www.w3.org/2001/XMLSchema#>.
@prefix owl: <http://www.w3.org/2002/07/owl#>.
@prefix : <https://w3id.org/conn#>.
@prefix sh: <http://www.w3.org/ns/shacl#>.
@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>.

<> owl:imports <../pipeline.ttl>.

jvm:Square a jvm:Processor;
jvm:file <../sources/Square.java>;
jvm:language "Java".

[] a sh:NodeShape;
sh:targetClass jvm:Square;
sh:property [
sh:path jvm:input;
sh:name "input";
sh:class jvm:ChannelReader;
sh:minCount 1;
sh:maxCount 1;
], [
sh:path jvm:output;
sh:name "output";
sh:class jvm:ChannelWriter;
sh:minCount 1;
sh:maxCount 1;
];
sh:closed true;
sh:ignoredProperties (rdf:type).
6 changes: 1 addition & 5 deletions src/test/resources/sources/Range.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,10 @@ public Range(Map<String, Object> args) {
}

public void exec() {
log.info("Initializing emitting loop.");

for (int i = start; i < end; i += step) {
log.info("Emitting " + i);
log.info(Integer.toString(i));
writer.pushSync(Integer.toString(i).getBytes());
}

log.info("Closing outgoing channel.");
writer.close();
}
}
2 changes: 1 addition & 1 deletion src/test/resources/sources/Reporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public void exec() {
break;
}

log.info("Received item: " + new String(result.getValue()));
log.info(new String(result.getValue()));
}
}
}
39 changes: 39 additions & 0 deletions src/test/resources/sources/Square.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import bridge.Reader;
import bridge.Writer;
import java.util.Map;

import technology.idlab.logging.Log;
import technology.idlab.runner.Processor;

public class Square extends Processor {
// Channels
private final Reader reader;
private final Writer writer;

public Square(Map<String, Object> args) {
// Call super constructor.
super(args);

// Channels
this.reader = this.getArgument("input");
this.writer = this.getArgument("output");
}

public void exec() {
while (true) {
Reader.Result data = reader.readSync();

if (data.isClosed()) {
break;
}

int value = Integer.parseInt(new String(data.getValue()));
int square = value * value;
byte[] result = Integer.toString(square).getBytes();

log.info(value + " * " + value + " = " + square);
writer.pushSync(result);
}
writer.close();
}
}

0 comments on commit f070dc9

Please sign in to comment.