Skip to content

Commit

Permalink
feat: nodejs runner implementation basics
Browse files Browse the repository at this point in the history
  • Loading branch information
jenspots committed Jun 21, 2024
1 parent d7603b6 commit 52b6e39
Show file tree
Hide file tree
Showing 14 changed files with 373 additions and 104 deletions.
3 changes: 3 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ dependencies {
implementation("org.apache.jena:apache-jena-libs:5.0.0")
implementation("org.apache.jena:jena-arq:5.0.0")

// Hide SLF4J warnings.
implementation("org.slf4j:slf4j-nop:2.0.7")

// Initialize testing.
testImplementation("org.jetbrains.kotlin:kotlin-test")
}
Expand Down
20 changes: 19 additions & 1 deletion runners/nodejs/src/runtime/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,29 @@ import { Server, ServerCredentials } from "@grpc/grpc-js";
import { ServerImplementation } from "./server";
import { RunnerService } from "../proto";

// Get arguments.
const host = process.argv[2];
const port = process.argv[3];
console.log(`gRPC targeting ${host}:${port}`);

// Initialize the server.
console.log("Initializing server.");
const server = new Server();

// Add the Runner service.
console.log("Adding Runner service.");
server.addService(RunnerService, new ServerImplementation());

// Startup.
server.bindAsync("0.0.0.0:50051", ServerCredentials.createInsecure(), () => {});
console.log("Starting server.");
server.bindAsync(
`${host}:${port}`,
ServerCredentials.createInsecure(),
(error, port) => {
if (error) {
return console.error(error);
} else {
console.log(`Server started on port ${port}.`);
}
},
);
120 changes: 112 additions & 8 deletions runners/nodejs/src/runtime/runner.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,125 @@
import { IRProcessor } from "../proto/intermediate";
import {
IRParameter,
IRParameterType,
IRProcessor,
IRStage,
} from "../proto/intermediate";
import { ChannelData } from "../proto";
import { Subject } from "rxjs";
import { Subject, Subscription } from "rxjs";
import { Processor } from "../interfaces/processor";
import { Constructor } from "./constructor";
import * as path from "node:path";
import { Reader } from "../interfaces/reader";
import { Writer } from "../interfaces/writer";

export class Runner {
/** Channels. */
public incoming = new Subject<ChannelData>();
public outgoing = new Subject<ChannelData>();
private incomingSubscription: Subscription;
private readers: Map<String, Subject<Uint8Array>> = new Map();

prepareProcessor(processor: IRProcessor): void {
throw new Error("Method not implemented");
/** Runtime config. */
private processors: Map<
String,
{ constructor: Constructor<Processor>; definition: IRProcessor }
> = new Map();
private stages: Map<String, Processor> = new Map();

/** Executions as promises. */
private readonly executions: Promise<void>[] = [];

constructor() {
this.incomingSubscription = this.incoming.subscribe((payload) => {
console.log(`Incoming payload: ${payload.destinationUri}`);
const reader = this.readers.get(payload.destinationUri);
if (!reader) {
throw new Error(
`Reader not found for payload ${payload.destinationUri}`,
);
}
reader.next(payload.data);
});
}

async prepareProcessor(irProcessor: IRProcessor): Promise<void> {
const absolutePath = path.resolve(irProcessor.metadata.import);
console.log(`Importing ${absolutePath}`);
const processor = await import(absolutePath);
this.processors.set(irProcessor.uri, {
constructor: processor.default,
definition: irProcessor,
});
}

async prepareStage(stage: IRStage): Promise<void> {
console.log(
`Preparing stage: \`${stage.uri}\` using ${stage.processorUri}`,
);
// Retrieve the processor definition and constructor.
const entry = this.processors.get(stage.processorUri);
if (entry === null || entry === undefined) {
throw new Error(`Processor not found for stage ${stage.uri}`);
}
const { constructor, definition } = entry;

// Retrieve parameters by name.
const parameters: Map<String, IRParameter> = new Map();
definition.parameters.forEach((param) => {
parameters.set(param.name, param);
});

// Parse args.
const args: Map<String, unknown> = new Map();
stage.arguments.forEach((arg) => {
const param = parameters.get(arg.name)!;
if (param.type == IRParameterType.READER) {
const subject = new Subject<Uint8Array>();
const reader = new Reader(subject);
this.readers.set(arg.value[0], subject);
args.set(param.name, reader);
} else if (param.type == IRParameterType.WRITER) {
const subject = new Subject<Uint8Array>();
subject.subscribe((data) => {
this.outgoing.next({
destinationUri: arg.value[0],
data: data,
});
});
const writer = new Writer(subject);
args.set(param.name, writer);
} else {
console.error(new Error(`Unsupported parameter type ${param.type}`));
}
});

try {
const processorInstance = new constructor(args);
this.stages.set(stage.uri, processorInstance);
} catch (e) {
console.error(e);
}
}

prepareStage(stage: IRProcessor): void {
throw new Error("Method not implemented");
async exec(): Promise<void> {
console.log("Executing stages.");
this.stages.forEach((stage) => {
this.executions.push(
new Promise(() => {
try {
return stage.exec();
} catch (e) {
console.error(e);
throw e;
}
}),
);
});
}

exec(): void {
throw new Error("Method not implemented");
async halt(): Promise<void> {
console.log("Halting stages.");
this.incomingSubscription.unsubscribe();
}

static shared = new Runner();
Expand Down
27 changes: 17 additions & 10 deletions runners/nodejs/src/runtime/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,36 @@ export class ServerImplementation implements RunnerServer {
call: ServerUnaryCall<IRStage, Empty>,
callback: sendUnaryData<Empty>,
): void {
call.on("data", (stage) => {
Runner.shared.prepareStage(stage);
callback(null, {});
});
Runner.shared
.prepareStage(call.request)
.then(() => {
callback(null, {});
})
.catch((e) => {
callback(e, {});
});
}

prepareProcessor(
call: ServerUnaryCall<IRProcessor, Empty>,
callback: sendUnaryData<Empty>,
): void {
call.on("data", (processor) => {
Runner.shared.prepareProcessor(processor);
callback(null, {});
});
Runner.shared
.prepareProcessor(call.request)
.then(() => {
callback(null, {});
})
.catch((e) => {
callback(e, {});
});
}

exec(
call: ServerUnaryCall<Empty, Empty>,
callback: sendUnaryData<Empty>,
): void {
call.on("data", () => {
Runner.shared.exec().then(() => {
callback(null, {});
Runner.shared.exec();
});
}
}
19 changes: 19 additions & 0 deletions runners/nodejs/src/std/transparent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { Processor } from "../interfaces/processor";
import { Reader } from "../interfaces/reader";
import { Writer } from "../interfaces/writer";

export default class Transparent extends Processor {
private readonly input = this.getArgument<Reader>("input");
private readonly output = this.getArgument<Writer>("output");

async exec(): Promise<void> {
// eslint-disable-next-line no-constant-condition
while (true) {
const data = await this.input.read();
if (!data) {
break;
}
this.output.write(data);
}
}
}
2 changes: 1 addition & 1 deletion runners/nodejs/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
/* Visit https://aka.ms/tsconfig to read more about this file */

/* Projects */
"incremental": true /* Save .tsbuildinfo files to allow for incremental compilation of projects. */,
"incremental": false /* Save .tsbuildinfo files to allow for incremental compilation of projects. */,
// "composite": true, /* Enable constraints that allow a TypeScript project to be used with project references. */
// "tsBuildInfoFile": "./.tsbuildinfo", /* Specify the path to .tsbuildinfo incremental compilation file. */
// "disableSourceOfProjectReferenceRedirect": true, /* Disable preferring source files instead of declaration files when referencing composite projects. */
Expand Down
25 changes: 1 addition & 24 deletions src/main/kotlin/runner/Runner.kt
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package runner

import kotlin.concurrent.thread
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking
import technology.idlab.parser.intermediate.IRProcessor
import technology.idlab.parser.intermediate.IRStage
import technology.idlab.util.Log
Expand Down Expand Up @@ -55,24 +53,6 @@ abstract class Runner {
/* Message which must be transmitted to the outside world. */
protected val outgoing: Channel<Payload> = Channel()

/** Incoming messages are delegated to sub channels. These are mapped by their URI. */
protected val readers = mutableMapOf<String, Channel<ByteArray>>()

// Handle incoming messages.
private val handler = thread {
runBlocking {
for (message in this@Runner.incoming) {
// Get the reader.
val reader =
readers[message.destinationURI]
?: Log.shared.fatal("Unknown reader: ${message.destinationURI}")

// Push data to the reader.
reader.send(message.data)
}
}
}

/** Register and prepare a processor inside the runtime. */
abstract suspend fun prepare(processor: IRProcessor)

Expand All @@ -86,10 +66,7 @@ abstract class Runner {
abstract suspend fun status(): Status

/** Halt the execution of the runtime and release all resources. */
open fun halt() {
// TODO: Propagate halting signal to processors.
handler.interrupt()
}
abstract fun halt()

fun getIncomingChannel(): Channel<Payload> {
return incoming
Expand Down
Loading

0 comments on commit 52b6e39

Please sign in to comment.