From 8d0e0167023e4386fdea9f76c0d98fa9d3c6ca7e Mon Sep 17 00:00:00 2001 From: Jens Pots Date: Sun, 23 Jun 2024 23:25:21 +0200 Subject: [PATCH] feat: fixed integration with nodejs runner --- .github/workflows/test.yml | 11 +++ runners/nodejs/src/proto/.keep | 0 runners/nodejs/src/proto/README.md | 1 + runners/nodejs/src/runtime/index.ts | 8 +- runners/nodejs/src/runtime/server.ts | 5 + runners/nodejs/src/std/transparent.ts | 5 + src/main/kotlin/Main.kt | 14 +-- src/main/kotlin/Orchestrator.kt | 96 +++++++++++++++++++ src/main/kotlin/runner/Runner.kt | 8 +- src/main/kotlin/runner/impl/GRPCRunner.kt | 9 +- src/main/kotlin/runner/impl/NodeRunner.kt | 11 ++- src/main/kotlin/runner/jvm/JVMRunner.kt | 9 +- src/main/kotlin/runner/jvm/Reader.kt | 25 +---- src/main/kotlin/runner/jvm/Writer.kt | 4 +- src/main/kotlin/std/Transparent.kt | 2 + src/main/kotlin/util/Log.kt | 63 ++++++++---- src/test/kotlin/OrchestratorTest.kt | 28 ++++++ src/test/kotlin/processors/NodeTransparent.kt | 42 ++++++++ src/test/kotlin/processors/TappedReader.kt | 55 +++++++++++ src/test/kotlin/processors/TappedWriter.kt | 55 +++++++++++ src/test/kotlin/runner/impl/NodeRunnerTest.kt | 2 +- 21 files changed, 385 insertions(+), 68 deletions(-) create mode 100644 runners/nodejs/src/proto/.keep create mode 100644 runners/nodejs/src/proto/README.md create mode 100644 src/main/kotlin/Orchestrator.kt create mode 100644 src/test/kotlin/OrchestratorTest.kt create mode 100644 src/test/kotlin/processors/NodeTransparent.kt create mode 100644 src/test/kotlin/processors/TappedReader.kt create mode 100644 src/test/kotlin/processors/TappedWriter.kt diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 2bce5e7..30d0d8e 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -25,5 +25,16 @@ jobs: distribution: 'adopt' cache: 'gradle' + - name: Set up NPM + uses: actions/setup-node@v2 + with: + node-version: '21' + + - name: Install NPM dependencies + run: npm install --prefix runners/nodejs + + - name: Build NPM runner + run: npm run build --prefix runners/nodejs + - name: Run tests run: ./gradlew test diff --git a/runners/nodejs/src/proto/.keep b/runners/nodejs/src/proto/.keep new file mode 100644 index 0000000..e69de29 diff --git a/runners/nodejs/src/proto/README.md b/runners/nodejs/src/proto/README.md new file mode 100644 index 0000000..c4eac67 --- /dev/null +++ b/runners/nodejs/src/proto/README.md @@ -0,0 +1 @@ +The files in this directory are automatically generated and should not be updated by hand. diff --git a/runners/nodejs/src/runtime/index.ts b/runners/nodejs/src/runtime/index.ts index b8d009c..ffab57b 100644 --- a/runners/nodejs/src/runtime/index.ts +++ b/runners/nodejs/src/runtime/index.ts @@ -2,21 +2,17 @@ import { Server, ServerCredentials } from "@grpc/grpc-js"; import { ServerImplementation } from "./server"; import { RunnerService } from "../proto"; -// Get arguments. +/** The socket at which gRPC binds is decided by the orchestrator. */ const host = process.argv[2]; const port = process.argv[3]; -console.log(`gRPC targeting ${host}:${port}`); // Initialize the server. -console.log("Initializing server."); const server = new Server(); // Add the Runner service. -console.log("Adding Runner service."); server.addService(RunnerService, new ServerImplementation()); // Startup. -console.log("Starting server."); server.bindAsync( `${host}:${port}`, ServerCredentials.createInsecure(), @@ -24,7 +20,7 @@ server.bindAsync( if (error) { return console.error(error); } else { - console.log(`Server started on port ${port}.`); + console.log(`gRPC up and running (port=${port})`); } }, ); diff --git a/runners/nodejs/src/runtime/server.ts b/runners/nodejs/src/runtime/server.ts index 25814d6..dc19e5a 100644 --- a/runners/nodejs/src/runtime/server.ts +++ b/runners/nodejs/src/runtime/server.ts @@ -42,12 +42,15 @@ export class ServerImplementation implements RunnerServer { call: ServerUnaryCall, callback: sendUnaryData, ): void { + console.log("gRPC::prepareProcessor::invoke"); Runner.shared .prepareProcessor(call.request) .then(() => { + console.log("gRPC::prepareProcessor::success"); callback(null, {}); }) .catch((e) => { + console.log("gRPC::prepareProcessor::error"); callback(e, {}); }); } @@ -56,7 +59,9 @@ export class ServerImplementation implements RunnerServer { call: ServerUnaryCall, callback: sendUnaryData, ): void { + console.log("gRPC::prepareProcessor::invoke"); Runner.shared.exec().then(() => { + console.log("gRPC::prepareProcessor::success"); callback(null, {}); }); } diff --git a/runners/nodejs/src/std/transparent.ts b/runners/nodejs/src/std/transparent.ts index fd4ed05..7f6662b 100644 --- a/runners/nodejs/src/std/transparent.ts +++ b/runners/nodejs/src/std/transparent.ts @@ -2,6 +2,10 @@ import { Processor } from "../interfaces/processor"; import { Reader } from "../interfaces/reader"; import { Writer } from "../interfaces/writer"; +/** + * The Transparent processor reads data and transmits it directly to it's output, while also logging the data to the + * console. This processor is only used for debugging purposes. + */ export default class Transparent extends Processor { private readonly input = this.getArgument("input"); private readonly output = this.getArgument("output"); @@ -13,6 +17,7 @@ export default class Transparent extends Processor { if (!data) { break; } + console.log(data.toString()); this.output.write(data); } } diff --git a/src/main/kotlin/Main.kt b/src/main/kotlin/Main.kt index 55ca0dd..60a56bd 100644 --- a/src/main/kotlin/Main.kt +++ b/src/main/kotlin/Main.kt @@ -6,20 +6,22 @@ import kotlinx.coroutines.runBlocking import technology.idlab.parser.Parser fun main(args: Array) = runBlocking { - // Parse arguments. + /** + * At the moment, the only argument that the runtime accepts is the path to the pipeline + * declaration file. + */ if (args.size != 1) { println("Usage: jvm-runner ") exitProcess(0) } - // Configuration. + /** + * We start off by parsing the configuration file. This file contains the list of processors and + * stages that the runtime should prepare, as well as channel declarations. + */ val configPath = args[0] val config = File(configPath) val parser = Parser.create(config) - - // Initialize the processors. val processors = parser.processors() - - // Initialize the stages. val stages = parser.stages() } diff --git a/src/main/kotlin/Orchestrator.kt b/src/main/kotlin/Orchestrator.kt new file mode 100644 index 0000000..35dd426 --- /dev/null +++ b/src/main/kotlin/Orchestrator.kt @@ -0,0 +1,96 @@ +package technology.idlab + +import kotlin.concurrent.thread +import kotlinx.coroutines.async +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.runBlocking +import runner.Runner +import runner.impl.NodeRunner +import runner.jvm.JVMRunner +import technology.idlab.parser.intermediate.IRParameter +import technology.idlab.parser.intermediate.IRProcessor +import technology.idlab.parser.intermediate.IRStage +import technology.idlab.util.Log + +class Orchestrator(stages: Set) { + /** List of all processors in the pipeline. */ + private val processors = stages.map { it.processor }.toSet() + + /** + * A channel which listens to all incoming messages and distributes them according to the topology + * of the runners. + */ + private val channel = + Channel().also { + thread { + runBlocking { + for (payload in it) { + // Special URI for printing to the console. + if (payload.destinationURI == "print") { + println(payload.data.decodeToString()) + continue + } + + // Get the runner and send the message. + val runner = readers[payload.destinationURI] + runner!!.getIncomingChannel().send(payload) + } + } + } + } + + /** An exhaustive list of all runners. */ + private val jvmRunner by lazy { JVMRunner(channel) } + private val nodeRunner by lazy { NodeRunner(channel, 5000) } + private val runners = listOf(nodeRunner, jvmRunner) + + /** A map of all channel URIs and their readers. */ + private val readers = mutableMapOf() + + init { + /** Initialize the processors and stages in the runtimes. */ + runBlocking { + processors.forEach { processor -> prepare(processor) } + stages.forEach { stage -> prepare(stage) } + } + } + + /** Prepare a processor inside of it's corresponding runtime. */ + private suspend fun prepare(processor: IRProcessor) { + val runner = getRuntime(processor.target) + runner.prepare(processor) + } + + /** Prepare a stage inside of it's corresponding runtime. */ + private suspend fun prepare(stage: IRStage) { + // Get the corresponding runner. + val runner = getRuntime(stage.processor.target) + runner.prepare(stage) + + // Find all the readers in the stage. + val readers = + stage.processor.parameters.filter { it.type == IRParameter.Type.READER }.map { it.name } + + // Get their concrete URIs. + val uris = stage.arguments.filter { readers.contains(it.name) }.map { it.value[0] } + + // Add them as a channel targets. + uris.forEach { this.readers[it] = runner } + } + + /** Execute all stages in all the runtimes. */ + suspend fun exec() = coroutineScope { + Log.shared.info("Bringing all stages online.") + runners.map { async { it.exec() } }.forEach { it.await() } + Log.shared.info("All stages are online.") + } + + /** Get a lazy evaluated runner. */ + private fun getRuntime(target: Runner.Target): Runner { + return when (target) { + Runner.Target.JVM -> this.jvmRunner + Runner.Target.NODEJS -> this.nodeRunner + } + } +} diff --git a/src/main/kotlin/runner/Runner.kt b/src/main/kotlin/runner/Runner.kt index a544ee8..605e97b 100644 --- a/src/main/kotlin/runner/Runner.kt +++ b/src/main/kotlin/runner/Runner.kt @@ -5,7 +5,10 @@ import technology.idlab.parser.intermediate.IRProcessor import technology.idlab.parser.intermediate.IRStage import technology.idlab.util.Log -abstract class Runner { +abstract class Runner( + /* Message which must be transmitted to the outside world. */ + protected val outgoing: Channel = Channel() +) { /** The state of a runtime. */ enum class Status { STARTING, @@ -50,9 +53,6 @@ abstract class Runner { /* Messages which are destined to a processor inside the runner. */ protected val incoming: Channel = Channel() - /* Message which must be transmitted to the outside world. */ - protected val outgoing: Channel = Channel() - /** Register and prepare a processor inside the runtime. */ abstract suspend fun prepare(processor: IRProcessor) diff --git a/src/main/kotlin/runner/impl/GRPCRunner.kt b/src/main/kotlin/runner/impl/GRPCRunner.kt index a2d1ced..3d1a150 100644 --- a/src/main/kotlin/runner/impl/GRPCRunner.kt +++ b/src/main/kotlin/runner/impl/GRPCRunner.kt @@ -8,6 +8,7 @@ import com.google.protobuf.ByteString import io.grpc.ManagedChannelBuilder import io.ktor.utils.io.errors.* import kotlin.concurrent.thread +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow import kotlinx.coroutines.runBlocking @@ -85,7 +86,11 @@ private fun IRProcessor.toGRPC(): GRPC.IRProcessor { * This runner has GRPC built-in, so the only configuration that an extending class needs to provide * is the host and port of the GRPC server, as well as actually booting the process. */ -abstract class GRPCRunner(host: String, protected val port: Int) : Runner() { +abstract class GRPCRunner( + outgoing: Channel = Channel(), + host: String, + protected val port: Int +) : Runner(outgoing) { /** Handle to the child process. */ private val process by lazy { createProcess() } @@ -162,7 +167,9 @@ abstract class GRPCRunner(host: String, protected val port: Int) : Runner() { } override suspend fun exec() { + Log.shared.debug("gRPC::exec::invoke") grpc.exec(empty) + Log.shared.debug("gRPC::exec::success") } override fun halt() { diff --git a/src/main/kotlin/runner/impl/NodeRunner.kt b/src/main/kotlin/runner/impl/NodeRunner.kt index 990e8a0..3ea1960 100644 --- a/src/main/kotlin/runner/impl/NodeRunner.kt +++ b/src/main/kotlin/runner/impl/NodeRunner.kt @@ -1,19 +1,24 @@ package runner.impl import java.io.File +import kotlinx.coroutines.channels.Channel import technology.idlab.runner.impl.GRPCRunner import technology.idlab.util.Log -class NodeRunner(port: Int) : GRPCRunner("localhost", port) { +class NodeRunner(outgoing: Channel = Channel(), port: Int) : + GRPCRunner(outgoing, "localhost", port) { override fun createProcess(): Process { // Configuration. - val directory = "/Users/jens/Developer/technology.idlab.jvm-runner/runners/nodejs/build/runtime" + val relative = "runners/nodejs/build/runtime" + val directory = File(".").resolve(relative) + Log.shared.debug("Node working directory: ${directory.canonicalPath}") + val command = listOf("node", "index.js", "localhost", port.toString()) Log.shared.info("Starting process: `${command.joinToString(" ")}`") // Initialize the process. val processBuilder = ProcessBuilder(command) - processBuilder.directory(File(directory)) + processBuilder.directory(directory) try { return processBuilder.start() } catch (e: Exception) { diff --git a/src/main/kotlin/runner/jvm/JVMRunner.kt b/src/main/kotlin/runner/jvm/JVMRunner.kt index 2bcd5df..20c68da 100644 --- a/src/main/kotlin/runner/jvm/JVMRunner.kt +++ b/src/main/kotlin/runner/jvm/JVMRunner.kt @@ -9,13 +9,16 @@ import technology.idlab.parser.intermediate.IRProcessor import technology.idlab.parser.intermediate.IRStage import technology.idlab.util.Log -class JVMRunner : Runner() { +class JVMRunner(outgoing: Channel = Channel()) : Runner(outgoing) { private val processors = mutableMapOf>>() private val stages = mutableMapOf() /** Incoming messages are delegated to sub channels. These are mapped by their URI. */ private val readers = mutableMapOf>() + /** Keep track of all spawned threads. */ + private var threads = mutableListOf() + // Handle incoming messages. private val handler = thread { try { @@ -85,7 +88,9 @@ class JVMRunner : Runner() { } override suspend fun exec() { - this.stages.values.map { thread { it.exec() } }.map { it.join() } + Log.shared.info("Bringing JVM stages online.") + this.stages.values.forEach { this.threads.add(thread { it.exec() }) } + Log.shared.info("All stages are online.") } override suspend fun status(): Status { diff --git a/src/main/kotlin/runner/jvm/Reader.kt b/src/main/kotlin/runner/jvm/Reader.kt index 55f18f9..660a7d9 100644 --- a/src/main/kotlin/runner/jvm/Reader.kt +++ b/src/main/kotlin/runner/jvm/Reader.kt @@ -40,26 +40,7 @@ class Reader(private val channel: Channel) { } } - suspend fun push(bytes: ByteArray) { - channel.send(bytes) - } - - fun readSync(): Result { - val result = runBlocking { channel.receiveCatching() } - - // Check if the channel got closed. - if (result.isClosed) { - return Result.closed() - } - - // If an error occurred, the runner must handle it itself. - if (result.isFailure) { - Log.shared.fatal("Failed to read bytes") - } - - val bytes = result.getOrThrow() - return Result.success(bytes) - } + fun readSync(): Result = runBlocking { read() } suspend fun read(): Result { return try { @@ -71,8 +52,4 @@ class Reader(private val channel: Channel) { Log.shared.fatal(e) } } - - fun isClosed(): Boolean { - return channel.isClosedForSend - } } diff --git a/src/main/kotlin/runner/jvm/Writer.kt b/src/main/kotlin/runner/jvm/Writer.kt index 9bc100a..0303d9c 100644 --- a/src/main/kotlin/runner/jvm/Writer.kt +++ b/src/main/kotlin/runner/jvm/Writer.kt @@ -6,9 +6,7 @@ import runner.Runner import technology.idlab.util.Log class Writer(private val channel: Channel, private val destination: String) { - fun pushSync(value: ByteArray) { - runBlocking { channel.send(Runner.Payload(destination, value)) } - } + fun pushSync(value: ByteArray) = runBlocking { push(value) } fun push(value: ByteArray) { try { diff --git a/src/main/kotlin/std/Transparent.kt b/src/main/kotlin/std/Transparent.kt index db2baf5..b3a06e2 100644 --- a/src/main/kotlin/std/Transparent.kt +++ b/src/main/kotlin/std/Transparent.kt @@ -3,6 +3,7 @@ package technology.idlab.std import runner.jvm.Processor import runner.jvm.Reader import runner.jvm.Writer +import technology.idlab.util.Log class Transparent(args: Map) : Processor(args) { private val input = this.getArgument("input") @@ -16,6 +17,7 @@ class Transparent(args: Map) : Processor(args) { break } + Log.shared.info("Received: ${result.value}") output.pushSync(result.value) } } diff --git a/src/main/kotlin/util/Log.kt b/src/main/kotlin/util/Log.kt index 297a97a..c23e57a 100644 --- a/src/main/kotlin/util/Log.kt +++ b/src/main/kotlin/util/Log.kt @@ -7,6 +7,32 @@ import kotlin.Exception import technology.idlab.exception.RunnerException class Log private constructor() { + enum class Level { + INFO, + SEVERE, + FATAL, + DEBUG, + ; + + fun style(string: String): String { + return when (this) { + INFO -> string + SEVERE -> string + FATAL -> string + DEBUG -> "\u001B[34m${string}\u001B[0m" + } + } + + fun code(): String { + return when (this) { + INFO -> "INFO" + SEVERE -> "SEVERE" + FATAL -> "FATAL" + DEBUG -> "DEBUG" + } + } + } + init { val header = listOf( @@ -31,54 +57,55 @@ class Log private constructor() { println(separator) } - private fun print(message: String, level: String) { + private fun line(message: String, level: Level): String { val instant = Date().toInstant() val tz = instant.atZone(TimeZone.getDefault().toZoneId()) val iso = DateTimeFormatter.ISO_LOCAL_TIME val time = tz.format(iso) - val caller = Throwable().stackTrace[2] + val caller = Throwable().stackTrace[3] val name = "${caller.className.substringAfterLast(".")}::${caller.methodName}::${caller.lineNumber}" - val line = - listOf( - time.padEnd(12, '0'), - "[${Thread.currentThread().id}]".padEnd(6, ' '), - level.padEnd(7, ' '), - name.padEnd(50, ' '), - message, - ) - .joinToString(" ") + return listOf( + time.padEnd(12, '0'), + "[${Thread.currentThread().id}]".padEnd(6, ' '), + level.code().padEnd(7, ' '), + name.padEnd(50, ' '), + message, + ) + .joinToString(" ") + } - println(line) + private fun toConsole(message: String, level: Level) { + println(level.style(line(message, level))) } fun info(message: String) { - print(message, "INFO") + toConsole(message, Level.INFO) } fun severe(message: String) { - print(message, "SEVERE") + toConsole(message, Level.SEVERE) } fun fatal(message: String): Nothing { - print(message, "FATAL") + toConsole(message, Level.FATAL) throw RunnerException() } fun fatal(exception: Exception): Nothing { - print(exception.message.toString(), "FATAL") + toConsole(exception.message.toString(), Level.FATAL) throw RunnerException() } fun fatal(message: String, exception: Exception) { - print("$message - ${exception.message}") + toConsole("$message - ${exception.message}", Level.FATAL) throw RunnerException() } fun debug(message: String) { - print(message, "DEBUG") + toConsole(message, Level.DEBUG) } fun assert(condition: Boolean, message: String) { diff --git a/src/test/kotlin/OrchestratorTest.kt b/src/test/kotlin/OrchestratorTest.kt new file mode 100644 index 0000000..db85a6f --- /dev/null +++ b/src/test/kotlin/OrchestratorTest.kt @@ -0,0 +1,28 @@ +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlinx.coroutines.runBlocking +import processors.NodeTransparent +import processors.TappedReader +import processors.TappedWriter +import technology.idlab.Orchestrator + +class OrchestratorTest { + @Test + fun channelTest(): Unit = runBlocking { + val stages = + setOf( + TappedWriter.stage("in"), NodeTransparent.stage("in", "out"), TappedReader.stage("out")) + val orchestrator = Orchestrator(stages) + + // Bring pipeline online. + orchestrator.exec() + + // Send message into the pipeline. + val data = "Hello, World!".encodeToByteArray() + TappedWriter.input.send(data) + + // Check the result. + val result = TappedReader.output.receive() + assertEquals(data.decodeToString(), result.decodeToString()) + } +} diff --git a/src/test/kotlin/processors/NodeTransparent.kt b/src/test/kotlin/processors/NodeTransparent.kt new file mode 100644 index 0000000..e9467e8 --- /dev/null +++ b/src/test/kotlin/processors/NodeTransparent.kt @@ -0,0 +1,42 @@ +package processors + +import runner.Runner +import technology.idlab.parser.intermediate.IRArgument +import technology.idlab.parser.intermediate.IRParameter +import technology.idlab.parser.intermediate.IRProcessor +import technology.idlab.parser.intermediate.IRStage + +class NodeTransparent { + companion object { + val processor = + IRProcessor( + "transparent", + Runner.Target.NODEJS, + listOf( + IRParameter( + "input", + IRParameter.Type.READER, + IRParameter.Presence.REQUIRED, + IRParameter.Count.SINGLE, + ), + IRParameter( + "output", + IRParameter.Type.WRITER, + IRParameter.Presence.REQUIRED, + IRParameter.Count.SINGLE, + ), + ), + mapOf("import" to "../std/transparent.js"), + ) + + fun stage(channelInURI: String, channelOutURI: String): IRStage { + return IRStage( + "transparent_stage", + processor, + listOf( + IRArgument("input", listOf(channelInURI)), + IRArgument("output", listOf(channelOutURI))), + ) + } + } +} diff --git a/src/test/kotlin/processors/TappedReader.kt b/src/test/kotlin/processors/TappedReader.kt new file mode 100644 index 0000000..e4b67a3 --- /dev/null +++ b/src/test/kotlin/processors/TappedReader.kt @@ -0,0 +1,55 @@ +package processors + +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.runBlocking +import runner.Runner +import runner.jvm.Processor +import runner.jvm.Reader +import technology.idlab.parser.intermediate.IRArgument +import technology.idlab.parser.intermediate.IRParameter +import technology.idlab.parser.intermediate.IRProcessor +import technology.idlab.parser.intermediate.IRStage + +/** + * The TappedReader processor provides a convenient way to read data from the pipeline during + * testing. All incoming data will be written to a global channel, which can be used directly during + * testing to read data from. + */ +class TappedReader(args: Map) : Processor(args) { + /** The channel which is exposed to the pipeline. */ + private val input = this.getArgument("input") + + /** Continuously read data from the input and write it to the global channel. */ + override fun exec() = runBlocking { + while (true) { + val read = input.read() + output.send(read.value) + } + } + + companion object { + /** Global channel into which all data is dumped. */ + val output = Channel() + + /** Implementation of this processor as IR. */ + val processor = + IRProcessor( + "tapped_reader", + Runner.Target.JVM, + listOf( + IRParameter( + "input", + IRParameter.Type.READER, + IRParameter.Presence.REQUIRED, + IRParameter.Count.SINGLE, + ), + ), + mapOf("class" to "processors.TappedReader"), + ) + + fun stage(channelURI: String): IRStage { + return IRStage( + "tapped_reader_stage", processor, listOf(IRArgument("input", listOf(channelURI)))) + } + } +} diff --git a/src/test/kotlin/processors/TappedWriter.kt b/src/test/kotlin/processors/TappedWriter.kt new file mode 100644 index 0000000..b4c7ac6 --- /dev/null +++ b/src/test/kotlin/processors/TappedWriter.kt @@ -0,0 +1,55 @@ +package processors + +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.runBlocking +import runner.Runner +import runner.jvm.Processor +import runner.jvm.Writer +import technology.idlab.parser.intermediate.IRArgument +import technology.idlab.parser.intermediate.IRParameter +import technology.idlab.parser.intermediate.IRProcessor +import technology.idlab.parser.intermediate.IRStage + +/** + * The TappedWriter processor provides a convenient way to write data into the pipeline during + * testing. All instances listen to a global channel, which can be used directly during testing to + * write date to. + */ +class TappedWriter(args: Map) : Processor(args) { + /** Writer which is exposed to the pipeline. */ + private val output = this.getArgument("output") + + /** Continuously read data from the global channel and write it to the output. */ + override fun exec() = runBlocking { + while (true) { + val read = input.receiveCatching() + output.pushSync(read.getOrNull()!!) + } + } + + companion object { + /** Global channel from which all data is read. */ + val input = Channel() + + /** Implementation of this processor as IR. */ + val processor = + IRProcessor( + "tapped_writer", + Runner.Target.JVM, + listOf( + IRParameter( + "output", + IRParameter.Type.WRITER, + IRParameter.Presence.REQUIRED, + IRParameter.Count.SINGLE, + ), + ), + mapOf("class" to "processors.TappedWriter"), + ) + + fun stage(channelURI: String): IRStage { + return IRStage( + "tapped_writer_stage", processor, listOf(IRArgument("output", listOf(channelURI)))) + } + } +} diff --git a/src/test/kotlin/runner/impl/NodeRunnerTest.kt b/src/test/kotlin/runner/impl/NodeRunnerTest.kt index b6072b4..48c4547 100644 --- a/src/test/kotlin/runner/impl/NodeRunnerTest.kt +++ b/src/test/kotlin/runner/impl/NodeRunnerTest.kt @@ -9,6 +9,6 @@ class NodeRunnerTest : RunnerTest() { override val metadata: Map = mapOf("import" to "../std/transparent.js") override fun createRunner(): Runner { - return NodeRunner(5000) + return NodeRunner(port = 5000) } }