From 863c27ec019b8c31cebac647fbec876bada22e8e Mon Sep 17 00:00:00 2001 From: Jens Pots Date: Thu, 20 Jun 2024 18:03:28 +0200 Subject: [PATCH] feat: updated nodejs runner structure --- runners/nodejs/README.md | 12 +- runners/nodejs/src/index.ts | 6 - runners/nodejs/src/proto/empty.ts | 83 ++ runners/nodejs/src/proto/index.ts | 294 ++++++ runners/nodejs/src/proto/intermediate.ts | 725 +++++++++++++++ runners/nodejs/src/runtime/index.ts | 16 +- runners/nodejs/src/runtime/resolve.ts | 17 - runners/nodejs/src/runtime/runner.ts | 1041 +--------------------- runners/nodejs/src/runtime/server.ts | 118 +-- 9 files changed, 1151 insertions(+), 1161 deletions(-) delete mode 100644 runners/nodejs/src/index.ts create mode 100644 runners/nodejs/src/proto/empty.ts create mode 100644 runners/nodejs/src/proto/index.ts create mode 100644 runners/nodejs/src/proto/intermediate.ts delete mode 100644 runners/nodejs/src/runtime/resolve.ts diff --git a/runners/nodejs/README.md b/runners/nodejs/README.md index f75c0d3..9dfd789 100644 --- a/runners/nodejs/README.md +++ b/runners/nodejs/README.md @@ -9,10 +9,10 @@ In this directory, a runner and processor interface is defined for use with Node We use `proto-ts` to generate TypeScript interfaces from the protocol buffer definitions. Run the following command to generate the source code. ```shell -protoc \ - --plugin=./node_modules/.bin/protoc-gen-ts_proto \ - --ts_proto_out=./src \ - --ts_proto_opt=outputServices=grpc-js \ - --proto-path=.. \ - ./index.proto + protoc \ + --plugin=./node_modules/.bin/protoc-gen-ts_proto \ + --ts_proto_out=./src/proto \ + --ts_proto_opt=outputServices=grpc-js \ + --proto_path=./../../proto \ + index.proto ``` diff --git a/runners/nodejs/src/index.ts b/runners/nodejs/src/index.ts deleted file mode 100644 index d31f25f..0000000 --- a/runners/nodejs/src/index.ts +++ /dev/null @@ -1,6 +0,0 @@ -import { Processor } from "./interfaces/processor"; -import { Reader } from "./interfaces/reader"; -import { Writer } from "./interfaces/writer"; -import { RunnerError } from "./error"; - -export { Processor, Reader, Writer, RunnerError }; diff --git a/runners/nodejs/src/proto/empty.ts b/runners/nodejs/src/proto/empty.ts new file mode 100644 index 0000000..0c6be95 --- /dev/null +++ b/runners/nodejs/src/proto/empty.ts @@ -0,0 +1,83 @@ +// Code generated by protoc-gen-ts_proto. DO NOT EDIT. +// versions: +// protoc-gen-ts_proto v1.180.0 +// protoc v5.27.0 +// source: empty.proto + +/* eslint-disable */ +import * as _m0 from "protobufjs/minimal"; + +export const protobufPackage = ""; + +/** A message that represents nothing. */ +export interface Empty {} + +function createBaseEmpty(): Empty { + return {}; +} + +export const Empty = { + encode(_: Empty, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): Empty { + const reader = + input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseEmpty(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(_: any): Empty { + return {}; + }, + + toJSON(_: Empty): unknown { + const obj: any = {}; + return obj; + }, + + create, I>>(base?: I): Empty { + return Empty.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>(_: I): Empty { + const message = createBaseEmpty(); + return message; + }, +}; + +type Builtin = + | Date + | Function + | Uint8Array + | string + | number + | boolean + | undefined; + +export type DeepPartial = T extends Builtin + ? T + : T extends globalThis.Array + ? globalThis.Array> + : T extends ReadonlyArray + ? ReadonlyArray> + : T extends {} + ? { [K in keyof T]?: DeepPartial } + : Partial; + +type KeysOfUnion = T extends T ? keyof T : never; +export type Exact = P extends Builtin + ? P + : P & { [K in keyof P]: Exact } & { + [K in Exclude>]: never; + }; diff --git a/runners/nodejs/src/proto/index.ts b/runners/nodejs/src/proto/index.ts new file mode 100644 index 0000000..7222541 --- /dev/null +++ b/runners/nodejs/src/proto/index.ts @@ -0,0 +1,294 @@ +// Code generated by protoc-gen-ts_proto. DO NOT EDIT. +// versions: +// protoc-gen-ts_proto v1.180.0 +// protoc v5.27.0 +// source: index.proto + +/* eslint-disable */ +import { + type CallOptions, + ChannelCredentials, + Client, + ClientDuplexStream, + type ClientOptions, + type ClientUnaryCall, + handleBidiStreamingCall, + type handleUnaryCall, + makeGenericClientConstructor, + Metadata, + type ServiceError, + type UntypedServiceImplementation, +} from "@grpc/grpc-js"; +import * as _m0 from "protobufjs/minimal"; +import { Empty } from "./empty"; +import { IRProcessor, IRStage } from "./intermediate"; + +export const protobufPackage = ""; + +export interface ChannelData { + destinationUri: string; + data: Uint8Array; +} + +function createBaseChannelData(): ChannelData { + return { destinationUri: "", data: new Uint8Array(0) }; +} + +export const ChannelData = { + encode( + message: ChannelData, + writer: _m0.Writer = _m0.Writer.create(), + ): _m0.Writer { + if (message.destinationUri !== "") { + writer.uint32(10).string(message.destinationUri); + } + if (message.data.length !== 0) { + writer.uint32(18).bytes(message.data); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): ChannelData { + const reader = + input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseChannelData(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + + message.destinationUri = reader.string(); + continue; + case 2: + if (tag !== 18) { + break; + } + + message.data = reader.bytes(); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): ChannelData { + return { + destinationUri: isSet(object.destinationUri) + ? globalThis.String(object.destinationUri) + : "", + data: isSet(object.data) + ? bytesFromBase64(object.data) + : new Uint8Array(0), + }; + }, + + toJSON(message: ChannelData): unknown { + const obj: any = {}; + if (message.destinationUri !== "") { + obj.destinationUri = message.destinationUri; + } + if (message.data.length !== 0) { + obj.data = base64FromBytes(message.data); + } + return obj; + }, + + create, I>>(base?: I): ChannelData { + return ChannelData.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>( + object: I, + ): ChannelData { + const message = createBaseChannelData(); + message.destinationUri = object.destinationUri ?? ""; + message.data = object.data ?? new Uint8Array(0); + return message; + }, +}; + +export type RunnerService = typeof RunnerService; +export const RunnerService = { + prepareProcessor: { + path: "/Runner/prepareProcessor", + requestStream: false, + responseStream: false, + requestSerialize: (value: IRProcessor) => + Buffer.from(IRProcessor.encode(value).finish()), + requestDeserialize: (value: Buffer) => IRProcessor.decode(value), + responseSerialize: (value: Empty) => + Buffer.from(Empty.encode(value).finish()), + responseDeserialize: (value: Buffer) => Empty.decode(value), + }, + prepareStage: { + path: "/Runner/prepareStage", + requestStream: false, + responseStream: false, + requestSerialize: (value: IRStage) => + Buffer.from(IRStage.encode(value).finish()), + requestDeserialize: (value: Buffer) => IRStage.decode(value), + responseSerialize: (value: Empty) => + Buffer.from(Empty.encode(value).finish()), + responseDeserialize: (value: Buffer) => Empty.decode(value), + }, + exec: { + path: "/Runner/exec", + requestStream: false, + responseStream: false, + requestSerialize: (value: Empty) => + Buffer.from(Empty.encode(value).finish()), + requestDeserialize: (value: Buffer) => Empty.decode(value), + responseSerialize: (value: Empty) => + Buffer.from(Empty.encode(value).finish()), + responseDeserialize: (value: Buffer) => Empty.decode(value), + }, + channel: { + path: "/Runner/channel", + requestStream: true, + responseStream: true, + requestSerialize: (value: ChannelData) => + Buffer.from(ChannelData.encode(value).finish()), + requestDeserialize: (value: Buffer) => ChannelData.decode(value), + responseSerialize: (value: ChannelData) => + Buffer.from(ChannelData.encode(value).finish()), + responseDeserialize: (value: Buffer) => ChannelData.decode(value), + }, +} as const; + +export interface RunnerServer extends UntypedServiceImplementation { + prepareProcessor: handleUnaryCall; + prepareStage: handleUnaryCall; + exec: handleUnaryCall; + channel: handleBidiStreamingCall; +} + +export interface RunnerClient extends Client { + prepareProcessor( + request: IRProcessor, + callback: (error: ServiceError | null, response: Empty) => void, + ): ClientUnaryCall; + prepareProcessor( + request: IRProcessor, + metadata: Metadata, + callback: (error: ServiceError | null, response: Empty) => void, + ): ClientUnaryCall; + prepareProcessor( + request: IRProcessor, + metadata: Metadata, + options: Partial, + callback: (error: ServiceError | null, response: Empty) => void, + ): ClientUnaryCall; + prepareStage( + request: IRStage, + callback: (error: ServiceError | null, response: Empty) => void, + ): ClientUnaryCall; + prepareStage( + request: IRStage, + metadata: Metadata, + callback: (error: ServiceError | null, response: Empty) => void, + ): ClientUnaryCall; + prepareStage( + request: IRStage, + metadata: Metadata, + options: Partial, + callback: (error: ServiceError | null, response: Empty) => void, + ): ClientUnaryCall; + exec( + request: Empty, + callback: (error: ServiceError | null, response: Empty) => void, + ): ClientUnaryCall; + exec( + request: Empty, + metadata: Metadata, + callback: (error: ServiceError | null, response: Empty) => void, + ): ClientUnaryCall; + exec( + request: Empty, + metadata: Metadata, + options: Partial, + callback: (error: ServiceError | null, response: Empty) => void, + ): ClientUnaryCall; + channel(): ClientDuplexStream; + channel( + options: Partial, + ): ClientDuplexStream; + channel( + metadata: Metadata, + options?: Partial, + ): ClientDuplexStream; +} + +export const RunnerClient = makeGenericClientConstructor( + RunnerService, + "Runner", +) as unknown as { + new ( + address: string, + credentials: ChannelCredentials, + options?: Partial, + ): RunnerClient; + service: typeof RunnerService; + serviceName: string; +}; + +function bytesFromBase64(b64: string): Uint8Array { + if ((globalThis as any).Buffer) { + return Uint8Array.from(globalThis.Buffer.from(b64, "base64")); + } else { + const bin = globalThis.atob(b64); + const arr = new Uint8Array(bin.length); + for (let i = 0; i < bin.length; ++i) { + arr[i] = bin.charCodeAt(i); + } + return arr; + } +} + +function base64FromBytes(arr: Uint8Array): string { + if ((globalThis as any).Buffer) { + return globalThis.Buffer.from(arr).toString("base64"); + } else { + const bin: string[] = []; + arr.forEach((byte) => { + bin.push(globalThis.String.fromCharCode(byte)); + }); + return globalThis.btoa(bin.join("")); + } +} + +type Builtin = + | Date + | Function + | Uint8Array + | string + | number + | boolean + | undefined; + +export type DeepPartial = T extends Builtin + ? T + : T extends globalThis.Array + ? globalThis.Array> + : T extends ReadonlyArray + ? ReadonlyArray> + : T extends {} + ? { [K in keyof T]?: DeepPartial } + : Partial; + +type KeysOfUnion = T extends T ? keyof T : never; +export type Exact = P extends Builtin + ? P + : P & { [K in keyof P]: Exact } & { + [K in Exclude>]: never; + }; + +function isSet(value: any): boolean { + return value !== null && value !== undefined; +} diff --git a/runners/nodejs/src/proto/intermediate.ts b/runners/nodejs/src/proto/intermediate.ts new file mode 100644 index 0000000..ca8a16d --- /dev/null +++ b/runners/nodejs/src/proto/intermediate.ts @@ -0,0 +1,725 @@ +// Code generated by protoc-gen-ts_proto. DO NOT EDIT. +// versions: +// protoc-gen-ts_proto v1.180.0 +// protoc v5.27.0 +// source: intermediate.proto + +/* eslint-disable */ +import * as _m0 from "protobufjs/minimal"; + +export const protobufPackage = ""; + +export enum IRParameterType { + BOOLEAN = 0, + BYTE = 1, + DATE = 2, + DOUBLE = 3, + FLOAT = 4, + INT = 5, + LONG = 6, + STRING = 7, + WRITER = 8, + READER = 9, + UNRECOGNIZED = -1, +} + +export function iRParameterTypeFromJSON(object: any): IRParameterType { + switch (object) { + case 0: + case "BOOLEAN": + return IRParameterType.BOOLEAN; + case 1: + case "BYTE": + return IRParameterType.BYTE; + case 2: + case "DATE": + return IRParameterType.DATE; + case 3: + case "DOUBLE": + return IRParameterType.DOUBLE; + case 4: + case "FLOAT": + return IRParameterType.FLOAT; + case 5: + case "INT": + return IRParameterType.INT; + case 6: + case "LONG": + return IRParameterType.LONG; + case 7: + case "STRING": + return IRParameterType.STRING; + case 8: + case "WRITER": + return IRParameterType.WRITER; + case 9: + case "READER": + return IRParameterType.READER; + case -1: + case "UNRECOGNIZED": + default: + return IRParameterType.UNRECOGNIZED; + } +} + +export function iRParameterTypeToJSON(object: IRParameterType): string { + switch (object) { + case IRParameterType.BOOLEAN: + return "BOOLEAN"; + case IRParameterType.BYTE: + return "BYTE"; + case IRParameterType.DATE: + return "DATE"; + case IRParameterType.DOUBLE: + return "DOUBLE"; + case IRParameterType.FLOAT: + return "FLOAT"; + case IRParameterType.INT: + return "INT"; + case IRParameterType.LONG: + return "LONG"; + case IRParameterType.STRING: + return "STRING"; + case IRParameterType.WRITER: + return "WRITER"; + case IRParameterType.READER: + return "READER"; + case IRParameterType.UNRECOGNIZED: + default: + return "UNRECOGNIZED"; + } +} + +export enum IRParameterPresence { + OPTIONAL = 0, + REQUIRED = 1, + UNRECOGNIZED = -1, +} + +export function iRParameterPresenceFromJSON(object: any): IRParameterPresence { + switch (object) { + case 0: + case "OPTIONAL": + return IRParameterPresence.OPTIONAL; + case 1: + case "REQUIRED": + return IRParameterPresence.REQUIRED; + case -1: + case "UNRECOGNIZED": + default: + return IRParameterPresence.UNRECOGNIZED; + } +} + +export function iRParameterPresenceToJSON(object: IRParameterPresence): string { + switch (object) { + case IRParameterPresence.OPTIONAL: + return "OPTIONAL"; + case IRParameterPresence.REQUIRED: + return "REQUIRED"; + case IRParameterPresence.UNRECOGNIZED: + default: + return "UNRECOGNIZED"; + } +} + +export enum IRParameterCount { + SINGLE = 0, + LIST = 1, + UNRECOGNIZED = -1, +} + +export function iRParameterCountFromJSON(object: any): IRParameterCount { + switch (object) { + case 0: + case "SINGLE": + return IRParameterCount.SINGLE; + case 1: + case "LIST": + return IRParameterCount.LIST; + case -1: + case "UNRECOGNIZED": + default: + return IRParameterCount.UNRECOGNIZED; + } +} + +export function iRParameterCountToJSON(object: IRParameterCount): string { + switch (object) { + case IRParameterCount.SINGLE: + return "SINGLE"; + case IRParameterCount.LIST: + return "LIST"; + case IRParameterCount.UNRECOGNIZED: + default: + return "UNRECOGNIZED"; + } +} + +export interface IRParameter { + name: string; + type: IRParameterType; + presence: IRParameterPresence; + count: IRParameterCount; +} + +export interface IRProcessor { + uri: string; + parameters: IRParameter[]; + metadata: { [key: string]: string }; +} + +export interface IRProcessor_MetadataEntry { + key: string; + value: string; +} + +export interface IRArgument { + name: string; + value: string[]; +} + +export interface IRStage { + uri: string; + processorUri: string; + arguments: IRArgument[]; +} + +function createBaseIRParameter(): IRParameter { + return { name: "", type: 0, presence: 0, count: 0 }; +} + +export const IRParameter = { + encode( + message: IRParameter, + writer: _m0.Writer = _m0.Writer.create(), + ): _m0.Writer { + if (message.name !== "") { + writer.uint32(10).string(message.name); + } + if (message.type !== 0) { + writer.uint32(16).int32(message.type); + } + if (message.presence !== 0) { + writer.uint32(24).int32(message.presence); + } + if (message.count !== 0) { + writer.uint32(32).int32(message.count); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): IRParameter { + const reader = + input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseIRParameter(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + + message.name = reader.string(); + continue; + case 2: + if (tag !== 16) { + break; + } + + message.type = reader.int32() as any; + continue; + case 3: + if (tag !== 24) { + break; + } + + message.presence = reader.int32() as any; + continue; + case 4: + if (tag !== 32) { + break; + } + + message.count = reader.int32() as any; + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): IRParameter { + return { + name: isSet(object.name) ? globalThis.String(object.name) : "", + type: isSet(object.type) ? iRParameterTypeFromJSON(object.type) : 0, + presence: isSet(object.presence) + ? iRParameterPresenceFromJSON(object.presence) + : 0, + count: isSet(object.count) ? iRParameterCountFromJSON(object.count) : 0, + }; + }, + + toJSON(message: IRParameter): unknown { + const obj: any = {}; + if (message.name !== "") { + obj.name = message.name; + } + if (message.type !== 0) { + obj.type = iRParameterTypeToJSON(message.type); + } + if (message.presence !== 0) { + obj.presence = iRParameterPresenceToJSON(message.presence); + } + if (message.count !== 0) { + obj.count = iRParameterCountToJSON(message.count); + } + return obj; + }, + + create, I>>(base?: I): IRParameter { + return IRParameter.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>( + object: I, + ): IRParameter { + const message = createBaseIRParameter(); + message.name = object.name ?? ""; + message.type = object.type ?? 0; + message.presence = object.presence ?? 0; + message.count = object.count ?? 0; + return message; + }, +}; + +function createBaseIRProcessor(): IRProcessor { + return { uri: "", parameters: [], metadata: {} }; +} + +export const IRProcessor = { + encode( + message: IRProcessor, + writer: _m0.Writer = _m0.Writer.create(), + ): _m0.Writer { + if (message.uri !== "") { + writer.uint32(10).string(message.uri); + } + for (const v of message.parameters) { + IRParameter.encode(v!, writer.uint32(18).fork()).ldelim(); + } + Object.entries(message.metadata).forEach(([key, value]) => { + IRProcessor_MetadataEntry.encode( + { key: key as any, value }, + writer.uint32(26).fork(), + ).ldelim(); + }); + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): IRProcessor { + const reader = + input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseIRProcessor(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + + message.uri = reader.string(); + continue; + case 2: + if (tag !== 18) { + break; + } + + message.parameters.push(IRParameter.decode(reader, reader.uint32())); + continue; + case 3: + if (tag !== 26) { + break; + } + + const entry3 = IRProcessor_MetadataEntry.decode( + reader, + reader.uint32(), + ); + if (entry3.value !== undefined) { + message.metadata[entry3.key] = entry3.value; + } + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): IRProcessor { + return { + uri: isSet(object.uri) ? globalThis.String(object.uri) : "", + parameters: globalThis.Array.isArray(object?.parameters) + ? object.parameters.map((e: any) => IRParameter.fromJSON(e)) + : [], + metadata: isObject(object.metadata) + ? Object.entries(object.metadata).reduce<{ [key: string]: string }>( + (acc, [key, value]) => { + acc[key] = String(value); + return acc; + }, + {}, + ) + : {}, + }; + }, + + toJSON(message: IRProcessor): unknown { + const obj: any = {}; + if (message.uri !== "") { + obj.uri = message.uri; + } + if (message.parameters?.length) { + obj.parameters = message.parameters.map((e) => IRParameter.toJSON(e)); + } + if (message.metadata) { + const entries = Object.entries(message.metadata); + if (entries.length > 0) { + obj.metadata = {}; + entries.forEach(([k, v]) => { + obj.metadata[k] = v; + }); + } + } + return obj; + }, + + create, I>>(base?: I): IRProcessor { + return IRProcessor.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>( + object: I, + ): IRProcessor { + const message = createBaseIRProcessor(); + message.uri = object.uri ?? ""; + message.parameters = + object.parameters?.map((e) => IRParameter.fromPartial(e)) || []; + message.metadata = Object.entries(object.metadata ?? {}).reduce<{ + [key: string]: string; + }>((acc, [key, value]) => { + if (value !== undefined) { + acc[key] = globalThis.String(value); + } + return acc; + }, {}); + return message; + }, +}; + +function createBaseIRProcessor_MetadataEntry(): IRProcessor_MetadataEntry { + return { key: "", value: "" }; +} + +export const IRProcessor_MetadataEntry = { + encode( + message: IRProcessor_MetadataEntry, + writer: _m0.Writer = _m0.Writer.create(), + ): _m0.Writer { + if (message.key !== "") { + writer.uint32(10).string(message.key); + } + if (message.value !== "") { + writer.uint32(18).string(message.value); + } + return writer; + }, + + decode( + input: _m0.Reader | Uint8Array, + length?: number, + ): IRProcessor_MetadataEntry { + const reader = + input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseIRProcessor_MetadataEntry(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + + message.key = reader.string(); + continue; + case 2: + if (tag !== 18) { + break; + } + + message.value = reader.string(); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): IRProcessor_MetadataEntry { + return { + key: isSet(object.key) ? globalThis.String(object.key) : "", + value: isSet(object.value) ? globalThis.String(object.value) : "", + }; + }, + + toJSON(message: IRProcessor_MetadataEntry): unknown { + const obj: any = {}; + if (message.key !== "") { + obj.key = message.key; + } + if (message.value !== "") { + obj.value = message.value; + } + return obj; + }, + + create, I>>( + base?: I, + ): IRProcessor_MetadataEntry { + return IRProcessor_MetadataEntry.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>( + object: I, + ): IRProcessor_MetadataEntry { + const message = createBaseIRProcessor_MetadataEntry(); + message.key = object.key ?? ""; + message.value = object.value ?? ""; + return message; + }, +}; + +function createBaseIRArgument(): IRArgument { + return { name: "", value: [] }; +} + +export const IRArgument = { + encode( + message: IRArgument, + writer: _m0.Writer = _m0.Writer.create(), + ): _m0.Writer { + if (message.name !== "") { + writer.uint32(10).string(message.name); + } + for (const v of message.value) { + writer.uint32(18).string(v!); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): IRArgument { + const reader = + input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseIRArgument(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + + message.name = reader.string(); + continue; + case 2: + if (tag !== 18) { + break; + } + + message.value.push(reader.string()); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): IRArgument { + return { + name: isSet(object.name) ? globalThis.String(object.name) : "", + value: globalThis.Array.isArray(object?.value) + ? object.value.map((e: any) => globalThis.String(e)) + : [], + }; + }, + + toJSON(message: IRArgument): unknown { + const obj: any = {}; + if (message.name !== "") { + obj.name = message.name; + } + if (message.value?.length) { + obj.value = message.value; + } + return obj; + }, + + create, I>>(base?: I): IRArgument { + return IRArgument.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>( + object: I, + ): IRArgument { + const message = createBaseIRArgument(); + message.name = object.name ?? ""; + message.value = object.value?.map((e) => e) || []; + return message; + }, +}; + +function createBaseIRStage(): IRStage { + return { uri: "", processorUri: "", arguments: [] }; +} + +export const IRStage = { + encode( + message: IRStage, + writer: _m0.Writer = _m0.Writer.create(), + ): _m0.Writer { + if (message.uri !== "") { + writer.uint32(10).string(message.uri); + } + if (message.processorUri !== "") { + writer.uint32(18).string(message.processorUri); + } + for (const v of message.arguments) { + IRArgument.encode(v!, writer.uint32(26).fork()).ldelim(); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): IRStage { + const reader = + input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseIRStage(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + + message.uri = reader.string(); + continue; + case 2: + if (tag !== 18) { + break; + } + + message.processorUri = reader.string(); + continue; + case 3: + if (tag !== 26) { + break; + } + + message.arguments.push(IRArgument.decode(reader, reader.uint32())); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): IRStage { + return { + uri: isSet(object.uri) ? globalThis.String(object.uri) : "", + processorUri: isSet(object.processorUri) + ? globalThis.String(object.processorUri) + : "", + arguments: globalThis.Array.isArray(object?.arguments) + ? object.arguments.map((e: any) => IRArgument.fromJSON(e)) + : [], + }; + }, + + toJSON(message: IRStage): unknown { + const obj: any = {}; + if (message.uri !== "") { + obj.uri = message.uri; + } + if (message.processorUri !== "") { + obj.processorUri = message.processorUri; + } + if (message.arguments?.length) { + obj.arguments = message.arguments.map((e) => IRArgument.toJSON(e)); + } + return obj; + }, + + create, I>>(base?: I): IRStage { + return IRStage.fromPartial(base ?? ({} as any)); + }, + fromPartial, I>>(object: I): IRStage { + const message = createBaseIRStage(); + message.uri = object.uri ?? ""; + message.processorUri = object.processorUri ?? ""; + message.arguments = + object.arguments?.map((e) => IRArgument.fromPartial(e)) || []; + return message; + }, +}; + +type Builtin = + | Date + | Function + | Uint8Array + | string + | number + | boolean + | undefined; + +export type DeepPartial = T extends Builtin + ? T + : T extends globalThis.Array + ? globalThis.Array> + : T extends ReadonlyArray + ? ReadonlyArray> + : T extends {} + ? { [K in keyof T]?: DeepPartial } + : Partial; + +type KeysOfUnion = T extends T ? keyof T : never; +export type Exact = P extends Builtin + ? P + : P & { [K in keyof P]: Exact } & { + [K in Exclude>]: never; + }; + +function isObject(value: any): boolean { + return typeof value === "object" && value !== null; +} + +function isSet(value: any): boolean { + return value !== null && value !== undefined; +} diff --git a/runners/nodejs/src/runtime/index.ts b/runners/nodejs/src/runtime/index.ts index 43a74de..11efb11 100644 --- a/runners/nodejs/src/runtime/index.ts +++ b/runners/nodejs/src/runtime/index.ts @@ -1,20 +1,12 @@ import { Server, ServerCredentials } from "@grpc/grpc-js"; import { ServerImplementation } from "./server"; -import { RunnerService } from "./runner"; +import { RunnerService } from "../proto"; // Initialize the server. const server = new Server(); + +// Add the Runner service. server.addService(RunnerService, new ServerImplementation()); // Startup. -server.bindAsync( - "0.0.0.0:50051", - ServerCredentials.createInsecure(), - (err, port) => { - if (err) { - console.error(err); - } else { - console.log(`Server listening on ${port}`); - } - }, -); +server.bindAsync("0.0.0.0:50051", ServerCredentials.createInsecure(), () => {}); diff --git a/runners/nodejs/src/runtime/resolve.ts b/runners/nodejs/src/runtime/resolve.ts deleted file mode 100644 index a95e081..0000000 --- a/runners/nodejs/src/runtime/resolve.ts +++ /dev/null @@ -1,17 +0,0 @@ -import { Processor } from "../interfaces/processor"; -import { Constructor } from "./constructor"; - -export async function resolve(): Promise> { - // Define variables. - const src = - "/Users/jens/Developer/technology.idlab.jvm-runner/examples/processors/logger-ts/build/index.js"; - const packageName = ""; - const className = "Logger"; - - // Read file from disk. - const module = await import(src); - const constructor = module[className]; - - // Return result. - return constructor as Constructor; -} diff --git a/runners/nodejs/src/runtime/runner.ts b/runners/nodejs/src/runtime/runner.ts index 7fd3e6d..14baa69 100644 --- a/runners/nodejs/src/runtime/runner.ts +++ b/runners/nodejs/src/runtime/runner.ts @@ -1,1037 +1,22 @@ -// Code generated by protoc-gen-ts_proto. DO NOT EDIT. -// versions: -// protoc-gen-ts_proto v1.180.0 -// protoc v5.27.0 -// source: proto/index.proto +import { IRProcessor } from "../proto/intermediate"; +import { ChannelData } from "../proto"; +import { Subject } from "rxjs"; -/* eslint-disable */ -import { - type CallOptions, - ChannelCredentials, - Client, - ClientDuplexStream, - type ClientOptions, - type ClientUnaryCall, - handleBidiStreamingCall, - type handleUnaryCall, - makeGenericClientConstructor, - Metadata, - type ServiceError, - type UntypedServiceImplementation, -} from "@grpc/grpc-js"; -import * as _m0 from "protobufjs/minimal"; +export class Runner { + public incoming = new Subject(); + public outgoing = new Subject(); -export const protobufPackage = ""; - -export enum ArgumentType { - STRING = 0, - INT = 1, - FLOAT = 2, - BOOL = 3, - WRITER = 4, - READER = 5, - UNRECOGNIZED = -1, -} - -export function argumentTypeFromJSON(object: any): ArgumentType { - switch (object) { - case 0: - case "STRING": - return ArgumentType.STRING; - case 1: - case "INT": - return ArgumentType.INT; - case 2: - case "FLOAT": - return ArgumentType.FLOAT; - case 3: - case "BOOL": - return ArgumentType.BOOL; - case 4: - case "WRITER": - return ArgumentType.WRITER; - case 5: - case "READER": - return ArgumentType.READER; - case -1: - case "UNRECOGNIZED": - default: - return ArgumentType.UNRECOGNIZED; + prepareProcessor(processor: IRProcessor): void { + throw new Error("Method not implemented"); } -} -export function argumentTypeToJSON(object: ArgumentType): string { - switch (object) { - case ArgumentType.STRING: - return "STRING"; - case ArgumentType.INT: - return "INT"; - case ArgumentType.FLOAT: - return "FLOAT"; - case ArgumentType.BOOL: - return "BOOL"; - case ArgumentType.WRITER: - return "WRITER"; - case ArgumentType.READER: - return "READER"; - case ArgumentType.UNRECOGNIZED: - default: - return "UNRECOGNIZED"; + prepareStage(stage: IRProcessor): void { + throw new Error("Method not implemented"); } -} - -export interface Void {} - -export interface Processor { - uri: string; - arguments: { [key: string]: ArgumentType }; - metadata: { [key: string]: string }; -} - -export interface Processor_ArgumentsEntry { - key: string; - value: ArgumentType; -} - -export interface Processor_MetadataEntry { - key: string; - value: string; -} - -export interface Argument { - type: ArgumentType; - value: Uint8Array; -} - -export interface Stage { - uri: string; - processorUri: string; - arguments: { [key: string]: Argument }; -} - -export interface Stage_ArgumentsEntry { - key: string; - value: Argument | undefined; -} - -export interface Payload { - channelUri: string; - data: Uint8Array; -} - -function createBaseVoid(): Void { - return {}; -} - -export const Void = { - encode(_: Void, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { - return writer; - }, - - decode(input: _m0.Reader | Uint8Array, length?: number): Void { - const reader = - input instanceof _m0.Reader ? input : _m0.Reader.create(input); - let end = length === undefined ? reader.len : reader.pos + length; - const message = createBaseVoid(); - while (reader.pos < end) { - const tag = reader.uint32(); - switch (tag >>> 3) { - } - if ((tag & 7) === 4 || tag === 0) { - break; - } - reader.skipType(tag & 7); - } - return message; - }, - - fromJSON(_: any): Void { - return {}; - }, - - toJSON(_: Void): unknown { - const obj: any = {}; - return obj; - }, - - create, I>>(base?: I): Void { - return Void.fromPartial(base ?? ({} as any)); - }, - fromPartial, I>>(_: I): Void { - const message = createBaseVoid(); - return message; - }, -}; - -function createBaseProcessor(): Processor { - return { uri: "", arguments: {}, metadata: {} }; -} - -export const Processor = { - encode( - message: Processor, - writer: _m0.Writer = _m0.Writer.create(), - ): _m0.Writer { - if (message.uri !== "") { - writer.uint32(10).string(message.uri); - } - Object.entries(message.arguments).forEach(([key, value]) => { - Processor_ArgumentsEntry.encode( - { key: key as any, value }, - writer.uint32(18).fork(), - ).ldelim(); - }); - Object.entries(message.metadata).forEach(([key, value]) => { - Processor_MetadataEntry.encode( - { key: key as any, value }, - writer.uint32(26).fork(), - ).ldelim(); - }); - return writer; - }, - - decode(input: _m0.Reader | Uint8Array, length?: number): Processor { - const reader = - input instanceof _m0.Reader ? input : _m0.Reader.create(input); - let end = length === undefined ? reader.len : reader.pos + length; - const message = createBaseProcessor(); - while (reader.pos < end) { - const tag = reader.uint32(); - switch (tag >>> 3) { - case 1: - if (tag !== 10) { - break; - } - - message.uri = reader.string(); - continue; - case 2: - if (tag !== 18) { - break; - } - - const entry2 = Processor_ArgumentsEntry.decode( - reader, - reader.uint32(), - ); - if (entry2.value !== undefined) { - message.arguments[entry2.key] = entry2.value; - } - continue; - case 3: - if (tag !== 26) { - break; - } - - const entry3 = Processor_MetadataEntry.decode( - reader, - reader.uint32(), - ); - if (entry3.value !== undefined) { - message.metadata[entry3.key] = entry3.value; - } - continue; - } - if ((tag & 7) === 4 || tag === 0) { - break; - } - reader.skipType(tag & 7); - } - return message; - }, - - fromJSON(object: any): Processor { - return { - uri: isSet(object.uri) ? globalThis.String(object.uri) : "", - arguments: isObject(object.arguments) - ? Object.entries(object.arguments).reduce<{ - [key: string]: ArgumentType; - }>((acc, [key, value]) => { - acc[key] = argumentTypeFromJSON(value); - return acc; - }, {}) - : {}, - metadata: isObject(object.metadata) - ? Object.entries(object.metadata).reduce<{ [key: string]: string }>( - (acc, [key, value]) => { - acc[key] = String(value); - return acc; - }, - {}, - ) - : {}, - }; - }, - - toJSON(message: Processor): unknown { - const obj: any = {}; - if (message.uri !== "") { - obj.uri = message.uri; - } - if (message.arguments) { - const entries = Object.entries(message.arguments); - if (entries.length > 0) { - obj.arguments = {}; - entries.forEach(([k, v]) => { - obj.arguments[k] = argumentTypeToJSON(v); - }); - } - } - if (message.metadata) { - const entries = Object.entries(message.metadata); - if (entries.length > 0) { - obj.metadata = {}; - entries.forEach(([k, v]) => { - obj.metadata[k] = v; - }); - } - } - return obj; - }, - - create, I>>(base?: I): Processor { - return Processor.fromPartial(base ?? ({} as any)); - }, - fromPartial, I>>( - object: I, - ): Processor { - const message = createBaseProcessor(); - message.uri = object.uri ?? ""; - message.arguments = Object.entries(object.arguments ?? {}).reduce<{ - [key: string]: ArgumentType; - }>((acc, [key, value]) => { - if (value !== undefined) { - acc[key] = value as ArgumentType; - } - return acc; - }, {}); - message.metadata = Object.entries(object.metadata ?? {}).reduce<{ - [key: string]: string; - }>((acc, [key, value]) => { - if (value !== undefined) { - acc[key] = globalThis.String(value); - } - return acc; - }, {}); - return message; - }, -}; - -function createBaseProcessor_ArgumentsEntry(): Processor_ArgumentsEntry { - return { key: "", value: 0 }; -} - -export const Processor_ArgumentsEntry = { - encode( - message: Processor_ArgumentsEntry, - writer: _m0.Writer = _m0.Writer.create(), - ): _m0.Writer { - if (message.key !== "") { - writer.uint32(10).string(message.key); - } - if (message.value !== 0) { - writer.uint32(16).int32(message.value); - } - return writer; - }, - - decode( - input: _m0.Reader | Uint8Array, - length?: number, - ): Processor_ArgumentsEntry { - const reader = - input instanceof _m0.Reader ? input : _m0.Reader.create(input); - let end = length === undefined ? reader.len : reader.pos + length; - const message = createBaseProcessor_ArgumentsEntry(); - while (reader.pos < end) { - const tag = reader.uint32(); - switch (tag >>> 3) { - case 1: - if (tag !== 10) { - break; - } - - message.key = reader.string(); - continue; - case 2: - if (tag !== 16) { - break; - } - - message.value = reader.int32() as any; - continue; - } - if ((tag & 7) === 4 || tag === 0) { - break; - } - reader.skipType(tag & 7); - } - return message; - }, - - fromJSON(object: any): Processor_ArgumentsEntry { - return { - key: isSet(object.key) ? globalThis.String(object.key) : "", - value: isSet(object.value) ? argumentTypeFromJSON(object.value) : 0, - }; - }, - - toJSON(message: Processor_ArgumentsEntry): unknown { - const obj: any = {}; - if (message.key !== "") { - obj.key = message.key; - } - if (message.value !== 0) { - obj.value = argumentTypeToJSON(message.value); - } - return obj; - }, - - create, I>>( - base?: I, - ): Processor_ArgumentsEntry { - return Processor_ArgumentsEntry.fromPartial(base ?? ({} as any)); - }, - fromPartial, I>>( - object: I, - ): Processor_ArgumentsEntry { - const message = createBaseProcessor_ArgumentsEntry(); - message.key = object.key ?? ""; - message.value = object.value ?? 0; - return message; - }, -}; - -function createBaseProcessor_MetadataEntry(): Processor_MetadataEntry { - return { key: "", value: "" }; -} - -export const Processor_MetadataEntry = { - encode( - message: Processor_MetadataEntry, - writer: _m0.Writer = _m0.Writer.create(), - ): _m0.Writer { - if (message.key !== "") { - writer.uint32(10).string(message.key); - } - if (message.value !== "") { - writer.uint32(18).string(message.value); - } - return writer; - }, - - decode( - input: _m0.Reader | Uint8Array, - length?: number, - ): Processor_MetadataEntry { - const reader = - input instanceof _m0.Reader ? input : _m0.Reader.create(input); - let end = length === undefined ? reader.len : reader.pos + length; - const message = createBaseProcessor_MetadataEntry(); - while (reader.pos < end) { - const tag = reader.uint32(); - switch (tag >>> 3) { - case 1: - if (tag !== 10) { - break; - } - - message.key = reader.string(); - continue; - case 2: - if (tag !== 18) { - break; - } - - message.value = reader.string(); - continue; - } - if ((tag & 7) === 4 || tag === 0) { - break; - } - reader.skipType(tag & 7); - } - return message; - }, - - fromJSON(object: any): Processor_MetadataEntry { - return { - key: isSet(object.key) ? globalThis.String(object.key) : "", - value: isSet(object.value) ? globalThis.String(object.value) : "", - }; - }, - - toJSON(message: Processor_MetadataEntry): unknown { - const obj: any = {}; - if (message.key !== "") { - obj.key = message.key; - } - if (message.value !== "") { - obj.value = message.value; - } - return obj; - }, - create, I>>( - base?: I, - ): Processor_MetadataEntry { - return Processor_MetadataEntry.fromPartial(base ?? ({} as any)); - }, - fromPartial, I>>( - object: I, - ): Processor_MetadataEntry { - const message = createBaseProcessor_MetadataEntry(); - message.key = object.key ?? ""; - message.value = object.value ?? ""; - return message; - }, -}; - -function createBaseArgument(): Argument { - return { type: 0, value: new Uint8Array(0) }; -} - -export const Argument = { - encode( - message: Argument, - writer: _m0.Writer = _m0.Writer.create(), - ): _m0.Writer { - if (message.type !== 0) { - writer.uint32(8).int32(message.type); - } - if (message.value.length !== 0) { - writer.uint32(18).bytes(message.value); - } - return writer; - }, - - decode(input: _m0.Reader | Uint8Array, length?: number): Argument { - const reader = - input instanceof _m0.Reader ? input : _m0.Reader.create(input); - let end = length === undefined ? reader.len : reader.pos + length; - const message = createBaseArgument(); - while (reader.pos < end) { - const tag = reader.uint32(); - switch (tag >>> 3) { - case 1: - if (tag !== 8) { - break; - } - - message.type = reader.int32() as any; - continue; - case 2: - if (tag !== 18) { - break; - } - - message.value = reader.bytes(); - continue; - } - if ((tag & 7) === 4 || tag === 0) { - break; - } - reader.skipType(tag & 7); - } - return message; - }, - - fromJSON(object: any): Argument { - return { - type: isSet(object.type) ? argumentTypeFromJSON(object.type) : 0, - value: isSet(object.value) - ? bytesFromBase64(object.value) - : new Uint8Array(0), - }; - }, - - toJSON(message: Argument): unknown { - const obj: any = {}; - if (message.type !== 0) { - obj.type = argumentTypeToJSON(message.type); - } - if (message.value.length !== 0) { - obj.value = base64FromBytes(message.value); - } - return obj; - }, - - create, I>>(base?: I): Argument { - return Argument.fromPartial(base ?? ({} as any)); - }, - fromPartial, I>>(object: I): Argument { - const message = createBaseArgument(); - message.type = object.type ?? 0; - message.value = object.value ?? new Uint8Array(0); - return message; - }, -}; - -function createBaseStage(): Stage { - return { uri: "", processorUri: "", arguments: {} }; -} - -export const Stage = { - encode(message: Stage, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { - if (message.uri !== "") { - writer.uint32(10).string(message.uri); - } - if (message.processorUri !== "") { - writer.uint32(18).string(message.processorUri); - } - Object.entries(message.arguments).forEach(([key, value]) => { - Stage_ArgumentsEntry.encode( - { key: key as any, value }, - writer.uint32(26).fork(), - ).ldelim(); - }); - return writer; - }, - - decode(input: _m0.Reader | Uint8Array, length?: number): Stage { - const reader = - input instanceof _m0.Reader ? input : _m0.Reader.create(input); - let end = length === undefined ? reader.len : reader.pos + length; - const message = createBaseStage(); - while (reader.pos < end) { - const tag = reader.uint32(); - switch (tag >>> 3) { - case 1: - if (tag !== 10) { - break; - } - - message.uri = reader.string(); - continue; - case 2: - if (tag !== 18) { - break; - } - - message.processorUri = reader.string(); - continue; - case 3: - if (tag !== 26) { - break; - } - - const entry3 = Stage_ArgumentsEntry.decode(reader, reader.uint32()); - if (entry3.value !== undefined) { - message.arguments[entry3.key] = entry3.value; - } - continue; - } - if ((tag & 7) === 4 || tag === 0) { - break; - } - reader.skipType(tag & 7); - } - return message; - }, - - fromJSON(object: any): Stage { - return { - uri: isSet(object.uri) ? globalThis.String(object.uri) : "", - processorUri: isSet(object.processorUri) - ? globalThis.String(object.processorUri) - : "", - arguments: isObject(object.arguments) - ? Object.entries(object.arguments).reduce<{ [key: string]: Argument }>( - (acc, [key, value]) => { - acc[key] = Argument.fromJSON(value); - return acc; - }, - {}, - ) - : {}, - }; - }, - - toJSON(message: Stage): unknown { - const obj: any = {}; - if (message.uri !== "") { - obj.uri = message.uri; - } - if (message.processorUri !== "") { - obj.processorUri = message.processorUri; - } - if (message.arguments) { - const entries = Object.entries(message.arguments); - if (entries.length > 0) { - obj.arguments = {}; - entries.forEach(([k, v]) => { - obj.arguments[k] = Argument.toJSON(v); - }); - } - } - return obj; - }, - - create, I>>(base?: I): Stage { - return Stage.fromPartial(base ?? ({} as any)); - }, - fromPartial, I>>(object: I): Stage { - const message = createBaseStage(); - message.uri = object.uri ?? ""; - message.processorUri = object.processorUri ?? ""; - message.arguments = Object.entries(object.arguments ?? {}).reduce<{ - [key: string]: Argument; - }>((acc, [key, value]) => { - if (value !== undefined) { - acc[key] = Argument.fromPartial(value); - } - return acc; - }, {}); - return message; - }, -}; - -function createBaseStage_ArgumentsEntry(): Stage_ArgumentsEntry { - return { key: "", value: undefined }; -} - -export const Stage_ArgumentsEntry = { - encode( - message: Stage_ArgumentsEntry, - writer: _m0.Writer = _m0.Writer.create(), - ): _m0.Writer { - if (message.key !== "") { - writer.uint32(10).string(message.key); - } - if (message.value !== undefined) { - Argument.encode(message.value, writer.uint32(18).fork()).ldelim(); - } - return writer; - }, - - decode( - input: _m0.Reader | Uint8Array, - length?: number, - ): Stage_ArgumentsEntry { - const reader = - input instanceof _m0.Reader ? input : _m0.Reader.create(input); - let end = length === undefined ? reader.len : reader.pos + length; - const message = createBaseStage_ArgumentsEntry(); - while (reader.pos < end) { - const tag = reader.uint32(); - switch (tag >>> 3) { - case 1: - if (tag !== 10) { - break; - } - - message.key = reader.string(); - continue; - case 2: - if (tag !== 18) { - break; - } - - message.value = Argument.decode(reader, reader.uint32()); - continue; - } - if ((tag & 7) === 4 || tag === 0) { - break; - } - reader.skipType(tag & 7); - } - return message; - }, - - fromJSON(object: any): Stage_ArgumentsEntry { - return { - key: isSet(object.key) ? globalThis.String(object.key) : "", - value: isSet(object.value) ? Argument.fromJSON(object.value) : undefined, - }; - }, - - toJSON(message: Stage_ArgumentsEntry): unknown { - const obj: any = {}; - if (message.key !== "") { - obj.key = message.key; - } - if (message.value !== undefined) { - obj.value = Argument.toJSON(message.value); - } - return obj; - }, - - create, I>>( - base?: I, - ): Stage_ArgumentsEntry { - return Stage_ArgumentsEntry.fromPartial(base ?? ({} as any)); - }, - fromPartial, I>>( - object: I, - ): Stage_ArgumentsEntry { - const message = createBaseStage_ArgumentsEntry(); - message.key = object.key ?? ""; - message.value = - object.value !== undefined && object.value !== null - ? Argument.fromPartial(object.value) - : undefined; - return message; - }, -}; - -function createBasePayload(): Payload { - return { channelUri: "", data: new Uint8Array(0) }; -} - -export const Payload = { - encode( - message: Payload, - writer: _m0.Writer = _m0.Writer.create(), - ): _m0.Writer { - if (message.channelUri !== "") { - writer.uint32(10).string(message.channelUri); - } - if (message.data.length !== 0) { - writer.uint32(18).bytes(message.data); - } - return writer; - }, - - decode(input: _m0.Reader | Uint8Array, length?: number): Payload { - const reader = - input instanceof _m0.Reader ? input : _m0.Reader.create(input); - let end = length === undefined ? reader.len : reader.pos + length; - const message = createBasePayload(); - while (reader.pos < end) { - const tag = reader.uint32(); - switch (tag >>> 3) { - case 1: - if (tag !== 10) { - break; - } - - message.channelUri = reader.string(); - continue; - case 2: - if (tag !== 18) { - break; - } - - message.data = reader.bytes(); - continue; - } - if ((tag & 7) === 4 || tag === 0) { - break; - } - reader.skipType(tag & 7); - } - return message; - }, - - fromJSON(object: any): Payload { - return { - channelUri: isSet(object.channelUri) - ? globalThis.String(object.channelUri) - : "", - data: isSet(object.data) - ? bytesFromBase64(object.data) - : new Uint8Array(0), - }; - }, - - toJSON(message: Payload): unknown { - const obj: any = {}; - if (message.channelUri !== "") { - obj.channelUri = message.channelUri; - } - if (message.data.length !== 0) { - obj.data = base64FromBytes(message.data); - } - return obj; - }, - - create, I>>(base?: I): Payload { - return Payload.fromPartial(base ?? ({} as any)); - }, - fromPartial, I>>(object: I): Payload { - const message = createBasePayload(); - message.channelUri = object.channelUri ?? ""; - message.data = object.data ?? new Uint8Array(0); - return message; - }, -}; - -export type RunnerService = typeof RunnerService; -export const RunnerService = { - prepareProcessor: { - path: "/Runner/prepareProcessor", - requestStream: false, - responseStream: false, - requestSerialize: (value: Processor) => - Buffer.from(Processor.encode(value).finish()), - requestDeserialize: (value: Buffer) => Processor.decode(value), - responseSerialize: (value: Void) => - Buffer.from(Void.encode(value).finish()), - responseDeserialize: (value: Buffer) => Void.decode(value), - }, - prepareStage: { - path: "/Runner/prepareStage", - requestStream: false, - responseStream: false, - requestSerialize: (value: Stage) => - Buffer.from(Stage.encode(value).finish()), - requestDeserialize: (value: Buffer) => Stage.decode(value), - responseSerialize: (value: Void) => - Buffer.from(Void.encode(value).finish()), - responseDeserialize: (value: Buffer) => Void.decode(value), - }, - exec: { - path: "/Runner/exec", - requestStream: false, - responseStream: false, - requestSerialize: (value: Void) => Buffer.from(Void.encode(value).finish()), - requestDeserialize: (value: Buffer) => Void.decode(value), - responseSerialize: (value: Void) => - Buffer.from(Void.encode(value).finish()), - responseDeserialize: (value: Buffer) => Void.decode(value), - }, - channel: { - path: "/Runner/channel", - requestStream: true, - responseStream: true, - requestSerialize: (value: Payload) => - Buffer.from(Payload.encode(value).finish()), - requestDeserialize: (value: Buffer) => Payload.decode(value), - responseSerialize: (value: Payload) => - Buffer.from(Payload.encode(value).finish()), - responseDeserialize: (value: Buffer) => Payload.decode(value), - }, -} as const; - -export interface RunnerServer extends UntypedServiceImplementation { - prepareProcessor: handleUnaryCall; - prepareStage: handleUnaryCall; - exec: handleUnaryCall; - channel: handleBidiStreamingCall; -} - -export interface RunnerClient extends Client { - prepareProcessor( - request: Processor, - callback: (error: ServiceError | null, response: Void) => void, - ): ClientUnaryCall; - prepareProcessor( - request: Processor, - metadata: Metadata, - callback: (error: ServiceError | null, response: Void) => void, - ): ClientUnaryCall; - prepareProcessor( - request: Processor, - metadata: Metadata, - options: Partial, - callback: (error: ServiceError | null, response: Void) => void, - ): ClientUnaryCall; - prepareStage( - request: Stage, - callback: (error: ServiceError | null, response: Void) => void, - ): ClientUnaryCall; - prepareStage( - request: Stage, - metadata: Metadata, - callback: (error: ServiceError | null, response: Void) => void, - ): ClientUnaryCall; - prepareStage( - request: Stage, - metadata: Metadata, - options: Partial, - callback: (error: ServiceError | null, response: Void) => void, - ): ClientUnaryCall; - exec( - request: Void, - callback: (error: ServiceError | null, response: Void) => void, - ): ClientUnaryCall; - exec( - request: Void, - metadata: Metadata, - callback: (error: ServiceError | null, response: Void) => void, - ): ClientUnaryCall; - exec( - request: Void, - metadata: Metadata, - options: Partial, - callback: (error: ServiceError | null, response: Void) => void, - ): ClientUnaryCall; - channel(): ClientDuplexStream; - channel(options: Partial): ClientDuplexStream; - channel( - metadata: Metadata, - options?: Partial, - ): ClientDuplexStream; -} - -export const RunnerClient = makeGenericClientConstructor( - RunnerService, - "Runner", -) as unknown as { - new ( - address: string, - credentials: ChannelCredentials, - options?: Partial, - ): RunnerClient; - service: typeof RunnerService; - serviceName: string; -}; - -function bytesFromBase64(b64: string): Uint8Array { - if ((globalThis as any).Buffer) { - return Uint8Array.from(globalThis.Buffer.from(b64, "base64")); - } else { - const bin = globalThis.atob(b64); - const arr = new Uint8Array(bin.length); - for (let i = 0; i < bin.length; ++i) { - arr[i] = bin.charCodeAt(i); - } - return arr; - } -} - -function base64FromBytes(arr: Uint8Array): string { - if ((globalThis as any).Buffer) { - return globalThis.Buffer.from(arr).toString("base64"); - } else { - const bin: string[] = []; - arr.forEach((byte) => { - bin.push(globalThis.String.fromCharCode(byte)); - }); - return globalThis.btoa(bin.join("")); + exec(): void { + throw new Error("Method not implemented"); } -} - -type Builtin = - | Date - | Function - | Uint8Array - | string - | number - | boolean - | undefined; - -export type DeepPartial = T extends Builtin - ? T - : T extends globalThis.Array - ? globalThis.Array> - : T extends ReadonlyArray - ? ReadonlyArray> - : T extends {} - ? { [K in keyof T]?: DeepPartial } - : Partial; - -type KeysOfUnion = T extends T ? keyof T : never; -export type Exact = P extends Builtin - ? P - : P & { [K in keyof P]: Exact } & { - [K in Exclude>]: never; - }; - -function isObject(value: any): boolean { - return typeof value === "object" && value !== null; -} -function isSet(value: any): boolean { - return value !== null && value !== undefined; + static shared = new Runner(); } diff --git a/runners/nodejs/src/runtime/server.ts b/runners/nodejs/src/runtime/server.ts index 9820433..8a344ad 100644 --- a/runners/nodejs/src/runtime/server.ts +++ b/runners/nodejs/src/runtime/server.ts @@ -1,122 +1,56 @@ -import { - ArgumentType, - Payload, - Processor as AbstractProcessor, - RunnerServer, - Stage, - Void, -} from "./runner"; +import { ChannelData, RunnerServer } from "../proto"; import { sendUnaryData, ServerDuplexStream, ServerUnaryCall, UntypedHandleCall, } from "@grpc/grpc-js"; - -import { Processor } from "../interfaces/processor"; -import { Observable, Subject } from "rxjs"; -import { Writer } from "../interfaces/writer"; -import { RunnerError } from "../error"; -import { Reader } from "../interfaces/reader"; -import { Constructor } from "./constructor"; -import { resolve } from "./resolve"; - -const processors: Map> = new Map(); -const stages: Map = new Map(); -const readers: Map> = new Map(); -const writers: Map> = new Map(); +import { IRProcessor, IRStage } from "../proto/intermediate"; +import { Empty } from "../proto/empty"; +import { Runner } from "./runner"; export class ServerImplementation implements RunnerServer { [name: string]: UntypedHandleCall; - channel(call: ServerDuplexStream): void { + channel(call: ServerDuplexStream): void { // On incoming data, call the appropriate reader. - call.on("data", function (payload: Payload) { - const reader = readers.get(payload.channelUri)!; - reader.next(payload.data); + call.on("data", function (payload: ChannelData) { + Runner.shared.incoming.next(payload); }); - // On outgoing data, create a payload with the corresponding writer. - for (const [uri, writer] of writers) { - writer.subscribe((value) => { - call.write({ data: value, channelUri: uri }); - }); - } - - // On end, throw an error. - call.on("end", function () { - throw RunnerError.unexpectedBehaviour(); + // On outgoing data, propagate to gRPC. + Runner.shared.outgoing.subscribe((payload) => { + call.write(payload); }); } prepareStage( - call: ServerUnaryCall, - callback: sendUnaryData, + call: ServerUnaryCall, + callback: sendUnaryData, ): void { - call.on("data", function (stage: Stage): void { - initStage(stage); + call.on("data", (stage) => { + Runner.shared.prepareStage(stage); callback(null, {}); }); } prepareProcessor( - call: ServerUnaryCall, - callback: sendUnaryData, + call: ServerUnaryCall, + callback: sendUnaryData, ): void { - call.on("data", function (processor: AbstractProcessor): void { - resolve().then((constructor) => { - processors.set(processor.uri, constructor); - callback(null, {}); - }); + call.on("data", (processor) => { + Runner.shared.prepareProcessor(processor); + callback(null, {}); }); } - exec(call: ServerUnaryCall, callback: sendUnaryData): void { - call.on("data", function (): void { - processors.forEach((processor) => { - new processor().exec(); - }); + exec( + call: ServerUnaryCall, + callback: sendUnaryData, + ): void { + call.on("data", () => { + callback(null, {}); + Runner.shared.exec(); }); } } - -export function initStage(stage: Stage): void { - const abstractArgs = Object.entries(stage.arguments); - const parsedArgs = new Map(); - - for (const [name, argument] of abstractArgs) { - if (argument.type == ArgumentType.READER) { - const subject = new Subject(); - const reader = new Reader(subject); - const uri = argument.value.toString(); - readers.set(uri, subject); - parsedArgs.set(name, reader); - continue; - } - - if (argument.type == ArgumentType.WRITER) { - const observer = { - next: (value: Uint8Array) => { - console.log("Next:", value); - }, - error: () => { - RunnerError.channelError(); - }, - complete: () => { - RunnerError.unexpectedBehaviour(); - }, - }; - - const writer = new Writer(observer); - parsedArgs.set(name, writer); - continue; - } - - // If the argument is not a reader or writer, it is a value. - parsedArgs.set(name, argument.value); - } - - // Initialize the new stage. - const constructor = processors.get(stage.processorUri)!; - stages.set(stage.uri, new constructor(parsedArgs)); -}