diff --git a/build.gradle.kts b/build.gradle.kts index cc14a86..2ae991d 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -109,6 +109,9 @@ dependencies { implementation("org.apache.jena:apache-jena-libs:5.0.0") implementation("org.apache.jena:jena-arq:5.0.0") + // Hide SLF4J warnings. + implementation("org.slf4j:slf4j-nop:2.0.7") + // Initialize testing. testImplementation("org.jetbrains.kotlin:kotlin-test") } diff --git a/runners/nodejs/src/runtime/index.ts b/runners/nodejs/src/runtime/index.ts index 11efb11..b8d009c 100644 --- a/runners/nodejs/src/runtime/index.ts +++ b/runners/nodejs/src/runtime/index.ts @@ -2,11 +2,29 @@ import { Server, ServerCredentials } from "@grpc/grpc-js"; import { ServerImplementation } from "./server"; import { RunnerService } from "../proto"; +// Get arguments. +const host = process.argv[2]; +const port = process.argv[3]; +console.log(`gRPC targeting ${host}:${port}`); + // Initialize the server. +console.log("Initializing server."); const server = new Server(); // Add the Runner service. +console.log("Adding Runner service."); server.addService(RunnerService, new ServerImplementation()); // Startup. -server.bindAsync("0.0.0.0:50051", ServerCredentials.createInsecure(), () => {}); +console.log("Starting server."); +server.bindAsync( + `${host}:${port}`, + ServerCredentials.createInsecure(), + (error, port) => { + if (error) { + return console.error(error); + } else { + console.log(`Server started on port ${port}.`); + } + }, +); diff --git a/runners/nodejs/src/runtime/runner.ts b/runners/nodejs/src/runtime/runner.ts index 14baa69..0d602c8 100644 --- a/runners/nodejs/src/runtime/runner.ts +++ b/runners/nodejs/src/runtime/runner.ts @@ -1,21 +1,125 @@ -import { IRProcessor } from "../proto/intermediate"; +import { + IRParameter, + IRParameterType, + IRProcessor, + IRStage, +} from "../proto/intermediate"; import { ChannelData } from "../proto"; -import { Subject } from "rxjs"; +import { Subject, Subscription } from "rxjs"; +import { Processor } from "../interfaces/processor"; +import { Constructor } from "./constructor"; +import * as path from "node:path"; +import { Reader } from "../interfaces/reader"; +import { Writer } from "../interfaces/writer"; export class Runner { + /** Channels. */ public incoming = new Subject(); public outgoing = new Subject(); + private incomingSubscription: Subscription; + private readers: Map> = new Map(); - prepareProcessor(processor: IRProcessor): void { - throw new Error("Method not implemented"); + /** Runtime config. */ + private processors: Map< + String, + { constructor: Constructor; definition: IRProcessor } + > = new Map(); + private stages: Map = new Map(); + + /** Executions as promises. */ + private readonly executions: Promise[] = []; + + constructor() { + this.incomingSubscription = this.incoming.subscribe((payload) => { + console.log(`Incoming payload: ${payload.destinationUri}`); + const reader = this.readers.get(payload.destinationUri); + if (!reader) { + throw new Error( + `Reader not found for payload ${payload.destinationUri}`, + ); + } + reader.next(payload.data); + }); + } + + async prepareProcessor(irProcessor: IRProcessor): Promise { + const absolutePath = path.resolve(irProcessor.metadata.import); + console.log(`Importing ${absolutePath}`); + const processor = await import(absolutePath); + this.processors.set(irProcessor.uri, { + constructor: processor.default, + definition: irProcessor, + }); + } + + async prepareStage(stage: IRStage): Promise { + console.log( + `Preparing stage: \`${stage.uri}\` using ${stage.processorUri}`, + ); + // Retrieve the processor definition and constructor. + const entry = this.processors.get(stage.processorUri); + if (entry === null || entry === undefined) { + throw new Error(`Processor not found for stage ${stage.uri}`); + } + const { constructor, definition } = entry; + + // Retrieve parameters by name. + const parameters: Map = new Map(); + definition.parameters.forEach((param) => { + parameters.set(param.name, param); + }); + + // Parse args. + const args: Map = new Map(); + stage.arguments.forEach((arg) => { + const param = parameters.get(arg.name)!; + if (param.type == IRParameterType.READER) { + const subject = new Subject(); + const reader = new Reader(subject); + this.readers.set(arg.value[0], subject); + args.set(param.name, reader); + } else if (param.type == IRParameterType.WRITER) { + const subject = new Subject(); + subject.subscribe((data) => { + this.outgoing.next({ + destinationUri: arg.value[0], + data: data, + }); + }); + const writer = new Writer(subject); + args.set(param.name, writer); + } else { + console.error(new Error(`Unsupported parameter type ${param.type}`)); + } + }); + + try { + const processorInstance = new constructor(args); + this.stages.set(stage.uri, processorInstance); + } catch (e) { + console.error(e); + } } - prepareStage(stage: IRProcessor): void { - throw new Error("Method not implemented"); + async exec(): Promise { + console.log("Executing stages."); + this.stages.forEach((stage) => { + this.executions.push( + new Promise(() => { + try { + return stage.exec(); + } catch (e) { + console.error(e); + throw e; + } + }), + ); + }); } - exec(): void { - throw new Error("Method not implemented"); + async halt(): Promise { + console.log("Halting stages."); + this.incomingSubscription.unsubscribe(); } static shared = new Runner(); diff --git a/runners/nodejs/src/runtime/server.ts b/runners/nodejs/src/runtime/server.ts index 8a344ad..25814d6 100644 --- a/runners/nodejs/src/runtime/server.ts +++ b/runners/nodejs/src/runtime/server.ts @@ -28,29 +28,36 @@ export class ServerImplementation implements RunnerServer { call: ServerUnaryCall, callback: sendUnaryData, ): void { - call.on("data", (stage) => { - Runner.shared.prepareStage(stage); - callback(null, {}); - }); + Runner.shared + .prepareStage(call.request) + .then(() => { + callback(null, {}); + }) + .catch((e) => { + callback(e, {}); + }); } prepareProcessor( call: ServerUnaryCall, callback: sendUnaryData, ): void { - call.on("data", (processor) => { - Runner.shared.prepareProcessor(processor); - callback(null, {}); - }); + Runner.shared + .prepareProcessor(call.request) + .then(() => { + callback(null, {}); + }) + .catch((e) => { + callback(e, {}); + }); } exec( call: ServerUnaryCall, callback: sendUnaryData, ): void { - call.on("data", () => { + Runner.shared.exec().then(() => { callback(null, {}); - Runner.shared.exec(); }); } } diff --git a/runners/nodejs/src/std/transparent.ts b/runners/nodejs/src/std/transparent.ts new file mode 100644 index 0000000..fd4ed05 --- /dev/null +++ b/runners/nodejs/src/std/transparent.ts @@ -0,0 +1,19 @@ +import { Processor } from "../interfaces/processor"; +import { Reader } from "../interfaces/reader"; +import { Writer } from "../interfaces/writer"; + +export default class Transparent extends Processor { + private readonly input = this.getArgument("input"); + private readonly output = this.getArgument("output"); + + async exec(): Promise { + // eslint-disable-next-line no-constant-condition + while (true) { + const data = await this.input.read(); + if (!data) { + break; + } + this.output.write(data); + } + } +} diff --git a/runners/nodejs/tsconfig.json b/runners/nodejs/tsconfig.json index 4b8ef0c..edf5b01 100644 --- a/runners/nodejs/tsconfig.json +++ b/runners/nodejs/tsconfig.json @@ -3,7 +3,7 @@ /* Visit https://aka.ms/tsconfig to read more about this file */ /* Projects */ - "incremental": true /* Save .tsbuildinfo files to allow for incremental compilation of projects. */, + "incremental": false /* Save .tsbuildinfo files to allow for incremental compilation of projects. */, // "composite": true, /* Enable constraints that allow a TypeScript project to be used with project references. */ // "tsBuildInfoFile": "./.tsbuildinfo", /* Specify the path to .tsbuildinfo incremental compilation file. */ // "disableSourceOfProjectReferenceRedirect": true, /* Disable preferring source files instead of declaration files when referencing composite projects. */ diff --git a/src/main/kotlin/runner/Runner.kt b/src/main/kotlin/runner/Runner.kt index 76459dd..a544ee8 100644 --- a/src/main/kotlin/runner/Runner.kt +++ b/src/main/kotlin/runner/Runner.kt @@ -1,8 +1,6 @@ package runner -import kotlin.concurrent.thread import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.runBlocking import technology.idlab.parser.intermediate.IRProcessor import technology.idlab.parser.intermediate.IRStage import technology.idlab.util.Log @@ -55,24 +53,6 @@ abstract class Runner { /* Message which must be transmitted to the outside world. */ protected val outgoing: Channel = Channel() - /** Incoming messages are delegated to sub channels. These are mapped by their URI. */ - protected val readers = mutableMapOf>() - - // Handle incoming messages. - private val handler = thread { - runBlocking { - for (message in this@Runner.incoming) { - // Get the reader. - val reader = - readers[message.destinationURI] - ?: Log.shared.fatal("Unknown reader: ${message.destinationURI}") - - // Push data to the reader. - reader.send(message.data) - } - } - } - /** Register and prepare a processor inside the runtime. */ abstract suspend fun prepare(processor: IRProcessor) @@ -86,10 +66,7 @@ abstract class Runner { abstract suspend fun status(): Status /** Halt the execution of the runtime and release all resources. */ - open fun halt() { - // TODO: Propagate halting signal to processors. - handler.interrupt() - } + abstract fun halt() fun getIncomingChannel(): Channel { return incoming diff --git a/src/main/kotlin/runner/impl/GRPCRunner.kt b/src/main/kotlin/runner/impl/GRPCRunner.kt index 8e2d1da..a2d1ced 100644 --- a/src/main/kotlin/runner/impl/GRPCRunner.kt +++ b/src/main/kotlin/runner/impl/GRPCRunner.kt @@ -6,14 +6,17 @@ import Intermediate as GRPC import RunnerGrpcKt import com.google.protobuf.ByteString import io.grpc.ManagedChannelBuilder +import io.ktor.utils.io.errors.* +import kotlin.concurrent.thread import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.map +import kotlinx.coroutines.runBlocking import runner.Runner import technology.idlab.parser.intermediate.IRArgument import technology.idlab.parser.intermediate.IRParameter import technology.idlab.parser.intermediate.IRProcessor import technology.idlab.parser.intermediate.IRStage +import technology.idlab.util.Log private val empty = Empty.getDefaultInstance() @@ -82,16 +85,38 @@ private fun IRProcessor.toGRPC(): GRPC.IRProcessor { * This runner has GRPC built-in, so the only configuration that an extending class needs to provide * is the host and port of the GRPC server, as well as actually booting the process. */ -abstract class GRPCRunner(host: String, port: Int) : Runner() { +abstract class GRPCRunner(host: String, protected val port: Int) : Runner() { /** Handle to the child process. */ - abstract val process: Process + private val process by lazy { createProcess() } /** Create a single stub for all communication. */ private val grpc: RunnerGrpcKt.RunnerCoroutineStub private val parseIncoming: Flow - private val parseOutgoing: Flow + private val parseOutgoing: Flow init { + // Add a shutdown hook to ensure that the process is killed when the JVM exits. + Runtime.getRuntime().addShutdownHook(Thread { process.destroyForcibly() }) + + // Get the command that was used to start the process. + val command = + this.process.info().command().orElseThrow { Log.shared.fatal("Failed to start process.") } + + // Pipe all process output to the logger. + thread { + val stream = process.inputStream.bufferedReader() + for (line in stream.lines()) { + Log.shared.runtime(command, line) + } + } + + thread { + val stream = process.errorStream.bufferedReader() + for (line in stream.lines()) { + Log.shared.runtimeFatal(command, line) + } + } + // Initialize the GRPC stub. val connection = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build() grpc = RunnerGrpcKt.RunnerCoroutineStub(connection) @@ -100,6 +125,7 @@ abstract class GRPCRunner(host: String, port: Int) : Runner() { parseIncoming = flow { for (message in this@GRPCRunner.incoming) { + Log.shared.info("Sending message to runner with URI: `${message.destinationURI}`") val builder = GRPCChannelData.newBuilder() builder.setDestinationUri(message.destinationURI) builder.setData(ByteString.copyFrom(message.data)) @@ -108,19 +134,31 @@ abstract class GRPCRunner(host: String, port: Int) : Runner() { } // Emit outgoing messages. - parseOutgoing = - grpc.channel(parseIncoming).map { - val message = Runner.Payload(it.destinationUri, it.data.toByteArray()) - this.outgoing.send(message) + parseOutgoing = grpc.channel(parseIncoming) + + thread { + runBlocking { + parseOutgoing.collect { + val message = Payload(it.destinationUri, it.data.toByteArray()) + Log.shared.info("Received message from runner with URI: `${message.destinationURI}`") + this@GRPCRunner.outgoing.send(message) } + } + } } + abstract fun createProcess(): Process + override suspend fun prepare(processor: IRProcessor) { + Log.shared.info("Preparing processor: `${processor.uri}`") grpc.prepareProcessor(processor.toGRPC()) + Log.shared.info("Done preparing processor: `${processor.uri}`") } override suspend fun prepare(stage: IRStage) { + Log.shared.info("Preparing stage: `${stage.uri}`") grpc.prepareStage(stage.toGRPC()) + Log.shared.info("Done preparing stage: `${stage.uri}`") } override suspend fun exec() { @@ -128,7 +166,6 @@ abstract class GRPCRunner(host: String, port: Int) : Runner() { } override fun halt() { - super.halt() process.destroy() } diff --git a/src/main/kotlin/runner/impl/NodeRunner.kt b/src/main/kotlin/runner/impl/NodeRunner.kt index f31827a..990e8a0 100644 --- a/src/main/kotlin/runner/impl/NodeRunner.kt +++ b/src/main/kotlin/runner/impl/NodeRunner.kt @@ -2,18 +2,22 @@ package runner.impl import java.io.File import technology.idlab.runner.impl.GRPCRunner +import technology.idlab.util.Log -class NodeRunner : GRPCRunner("localhost", 50051) { - override val process: Process - - init { +class NodeRunner(port: Int) : GRPCRunner("localhost", port) { + override fun createProcess(): Process { // Configuration. - val directory = "/Users/jens/Developer/technology.idlab.jvm-runner/lib/nodejs/build/runtime" - val command = listOf("node", "index.js") + val directory = "/Users/jens/Developer/technology.idlab.jvm-runner/runners/nodejs/build/runtime" + val command = listOf("node", "index.js", "localhost", port.toString()) + Log.shared.info("Starting process: `${command.joinToString(" ")}`") // Initialize the process. val processBuilder = ProcessBuilder(command) processBuilder.directory(File(directory)) - process = processBuilder.start() + try { + return processBuilder.start() + } catch (e: Exception) { + Log.shared.fatal("Failed to start process.") + } } } diff --git a/src/main/kotlin/runner/jvm/JVMRunner.kt b/src/main/kotlin/runner/jvm/JVMRunner.kt index 7c209b4..2bcd5df 100644 --- a/src/main/kotlin/runner/jvm/JVMRunner.kt +++ b/src/main/kotlin/runner/jvm/JVMRunner.kt @@ -2,6 +2,7 @@ package runner.jvm import kotlin.concurrent.thread import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.runBlocking import runner.Runner import technology.idlab.parser.intermediate.IRParameter import technology.idlab.parser.intermediate.IRProcessor @@ -12,6 +13,31 @@ class JVMRunner : Runner() { private val processors = mutableMapOf>>() private val stages = mutableMapOf() + /** Incoming messages are delegated to sub channels. These are mapped by their URI. */ + private val readers = mutableMapOf>() + + // Handle incoming messages. + private val handler = thread { + try { + runBlocking { + while (true) { + // Get the next message, check if it is valid. + val message = this@JVMRunner.incoming.receiveCatching() + if (message.isClosed || message.isFailure) { + break + } + val payload = message.getOrNull()!! + + // Get the reader. + val reader = readers[payload.destinationURI]!! + reader.send(payload.data) + } + } + } catch (e: InterruptedException) { + Log.shared.severe("Message handler thread interrupted.") + } + } + override suspend fun prepare(processor: IRProcessor) { val className = processor.metadata["class"] ?: Log.shared.fatal("Processor has no class key.") val clazz = Class.forName(className) as Class<*> @@ -84,4 +110,8 @@ class JVMRunner : Runner() { } } } + + override fun halt() { + handler.interrupt() + } } diff --git a/src/main/kotlin/util/Log.kt b/src/main/kotlin/util/Log.kt index 2b1d1b3..297a97a 100644 --- a/src/main/kotlin/util/Log.kt +++ b/src/main/kotlin/util/Log.kt @@ -87,6 +87,45 @@ class Log private constructor() { } } + fun runtime(runtime: String, message: String) { + val instant = Date().toInstant() + val tz = instant.atZone(TimeZone.getDefault().toZoneId()) + val iso = DateTimeFormatter.ISO_LOCAL_TIME + val time = tz.format(iso) + + val line = + listOf( + time.padEnd(12, '0'), + "[${Thread.currentThread().id}]".padEnd(6, ' '), + "INFO".padEnd(7, ' '), + runtime.padEnd(50, ' '), + message, + ) + .joinToString(" ") + + println(line) + } + + fun runtimeFatal(runtime: String, message: String): Nothing { + val instant = Date().toInstant() + val tz = instant.atZone(TimeZone.getDefault().toZoneId()) + val iso = DateTimeFormatter.ISO_LOCAL_TIME + val time = tz.format(iso) + + val line = + listOf( + time.padEnd(12, '0'), + "[${Thread.currentThread().id}]".padEnd(6, ' '), + "FATAL".padEnd(7, ' '), + runtime.padEnd(50, ' '), + message, + ) + .joinToString(" ") + + println(line) + throw RunnerException() + } + companion object { val shared = Log() } diff --git a/src/test/kotlin/runner/jvm/JVMRunnerTest.kt b/src/test/kotlin/runner/RunnerTest.kt similarity index 70% rename from src/test/kotlin/runner/jvm/JVMRunnerTest.kt rename to src/test/kotlin/runner/RunnerTest.kt index 3cf3fa4..6ce8d76 100644 --- a/src/test/kotlin/runner/jvm/JVMRunnerTest.kt +++ b/src/test/kotlin/runner/RunnerTest.kt @@ -1,68 +1,40 @@ -package runner.jvm +package runner import kotlin.concurrent.thread import kotlin.test.Test import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.BeforeEach -import runner.Runner import technology.idlab.parser.intermediate.IRArgument import technology.idlab.parser.intermediate.IRParameter import technology.idlab.parser.intermediate.IRProcessor import technology.idlab.parser.intermediate.IRStage -private val processor = - IRProcessor( - "transparent", - Runner.Target.JVM, - listOf( - IRParameter( - "input", - IRParameter.Type.READER, - IRParameter.Presence.REQUIRED, - IRParameter.Count.SINGLE, - ), - IRParameter( - "output", - IRParameter.Type.WRITER, - IRParameter.Presence.REQUIRED, - IRParameter.Count.SINGLE, - ), - ), - mapOf("class" to "technology.idlab.std.Transparent")) - -private val stage = - IRStage( - "transparent_stage", - processor, - listOf( - IRArgument("input", listOf("channel_in_uri")), - IRArgument("output", listOf("channel_out_uri"))), - ) - -class JVMRunnerTest { +abstract class RunnerTest { + abstract val target: Runner.Target + abstract val metadata: Map - private var runner = JVMRunner() + abstract fun createRunner(): Runner - @BeforeEach - fun reset() { - runner = JVMRunner() + @Test + fun prepareProcessorTest() = runBlocking { + val runner = createRunner() + runner.prepare(createProcessor()) } - @Test fun prepareProcessorTest() = runBlocking { runner.prepare(processor) } - @Test fun prepareStageTest() = runBlocking { - runner.prepare(processor) - runner.prepare(stage) + val runner = createRunner() + runner.prepare(createProcessor()) + runner.prepare(createStage()) } @Test fun channelTest(): Unit = runBlocking { + val runner = createRunner() try { // Prepare the runner. - runner.prepare(processor) - runner.prepare(stage) + runner.prepare(createProcessor()) + runner.prepare(createStage()) // Execute the runner. val execution = thread { @@ -82,7 +54,7 @@ class JVMRunnerTest { val outgoing = runner.getOutgoingChannel() val result = outgoing.receive() assertEquals("channel_out_uri", result.destinationURI) - assertEquals(data, result.data) + assertEquals(data.decodeToString(), result.data.decodeToString()) // Halt the runner. runner.halt() @@ -91,4 +63,36 @@ class JVMRunnerTest { // Ignore. } } + + private fun createProcessor(): IRProcessor { + return IRProcessor( + "transparent", + this.target, + listOf( + IRParameter( + "input", + IRParameter.Type.READER, + IRParameter.Presence.REQUIRED, + IRParameter.Count.SINGLE, + ), + IRParameter( + "output", + IRParameter.Type.WRITER, + IRParameter.Presence.REQUIRED, + IRParameter.Count.SINGLE, + ), + ), + this.metadata, + ) + } + + private fun createStage(): IRStage { + return IRStage( + "transparent_stage", + this.createProcessor(), + listOf( + IRArgument("input", listOf("channel_in_uri")), + IRArgument("output", listOf("channel_out_uri"))), + ) + } } diff --git a/src/test/kotlin/runner/impl/JVMRunnerTest.kt b/src/test/kotlin/runner/impl/JVMRunnerTest.kt new file mode 100644 index 0000000..930568a --- /dev/null +++ b/src/test/kotlin/runner/impl/JVMRunnerTest.kt @@ -0,0 +1,13 @@ +package runner.impl + +import runner.Runner +import runner.RunnerTest +import runner.jvm.JVMRunner + +class JVMRunnerTest : RunnerTest() { + override val target: Runner.Target = Runner.Target.JVM + + override val metadata: Map = mapOf("class" to "technology.idlab.std.Transparent") + + override fun createRunner(): Runner = JVMRunner() +} diff --git a/src/test/kotlin/runner/impl/NodeRunnerTest.kt b/src/test/kotlin/runner/impl/NodeRunnerTest.kt new file mode 100644 index 0000000..b6072b4 --- /dev/null +++ b/src/test/kotlin/runner/impl/NodeRunnerTest.kt @@ -0,0 +1,14 @@ +package runner.impl + +import runner.Runner +import runner.RunnerTest + +class NodeRunnerTest : RunnerTest() { + override val target: Runner.Target = Runner.Target.NODEJS + + override val metadata: Map = mapOf("import" to "../std/transparent.js") + + override fun createRunner(): Runner { + return NodeRunner(5000) + } +}