Skip to content

Commit

Permalink
feat: fixed integration with nodejs runner
Browse files Browse the repository at this point in the history
  • Loading branch information
jenspots committed Jun 23, 2024
1 parent 52b6e39 commit 8d0e016
Show file tree
Hide file tree
Showing 21 changed files with 385 additions and 68 deletions.
11 changes: 11 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,16 @@ jobs:
distribution: 'adopt'
cache: 'gradle'

- name: Set up NPM
uses: actions/setup-node@v2
with:
node-version: '21'

- name: Install NPM dependencies
run: npm install --prefix runners/nodejs

- name: Build NPM runner
run: npm run build --prefix runners/nodejs

- name: Run tests
run: ./gradlew test
Empty file added runners/nodejs/src/proto/.keep
Empty file.
1 change: 1 addition & 0 deletions runners/nodejs/src/proto/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The files in this directory are automatically generated and should not be updated by hand.
8 changes: 2 additions & 6 deletions runners/nodejs/src/runtime/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,25 @@ import { Server, ServerCredentials } from "@grpc/grpc-js";
import { ServerImplementation } from "./server";
import { RunnerService } from "../proto";

// Get arguments.
/** The socket at which gRPC binds is decided by the orchestrator. */
const host = process.argv[2];
const port = process.argv[3];
console.log(`gRPC targeting ${host}:${port}`);

// Initialize the server.
console.log("Initializing server.");
const server = new Server();

// Add the Runner service.
console.log("Adding Runner service.");
server.addService(RunnerService, new ServerImplementation());

// Startup.
console.log("Starting server.");
server.bindAsync(
`${host}:${port}`,
ServerCredentials.createInsecure(),
(error, port) => {
if (error) {
return console.error(error);
} else {
console.log(`Server started on port ${port}.`);
console.log(`gRPC up and running (port=${port})`);
}
},
);
5 changes: 5 additions & 0 deletions runners/nodejs/src/runtime/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,15 @@ export class ServerImplementation implements RunnerServer {
call: ServerUnaryCall<IRProcessor, Empty>,
callback: sendUnaryData<Empty>,
): void {
console.log("gRPC::prepareProcessor::invoke");
Runner.shared
.prepareProcessor(call.request)
.then(() => {
console.log("gRPC::prepareProcessor::success");
callback(null, {});
})
.catch((e) => {
console.log("gRPC::prepareProcessor::error");
callback(e, {});
});
}
Expand All @@ -56,7 +59,9 @@ export class ServerImplementation implements RunnerServer {
call: ServerUnaryCall<Empty, Empty>,
callback: sendUnaryData<Empty>,
): void {
console.log("gRPC::prepareProcessor::invoke");
Runner.shared.exec().then(() => {
console.log("gRPC::prepareProcessor::success");
callback(null, {});
});
}
Expand Down
5 changes: 5 additions & 0 deletions runners/nodejs/src/std/transparent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ import { Processor } from "../interfaces/processor";
import { Reader } from "../interfaces/reader";
import { Writer } from "../interfaces/writer";

/**
* The Transparent processor reads data and transmits it directly to it's output, while also logging the data to the
* console. This processor is only used for debugging purposes.
*/
export default class Transparent extends Processor {
private readonly input = this.getArgument<Reader>("input");
private readonly output = this.getArgument<Writer>("output");
Expand All @@ -13,6 +17,7 @@ export default class Transparent extends Processor {
if (!data) {
break;
}
console.log(data.toString());
this.output.write(data);
}
}
Expand Down
14 changes: 8 additions & 6 deletions src/main/kotlin/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,22 @@ import kotlinx.coroutines.runBlocking
import technology.idlab.parser.Parser

fun main(args: Array<String>) = runBlocking {
// Parse arguments.
/**
* At the moment, the only argument that the runtime accepts is the path to the pipeline
* declaration file.
*/
if (args.size != 1) {
println("Usage: jvm-runner <config>")
exitProcess(0)
}

// Configuration.
/**
* We start off by parsing the configuration file. This file contains the list of processors and
* stages that the runtime should prepare, as well as channel declarations.
*/
val configPath = args[0]
val config = File(configPath)
val parser = Parser.create(config)

// Initialize the processors.
val processors = parser.processors()

// Initialize the stages.
val stages = parser.stages()
}
96 changes: 96 additions & 0 deletions src/main/kotlin/Orchestrator.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package technology.idlab

import kotlin.concurrent.thread
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.runBlocking
import runner.Runner
import runner.impl.NodeRunner
import runner.jvm.JVMRunner
import technology.idlab.parser.intermediate.IRParameter
import technology.idlab.parser.intermediate.IRProcessor
import technology.idlab.parser.intermediate.IRStage
import technology.idlab.util.Log

class Orchestrator(stages: Set<IRStage>) {
/** List of all processors in the pipeline. */
private val processors = stages.map { it.processor }.toSet()

/**
* A channel which listens to all incoming messages and distributes them according to the topology
* of the runners.
*/
private val channel =
Channel<Runner.Payload>().also {
thread {
runBlocking {
for (payload in it) {
// Special URI for printing to the console.
if (payload.destinationURI == "print") {
println(payload.data.decodeToString())
continue
}

// Get the runner and send the message.
val runner = readers[payload.destinationURI]
runner!!.getIncomingChannel().send(payload)
}
}
}
}

/** An exhaustive list of all runners. */
private val jvmRunner by lazy { JVMRunner(channel) }
private val nodeRunner by lazy { NodeRunner(channel, 5000) }
private val runners = listOf(nodeRunner, jvmRunner)

/** A map of all channel URIs and their readers. */
private val readers = mutableMapOf<String, Runner>()

init {
/** Initialize the processors and stages in the runtimes. */
runBlocking {
processors.forEach { processor -> prepare(processor) }
stages.forEach { stage -> prepare(stage) }
}
}

/** Prepare a processor inside of it's corresponding runtime. */
private suspend fun prepare(processor: IRProcessor) {
val runner = getRuntime(processor.target)
runner.prepare(processor)
}

/** Prepare a stage inside of it's corresponding runtime. */
private suspend fun prepare(stage: IRStage) {
// Get the corresponding runner.
val runner = getRuntime(stage.processor.target)
runner.prepare(stage)

// Find all the readers in the stage.
val readers =
stage.processor.parameters.filter { it.type == IRParameter.Type.READER }.map { it.name }

// Get their concrete URIs.
val uris = stage.arguments.filter { readers.contains(it.name) }.map { it.value[0] }

// Add them as a channel targets.
uris.forEach { this.readers[it] = runner }
}

/** Execute all stages in all the runtimes. */
suspend fun exec() = coroutineScope {
Log.shared.info("Bringing all stages online.")
runners.map { async { it.exec() } }.forEach { it.await() }
Log.shared.info("All stages are online.")
}

/** Get a lazy evaluated runner. */
private fun getRuntime(target: Runner.Target): Runner {
return when (target) {
Runner.Target.JVM -> this.jvmRunner
Runner.Target.NODEJS -> this.nodeRunner
}
}
}
8 changes: 4 additions & 4 deletions src/main/kotlin/runner/Runner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import technology.idlab.parser.intermediate.IRProcessor
import technology.idlab.parser.intermediate.IRStage
import technology.idlab.util.Log

abstract class Runner {
abstract class Runner(
/* Message which must be transmitted to the outside world. */
protected val outgoing: Channel<Payload> = Channel()
) {
/** The state of a runtime. */
enum class Status {
STARTING,
Expand Down Expand Up @@ -50,9 +53,6 @@ abstract class Runner {
/* Messages which are destined to a processor inside the runner. */
protected val incoming: Channel<Payload> = Channel()

/* Message which must be transmitted to the outside world. */
protected val outgoing: Channel<Payload> = Channel()

/** Register and prepare a processor inside the runtime. */
abstract suspend fun prepare(processor: IRProcessor)

Expand Down
9 changes: 8 additions & 1 deletion src/main/kotlin/runner/impl/GRPCRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.google.protobuf.ByteString
import io.grpc.ManagedChannelBuilder
import io.ktor.utils.io.errors.*
import kotlin.concurrent.thread
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
Expand Down Expand Up @@ -85,7 +86,11 @@ private fun IRProcessor.toGRPC(): GRPC.IRProcessor {
* This runner has GRPC built-in, so the only configuration that an extending class needs to provide
* is the host and port of the GRPC server, as well as actually booting the process.
*/
abstract class GRPCRunner(host: String, protected val port: Int) : Runner() {
abstract class GRPCRunner(
outgoing: Channel<Payload> = Channel(),
host: String,
protected val port: Int
) : Runner(outgoing) {
/** Handle to the child process. */
private val process by lazy { createProcess() }

Expand Down Expand Up @@ -162,7 +167,9 @@ abstract class GRPCRunner(host: String, protected val port: Int) : Runner() {
}

override suspend fun exec() {
Log.shared.debug("gRPC::exec::invoke")
grpc.exec(empty)
Log.shared.debug("gRPC::exec::success")
}

override fun halt() {
Expand Down
11 changes: 8 additions & 3 deletions src/main/kotlin/runner/impl/NodeRunner.kt
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
package runner.impl

import java.io.File
import kotlinx.coroutines.channels.Channel
import technology.idlab.runner.impl.GRPCRunner
import technology.idlab.util.Log

class NodeRunner(port: Int) : GRPCRunner("localhost", port) {
class NodeRunner(outgoing: Channel<Payload> = Channel(), port: Int) :
GRPCRunner(outgoing, "localhost", port) {
override fun createProcess(): Process {
// Configuration.
val directory = "/Users/jens/Developer/technology.idlab.jvm-runner/runners/nodejs/build/runtime"
val relative = "runners/nodejs/build/runtime"
val directory = File(".").resolve(relative)
Log.shared.debug("Node working directory: ${directory.canonicalPath}")

val command = listOf("node", "index.js", "localhost", port.toString())
Log.shared.info("Starting process: `${command.joinToString(" ")}`")

// Initialize the process.
val processBuilder = ProcessBuilder(command)
processBuilder.directory(File(directory))
processBuilder.directory(directory)
try {
return processBuilder.start()
} catch (e: Exception) {
Expand Down
9 changes: 7 additions & 2 deletions src/main/kotlin/runner/jvm/JVMRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ import technology.idlab.parser.intermediate.IRProcessor
import technology.idlab.parser.intermediate.IRStage
import technology.idlab.util.Log

class JVMRunner : Runner() {
class JVMRunner(outgoing: Channel<Payload> = Channel()) : Runner(outgoing) {
private val processors = mutableMapOf<String, Pair<IRProcessor, Class<Processor>>>()
private val stages = mutableMapOf<String, Processor>()

/** Incoming messages are delegated to sub channels. These are mapped by their URI. */
private val readers = mutableMapOf<String, Channel<ByteArray>>()

/** Keep track of all spawned threads. */
private var threads = mutableListOf<Thread>()

// Handle incoming messages.
private val handler = thread {
try {
Expand Down Expand Up @@ -85,7 +88,9 @@ class JVMRunner : Runner() {
}

override suspend fun exec() {
this.stages.values.map { thread { it.exec() } }.map { it.join() }
Log.shared.info("Bringing JVM stages online.")
this.stages.values.forEach { this.threads.add(thread { it.exec() }) }
Log.shared.info("All stages are online.")
}

override suspend fun status(): Status {
Expand Down
25 changes: 1 addition & 24 deletions src/main/kotlin/runner/jvm/Reader.kt
Original file line number Diff line number Diff line change
Expand Up @@ -40,26 +40,7 @@ class Reader(private val channel: Channel<ByteArray>) {
}
}

suspend fun push(bytes: ByteArray) {
channel.send(bytes)
}

fun readSync(): Result {
val result = runBlocking { channel.receiveCatching() }

// Check if the channel got closed.
if (result.isClosed) {
return Result.closed()
}

// If an error occurred, the runner must handle it itself.
if (result.isFailure) {
Log.shared.fatal("Failed to read bytes")
}

val bytes = result.getOrThrow()
return Result.success(bytes)
}
fun readSync(): Result = runBlocking { read() }

suspend fun read(): Result {
return try {
Expand All @@ -71,8 +52,4 @@ class Reader(private val channel: Channel<ByteArray>) {
Log.shared.fatal(e)
}
}

fun isClosed(): Boolean {
return channel.isClosedForSend
}
}
4 changes: 1 addition & 3 deletions src/main/kotlin/runner/jvm/Writer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ import runner.Runner
import technology.idlab.util.Log

class Writer(private val channel: Channel<Runner.Payload>, private val destination: String) {
fun pushSync(value: ByteArray) {
runBlocking { channel.send(Runner.Payload(destination, value)) }
}
fun pushSync(value: ByteArray) = runBlocking { push(value) }

fun push(value: ByteArray) {
try {
Expand Down
2 changes: 2 additions & 0 deletions src/main/kotlin/std/Transparent.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package technology.idlab.std
import runner.jvm.Processor
import runner.jvm.Reader
import runner.jvm.Writer
import technology.idlab.util.Log

class Transparent(args: Map<String, Any>) : Processor(args) {
private val input = this.getArgument<Reader>("input")
Expand All @@ -16,6 +17,7 @@ class Transparent(args: Map<String, Any>) : Processor(args) {
break
}

Log.shared.info("Received: ${result.value}")
output.pushSync(result.value)
}
}
Expand Down
Loading

0 comments on commit 8d0e016

Please sign in to comment.