diff --git a/proto/index.proto b/proto/index.proto index bfa8fbb..b79ee4b 100644 --- a/proto/index.proto +++ b/proto/index.proto @@ -9,8 +9,7 @@ message ChannelData { } service Runner { - rpc prepareProcessor(IRProcessor) returns (Empty); - rpc prepareStage(IRStage) returns (Empty); + rpc load(IRStage) returns (Empty); rpc exec(Empty) returns (Empty); rpc channel(stream ChannelData) returns (stream ChannelData); } diff --git a/proto/intermediate.proto b/proto/intermediate.proto index e30af9e..ed1f889 100644 --- a/proto/intermediate.proto +++ b/proto/intermediate.proto @@ -32,17 +32,17 @@ message IRParameter { message IRProcessor { string uri = 1; - repeated IRParameter parameters = 2; + map parameters = 2; map metadata = 3; } message IRArgument { - string name = 1; + IRParameter parameter = 1; repeated string value = 2; } message IRStage { string uri = 1; - string processor_uri = 2; - repeated IRArgument arguments = 3; + IRProcessor processor = 2; + map arguments = 3; } diff --git a/runners/nodejs/src/proto/index.ts b/runners/nodejs/src/proto/index.ts index 7222541..5c30bdd 100644 --- a/runners/nodejs/src/proto/index.ts +++ b/runners/nodejs/src/proto/index.ts @@ -21,7 +21,7 @@ import { } from "@grpc/grpc-js"; import * as _m0 from "protobufjs/minimal"; import { Empty } from "./empty"; -import { IRProcessor, IRStage } from "./intermediate"; +import { IRStage } from "./intermediate"; export const protobufPackage = ""; @@ -116,19 +116,8 @@ export const ChannelData = { export type RunnerService = typeof RunnerService; export const RunnerService = { - prepareProcessor: { - path: "/Runner/prepareProcessor", - requestStream: false, - responseStream: false, - requestSerialize: (value: IRProcessor) => - Buffer.from(IRProcessor.encode(value).finish()), - requestDeserialize: (value: Buffer) => IRProcessor.decode(value), - responseSerialize: (value: Empty) => - Buffer.from(Empty.encode(value).finish()), - responseDeserialize: (value: Buffer) => Empty.decode(value), - }, - prepareStage: { - path: "/Runner/prepareStage", + load: { + path: "/Runner/load", requestStream: false, responseStream: false, requestSerialize: (value: IRStage) => @@ -163,38 +152,22 @@ export const RunnerService = { } as const; export interface RunnerServer extends UntypedServiceImplementation { - prepareProcessor: handleUnaryCall; - prepareStage: handleUnaryCall; + load: handleUnaryCall; exec: handleUnaryCall; channel: handleBidiStreamingCall; } export interface RunnerClient extends Client { - prepareProcessor( - request: IRProcessor, - callback: (error: ServiceError | null, response: Empty) => void, - ): ClientUnaryCall; - prepareProcessor( - request: IRProcessor, - metadata: Metadata, - callback: (error: ServiceError | null, response: Empty) => void, - ): ClientUnaryCall; - prepareProcessor( - request: IRProcessor, - metadata: Metadata, - options: Partial, - callback: (error: ServiceError | null, response: Empty) => void, - ): ClientUnaryCall; - prepareStage( + load( request: IRStage, callback: (error: ServiceError | null, response: Empty) => void, ): ClientUnaryCall; - prepareStage( + load( request: IRStage, metadata: Metadata, callback: (error: ServiceError | null, response: Empty) => void, ): ClientUnaryCall; - prepareStage( + load( request: IRStage, metadata: Metadata, options: Partial, diff --git a/runners/nodejs/src/proto/intermediate.ts b/runners/nodejs/src/proto/intermediate.ts index ca8a16d..14e4e12 100644 --- a/runners/nodejs/src/proto/intermediate.ts +++ b/runners/nodejs/src/proto/intermediate.ts @@ -165,24 +165,34 @@ export interface IRParameter { export interface IRProcessor { uri: string; - parameters: IRParameter[]; + parameters: { [key: string]: IRParameter }; metadata: { [key: string]: string }; } +export interface IRProcessor_ParametersEntry { + key: string; + value: IRParameter | undefined; +} + export interface IRProcessor_MetadataEntry { key: string; value: string; } export interface IRArgument { - name: string; + parameter: IRParameter | undefined; value: string[]; } export interface IRStage { uri: string; - processorUri: string; - arguments: IRArgument[]; + processor: IRProcessor | undefined; + arguments: { [key: string]: IRArgument }; +} + +export interface IRStage_ArgumentsEntry { + key: string; + value: IRArgument | undefined; } function createBaseIRParameter(): IRParameter { @@ -298,7 +308,7 @@ export const IRParameter = { }; function createBaseIRProcessor(): IRProcessor { - return { uri: "", parameters: [], metadata: {} }; + return { uri: "", parameters: {}, metadata: {} }; } export const IRProcessor = { @@ -309,9 +319,12 @@ export const IRProcessor = { if (message.uri !== "") { writer.uint32(10).string(message.uri); } - for (const v of message.parameters) { - IRParameter.encode(v!, writer.uint32(18).fork()).ldelim(); - } + Object.entries(message.parameters).forEach(([key, value]) => { + IRProcessor_ParametersEntry.encode( + { key: key as any, value }, + writer.uint32(18).fork(), + ).ldelim(); + }); Object.entries(message.metadata).forEach(([key, value]) => { IRProcessor_MetadataEntry.encode( { key: key as any, value }, @@ -341,7 +354,13 @@ export const IRProcessor = { break; } - message.parameters.push(IRParameter.decode(reader, reader.uint32())); + const entry2 = IRProcessor_ParametersEntry.decode( + reader, + reader.uint32(), + ); + if (entry2.value !== undefined) { + message.parameters[entry2.key] = entry2.value; + } continue; case 3: if (tag !== 26) { @@ -368,9 +387,14 @@ export const IRProcessor = { fromJSON(object: any): IRProcessor { return { uri: isSet(object.uri) ? globalThis.String(object.uri) : "", - parameters: globalThis.Array.isArray(object?.parameters) - ? object.parameters.map((e: any) => IRParameter.fromJSON(e)) - : [], + parameters: isObject(object.parameters) + ? Object.entries(object.parameters).reduce<{ + [key: string]: IRParameter; + }>((acc, [key, value]) => { + acc[key] = IRParameter.fromJSON(value); + return acc; + }, {}) + : {}, metadata: isObject(object.metadata) ? Object.entries(object.metadata).reduce<{ [key: string]: string }>( (acc, [key, value]) => { @@ -388,8 +412,14 @@ export const IRProcessor = { if (message.uri !== "") { obj.uri = message.uri; } - if (message.parameters?.length) { - obj.parameters = message.parameters.map((e) => IRParameter.toJSON(e)); + if (message.parameters) { + const entries = Object.entries(message.parameters); + if (entries.length > 0) { + obj.parameters = {}; + entries.forEach(([k, v]) => { + obj.parameters[k] = IRParameter.toJSON(v); + }); + } } if (message.metadata) { const entries = Object.entries(message.metadata); @@ -411,8 +441,14 @@ export const IRProcessor = { ): IRProcessor { const message = createBaseIRProcessor(); message.uri = object.uri ?? ""; - message.parameters = - object.parameters?.map((e) => IRParameter.fromPartial(e)) || []; + message.parameters = Object.entries(object.parameters ?? {}).reduce<{ + [key: string]: IRParameter; + }>((acc, [key, value]) => { + if (value !== undefined) { + acc[key] = IRParameter.fromPartial(value); + } + return acc; + }, {}); message.metadata = Object.entries(object.metadata ?? {}).reduce<{ [key: string]: string; }>((acc, [key, value]) => { @@ -425,6 +461,96 @@ export const IRProcessor = { }, }; +function createBaseIRProcessor_ParametersEntry(): IRProcessor_ParametersEntry { + return { key: "", value: undefined }; +} + +export const IRProcessor_ParametersEntry = { + encode( + message: IRProcessor_ParametersEntry, + writer: _m0.Writer = _m0.Writer.create(), + ): _m0.Writer { + if (message.key !== "") { + writer.uint32(10).string(message.key); + } + if (message.value !== undefined) { + IRParameter.encode(message.value, writer.uint32(18).fork()).ldelim(); + } + return writer; + }, + + decode( + input: _m0.Reader | Uint8Array, + length?: number, + ): IRProcessor_ParametersEntry { + const reader = + input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseIRProcessor_ParametersEntry(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + + message.key = reader.string(); + continue; + case 2: + if (tag !== 18) { + break; + } + + message.value = IRParameter.decode(reader, reader.uint32()); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): IRProcessor_ParametersEntry { + return { + key: isSet(object.key) ? globalThis.String(object.key) : "", + value: isSet(object.value) + ? IRParameter.fromJSON(object.value) + : undefined, + }; + }, + + toJSON(message: IRProcessor_ParametersEntry): unknown { + const obj: any = {}; + if (message.key !== "") { + obj.key = message.key; + } + if (message.value !== undefined) { + obj.value = IRParameter.toJSON(message.value); + } + return obj; + }, + + create, I>>( + base?: I, + ): IRProcessor_ParametersEntry { + return IRProcessor_ParametersEntry.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>( + object: I, + ): IRProcessor_ParametersEntry { + const message = createBaseIRProcessor_ParametersEntry(); + message.key = object.key ?? ""; + message.value = + object.value !== undefined && object.value !== null + ? IRParameter.fromPartial(object.value) + : undefined; + return message; + }, +}; + function createBaseIRProcessor_MetadataEntry(): IRProcessor_MetadataEntry { return { key: "", value: "" }; } @@ -511,7 +637,7 @@ export const IRProcessor_MetadataEntry = { }; function createBaseIRArgument(): IRArgument { - return { name: "", value: [] }; + return { parameter: undefined, value: [] }; } export const IRArgument = { @@ -519,8 +645,8 @@ export const IRArgument = { message: IRArgument, writer: _m0.Writer = _m0.Writer.create(), ): _m0.Writer { - if (message.name !== "") { - writer.uint32(10).string(message.name); + if (message.parameter !== undefined) { + IRParameter.encode(message.parameter, writer.uint32(10).fork()).ldelim(); } for (const v of message.value) { writer.uint32(18).string(v!); @@ -541,7 +667,7 @@ export const IRArgument = { break; } - message.name = reader.string(); + message.parameter = IRParameter.decode(reader, reader.uint32()); continue; case 2: if (tag !== 18) { @@ -561,7 +687,9 @@ export const IRArgument = { fromJSON(object: any): IRArgument { return { - name: isSet(object.name) ? globalThis.String(object.name) : "", + parameter: isSet(object.parameter) + ? IRParameter.fromJSON(object.parameter) + : undefined, value: globalThis.Array.isArray(object?.value) ? object.value.map((e: any) => globalThis.String(e)) : [], @@ -570,8 +698,8 @@ export const IRArgument = { toJSON(message: IRArgument): unknown { const obj: any = {}; - if (message.name !== "") { - obj.name = message.name; + if (message.parameter !== undefined) { + obj.parameter = IRParameter.toJSON(message.parameter); } if (message.value?.length) { obj.value = message.value; @@ -586,14 +714,17 @@ export const IRArgument = { object: I, ): IRArgument { const message = createBaseIRArgument(); - message.name = object.name ?? ""; + message.parameter = + object.parameter !== undefined && object.parameter !== null + ? IRParameter.fromPartial(object.parameter) + : undefined; message.value = object.value?.map((e) => e) || []; return message; }, }; function createBaseIRStage(): IRStage { - return { uri: "", processorUri: "", arguments: [] }; + return { uri: "", processor: undefined, arguments: {} }; } export const IRStage = { @@ -604,12 +735,15 @@ export const IRStage = { if (message.uri !== "") { writer.uint32(10).string(message.uri); } - if (message.processorUri !== "") { - writer.uint32(18).string(message.processorUri); - } - for (const v of message.arguments) { - IRArgument.encode(v!, writer.uint32(26).fork()).ldelim(); + if (message.processor !== undefined) { + IRProcessor.encode(message.processor, writer.uint32(18).fork()).ldelim(); } + Object.entries(message.arguments).forEach(([key, value]) => { + IRStage_ArgumentsEntry.encode( + { key: key as any, value }, + writer.uint32(26).fork(), + ).ldelim(); + }); return writer; }, @@ -633,14 +767,17 @@ export const IRStage = { break; } - message.processorUri = reader.string(); + message.processor = IRProcessor.decode(reader, reader.uint32()); continue; case 3: if (tag !== 26) { break; } - message.arguments.push(IRArgument.decode(reader, reader.uint32())); + const entry3 = IRStage_ArgumentsEntry.decode(reader, reader.uint32()); + if (entry3.value !== undefined) { + message.arguments[entry3.key] = entry3.value; + } continue; } if ((tag & 7) === 4 || tag === 0) { @@ -654,12 +791,17 @@ export const IRStage = { fromJSON(object: any): IRStage { return { uri: isSet(object.uri) ? globalThis.String(object.uri) : "", - processorUri: isSet(object.processorUri) - ? globalThis.String(object.processorUri) - : "", - arguments: globalThis.Array.isArray(object?.arguments) - ? object.arguments.map((e: any) => IRArgument.fromJSON(e)) - : [], + processor: isSet(object.processor) + ? IRProcessor.fromJSON(object.processor) + : undefined, + arguments: isObject(object.arguments) + ? Object.entries(object.arguments).reduce<{ + [key: string]: IRArgument; + }>((acc, [key, value]) => { + acc[key] = IRArgument.fromJSON(value); + return acc; + }, {}) + : {}, }; }, @@ -668,11 +810,17 @@ export const IRStage = { if (message.uri !== "") { obj.uri = message.uri; } - if (message.processorUri !== "") { - obj.processorUri = message.processorUri; + if (message.processor !== undefined) { + obj.processor = IRProcessor.toJSON(message.processor); } - if (message.arguments?.length) { - obj.arguments = message.arguments.map((e) => IRArgument.toJSON(e)); + if (message.arguments) { + const entries = Object.entries(message.arguments); + if (entries.length > 0) { + obj.arguments = {}; + entries.forEach(([k, v]) => { + obj.arguments[k] = IRArgument.toJSON(v); + }); + } } return obj; }, @@ -683,9 +831,108 @@ export const IRStage = { fromPartial, I>>(object: I): IRStage { const message = createBaseIRStage(); message.uri = object.uri ?? ""; - message.processorUri = object.processorUri ?? ""; - message.arguments = - object.arguments?.map((e) => IRArgument.fromPartial(e)) || []; + message.processor = + object.processor !== undefined && object.processor !== null + ? IRProcessor.fromPartial(object.processor) + : undefined; + message.arguments = Object.entries(object.arguments ?? {}).reduce<{ + [key: string]: IRArgument; + }>((acc, [key, value]) => { + if (value !== undefined) { + acc[key] = IRArgument.fromPartial(value); + } + return acc; + }, {}); + return message; + }, +}; + +function createBaseIRStage_ArgumentsEntry(): IRStage_ArgumentsEntry { + return { key: "", value: undefined }; +} + +export const IRStage_ArgumentsEntry = { + encode( + message: IRStage_ArgumentsEntry, + writer: _m0.Writer = _m0.Writer.create(), + ): _m0.Writer { + if (message.key !== "") { + writer.uint32(10).string(message.key); + } + if (message.value !== undefined) { + IRArgument.encode(message.value, writer.uint32(18).fork()).ldelim(); + } + return writer; + }, + + decode( + input: _m0.Reader | Uint8Array, + length?: number, + ): IRStage_ArgumentsEntry { + const reader = + input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseIRStage_ArgumentsEntry(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + + message.key = reader.string(); + continue; + case 2: + if (tag !== 18) { + break; + } + + message.value = IRArgument.decode(reader, reader.uint32()); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): IRStage_ArgumentsEntry { + return { + key: isSet(object.key) ? globalThis.String(object.key) : "", + value: isSet(object.value) + ? IRArgument.fromJSON(object.value) + : undefined, + }; + }, + + toJSON(message: IRStage_ArgumentsEntry): unknown { + const obj: any = {}; + if (message.key !== "") { + obj.key = message.key; + } + if (message.value !== undefined) { + obj.value = IRArgument.toJSON(message.value); + } + return obj; + }, + + create, I>>( + base?: I, + ): IRStage_ArgumentsEntry { + return IRStage_ArgumentsEntry.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>( + object: I, + ): IRStage_ArgumentsEntry { + const message = createBaseIRStage_ArgumentsEntry(); + message.key = object.key ?? ""; + message.value = + object.value !== undefined && object.value !== null + ? IRArgument.fromPartial(object.value) + : undefined; return message; }, }; diff --git a/runners/nodejs/src/runtime/runner.ts b/runners/nodejs/src/runtime/runner.ts index a44b331..f0fa355 100644 --- a/runners/nodejs/src/runtime/runner.ts +++ b/runners/nodejs/src/runtime/runner.ts @@ -1,13 +1,7 @@ -import { - IRParameter, - IRParameterType, - IRProcessor, - IRStage, -} from "../proto/intermediate"; +import { IRParameterType, IRStage } from "../proto/intermediate"; import { ChannelData } from "../proto"; import { Subject, Subscription } from "rxjs"; import { Processor } from "../interfaces/processor"; -import { Constructor } from "./constructor"; import * as path from "node:path"; import { Reader } from "../interfaces/reader"; import { Writer } from "../interfaces/writer"; @@ -20,15 +14,8 @@ export class Runner { private readers: Map> = new Map(); /** Runtime config. */ - private processors: Map< - String, - { constructor: Constructor; definition: IRProcessor } - > = new Map(); private stages: Map = new Map(); - /** Executions as promises. */ - private readonly executions: Promise[] = []; - constructor() { this.incomingSubscription = this.incoming.subscribe((payload) => { const reader = this.readers.get(payload.destinationUri); @@ -41,43 +28,21 @@ export class Runner { }); } - async prepareProcessor(irProcessor: IRProcessor): Promise { - const absolutePath = path.resolve(irProcessor.metadata.import); - console.log(`Importing ${absolutePath}`); + async load(stage: IRStage): Promise { + /** Load the processor into Node.js */ + const absolutePath = path.resolve(stage.processor!.metadata.import); const processor = await import(absolutePath); - this.processors.set(irProcessor.uri, { - constructor: processor.default, - definition: irProcessor, - }); - } - - async prepareStage(stage: IRStage): Promise { - console.log( - `Preparing stage: \`${stage.uri}\` using ${stage.processorUri}`, - ); - // Retrieve the processor definition and constructor. - const entry = this.processors.get(stage.processorUri); - if (entry === null || entry === undefined) { - throw new Error(`Processor not found for stage ${stage.uri}`); - } - const { constructor, definition } = entry; - - // Retrieve parameters by name. - const parameters: Map = new Map(); - definition.parameters.forEach((param) => { - parameters.set(param.name, param); - }); + const constructor = processor.default; - // Parse args. + /** Parse the stage's arguments. */ const args: Map = new Map(); - stage.arguments.forEach((arg) => { - const param = parameters.get(arg.name)!; - if (param.type == IRParameterType.READER) { + Object.entries(stage.arguments).map(([key, arg]) => { + if (arg.parameter!.type == IRParameterType.READER) { const subject = new Subject(); const reader = new Reader(subject); this.readers.set(arg.value[0], subject); - args.set(param.name, reader); - } else if (param.type == IRParameterType.WRITER) { + args.set(arg.parameter!.name, reader); + } else if (arg.parameter!.type == IRParameterType.WRITER) { const subject = new Subject(); subject.subscribe((data) => { this.outgoing.next({ @@ -86,9 +51,11 @@ export class Runner { }); }); const writer = new Writer(subject); - args.set(param.name, writer); + args.set(arg.parameter!.name, writer); } else { - console.error(new Error(`Unsupported parameter type ${param.type}`)); + console.error( + new Error(`Unsupported parameter type ${arg.parameter!.type}`), + ); } }); @@ -103,16 +70,14 @@ export class Runner { async exec(): Promise { console.log("Executing stages."); this.stages.forEach((stage) => { - this.executions.push( - new Promise(() => { - try { - return stage.exec(); - } catch (e) { - console.error(e); - throw e; - } - }), - ); + new Promise(() => { + try { + return stage.exec(); + } catch (e) { + console.error(e); + throw e; + } + }); }); } diff --git a/runners/nodejs/src/runtime/server.ts b/runners/nodejs/src/runtime/server.ts index a63a346..1039ace 100644 --- a/runners/nodejs/src/runtime/server.ts +++ b/runners/nodejs/src/runtime/server.ts @@ -5,7 +5,7 @@ import { ServerUnaryCall, UntypedHandleCall, } from "@grpc/grpc-js"; -import { IRProcessor, IRStage } from "../proto/intermediate"; +import { IRStage } from "../proto/intermediate"; import { Empty } from "../proto/empty"; import { Runner } from "./runner"; @@ -26,12 +26,12 @@ export class ServerImplementation implements RunnerServer { }); } - prepareStage( + load( call: ServerUnaryCall, callback: sendUnaryData, ): void { Runner.shared - .prepareStage(call.request) + .load(call.request) .then(() => { callback(null, {}); }) @@ -40,23 +40,6 @@ export class ServerImplementation implements RunnerServer { }); } - prepareProcessor( - call: ServerUnaryCall, - callback: sendUnaryData, - ): void { - console.log("gRPC::prepareProcessor::invoke"); - Runner.shared - .prepareProcessor(call.request) - .then(() => { - console.log("gRPC::prepareProcessor::success"); - callback(null, {}); - }) - .catch((e) => { - console.log("gRPC::prepareProcessor::error"); - callback(e, {}); - }); - } - exec( call: ServerUnaryCall, callback: sendUnaryData, diff --git a/src/main/kotlin/Orchestrator.kt b/src/main/kotlin/Orchestrator.kt index 7534efc..7b218a6 100644 --- a/src/main/kotlin/Orchestrator.kt +++ b/src/main/kotlin/Orchestrator.kt @@ -10,18 +10,14 @@ import runner.Runner import runner.impl.NodeRunner import runner.jvm.JVMRunner import technology.idlab.parser.intermediate.IRParameter -import technology.idlab.parser.intermediate.IRProcessor import technology.idlab.parser.intermediate.IRStage import technology.idlab.util.Log class Orchestrator(stages: Set) { - /** List of all processors in the pipeline. */ - private val processors = stages.map { it.processor }.toSet() - /** An exhaustive list of all runners. */ private val channel = Channel() - private val jvmRunner by lazy { JVMRunner(channel) } - private val nodeRunner by lazy { NodeRunner(channel, 5000) } + private val jvmRunner = JVMRunner(channel) + private val nodeRunner = NodeRunner(channel, 5000) private val runners = listOf(nodeRunner, jvmRunner) /** A map of all channel URIs and their readers. */ @@ -29,33 +25,20 @@ class Orchestrator(stages: Set) { init { /** Initialize the processors and stages in the runtimes. */ - runBlocking { - processors.forEach { processor -> prepare(processor) } - stages.forEach { stage -> prepare(stage) } - } - } - - /** Prepare a processor inside of it's corresponding runtime. */ - private suspend fun prepare(processor: IRProcessor) { - val runner = getRuntime(processor.target) - runner.prepare(processor) + runBlocking { stages.forEach { stage -> prepare(stage) } } } /** Prepare a stage inside of it's corresponding runtime. */ private suspend fun prepare(stage: IRStage) { // Get the corresponding runner. val runner = getRuntime(stage.processor.target) - runner.prepare(stage) + runner.load(stage) // Find all the readers in the stage. - val readers = - stage.processor.parameters.filter { it.type == IRParameter.Type.READER }.map { it.name } - - // Get their concrete URIs. - val uris = stage.arguments.filter { readers.contains(it.name) }.map { it.value[0] } - - // Add them as a channel targets. - uris.forEach { this.readers[it] = runner } + stage.arguments + .filter { it.value.parameter.type == IRParameter.Type.READER } + .mapValues { (_, argument) -> argument.value[0] } + .forEach { (_, uri) -> this.readers[uri] = runner } } /** Execute all stages in all the runtimes. */ diff --git a/src/main/kotlin/parser/impl/RDFParser.kt b/src/main/kotlin/parser/impl/RDFParser.kt index fb44c28..d07aa20 100644 --- a/src/main/kotlin/parser/impl/RDFParser.kt +++ b/src/main/kotlin/parser/impl/RDFParser.kt @@ -43,7 +43,7 @@ class RDFParser(file: File) : Parser() { // Parse parameters. val parameterBindings = mapOf("?processor" to uri) - val parameters = mutableListOf() + val parameters = mutableMapOf() model.query("/queries/parameters.sparql", parameterBindings) { query -> val path = query.get("path").asResource().toString().substringAfterLast("/") val datatype = query.get("datatype").asResource().toString() @@ -95,7 +95,7 @@ class RDFParser(file: File) : Parser() { } val parameter = IRParameter(path, type, presence, count) - parameters.add(parameter) + parameters[path] = parameter } // Parse metadata. @@ -137,7 +137,12 @@ class RDFParser(file: File) : Parser() { val args = builder.getOrPut(key) { mutableListOf() } args.add(value) } - val arguments = builder.map { (key, value) -> IRArgument(key, value) } + + val arguments = + builder.mapValues { (key, value) -> + val parameter = processor.parameters[key]!! + return@mapValues IRArgument(parameter, value) + } // Append as result. val stage = IRStage(uri, processor, arguments) diff --git a/src/main/kotlin/parser/impl/TomlParser.kt b/src/main/kotlin/parser/impl/TomlParser.kt index f5d6cdd..ba1de47 100644 --- a/src/main/kotlin/parser/impl/TomlParser.kt +++ b/src/main/kotlin/parser/impl/TomlParser.kt @@ -51,7 +51,7 @@ private fun TomlTable.toIRParameter(): IRParameter { return IRParameter(name, type, presence, count) } -private fun TomlTable.toIRArguments(): List { +private fun TomlTable.toIRArguments(parameters: Map): List { val results = mutableListOf() this.keySet().forEach { name -> @@ -64,10 +64,10 @@ private fun TomlTable.toIRArguments(): List { values.add(value) } - results.add(IRArgument(name, values)) + results.add(IRArgument(parameters[name]!!, values)) } else { val value = this.get(name).toString() - results.add(IRArgument(name, listOf(value))) + results.add(IRArgument(parameters[name]!!, listOf(value))) } } @@ -92,10 +92,10 @@ private fun TomlTable.toIRProcessor(uri: String): IRProcessor { // Parse the parameters. val parametersArray = this.getArray("parameters")!! - val result = mutableListOf() + val result = mutableMapOf() for (i in 0 until parametersArray.size()) { - val parameter = parametersArray.getTable(i) - result.add(parameter.toIRParameter()) + val parameter = parametersArray.getTable(i).toIRParameter() + result[parameter.name] = parameter } // Parse metadata. @@ -116,8 +116,8 @@ private fun TomlTable.toIRStage(processors: Map, uri: Strin val processor = processors[processorURI] ?: Log.shared.fatal("Unknown processor: $processorURI") // Parse arguments. - val arguments = this.getTable("arguments")?.toIRArguments() ?: emptyList() - return IRStage(uri, processor, arguments) + val arguments = this.getTable("arguments")?.toIRArguments(processor.parameters) ?: emptyList() + return IRStage(uri, processor, arguments.associateBy { it.parameter.name }) } class TomlParser(file: File) : Parser() { diff --git a/src/main/kotlin/parser/intermediate/IRArgument.kt b/src/main/kotlin/parser/intermediate/IRArgument.kt index bee649d..d8c316c 100644 --- a/src/main/kotlin/parser/intermediate/IRArgument.kt +++ b/src/main/kotlin/parser/intermediate/IRArgument.kt @@ -1,8 +1,8 @@ package technology.idlab.parser.intermediate data class IRArgument( - // Parameter name. - val name: String, + // Parameter. + val parameter: IRParameter, // Concrete but unparsed value. val value: List, ) diff --git a/src/main/kotlin/parser/intermediate/IRProcessor.kt b/src/main/kotlin/parser/intermediate/IRProcessor.kt index c814100..c85e025 100644 --- a/src/main/kotlin/parser/intermediate/IRProcessor.kt +++ b/src/main/kotlin/parser/intermediate/IRProcessor.kt @@ -8,7 +8,7 @@ class IRProcessor( // The destination of the processor. val target: Runner.Target, // Processor parameters. - val parameters: List = emptyList(), + val parameters: Map = emptyMap(), // Additional parameters. These may be used by the runner for any reason. val metadata: Map = emptyMap() ) diff --git a/src/main/kotlin/parser/intermediate/IRStage.kt b/src/main/kotlin/parser/intermediate/IRStage.kt index c08378b..2784216 100644 --- a/src/main/kotlin/parser/intermediate/IRStage.kt +++ b/src/main/kotlin/parser/intermediate/IRStage.kt @@ -6,5 +6,5 @@ data class IRStage( // The processor that the stage is associated with. val processor: IRProcessor, // Concrete but unparsed arguments for the stage. - val arguments: List = emptyList(), + val arguments: Map = emptyMap(), ) diff --git a/src/main/kotlin/runner/Runner.kt b/src/main/kotlin/runner/Runner.kt index f224392..cbb9c70 100644 --- a/src/main/kotlin/runner/Runner.kt +++ b/src/main/kotlin/runner/Runner.kt @@ -1,7 +1,6 @@ package runner import kotlinx.coroutines.channels.Channel -import technology.idlab.parser.intermediate.IRProcessor import technology.idlab.parser.intermediate.IRStage import technology.idlab.util.Log @@ -41,11 +40,8 @@ abstract class Runner( /* Messages which are destined to a processor inside the runner. */ val toProcessors = Channel() - /** Register and prepare a processor inside the runtime. */ - abstract suspend fun prepare(processor: IRProcessor) - /** Register and prepare a stage inside the runtime. */ - abstract suspend fun prepare(stage: IRStage) + abstract suspend fun load(stage: IRStage) /** Start pipeline execution. */ abstract suspend fun exec() diff --git a/src/main/kotlin/runner/impl/GRPCRunner.kt b/src/main/kotlin/runner/impl/GRPCRunner.kt index 2bd1940..37275d2 100644 --- a/src/main/kotlin/runner/impl/GRPCRunner.kt +++ b/src/main/kotlin/runner/impl/GRPCRunner.kt @@ -17,6 +17,7 @@ import technology.idlab.parser.intermediate.IRParameter import technology.idlab.parser.intermediate.IRProcessor import technology.idlab.parser.intermediate.IRStage import technology.idlab.util.Log +import technology.idlab.util.retries private val empty = Empty.getDefaultInstance() @@ -60,23 +61,23 @@ private fun IRParameter.toGRPC(): GRPC.IRParameter { private fun IRArgument.toGRPC(): GRPC.IRArgument { val builder = GRPC.IRArgument.newBuilder() - builder.setName(name) - builder.addAllValue(value) + builder.setParameter(this.parameter.toGRPC()) + builder.addAllValue(this.value) return builder.build() } private fun IRStage.toGRPC(): GRPC.IRStage { val builder = GRPC.IRStage.newBuilder() builder.setUri(uri) - builder.setProcessorUri(processor.uri) - builder.addAllArguments(arguments.map { it.toGRPC() }) + builder.setProcessor(this.processor.toGRPC()) + builder.putAllArguments(arguments.mapValues { it.value.toGRPC() }) return builder.build() } private fun IRProcessor.toGRPC(): GRPC.IRProcessor { val builder = GRPC.IRProcessor.newBuilder() builder.setUri(uri) - builder.addAllParameters(parameters.map { it.toGRPC() }) + builder.putAllParameters(parameters.mapValues { it.value.toGRPC() }) builder.putAllMetadata(metadata) return builder.build() } @@ -85,11 +86,17 @@ private fun IRProcessor.toGRPC(): GRPC.IRProcessor { * This runner has GRPC built-in, so the only configuration that an extending class needs to provide * is the host and port of the GRPC server, as well as actually booting the process. */ -abstract class GRPCRunner(fromProcessors: Channel, host: String, protected val port: Int) : - Runner(fromProcessors) { +abstract class GRPCRunner( + /** The channel to receive messages from the processors. */ + fromProcessors: Channel, + host: String, + /** The port of the GRPC server. */ + protected val port: Int +) : Runner(fromProcessors) { + /** Amount of retries before communication is considered failure. */ + private val retries = 5 /** Create a single stub for all communication. */ - // Initialize the GRPC stub. private val conn: ManagedChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build() private val grpc: RunnerGrpcKt.RunnerCoroutineStub = RunnerGrpcKt.RunnerCoroutineStub(conn) @@ -102,22 +109,12 @@ abstract class GRPCRunner(fromProcessors: Channel, host: String, protec conn.shutdown() } - override suspend fun prepare(processor: IRProcessor) { - Log.shared.info("Preparing processor: `${processor.uri}`") - grpc.prepareProcessor(processor.toGRPC()) - Log.shared.info("Done preparing processor: `${processor.uri}`") - } - - override suspend fun prepare(stage: IRStage) { - Log.shared.info("Preparing stage: `${stage.uri}`") - grpc.prepareStage(stage.toGRPC()) - Log.shared.info("Done preparing stage: `${stage.uri}`") + override suspend fun load(stage: IRStage) { + retries(5, 1000) { grpc.load(stage.toGRPC()) } } override suspend fun exec() = coroutineScope { - Log.shared.debug("gRPC::exec::invoke") - grpc.exec(empty) - Log.shared.debug("gRPC::exec::success") + retries(5, 1000) { grpc.exec(empty) } // Create a flow for outgoing messages. val toGRPCProcessors = diff --git a/src/main/kotlin/runner/impl/NodeRunner.kt b/src/main/kotlin/runner/impl/NodeRunner.kt index 0acb523..d4d10a8 100644 --- a/src/main/kotlin/runner/impl/NodeRunner.kt +++ b/src/main/kotlin/runner/impl/NodeRunner.kt @@ -9,7 +9,7 @@ import technology.idlab.util.Log class NodeRunner(fromProcessors: Channel, port: Int) : GRPCRunner(fromProcessors, "localhost", port) { /** Handle to the child process. */ - private val process by lazy { createProcess() } + private val process = createProcess() init { // Add a shutdown hook to ensure that the process is killed when the JVM exits. diff --git a/src/main/kotlin/runner/jvm/JVMRunner.kt b/src/main/kotlin/runner/jvm/JVMRunner.kt index 3696044..371af69 100644 --- a/src/main/kotlin/runner/jvm/JVMRunner.kt +++ b/src/main/kotlin/runner/jvm/JVMRunner.kt @@ -8,65 +8,63 @@ import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import kotlinx.coroutines.withTimeout import org.jetbrains.kotlin.backend.common.push +import org.jetbrains.kotlin.utils.addToStdlib.ifFalse import runner.Runner import technology.idlab.parser.intermediate.IRParameter -import technology.idlab.parser.intermediate.IRProcessor import technology.idlab.parser.intermediate.IRStage import technology.idlab.util.Log class JVMRunner( fromProcessors: Channel, ) : Runner(fromProcessors) { - private val processors = mutableMapOf>>() + /** Map of all stages in the runner. */ private val stages = mutableMapOf() + + /** All stages are ran in their own job, for cancellation purposes we keep track of them. */ private val jobs: MutableList = mutableListOf() /** Incoming messages are delegated to sub channels. These are mapped by their URI. */ private val readers = mutableMapOf>() - override suspend fun prepare(processor: IRProcessor) { - val className = processor.metadata["class"] ?: Log.shared.fatal("Processor has no class key.") + override suspend fun load(stage: IRStage) { + /** Load the class into the JVM> */ + val className = + stage.processor.metadata["class"] ?: Log.shared.fatal("The processor has no class key set.") val clazz = Class.forName(className) as Class<*> + /** Check if instantiatable. */ if (!Processor::class.java.isAssignableFrom(clazz)) { Log.shared.fatal("Processor class does not extend Processor.") } - this.processors[processor.uri] = Pair(processor, clazz as Class) - } - - override suspend fun prepare(stage: IRStage) { - val processor = - processors[stage.processor.uri] - ?: Log.shared.fatal("Unknown processor: ${stage.processor.uri}") - val irArguments = stage.arguments.associateBy { it.name } - + /** Build the argument map. */ val arguments = mutableMapOf() - for (parameter in processor.first.parameters) { - val irArgument = irArguments[parameter.name] - - if (irArgument == null) { - if (parameter.presence == IRParameter.Presence.REQUIRED) { - Log.shared.fatal("Missing required argument: ${parameter.name}") - } - continue + for ((name, arg) in stage.arguments) { + /** Create concrete instances. */ + val concrete = arg.value.map { instantiate(arg.parameter.type, it) } + + /** + * If an array is expected, simply pass the value directly. Otherwise, pass the first + * variable. + */ + if (arg.parameter.count == IRParameter.Count.LIST) { + arguments[name] = concrete + } else { + assert(concrete.size == 1) + assert(arg.parameter.count == IRParameter.Count.SINGLE) + arguments[name] = concrete[0] } - - if (parameter.count == IRParameter.Count.SINGLE) { - if (irArgument.value.size != 1) { - Log.shared.fatal("Expected single value for argument: ${parameter.name}") - } - - val serialized = irArgument.value[0] - arguments[parameter.name] = instantiate(parameter.type, serialized) - continue - } - - arguments[parameter.name] = irArgument.value.map { instantiate(parameter.type, it) } } - val constructor = processor.second.getConstructor(Map::class.java) + /** Check if the non-optional arguments were set. */ + stage.processor.parameters + .filter { it.value.presence == IRParameter.Presence.REQUIRED } + .all { it.key in arguments.keys } + .ifFalse { Log.shared.fatal("Required argument not set.") } + + /** Initialize the stage with the new map. */ + val constructor = clazz.getConstructor(Map::class.java) this.stages[stage.uri] = constructor.newInstance(arguments) as Processor } diff --git a/src/main/kotlin/util/Retries.kt b/src/main/kotlin/util/Retries.kt new file mode 100644 index 0000000..76bc687 --- /dev/null +++ b/src/main/kotlin/util/Retries.kt @@ -0,0 +1,17 @@ +package technology.idlab.util + +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay + +suspend fun retries(times: Int, milliseconds: Long = 1000, block: suspend () -> T): T = + coroutineScope { + for (i in 0 until times) { + try { + return@coroutineScope block() + } catch (e: Exception) { + delay(milliseconds) + } + } + + Log.shared.fatal("Maximum retries exceeded.") + } diff --git a/src/test/kotlin/parser/ParserTest.kt b/src/test/kotlin/parser/ParserTest.kt index 7b89e2d..27c8b2c 100644 --- a/src/test/kotlin/parser/ParserTest.kt +++ b/src/test/kotlin/parser/ParserTest.kt @@ -31,13 +31,13 @@ abstract class ParserTest { assertEquals(2, alpha.parameters.size, "Alpha processor should have two parameters.") // Check its arguments. - val one = alpha.parameters.find { it.name == "one" } + val one = alpha.parameters["one"] assertNotNull(one, "Parameter one should exist.") assertEquals(IRParameter.Type.STRING, one.type, "Parameter one should be of type string.") assertEquals(IRParameter.Count.SINGLE, one.count, "Parameter one should be a single value.") assertEquals(IRParameter.Presence.REQUIRED, one.presence, "Parameter one should be required.") - val two = alpha.parameters.find { it.name == "two" } + val two = alpha.parameters["two"] assertNotNull(two, "Parameter two should exist.") assertEquals(IRParameter.Type.INT, two.type, "Parameter two should be of type integer.") assertEquals(IRParameter.Count.LIST, two.count, "Parameter two should be an array.") @@ -55,13 +55,13 @@ abstract class ParserTest { alphaOne.processor.uri.endsWith("alpha"), "alpha_one stage should use the alpha processor.") // Parse first argument. - val one = alphaOne.arguments.find { it.name == "one" } + val one = alphaOne.arguments["one"] assertNotNull(one, "alpha_one::one should exist.") assertEquals(1, one.value.size, "alpha_one::one should have one value.") assertEquals("Hello, World!", one.value[0], "alpha_one::one should be 'Hello, World!'.") // Parse second argument. - val two = alphaOne.arguments.find { it.name == "two" } + val two = alphaOne.arguments["two"] assertNull(two, "alpha_one::two should not exist.") } } diff --git a/src/test/kotlin/processors/NodeTransparent.kt b/src/test/kotlin/processors/NodeTransparent.kt index ada09a0..dcc8ce7 100644 --- a/src/test/kotlin/processors/NodeTransparent.kt +++ b/src/test/kotlin/processors/NodeTransparent.kt @@ -12,19 +12,21 @@ class NodeTransparent { IRProcessor( "transparent", Runner.Target.NODEJS, - listOf( - IRParameter( - "input", - IRParameter.Type.READER, - IRParameter.Presence.REQUIRED, - IRParameter.Count.SINGLE, - ), - IRParameter( - "output", - IRParameter.Type.WRITER, - IRParameter.Presence.REQUIRED, - IRParameter.Count.SINGLE, - ), + mapOf( + "input" to + IRParameter( + "input", + IRParameter.Type.READER, + IRParameter.Presence.REQUIRED, + IRParameter.Count.SINGLE, + ), + "output" to + IRParameter( + "output", + IRParameter.Type.WRITER, + IRParameter.Presence.REQUIRED, + IRParameter.Count.SINGLE, + ), ), mapOf("import" to "../std/transparent.js"), ) @@ -33,9 +35,9 @@ class NodeTransparent { return IRStage( "transparent_stage", processor, - listOf( - IRArgument("input", listOf(channelInURI)), - IRArgument("output", listOf(channelOutURI))), + mapOf( + "input" to IRArgument(processor.parameters["input"]!!, listOf(channelInURI)), + "output" to IRArgument(processor.parameters["output"]!!, listOf(channelOutURI))), ) } } diff --git a/src/test/kotlin/processors/TappedReader.kt b/src/test/kotlin/processors/TappedReader.kt index 16665ea..b889606 100644 --- a/src/test/kotlin/processors/TappedReader.kt +++ b/src/test/kotlin/processors/TappedReader.kt @@ -34,20 +34,23 @@ class TappedReader(args: Map) : Processor(args) { IRProcessor( "tapped_reader", Runner.Target.JVM, - listOf( - IRParameter( - "input", - IRParameter.Type.READER, - IRParameter.Presence.REQUIRED, - IRParameter.Count.SINGLE, - ), + mapOf( + "input" to + IRParameter( + "input", + IRParameter.Type.READER, + IRParameter.Presence.REQUIRED, + IRParameter.Count.SINGLE, + ), ), mapOf("class" to "processors.TappedReader"), ) fun stage(channelURI: String): IRStage { return IRStage( - "tapped_reader_stage", processor, listOf(IRArgument("input", listOf(channelURI)))) + "tapped_reader_stage", + processor, + mapOf("input" to IRArgument(processor.parameters["input"]!!, listOf(channelURI)))) } } } diff --git a/src/test/kotlin/processors/TappedWriter.kt b/src/test/kotlin/processors/TappedWriter.kt index 1922700..a6ac0e5 100644 --- a/src/test/kotlin/processors/TappedWriter.kt +++ b/src/test/kotlin/processors/TappedWriter.kt @@ -34,20 +34,23 @@ class TappedWriter(args: Map) : Processor(args) { IRProcessor( "tapped_writer", Runner.Target.JVM, - listOf( - IRParameter( - "output", - IRParameter.Type.WRITER, - IRParameter.Presence.REQUIRED, - IRParameter.Count.SINGLE, - ), + mapOf( + "output" to + IRParameter( + "output", + IRParameter.Type.WRITER, + IRParameter.Presence.REQUIRED, + IRParameter.Count.SINGLE, + ), ), mapOf("class" to "processors.TappedWriter"), ) fun stage(channelURI: String): IRStage { return IRStage( - "tapped_writer_stage", processor, listOf(IRArgument("output", listOf(channelURI)))) + "tapped_writer_stage", + processor, + mapOf("output" to IRArgument(processor.parameters["output"]!!, listOf(channelURI)))) } } } diff --git a/src/test/kotlin/runner/RunnerTest.kt b/src/test/kotlin/runner/RunnerTest.kt index 5ffb183..06206b5 100644 --- a/src/test/kotlin/runner/RunnerTest.kt +++ b/src/test/kotlin/runner/RunnerTest.kt @@ -22,17 +22,9 @@ abstract class RunnerTest { @AfterEach fun allowGracefulShutdown() = runBlocking { delay(2000) } @Test - fun prepareProcessorTest() = runBlocking { + fun loadTest() = runBlocking { val runner = createRunner() - runner.prepare(createProcessor()) - runner.exit() - } - - @Test - fun prepareStageTest() = runBlocking { - val runner = createRunner() - runner.prepare(createProcessor()) - runner.prepare(createStage()) + runner.load(createStage()) runner.exit() } @@ -41,8 +33,7 @@ abstract class RunnerTest { val runner = createRunner() // Prepare the runner. - runner.prepare(createProcessor()) - runner.prepare(createStage()) + runner.load(createStage()) // Start the runner. val job = launch { runner.exec() } @@ -62,23 +53,29 @@ abstract class RunnerTest { runner.exit() } + private val paramInput = + IRParameter( + "input", + IRParameter.Type.READER, + IRParameter.Presence.REQUIRED, + IRParameter.Count.SINGLE, + ) + + private val paramOutput = + IRParameter( + "output", + IRParameter.Type.WRITER, + IRParameter.Presence.REQUIRED, + IRParameter.Count.SINGLE, + ) + private fun createProcessor(): IRProcessor { return IRProcessor( "transparent", this.target, - listOf( - IRParameter( - "input", - IRParameter.Type.READER, - IRParameter.Presence.REQUIRED, - IRParameter.Count.SINGLE, - ), - IRParameter( - "output", - IRParameter.Type.WRITER, - IRParameter.Presence.REQUIRED, - IRParameter.Count.SINGLE, - ), + mapOf( + "input" to this.paramInput, + "output" to this.paramOutput, ), this.metadata, ) @@ -88,9 +85,9 @@ abstract class RunnerTest { return IRStage( "transparent_stage", this.createProcessor(), - listOf( - IRArgument("input", listOf("channel_in_uri")), - IRArgument("output", listOf("channel_out_uri"))), + mapOf( + "input" to IRArgument(paramInput, listOf("channel_in_uri")), + "output" to IRArgument(paramOutput, listOf("channel_out_uri"))), ) } }