diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index cc0744bb..00000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "deno.enable": true, - "deno.unstable": true, - "deno.lint": true -} \ No newline at end of file diff --git a/compiler/compiler.ts b/compiler/compiler.ts index b2d9071e..4508db1e 100644 --- a/compiler/compiler.ts +++ b/compiler/compiler.ts @@ -298,7 +298,7 @@ export class Compiler { static SIGN_DEFAULT = true; // can be changed - static BIG_BANG_TIME = new Date(2022, 0, 22, 0, 0, 0, 0).getTime() // 1642806000000 + static BIG_BANG_TIME = Date.UTC(2024, 0, 0, 0, 0, 0, 0) static MAX_INT_32 = 2_147_483_647; static MIN_INT_32 = -2_147_483_648; @@ -416,12 +416,6 @@ export class Compiler { return parseInt(binary, 2); } - /** Set TTL of header of existing block */ - public static setHeaderTTL(dx_block:ArrayBuffer, ttl:number):ArrayBuffer { - const uint8 = new Uint8Array(dx_block); - uint8[4] = ttl; - return uint8.buffer; - } // get sender from header public static extractHeaderSender(dx_block: ArrayBuffer, last_byte?:[number], _appspace_byte = true, _start = 8): Endpoint|undefined { @@ -606,7 +600,7 @@ export class Compiler { } /** Add a header to a Datex block */ - public static DEFAULT_TTL = 64; + public static DEFAULT_TTL = 10; private static device_types = { "default": 0, @@ -783,8 +777,8 @@ export class Compiler { // ROUTING HEADER ///////////////////////////////////////////////// // ttl pre_header_uint8[i++] = __routing_ttl; - // priority - pre_header_uint8[i++] = __routing_prio; + // initial ttl (originally: prio, currently unused) + pre_header_uint8[i++] = __routing_ttl; //__routing_prio; // signed = 1, encrypted+signed = 2, encrypted = 3, others = 0 pre_header_uint8[i++] = sign && !encrypt ? 1: (sign && encrypt ? 2 : (!sign && encrypt ? 3 : 0)); @@ -2188,12 +2182,17 @@ export class Compiler { const pointer_origin = (id_buffer[0]==BinaryCode.ENDPOINT || id_buffer[0]==BinaryCode.PERSON_ALIAS || id_buffer[0]==BinaryCode.INSTITUTION_ALIAS) ? Target.get(id_buffer.slice(1,19), id_buffer.slice(19,21), id_buffer[0]) : null; const singleReceiver = - SCOPE.options.to instanceof Endpoint || - ( - SCOPE.options.to instanceof Disjunction && - SCOPE.options.to.size == 1 && - [...SCOPE.options.to][0] instanceof Endpoint - ) + SCOPE.options.to instanceof Endpoint ? + SCOPE.options.to : + ( + ( + SCOPE.options.to instanceof Disjunction && + SCOPE.options.to.size == 1 && + [...SCOPE.options.to][0] instanceof Endpoint + ) ? + [...SCOPE.options.to][0] as Endpoint : + null + ) if ( pointer_origin && @@ -2201,7 +2200,7 @@ export class Compiler { action_type == ACTION_TYPE.GET && // is get Runtime.endpoint.equals(pointer_origin) && // is own pointer SCOPE.options.to != Runtime.endpoint && // not sending to self - !Pointer.get(id)?.subscribers?.has(singleReceiver) // receiver is subscribed to pointer - assume it already has the current pointer value + !(singleReceiver && Pointer.get(id)?.subscribers?.has(singleReceiver)) // receiver is subscribed to pointer - assume it already has the current pointer value ) { return Compiler.builder.addPreemptivePointer(SCOPE, id) } @@ -2259,10 +2258,11 @@ export class Compiler { // preemptive value already exists and was not yet initialized in scope if (ptr?.value_initialized && !alreadyInitializing) { + parentScope.preemptive_pointers.set(normalized_id, SCOPE); Compiler.builder.handleRequiredBufferSize(SCOPE.b_index+1, SCOPE); SCOPE.uint8[SCOPE.b_index++] = BinaryCode.SUBSCOPE_START; - Compiler.builder.addPointerNormal(SCOPE, id, ACTION_TYPE.INIT, undefined, true, ptr.val, (ptr.force_local_transform && ptr.transform_scope) ? ptr.transform_scope : undefined); // sync + Compiler.builder.addPointerNormal(SCOPE, id, ACTION_TYPE.INIT, undefined, true, ptr, (ptr.force_local_transform && ptr.transform_scope) ? ptr.transform_scope : undefined); // sync Compiler.builder.handleRequiredBufferSize(SCOPE.b_index+1, SCOPE); SCOPE.uint8[SCOPE.b_index++] = BinaryCode.CLOSE_AND_STORE; Compiler.builder.addPointerNormal(SCOPE, id, ACTION_TYPE.GET, undefined, undefined, ptr.val); // sync @@ -2315,7 +2315,14 @@ export class Compiler { if (Runtime.OPTIONS.PROTECT_POINTERS) { const receiver = Compiler.builder.getScopeReceiver(SCOPE); if (receiver !== Runtime.endpoint) { - p.grantAccessTo(receiver) + if (receiver instanceof Endpoint) p.grantAccessTo(receiver) + else if (receiver instanceof Disjunction) { + for (const endpoint of receiver) { + if (endpoint instanceof Endpoint) p.grantAccessTo(endpoint) + else logger.error("Can't grant access to receiver:" + endpoint); + } + } + else logger.error("Can't grant access to receivers:" + receiver); } } @@ -2324,7 +2331,7 @@ export class Compiler { Compiler.builder.insertExtractedVariable(SCOPE, BinaryCode.POINTER, buffer2hex(p.id_buffer)); } // add normally - else return Compiler.builder.addPointerByID (SCOPE, p.id_buffer, action_type, action_specifier, undefined, NOT_EXISTING) + else return Compiler.builder.addPointerByID(SCOPE, p.id_buffer, action_type, action_specifier, undefined, NOT_EXISTING) }, // add @@ -2751,6 +2758,8 @@ export class Compiler { if (value?.[DX_REPLACE]) value = value[DX_REPLACE]; + const indirectReferencePtr = (value instanceof Pointer && value.indirectReference) ? true : false; + // make sure normal pointers are collapsed (ignore error if uninitialized pointer is passed in) try { value = Ref.collapseValue(value); @@ -2808,7 +2817,7 @@ export class Compiler { const start_index = Compiler.builder.getDynamicIndex(SCOPE.b_index, SCOPE); // add original value to inserted values map (only if useful, exclude short values like boolean and null) - if (!(SCOPE.options.no_duplicate_value_optimization && (typeof value == "bigint" || typeof value == "number" || typeof value == "string")) && value!==VOID && + if (!indirectReferencePtr && !(SCOPE.options.no_duplicate_value_optimization && (typeof value == "bigint" || typeof value == "number" || typeof value == "string")) && value!==VOID && value !==null && typeof value != "boolean" && !((typeof value == "bigint" || typeof value == "number") && value<=Compiler.MAX_INT_32 && value>=Compiler.MIN_INT_32) @@ -2845,7 +2854,6 @@ export class Compiler { const option_collapse = SCOPE.options.collapse_pointers && !(SCOPE.options.keep_external_pointers && value instanceof Pointer && !value.is_origin); const no_proxify = value instanceof Ref && (((value instanceof Pointer && value.is_anonymous) || option_collapse) || skip_first_collapse); - // proxify pointer exceptions: if (no_proxify) { @@ -2860,6 +2868,13 @@ export class Compiler { return; } + // indirect reference pointer + if (indirectReferencePtr) { + SCOPE.options._first_insert_done = true; + Compiler.builder.insert(Ref.collapseValue(value, true), SCOPE, is_root, parents, unassigned_children); + return; + } + value = value.val; // don't proxify anonymous pointers or serialize ptr // add $$ operator, not if no_create_pointers enabled or skip_first_collapse diff --git a/datex_all.ts b/datex_all.ts index a1f90d7b..3e42188f 100644 --- a/datex_all.ts +++ b/datex_all.ts @@ -23,7 +23,6 @@ export * from "./compiler/unit_codes.ts"; // network -export * from "./network/client.ts"; export * from "./network/supranet.ts"; export * from "./network/network_utils.ts"; export * from "./network/unyt.ts"; @@ -61,8 +60,8 @@ export * from "./types/tuple.ts"; export * from "./types/type.ts"; export * from "./types/quantity.ts"; export * from "./types/time.ts"; -export * from "./types/storage_map.ts"; -export * from "./types/storage_set.ts"; +export * from "./types/storage-map.ts"; +export * from "./types/storage-set.ts"; export * from "./types/struct.ts"; // polyfills diff --git a/datex_short.ts b/datex_short.ts index b1685d16..0b782493 100644 --- a/datex_short.ts +++ b/datex_short.ts @@ -14,6 +14,8 @@ import { eternals, getLazyEternal, waitingEternals, waitingLazyEternals } from " import {instance} from "./js_adapter/js_class_adapter.ts"; import { client_type } from "./utils/constants.ts"; +import { communicationHub } from "./network/communication-hub.ts"; +import { MessageLogger } from "./utils/message_logger.ts"; export {instance} from "./js_adapter/js_class_adapter.ts"; declare global { @@ -139,7 +141,7 @@ function _datex(dx:string|TemplateStringsArray|PrecompiledDXB, data?:unknown[], // local execution if (to === Runtime.endpoint) return Runtime.executeDatexLocally(dx, data, {plugins, sign, encrypt}, context_location ? new URL(context_location.toString()) : undefined); // remote execution - else return Runtime.datexOut([dx, data, {plugins, sign, encrypt, context_location: context_location ? new URL(context_location.toString()) : undefined}], typeof to == "string" ? f(to) : to, undefined, undefined, undefined, undefined, undefined, undefined, timeout); + else return Runtime.datexOut([dx, data, {plugins, sign, encrypt, context_location: context_location ? new URL(context_location.toString()) : undefined}], typeof to == "string" ? f(to) : to, undefined, undefined, undefined, undefined, undefined, timeout); } @@ -525,11 +527,29 @@ export function printTrace(endpoint: string|Endpoint) { endpoint = typeof endpoint == "string" ? Target.get(endpoint) as Endpoint : endpoint; return endpoint.printTrace() } - type printTraceT = typeof printTrace; + +export function printComStatus() { + return communicationHub.printStatus() +} +type printComStatusT = typeof printComStatus; + +export function enableMessageLogger(showRedirectMessages?: boolean) { + return MessageLogger.enable(showRedirectMessages) +} +type enableMessageLoggerT = typeof enableMessageLogger; + +export function disableMessageLogger() { + return MessageLogger.disable() +} +type disableMessageLoggerT = typeof disableMessageLogger; + declare global { const printTrace: printTraceT; + const printComStatus: printComStatusT; const printSnapshot: typeof Storage.printSnapshot + const enableMessageLogger: enableMessageLoggerT; + const disableMessageLogger: disableMessageLoggerT; } @@ -658,6 +678,12 @@ globalThis.f = f; // @ts-ignore globalThis.printTrace = printTrace; // @ts-ignore +globalThis.printComStatus = printComStatus; +// @ts-ignore globalThis.printSnapshot = Storage.printSnapshot.bind(Storage); // @ts-ignore +globalThis.enableMessageLogger = enableMessageLogger; +// @ts-ignore +globalThis.disableMessageLogger = disableMessageLogger; +// @ts-ignore globalThis.props = props; \ No newline at end of file diff --git a/docs/manual/01 Introduction.md b/docs/manual/01 Introduction.md index 5cec87e9..58d517fc 100644 --- a/docs/manual/01 Introduction.md +++ b/docs/manual/01 Introduction.md @@ -2,7 +2,7 @@ > [!WARNING] > The current implementation of the DATEX JavaScript Library is still a beta version. -> We are activly working on a new stable [Rust implementation](https://github.com/unyt-org/datex-core) that will be much more performant. +> We are actively working on a new stable [Rust implementation](https://github.com/unyt-org/datex-core) that will be much more performant. The DATEX JavaScript Library (*DATEX JS*) enables fine-grained reactivity with cross-device processing including data synchronisation. diff --git a/functions.ts b/functions.ts index b5bab2d1..7478ad24 100644 --- a/functions.ts +++ b/functions.ts @@ -332,7 +332,7 @@ export function toggle(value:RefLike, if_true:T, if_false:T): Minima always ( if (${Runtime.valueToDatexString(value)}) (${Runtime.valueToDatexString(if_true)}) else (${Runtime.valueToDatexString(if_false)}) - )`*/).js_value; + )`*/); } diff --git a/iframes/iframe-com-interface.ts b/iframes/iframe-com-interface.ts deleted file mode 100644 index 8bf8128d..00000000 --- a/iframes/iframe-com-interface.ts +++ /dev/null @@ -1,100 +0,0 @@ -import { Datex } from "../mod.ts"; -import InterfaceManager, { CommonInterface } from "../network/client.ts"; -import { Target } from "../types/addressing.ts"; - -type ParentDocument = [Window] & {postMessage:(data:unknown,origin:string)=>void}; - -/** - * Creates a direct DATEX communication channel with an iframe. - * Important: The iframe must have impotred the "./iframe-init.ts" module - */ -export class IFrameCommunicationInterface extends CommonInterface<[HTMLIFrameElement|ParentDocument]> { - - declare iframe?: HTMLIFrameElement - declare parentDocument?: ParentDocument - declare otherOrigin: string - - override in = true; - override out = true; - override global = false; - override authorization_required = false; // don't connect with public keys - override type = "iframe"; - - protected async connect() { - - if (this.initial_arguments[0] instanceof HTMLIFrameElement) { - this.iframe = this.initial_arguments[0]; - - // init iframe - this.iframe.setAttribute("sandbox", "allow-scripts allow-same-origin allow-popups allow-popups-to-escape-sandbox") - this.otherOrigin = new URL(this.iframe.src).origin; - this.logger.info("initializing as parent window, iframe origin: " + this.otherOrigin) - - if (this.iframe.contentDocument && this.iframe.contentDocument.readyState !== "complete") { - await new Promise(resolve => this.iframe!.addEventListener("load", resolve)); - } - } - // is a parent document with a window - else if (this.initial_arguments[0]) { - this.parentDocument = this.initial_arguments[0] - this.otherOrigin = new URL(document.referrer).origin; - - this.logger.info("initializing as iframe, parent window origin: " + this.otherOrigin) - - } - else { - this.logger.error("no IFrame or Window provided for IFrameCommunicationInterface"); - return false; - } - globalThis.addEventListener("message", this.onReceive); - - // if in ifram: send INIT to parent immediately - if (this.parentDocument) - this.sendInit(); - - return true; - } - - onReceive = (event: MessageEvent) => { - - if (event.source === this.other) { - const data = event.data; - if (data instanceof ArrayBuffer) { - InterfaceManager.handleReceiveBlock(data, this.endpoint, this); - } - else if (data?.type == "INIT" && !this.endpoint) { - this.endpoint = Target.get(data.endpoint) as Datex.Endpoint; - - // if in parent: send INIT to iframe after initialized - if (this.iframe) this.sendInit(); - } - } - - } - - private sendInit() { - this.other.postMessage({type:"INIT", endpoint:Datex.Runtime.endpoint.toString()}, this.otherOrigin); - } - - public override disconnect() { - globalThis.removeEventListener("message", this.onReceive); - } - - get other() { - return (this.parentDocument??this.iframe?.contentWindow)! - } - - // FIXME - i = 0; - protected sendBlock(datex: ArrayBuffer) { - if (this.i > 1000) - return; - if (new Uint8Array(datex.slice(0, 2)).toString() === "1,100") - this.i++; - this.other.postMessage(datex, this.otherOrigin); - } - -} - -// register worker interface immediately -InterfaceManager.registerInterface("iframe", IFrameCommunicationInterface); \ No newline at end of file diff --git a/iframes/iframe-init.ts b/iframes/iframe-init.ts index a18e94d5..5aefa7ea 100644 --- a/iframes/iframe-init.ts +++ b/iframes/iframe-init.ts @@ -4,7 +4,11 @@ */ import { Datex } from "../datex.ts"; -import "./iframe-com-interface.ts"; +import { communicationHub } from "../network/communication-hub.ts"; +import { WindowInterface } from "../network/communication-interfaces/window-interface.ts"; -await Datex.Supranet.connect(); -await Datex.InterfaceManager.connect("iframe", undefined, [parent]) \ No newline at end of file +await Datex.Supranet.init(); +const windowInterface = WindowInterface.createParentInterface(window.parent) +// use as default interface only if no other default interface active +const useAsDefaultInterface = !communicationHub.defaultSocket +await communicationHub.addInterface(windowInterface, useAsDefaultInterface) \ No newline at end of file diff --git a/init.ts b/init.ts index 9629e810..748887ab 100644 --- a/init.ts +++ b/init.ts @@ -3,7 +3,7 @@ import { Pointer } from "./runtime/pointers.ts"; import { LOCAL_ENDPOINT } from "./types/addressing.ts"; import { client_type } from "./utils/constants.ts"; import { Storage, registerStorageAsPointerSource } from "./runtime/storage.ts"; -import { logger } from "./utils/global_values.ts"; +import { cwdURL, logger } from "./utils/global_values.ts"; import { IndexedDBStorageLocation } from "./runtime/storage-locations/indexed-db.ts"; import { LocalStorageLocation } from "./runtime/storage-locations/local-storage.ts"; import { DenoKVStorageLocation } from "./runtime/storage-locations/deno-kv.ts"; @@ -11,7 +11,10 @@ import { loadEternalValues } from "./utils/eternals.ts"; import { DX_BOUND_LOCAL_SLOT } from "./runtime/constants.ts"; import { verboseArg } from "./utils/logger.ts"; import { MessageLogger } from "./utils/message_logger.ts"; - +import { Path } from "./utils/path.ts"; +import { communicationHub } from "./network/communication-hub.ts"; +import { LocalLoopbackInterface } from "./network/communication-interfaces/local-loopback-interface.ts"; +import { Crypto } from "./runtime/crypto.ts"; /** * Runtime init (sets ENV, storage, endpoint, ...) @@ -20,11 +23,42 @@ export async function init() { // register DatexStorage as pointer source registerStorageAsPointerSource(); - // default storage config: + // bind communication hub handlers to runtime + communicationHub.handler.init() + communicationHub.handler.setDatexInHandler(Runtime.datexIn.bind(Runtime)) + Runtime.setDatexOutHandler(communicationHub.handler.datexOut.bind(communicationHub.handler)) + await communicationHub.addInterface(new LocalLoopbackInterface()) + + // @ts-ignore NO_INIT if (!globalThis.NO_INIT) { + + // custom storage module (storage.ts next to .dx config) + let storageInitModule: Path|undefined if (client_type == "browser") { + // TODO: handle storage.ts URL in browser + // storageInitModule = new URL('/storage.ts', globalThis.location.href) + } + else if (client_type == "deno") { + // TODO: dynamic storage.ts location - use uix path backend/storage.ts as workaround + storageInitModule = new Path('./backend/storage.ts', cwdURL) + } + + if (await storageInitModule?.fsExists()) { + logger.info("Initializing custom storage configuration (" + storageInitModule!.normal_pathname + ")") + try { + await import(storageInitModule!.normal_pathname); + } + catch (e) { + console.error(e) + } + + if (Storage.locations.size === 0) + logger.warn(`No storage location was added in storage.ts - cannot store persistent data!`) + } + + else if (client_type == "browser") { await Storage.addLocation(new IndexedDBStorageLocation(), { modes: [Storage.Mode.SAVE_ON_CHANGE, Storage.Mode.SAVE_PERIODICALLY], primary: true @@ -53,6 +87,7 @@ export async function init() { } + // listen for endpoint changes Runtime.onEndpointChanged((endpoint) => { Pointer.pointer_prefix = endpoint.getPointerPrefix(); @@ -76,8 +111,8 @@ export async function init() { await Runtime.precompile(); // set Runtime ENV (not persistent if globalThis.NO_INIT) - Runtime.ENV = globalThis.NO_INIT ? getDefaultEnv() : await Storage.loadOrCreate("Datex.Runtime.ENV", getDefaultEnv); - Runtime.ENV[DX_BOUND_LOCAL_SLOT] = "env" + Runtime.ENV = (globalThis as any).NO_INIT ? getDefaultEnv() : await Storage.loadOrCreate("Datex.Runtime.ENV", getDefaultEnv); + (Runtime.ENV as any)[DX_BOUND_LOCAL_SLOT] = "env" // workaround, should never happen if (!Runtime.ENV) { @@ -101,8 +136,8 @@ export async function init() { function getDefaultEnv() { return { - LANG: globalThis.localStorage?.lang ?? globalThis?.navigator?.language?.split("-")[0]?.split("_")[0] ?? 'en', - DATEX_VERSION: null + LANG: globalThis.localStorage?.lang as string ?? globalThis?.navigator?.language?.split("-")[0]?.split("_")[0] ?? 'en', + DATEX_VERSION: "" } } @@ -111,7 +146,7 @@ export async function init() { Runtime.persistent_memory = (await Storage.loadOrCreate("Datex.Runtime.MEMORY", ()=>new Map())).setAutoDefault(Object); - if (!globalThis.NO_INIT) { + if (!(globalThis as any).NO_INIT) { Runtime.init(); // @ts-ignore @@ -126,5 +161,15 @@ export async function init() { if (!globalThis.NO_INIT) await loadEternalValues(); // enables message logger when running with -v - if (verboseArg) MessageLogger.enable() + if (verboseArg) MessageLogger.enable(); + + if (client_type == "deno") { + const { clear } = await import("./utils/args.ts"); + if (clear) { + await Storage.clearAndReload(); + } + } + + // init cleanup interval to remove crypto entries (endpoint keyss) + Crypto.initCleanup(); } diff --git a/js_adapter/js_class_adapter.ts b/js_adapter/js_class_adapter.ts index 28611c67..9d92b80c 100644 --- a/js_adapter/js_class_adapter.ts +++ b/js_adapter/js_class_adapter.ts @@ -588,7 +588,7 @@ function normalizeType(type:Type|string, allowTypeParams = true, defaultNamespac return type } else { - console.log(type) + console.error("invalid type",type) throw new Error("Invalid type") } } diff --git a/network/client.ts b/network/client.ts deleted file mode 100644 index 36a58e81..00000000 --- a/network/client.ts +++ /dev/null @@ -1,894 +0,0 @@ -/** - ╔══════════════════════════════════════════════════════════════════════════════════════╗ - ║ Datex Client ║ - ╠══════════════════════════════════════════════════════════════════════════════════════╣ - ║ Default JS client for datex protocol (support WebSockets, Get Requests) ║ - ║ Visit https://docs.unyt.cc/datex for more information ║ - ╠═════════════════════════════════════════╦════════════════════════════════════════════╣ - ║ © 2020 unyt.org ║ ║ - ╚═════════════════════════════════════════╩════════════════════════════════════════════╝ - */ - -import { Logger } from "../utils/logger.ts"; -import { Runtime } from "../runtime/runtime.ts"; - -import { Compiler } from "../compiler/compiler.ts"; -import { Endpoint, LOCAL_ENDPOINT, Target } from "../types/addressing.ts"; -import { NetworkError } from "../types/errors.ts"; -import type { dxb_header } from "../utils/global_types.ts"; -import { client_type } from "../utils/constants.ts"; -import { Disjunction } from "../types/logic.ts"; -import { Pointer } from "../runtime/pointers.ts"; -import { logger } from "../utils/global_values.ts"; - - - - -// general interface for all "datex interfaces" (client or server/router) -export interface ComInterface { - type: string - description?: string - persistent?: boolean // can be disconnected? - endpoint?: Endpoint // connected directly to a single endpoint - endpoints?: Set // multiple endpoints - is_bidirectional_hub?: boolean, // allow the same block to go in and out eg a -> this interface -> this runtime -> this interface again -> b - immediate?: boolean // can send immediately (eg. for local interfaces, workers) - isEqualSource?:(source: Partial, to: Endpoint) => boolean - in: boolean // can receive data - out: boolean // can send data - global?: boolean // has a connection to the global network, use as a default interface if possible - send: (datex:ArrayBuffer, to?: Target)=>Promise|void - disconnect: ()=>void|Promise -} - - -/** common class for all client interfaces (WebSockets, TCP Sockets, GET Requests, ...)*/ -export abstract class CommonInterface implements ComInterface { - - // endpoint interface mapping - protected static endpoint_connection_points = new Map>(); - protected static indirect_endpoint_connection_points = new Map>(); - protected static virtual_endpoint_connection_points = new Map>(); - - // DIRECT (direct end-to-end connection) - - public static addInterfaceForEndpoint(endpoint:Endpoint, com_interface:ComInterface) { - if (!this.endpoint_connection_points.has(endpoint)) this.endpoint_connection_points.set(endpoint, new Set()); - this.endpoint_connection_points.get(endpoint)!.add(com_interface); - // trigger listeners - for (const handler of this._endpointRegisteredHandlers.get(com_interface.type)??[]) handler(endpoint) - } - // does an endpoint have an explicit (direct) interface on this endpoint - public static hasDirectInterfaceForEndpoint(endpoint:Endpoint): boolean { - return this.hasEndpoint(this.endpoint_connection_points, endpoint) && this.getEndpoint(this.endpoint_connection_points, endpoint)?.size != 0; - } - // get a list of all currently available direct interfaces for an endpoint - public static getInterfacesForEndpoint(endpoint:Endpoint, interface_type?:string) { - return this.getEndpoint(this.endpoint_connection_points, endpoint) || new Set(); - } - - /** - * returns true if has endpoint or instance endpoint of endpoint - */ - private static hasEndpoint(set:Map|Set, endpoint:Target) { - if (set.has(endpoint)) return true; - else { - for (const e of set.keys()) { - if (e instanceof Endpoint && e.main === endpoint) return true; - } - } - return false; - } - - /** - * returns interface for endpoint or matching instance endpoint of endpoint - */ - private static getEndpoint(set:Map>, endpoint:Target) { - if (set.has(endpoint)) return set.get(endpoint); - else { - for (const [e, interf] of set.entries()) { - if (e instanceof Endpoint && e.main === endpoint) return interf; - } - } - } - - - private static _endpointRegisteredHandlers = new Mapvoid>>(); - public static onEndpointRegistered(interface_type: string, handler: (e:Endpoint)=>void) { - if (!this._endpointRegisteredHandlers.has(interface_type)) this._endpointRegisteredHandlers.set(interface_type, new Set()) - this._endpointRegisteredHandlers.get(interface_type)!.add(handler) - } - - - // INDIRECT (connected via a node) - - public static addIndirectInterfaceForEndpoint(endpoint:Target, com_interface:ComInterface) { - if (!this.indirect_endpoint_connection_points.has(endpoint)) this.indirect_endpoint_connection_points.set(endpoint, new Set()); - this.indirect_endpoint_connection_points.get(endpoint)!.add(com_interface); - } - // is an endpoint reachable via a specific endpoint (indirectly) - public static isEndpointReachableViaInterface(endpoint:Target):boolean { - return this.hasEndpoint(this.indirect_endpoint_connection_points, endpoint) && this.indirect_endpoint_connection_points.get(endpoint)?.size != 0; - } - // get a list of all currently available indirect interfaces for an endpoint - public static getIndirectInterfacesForEndpoint(endpoint:Target, interface_type?:string) { - return this.getEndpoint(this.indirect_endpoint_connection_points, endpoint) || new Set(); - } - - - - // VIRTUAL (just a relay connection, ignore for rooting) - - public static addVirtualInterfaceForEndpoint(endpoint:Target, com_interface:ComInterface) { - if (!this.virtual_endpoint_connection_points.has(endpoint)) this.virtual_endpoint_connection_points.set(endpoint, new Set()); - this.virtual_endpoint_connection_points.get(endpoint)!.add(com_interface); - } - // get a list of all currently available virtual interfaces for an endpoint - public static getVirtualInterfacesForEndpoint(endpoint:Target, interface_type?:string) { - return this.getEndpoint(this.virtual_endpoint_connection_points, endpoint) || new Set(); - } - - - - - // get a list of all currently available direct interfaces - public static getDirectInterfaces(): Set{ - let all = new Set(); - for (let e of this.endpoint_connection_points) { - for (let interf of e[1]) { - all.add(interf); - } - } - return all; - } - - public static resetEndpointConnectionPoints(){ - this.endpoint_connection_points.clear(); - this.indirect_endpoint_connection_points.clear() - } - - protected logger:Logger; - - // use this per default for all outgoing datex requests - public static default_interface:ComInterface; - public static proxy_interface?:ComInterface; - - public type = "local" - public persistent = false; // interface can be disconnected (per default) - public authorization_required = true; // connect with public keys per default - private _endpoint?:Endpoint; - public endpoints = new Set(); - public virtual = false; // only a relayed connection, don't use for DATEX rooting - - public in = true - public out = true - public global = true - public immediate = false - - public get endpoint() { - return this._endpoint - } - - public set endpoint(endpoint: Endpoint|undefined) { - if (endpoint === this._endpoint) return; - this.logger.debug("updated endpoint to " + endpoint); - this._endpoint = endpoint; - this.updateEndpoint(); - this.onEndpointSet?.(endpoint); - } - - private updateEndpoint() { - if (this.endpoint) { - if (this.virtual) CommonInterface.addVirtualInterfaceForEndpoint(this.endpoint, this); - else CommonInterface.addInterfaceForEndpoint(this.endpoint, this); - } - } - - private onEndpointSet?: Function - - - protected declare initial_arguments:Args - - constructor(endpoint:Endpoint) { - this.logger = new Logger(this.constructor.name); - this._endpoint = endpoint; - } - - // initialize - async init(...args:Args):Promise { - this.initial_arguments = args; - - this.connected = await this.connect(); - if (this.connected) { - this.updateEndpoint(); - // immediately consider endpoint as online - if (this.endpoint && this.immediate) { - this.endpoint.setOnline(true) - // don't trigger subscription cleanup once first HELLO message is received - this.endpoint.ignoreHello = true; - } - } - return this.connected; - } - - // create 'connection' - protected abstract connect():Promise|boolean - - // handle connection changes - static CONNECTED = Symbol("connected") - - set connected(connected:boolean) { - if (this.connected === connected) return; - this[CommonInterface.CONNECTED] = connected - if (!connected) InterfaceManager.handleInterfaceDisconnect(this); - else InterfaceManager.handleInterfaceConnect(this); - } - get connected() {return this[CommonInterface.CONNECTED]?true:false} - - protected connecting = false; - protected reconnecting = false; - - protected reconnect():Promise|boolean { - if (this.connected) this.connected = false; // (still) not connected - - if (this.reconnecting) return false; - this.reconnecting = true; - this.logger.info("trying to reconnnect...") - return new Promise(resolve=>{ - setTimeout(async ()=>{ - this.reconnecting = false; - const connected = await this.connect(); - this.connected = connected; - resolve(connected); - }, 3000); - }) - } - - public disconnect(){ - this.logger.info("Disconnecting interface: " + this.type) - } - - protected async onConnected(){ - } - - /** implement how to send a message to the server*/ - protected abstract sendBlock(datex:ArrayBuffer): void - - - protected addEndpoint(endpoint:Endpoint) { - this.endpoints.add(endpoint); - CommonInterface.addInterfaceForEndpoint(endpoint, this); - } - - //private datex_generators = new Set>(); - - - /** called from outside for requests */ - public async send(datex:ArrayBuffer):Promise|void { - await this.sendBlock(datex); - } -} - - -/** HTTP interface */ -// @deprecated -class HttpClientInterface extends CommonInterface { - - override type = "http" - override authorization_required = false; // don't connect with public keys - override in = false - override out = true - override global = false - - async connect() { - return true; - } - - async sendBlock(datex:ArrayBuffer){ - let res = await (await fetch("https://"+this.endpoint+"/http/"+fixedEncodeURIComponent("...todo..."))).text(); - } -} - - -/** 'Local' interface */ -export class LocalClientInterface extends CommonInterface { - - override type = "local" - override persistent = true; // cannot be disconnected - override authorization_required = false; // don't connect with public keys - override in = true - override out = true - override global = false - - async connect(){ - return true; - } - - datex_in_handler = Runtime.getDatexInputHandler(); - - async sendBlock(datex:ArrayBuffer){ - this.datex_in_handler(datex, Runtime.endpoint); - } -} - -/** 'Relayed' interface */ -export class RelayedClientInterface extends CommonInterface { - - override type = "relayed" - override authorization_required = false; // don't connect with public keys - override in = true - override out = true - override global = false - override virtual = true - - async connect(){ - return true; - } - - async sendBlock(datex:ArrayBuffer){ - this.logger.error("invalid") - } - - public override send(datex:ArrayBuffer) { - InterfaceManager.send(datex, this.endpoint); - } -} - - -/** 'Bluetooth' interface */ -export class BluetoothClientInterface extends CommonInterface { - - override type = "bluetooth" - override authorization_required = false; // don't connect with public keys - override in = true - override out = true - override global = false - - connect(){ - console.log("connecting to bluetooth", this.initial_arguments); - - - - return true; - } - - sendBlock(datex:ArrayBuffer){ - console.log("bluetooth send block", datex) - } -} - -/** 'Serial' interface (USB, ...) */ -export class SerialClientInterface extends CommonInterface<[any, number, number]> { - - override type = "serial" - override authorization_required = false; // don't connect with public keys - override in = true - override out = true - override global = false - - private baudRate = 9600; - private bufferSize = 255; - - private port?: any - private writer: any - - - async connect(){ - - if (!this.initial_arguments[0]) return false; // no port provided - - if (this.initial_arguments[0]) this.port = this.initial_arguments[0] - if (this.initial_arguments[1]) this.baudRate = this.initial_arguments[1] - if (this.initial_arguments[2]) this.bufferSize = this.initial_arguments[2] - - await this.port.open({ baudRate: this.baudRate, bufferSize:this.bufferSize}); - - this.in = this.port.readable; - this.in = this.port.writable; - - - (async ()=>{ - while (this.port.readable) { - const reader = this.port.readable.getReader(); - - await InterfaceManager.handleReceiveContinuosStream(reader, this.endpoint, this); - } - })() - - if (this.port.writable) { - this.writer = this.port.writable.getWriter(); - } - - return true; - } - - public override async disconnect() { - super.disconnect(); - await this.port.close() - } - - async sendBlock(datex:ArrayBuffer){ - return this.writer.write(datex); - } -} - - - - -/** Websocket stream interface */ -class WebsocketStreamClientInterface extends CommonInterface { - - override type = "websocketstream" - override in = true - override out = true - - override global = true - - public host:string - private stream_writer - private wss - - closed = false; - - private is_node_js - - override async init() { - - this.host = this.endpoint.getInterfaceChannelInfo("websocketstream"); - if (!this.host) return false; - - return super.init(); - } - - - protected async connect():Promise { - if (this.closed) return false; - if (this.connecting) return false; - - - if (!WebSocketStream) return false; - - this.connecting = true; - - try { - this.wss = new WebSocketStream("wss://"+this.host, {protocols: ['datex']}); - - (async ()=>{ - try { - const {code, reason} = await this.wss.closed; - this.logger.error("connection closed"); - this.connecting = false; - this.reconnect(); - } catch(e) { - console.log(e); - this.reconnect(); - } - })(); - - // connect, get reader and writer - let x = await this.wss.connection; - const {readable, writable} = x; - const reader = readable.getReader(); - this.stream_writer = writable.getWriter(); - - (async ()=>{ - try { - while (true) { - const {value, done} = await reader.read(); - if (done) { - this.logger.error("stream done") - break; - } - InterfaceManager.handleReceiveBlock(value, this.endpoint, this); - } - } catch (e) { - this.logger.error("connection error: " + "wss://"+this.host); - this.connecting = false; - this.reconnect(); - } - })(); - - this.connecting = false; - return true - } - - catch (e) { - this.logger.error("connection error:" + "wss://"+this.host); - this.connecting = false; - return this.reconnect(); - } - - } - - async sendBlock(block:ArrayBuffer) { - try { - if (this.is_node_js) this.stream_writer.write(new Uint8Array(block)); - else await this.stream_writer.write(block); - } catch(e) { - console.log(e); - throw new NetworkError("No connection"); - } - } - - public override disconnect(){ - super.disconnect(); - this.wss.close() - this.closed = true; - } -} - -/** Websocket interface */ -class WebsocketClientInterface extends CommonInterface { - - public host:string - - private socket?:WebSocket; - - override in = true - override out = true - override type = "websocket" - - get description() { - return `${this.protocol}://${this.host}` - } - - private protocol:'ws'|'wss' = 'wss'; // use wss or ws - private is_first_try = true - - - closed = false; - - override async init() { - - const host = this.endpoint.getInterfaceChannelInfo("websocket"); - if (host instanceof URL) { - if (host.protocol == "http:") this.protocol = "ws"; // assume ws as websocket protocol, if host is http - this.host = host.host; // convert https://xy -> xy - } - else this.host = host; - - if (!this.host) return false; - - return super.init(); - } - - protected connect():Promise|boolean { - - // @ts-ignore navigator api, no internet connection - if (client_type == "browser" && !navigator.onLine) { - this.logger.error("No connected interface for supranet connection available") - return false; - } - - if (this.closed) return false; - if (this.connecting) return false; - this.connecting = true; - - try { - this.socket = new WebSocket(`${this.protocol}://${this.host}`); - this.socket.binaryType = 'arraybuffer'; - - return new Promise(resolve=>{ - // Connection opened - this.socket!.addEventListener('open', () => { - this.connecting = false; - if (this.protocol == 'ws' && !this.host.match(/localhost(:\d+)?/)) this.logger.warn(`unsecure websocket connection to ${this.host}`) - resolve(true); - }); - - - // this.socket.addEventListener('close', (event) => { - // this.connecting = false; - // this.logger.error("connection closed"); - // this.reconnect(); - // }); - this.socket!.addEventListener('error', async (event) => { - this.connecting = false; - if (this.is_first_try && !globalThis.location?.href.startsWith("https://")) this.protocol = 'ws' - else { - this.protocol = 'wss' - this.logger.error("connection error:" + `${this.protocol}://${this.host}`); - } - - this.is_first_try = false; - resolve(await this.reconnect()); - }); - - this.socket!.addEventListener('message', (event:any) => { - InterfaceManager.handleReceiveBlock(event.data, this.endpoint, this); - }); - - }) - } - - catch (e) { - this.logger.error("connection error:" + "wss://"+this.host); - this.connecting = false; - return this.reconnect(); - } - - } - - async sendBlock(block:ArrayBuffer) { - if (!this.socket) throw "no socket connected"; - // web socket not connected, try reconnect - if (this.socket.readyState != 1) { - await this.reconnect() - } - try { - this.socket.send(block); - } catch { - logger.warn("no connection, trying to send block again in 5s"); - setTimeout(() => { - this.sendBlock(block) - }, 5000) - } - } - - public override disconnect(){ - super.disconnect(); - this.socket?.close() - this.closed = true; - } -} - - - -export class InterfaceManager { - - static logger = new Logger("DATEX Interface Manager"); - - static datex_in_handler: (dxb: ArrayBuffer|ReadableStreamDefaultReader | {dxb: ArrayBuffer|ReadableStreamDefaultReader; variables?: any; header_callback?: (header: dxb_header) => void, new_endpoint_callback?: (endpoint: Endpoint) => void}, last_endpoint:Endpoint, source?:ComInterface) => Promise - - static local_interface:LocalClientInterface; - - static interfaces = new Map(); - - // register new DatexCommonInterface - static registerInterface(channel_type:string, interf:typeof CommonInterface) { - this.interfaces.set(channel_type,interf); - } - - static receive_listeners = new Set(); - static new_interface_listeners = new Set(); - static interface_connected_listeners = new Set(); - static interface_disconnected_listeners = new Set(); - - static active_interfaces: Set - - - static handleReceiveBlock(dxb:ArrayBuffer, last_endpoint?:Endpoint, source?:ComInterface, header_callback?: (header: dxb_header) => void, new_endpoint_callback?: (endpoint: Endpoint) => void){ - if (header_callback || new_endpoint_callback) this.datex_in_handler({dxb, header_callback, new_endpoint_callback}, last_endpoint, source); - else this.datex_in_handler(dxb, last_endpoint, source); - } - - static handleReceiveContinuosStream(reader:ReadableStreamDefaultReader, last_endpoint?:Endpoint, source?:ComInterface, header_callback?: (header: dxb_header) => void, new_endpoint_callback?: (endpoint: Endpoint) => void) { - if (header_callback || new_endpoint_callback) return this.datex_in_handler({dxb:reader, header_callback, new_endpoint_callback}, last_endpoint, source); - else return this.datex_in_handler(reader, last_endpoint, source); - } - - static addReceiveListener(listen:(datex:ArrayBuffer)=>void){ - this.receive_listeners.add(listen); - } - - // connect to datex runtime (in/out) - private static initialized = false; - private static enabled = false; - - public static async init() { - if (this.initialized) return; - - this.initialized = true; - this.datex_in_handler = Runtime.getDatexInputHandler((sid, scope)=>{ - for (let p of this.receive_listeners) p(scope); - }); - - if (!this.active_interfaces) this.active_interfaces = Pointer.createOrGet(new Set()).js_value; - } - - - // datex out is now redirected to this interface - public static enable(){ - if (this.enabled) return; - Runtime.setDatexOut(InterfaceManager.send); - this.enabled = true; - } - - - static onNewInterface(listener:(interf:ComInterface)=>void) { - this.new_interface_listeners.add(listener); - } - - static onInterfaceConnected(listener:(interf:ComInterface)=>void) { - this.interface_connected_listeners.add(listener); - } - static onInterfaceDisconnected(listener:(interf:ComInterface)=>void) { - this.interface_disconnected_listeners.add(listener); - } - - static async enableLocalInterface(endpoint:Endpoint=Runtime.endpoint):Promise { - if (this.local_interface) return; - this.local_interface = new LocalClientInterface(endpoint); - // init requested interface - let res = await this.local_interface.init(); - if (res) this.addInterface(this.local_interface); - return this.local_interface - } - - static async disconnect(){ - CommonInterface.default_interface = null; - CommonInterface.resetEndpointConnectionPoints(); - - for (const interf of this.active_interfaces || []) { - await interf.disconnect() - this.active_interfaces.delete(interf); - } - } - - // create a new connection with a interface type (e.g. websocket, relayed...) - static async connect(channel_type:string, endpoint?:Endpoint, init_args?:any[], set_as_default_interface = true, callback?: (interf: ComInterface, connected: boolean)=>void):Promise { - this.logger.debug("connecting via interface: " + channel_type); - - await this.init(); - - // get requested interface - const interface_class = (InterfaceManager.interfaces.get(channel_type)); - if (!interface_class) throw "Channel type not found: " + channel_type; - const c_interface:CommonInterface = new interface_class(endpoint); - // this.logger.success("new interface: " + channel_type) - // init requested interface - const res = await c_interface.init(...(init_args||[])); - if (res) this.addInterface(c_interface); - - // set as new default interface? - local or relayed create a feedback loop, dont use webrtc as default interface - if (set_as_default_interface && c_interface.global) CommonInterface.default_interface = c_interface - - this.enable(); - - callback?.(c_interface, res); - - return res - } - - // add an existing interface to the interface list - static addInterface(i: ComInterface) { - if (!this.active_interfaces) this.active_interfaces = Pointer.createOrGet(new Set()).js_value; - for (const l of this.new_interface_listeners) l(i) - this.active_interfaces.add(i) - } - // remove an interface from the list - static async removeInterface(i: ComInterface) { - if (!this.active_interfaces) this.active_interfaces = Pointer.createOrGet(new Set()).js_value; - this.active_interfaces.delete(i) - - for (const interfaces of CommonInterface.endpoint_connection_points.values()) { - interfaces.delete(i) - } - - for (const interfaces of CommonInterface.indirect_endpoint_connection_points.values()) { - interfaces.delete(i) - } - for (const interfaces of CommonInterface.virtual_endpoint_connection_points.values()) { - interfaces.delete(i) - } - - await i.disconnect() - } - - // disconnected - static handleInterfaceDisconnect(i: ComInterface){ - for (const l of this.interface_disconnected_listeners) l(i) - } - // (re)connected - static handleInterfaceConnect(i: ComInterface){ - for (let l of this.interface_connected_listeners) l(i) - } - - /** main method to call send */ - // TODO: replace to with Disjunction - static async send(datex:ArrayBuffer, to:Endpoint, flood = false, source?:ComInterface) { - - if (!InterfaceManager.checkRedirectPermission(to)) { - logger.error(to + " has no redirect permission") - return; - } - - // flooding instead of sending to a receiver - if (flood) { - return InterfaceManager.flood(datex, to); - } - - // currently only sending to one target at a time here! TODO: improve - const addressed_datex = Compiler.updateHeaderReceiver(datex, new Disjunction(to))!; // set right receiver - - // is self - if (to instanceof Endpoint && (Runtime.endpoint.equals(to) || Runtime.endpoint.main.equals(to) || to === LOCAL_ENDPOINT)) { - await InterfaceManager.datex_in_handler(addressed_datex, Runtime.endpoint); - return; - } - - // default interface - let comInterface: ComInterface = CommonInterface.default_interface; - - // send via direct connection - if (CommonInterface.hasDirectInterfaceForEndpoint(to)) { - comInterface = [...CommonInterface.getInterfacesForEndpoint(to)][0]; // send to first available interface (todo) - } - // send via indirect connection - else if (CommonInterface.isEndpointReachableViaInterface(to)) { - comInterface = [...CommonInterface.getIndirectInterfacesForEndpoint(to)][0]; // send to first available interface (todo) - } - - // error: no default interface, no other interface found - if (!comInterface) { - return InterfaceManager.handleNoRedirectFound(to); - } - - // prefere proxy_interface - if (CommonInterface.proxy_interface) comInterface = CommonInterface.proxy_interface; - - // error: loopback - if (source && !source?.is_bidirectional_hub && (source == comInterface || comInterface.isEqualSource?.(source, to))) { - // fallback to default interface - if (CommonInterface.default_interface && comInterface !== CommonInterface.default_interface) comInterface = CommonInterface.default_interface; - // fallback to proxy interface - if (CommonInterface.proxy_interface && comInterface !== CommonInterface.proxy_interface) comInterface = CommonInterface.proxy_interface; - else return InterfaceManager.handleNoRedirectFound(to); - } - - // send - logger.debug("sending to " + to + " (interface " + comInterface.type + " / " + comInterface.endpoint + ")"); - return await comInterface.send(addressed_datex, to); // send to first available interface (todo) - } - - // flood to all currently directly connected nodes (only nodes!) - static flood(datex: ArrayBuffer, exclude: Target) { - const exclude_endpoints = new Set([exclude]); - - // iterate over all active endpoints - for (const interf of CommonInterface.proxy_interface ? [CommonInterface.proxy_interface]: this.active_interfaces) { - if (interf.endpoint && !exclude_endpoints.has(interf.endpoint) && !interf.endpoint.equals(Runtime.endpoint)) { - exclude_endpoints.add(interf.endpoint); - // console.log("FLOOD > " + interf.endpoint) - interf.send(datex, interf.endpoint); - } - for (const endpoint of interf.endpoints??[]){ - if (!exclude_endpoints.has(endpoint) && !interf.endpoint?.equals(Runtime.endpoint)) { - exclude_endpoints.add(endpoint); - // console.log("FLOOD > " + endpoint) - interf.send(datex, endpoint); - } - } - } - } - - // can be overwritten for clients - static checkRedirectPermission(receiver: Target){ - return true; // allow all redirects per default - } - - static handleNoRedirectFound(receiver:Target){ - throw new NetworkError("Cannot find route to endpoint " + receiver); - } -} - - -// register interfaces -InterfaceManager.registerInterface("websocketstream", WebsocketStreamClientInterface); -InterfaceManager.registerInterface("websocket", WebsocketClientInterface); -InterfaceManager.registerInterface("local", LocalClientInterface); -InterfaceManager.registerInterface("relayed", RelayedClientInterface); -InterfaceManager.registerInterface("bluetooth", BluetoothClientInterface); -InterfaceManager.registerInterface("serial", SerialClientInterface); - -globalThis.DatexInterfaceManager = InterfaceManager; -globalThis.DatexCommonInterface = CommonInterface; - -export default InterfaceManager; - -function fixedEncodeURIComponent(str:string) { - return encodeURIComponent(str).replace(/[!'()*]/g, function(c) { - return '%' + c.charCodeAt(0).toString(16); - }); -} - diff --git a/network/communication-hub.ts b/network/communication-hub.ts new file mode 100644 index 00000000..8f3afe35 --- /dev/null +++ b/network/communication-hub.ts @@ -0,0 +1,620 @@ +import { dxb_header } from "../utils/global_types.ts"; +import { Endpoint, BROADCAST, LOCAL_ENDPOINT } from "../types/addressing.ts"; +import { CommunicationInterface, CommunicationInterfaceSocket, ConnectedCommunicationInterfaceSocket } from "./communication-interface.ts"; +import { Disjunction } from "../types/logic.ts"; +import "../utils/auto_map.ts"; +import { InterfaceDirection } from "./communication-interface.ts"; +import { ESCAPE_SEQUENCES, Logger } from "../utils/logger.ts"; +import { NetworkError } from "../types/errors.ts"; +import { Compiler } from "../compiler/compiler.ts"; +import { Runtime } from "../runtime/runtime.ts"; +import { ProtocolDataType } from "../compiler/protocol_types.ts"; +import { Crypto } from "../runtime/crypto.ts"; +import { IOHandler } from "../runtime/io_handler.ts"; +import { DATEX_ERROR } from "../types/error_codes.ts"; + +export type DatexInData = { + dxb: ArrayBuffer|ReadableStreamDefaultReader, + socket: CommunicationInterfaceSocket +} + +export type DatexOutData = { + dxb: ArrayBuffer, + receivers: Endpoint|Disjunction, // @@any for broadcasts + socket: CommunicationInterfaceSocket +} + +/** + * Callback for handling incoming datex blocks + */ +export type DatexInHandler = (data: DatexInData) => Promise + +/** + * Public communication access point for managing + * CommunicationInterfaces + */ +export class CommunicationHub { + + // singleton + private constructor() {} + static #instance?: CommunicationHub + static get() { + if (!this.#instance) this.#instance = new CommunicationHub() + return this.#instance; + } + + get defaultSocket() { + return this.handler.defaultSocket; + } + + /** + * Registers a new CommunicationInterface and initializes it + * @param comInterface - CommunicationInterface to add + * @param setAsDefault - set as default interface for sending DATEX messages + * @param timeout - timeout in ms for interface initialization (default: no timeout) + * @returns true if the interface was successfully initialized, false if connection could not be established after timeout if specified + */ + public addInterface(comInterface: CommunicationInterface, setAsDefault = false, timeout?: number) { + return this.handler.addInterface(comInterface, setAsDefault, timeout); + } + + public removeInterface(comInterface: CommunicationInterface) { + return this.handler.removeInterface(comInterface); + } + + /** + * Prints the status of all connected interfaces and attached sockets + */ + public printStatus() { + return this.handler.printStatus(); + } + + /** + * Lists all available sockets for an endpoint, ordered by relevance + */ + public printEndpointSockets(endpoint: Endpoint) { + return this.handler.printEndpointSockets(endpoint); + } + + + /** + * @private + */ + handler = new CommunicationHubHandler() + +} + + +export const COM_HUB_SECRET = Symbol("COM_HUB_SECRET") + +type DynamicProperties = { + knownSince: number, + distance: number +} + +/** + * Internal handler for managing CommunicationInterfaces + */ +export class CommunicationHubHandler { + + /** + * Interval for checking online status for indirect/direct socket endpoints + */ + readonly CLEANUP_INTERVAL = 10 * 60 * 1000; // 10min + + + #logger = new Logger("CommunicationHub") + + #interfaces = new Set() + // CommunicationInterfaceSockets are ordered, most recent last + #endpointSockets = new Map>().setAutoDefault(Map).enableAutoRemove() + #registeredSockets = new Map>().setAutoDefault(Set).enableAutoRemove() + + get registeredSockets() { + return this.#registeredSockets; + } + get endpointSockets() { + return this.#endpointSockets; + } + get interfaces() { + return this.#interfaces; + } + + // maps main endpoints to a list of instance endpoints that are currently connected via sockets + #activeEndpointInstances = new Map>().setAutoDefault(Set).enableAutoRemove() + + #defaultInterface?: CommunicationInterface + #defaultSocket?: ConnectedCommunicationInterfaceSocket + + set defaultSocket(socket: ConnectedCommunicationInterfaceSocket|undefined) { + this.#defaultSocket = socket; + if (this.#defaultSocket) Runtime.setMainNode(this.#defaultSocket.endpoint); + } + get defaultSocket() { + return this.#defaultSocket; + } + + #datexInHandler?: DatexInHandler + + directionSymbols = { + [InterfaceDirection.IN]: "◀──", + [InterfaceDirection.OUT]: "──▶", + [InterfaceDirection.IN_OUT]: "◀─▶" + } + + constructor() { + this.startCleanupInterval() + } + + /** Public facing methods: **/ + + public async addInterface(comInterface: CommunicationInterface, setAsDefault = false, timeout?: number) { + this.#interfaces.add(comInterface) + const connected = await comInterface.init(COM_HUB_SECRET, timeout); + if (connected && setAsDefault) this.setDefaultInterface(comInterface) + if (!connected) this.#interfaces.delete(comInterface); + return connected + } + + public async removeInterface(comInterface: CommunicationInterface) { + this.#interfaces.delete(comInterface) + await comInterface.deinit(COM_HUB_SECRET); + } + + public printStatus() { + console.log(this.getStatus()) + } + + public getStatus() { + let string = ""; + string += ESCAPE_SEQUENCES.BOLD + "DATEX Communication Hub\n\n" + ESCAPE_SEQUENCES.RESET; + string += `Local Endpoint: ${Runtime.endpoint}\n` + string += `Registered Interfaces: ${this.#interfaces.size}\n` + string += `Connected Sockets: ${this.#registeredSockets.size}\n\n` + + const mapping = new Map}>>() + + const endpointPreferredSockets = new Map() + + // interfaces with direct sockets + for (const comInterface of this.#interfaces) { + const sockets = new Set( + [...comInterface.getSockets()] + .map(socket => [socket.endpoint, socket] as [Endpoint, ConnectedCommunicationInterfaceSocket]) + ) + const identifier = comInterface.toString() + if (!mapping.has(identifier)) mapping.set(identifier, new Map()) + sockets.forEach(([endpoint, socket]) => mapping.get(identifier)!.set(socket, {directEndpoint: endpoint, directEndpointDynamicProperties: this.#endpointSockets.get(endpoint)!.get(socket)!, indirectEndpoints: new Map()})) + } + + // indirect connections + for (const [endpoint, sockets] of this.#endpointSockets) { + for (const [socket, dynamicProperties] of sockets) { + // check if endpoint is indirect + if (socket.endpoint !== endpoint) { + if (!socket.interfaceProperties) { + console.warn("Invalid socket, missing interfaceProperties", socket); + continue; + } + const identifier = socket.toString(); + if (!mapping.has(identifier)) mapping.set(identifier, new Map()); + if (!mapping.get(identifier)!.has(socket)) mapping.get(identifier)!.set(socket, {indirectEndpoints: new Map}); + + mapping.get(identifier)!.get(socket)!.indirectEndpoints.set(endpoint, dynamicProperties); + } + } + } + + string += "Default interface: " + (this.#defaultInterface ? this.#defaultInterface.toString() : "none") + "\n"; + + const COLORS = { + DARK_GREEN: [41, 120, 83], + DARK_RED: [120, 41, 53], + DARK_GREY: [110, 110, 110], + } + const DARK_GREEN = `\x1b[38;2;${COLORS.DARK_GREEN.join(';')}m` + const DARK_GREY = `\x1b[38;2;${COLORS.DARK_GREY.join(';')}m` + const DARK_RED = `\x1b[38;2;${COLORS.DARK_RED.join(';')}m` + + const getFormattedSocketString = (socket: CommunicationInterfaceSocket, endpoint: Endpoint, dynamicProperties: DynamicProperties) => { + if (socket.interfaceProperties?.noContinuousConnection && endpoint == BROADCAST) return ""; + + if (!endpointPreferredSockets.has(endpoint)) endpointPreferredSockets.set(endpoint, this.getPreferredSocketForEndpoint(endpoint)); + const isPreferred = endpointPreferredSockets.get(endpoint)! === socket; + + const directionSymbolColor = isPreferred ? ESCAPE_SEQUENCES.BOLD : ESCAPE_SEQUENCES.GREY; + const directionSymbol = directionSymbolColor + this.directionSymbols[socket.interfaceProperties?.direction as InterfaceDirection] ?? "?"; + const isDirect = socket.endpoint === endpoint; + const color = socket.connected ? + ( + socket.endpoint ? + (isDirect ? ESCAPE_SEQUENCES.UNYT_GREEN : DARK_GREEN) : + (isDirect ? ESCAPE_SEQUENCES.UNYT_GREY : DARK_GREY) + ) : + (isDirect ? ESCAPE_SEQUENCES.UNYT_RED : DARK_RED) + const connectedState = `${color}⬤${ESCAPE_SEQUENCES.RESET}` + const knownSince = (Date.now()-dynamicProperties.knownSince)/1000; + const distance = dynamicProperties.distance + return ` ${connectedState} ${directionSymbol}${isDirect?'':' (indirect)'}${isDirect&&this.defaultSocket==socket?' (default)':''} ${endpoint??'unknown endpoint'}${ESCAPE_SEQUENCES.GREY} (distance:${distance < 0 ? 'unknown' : distance}, knownSince:${knownSince < 0 ? 'unknown' : knownSince.toFixed(2)+'s'})${ESCAPE_SEQUENCES.RESET}\n` + } + + // print + for (const [identifier, sockets] of mapping) { + string += `\n${ESCAPE_SEQUENCES.BOLD}${identifier}${ESCAPE_SEQUENCES.RESET}\n` + for (const [socket, {directEndpoint, directEndpointDynamicProperties, indirectEndpoints}] of sockets) { + if (directEndpoint) string += getFormattedSocketString(socket, directEndpoint, directEndpointDynamicProperties!) + for (const [endpoint, dynamicProperties] of indirectEndpoints) { + string += getFormattedSocketString(socket, endpoint, dynamicProperties) + } + } + } + + return string; + } + + public printEndpointSockets(endpoint: Endpoint|string) { + endpoint = endpoint instanceof Endpoint ? endpoint : Endpoint.get(endpoint) as Endpoint; + + let string = ""; + string += `Available sockets for ${endpoint}:\n` + + for (const socket of this.iterateEndpointSockets(endpoint, false, false)) { + string += " - " + socket.toString() + "\n"; + } + + console.log(string) + } + + + /** Internal methods: */ + + public registerSocket(socket: ConnectedCommunicationInterfaceSocket, endpoint: Endpoint|undefined = socket.endpoint, dynamicProperties: DynamicProperties) { + if (this.#endpointSockets.get(endpoint)?.has(socket)) return; + + if (!endpoint) throw new Error("Cannot register socket to communication hub without endpoint.") + if (!socket.connected || !socket.endpoint || !socket.interfaceProperties) throw new Error("Cannot register disconnected or uninitialized socket.") + + const isDirect = socket.endpoint==endpoint; + + // set default socket + if (isDirect && socket.toString()==this.#defaultInterface?.toString() && socket.canSend) { + this.#logger.debug("Setting default socket " + socket.toString() + " (endpoint " + endpoint.toString()+")") + this.defaultSocket = socket; + } + + + this.#logger.debug("Added new" + (isDirect?'':' indirect') + " socket " + socket.toString() + " for endpoint " + endpoint.toString()) + this.#registeredSockets.getAuto(socket).add(endpoint); + this.#endpointSockets.getAuto(endpoint).set(socket, dynamicProperties); + // add to instances map if not main endpoint + if (endpoint.main !== endpoint) this.#activeEndpointInstances.getAuto(endpoint.main).add(endpoint); + this.sortSockets(endpoint) + } + + public unregisterSocket(socket: CommunicationInterfaceSocket, endpoint: Endpoint|undefined = socket.endpoint) { + const connectedSocket = socket as ConnectedCommunicationInterfaceSocket; + if (!endpoint) throw new Error("Cannot unregister socket from communication hub without endpoint.") + if (!this.#endpointSockets.has(endpoint)) throw new Error("Cannot unregister socket, not registered for endpoint.") + if (!this.#registeredSockets.has(connectedSocket)) throw new Error("Cannot unregister socket, not registered.") + + const isDirect = connectedSocket.endpoint==endpoint; + + // remove default socket + if (isDirect && connectedSocket === this.defaultSocket) { + this.defaultSocket = undefined + } + + this.#logger.debug("Removed" + (isDirect?'':' indirect') + " socket " + connectedSocket.toString() + " for endpoint " + endpoint.toString()) + + const endpointSockets = this.#endpointSockets.get(endpoint)!; + const socketEndpoints = this.#registeredSockets.get(connectedSocket)!; + const endpointInstances = this.#activeEndpointInstances.getAuto(endpoint.main) + + // remove own socket endpoint + endpointSockets.delete(connectedSocket) + socketEndpoints.delete(endpoint) + endpointInstances.delete(endpoint) + + // direct socket removed, also remove all indirect sockets + if (isDirect) { + for (const indirectEndpoint of socketEndpoints) { + this.#endpointSockets.get(indirectEndpoint)?.delete(connectedSocket) + this.#activeEndpointInstances.get(indirectEndpoint.main)?.delete(indirectEndpoint) + } + this.#registeredSockets.delete(connectedSocket) + } + } + + + public setDatexInHandler(handler: DatexInHandler) { + this.#datexInHandler = handler + } + + + private startCleanupInterval() { + setInterval(() => { + for (const endpoint of this.#endpointSockets.keys()) { + endpoint.isOnline(); + } + }, this.CLEANUP_INTERVAL) + } + + /** + * @private + */ + async init() { + let lastEndpointGooodbyeMessage = await this.compileGoodbyeMessage(); + Runtime.onEndpointChanged(async (endpoint) => { + this.#logger.debug("Endpoint changed to " + endpoint.toString()) + + // send GOODBYE for previous endpoint + if (lastEndpointGooodbyeMessage) { + this.#logger.debug("Broadcasting GOODBYE for previous endpoint over all sockets"); + + // iterate direct outgoing sockets + for (const socket of this.iterateSockets()) { + socket.sendGoodbye(lastEndpointGooodbyeMessage) + } + } + + await sleep(1000); + + // iterate direct outgoing sockets + const helloMessage = await this.compileHelloMessage(1); + if (helloMessage) { + for (const socket of this.iterateSockets()) { + socket.sendHello(helloMessage) + } + } + + + lastEndpointGooodbyeMessage = await this.compileGoodbyeMessage(); + }) + } + + public compileGoodbyeMessage() { + if (!Runtime.endpoint || Runtime.endpoint == LOCAL_ENDPOINT) return; + return Compiler.compile("", [], {type:ProtocolDataType.GOODBYE, sign:true, flood:true, __routing_ttl:1}) as Promise + } + + public compileHelloMessage(ttl = 6) { + if (!Runtime.endpoint || Runtime.endpoint == LOCAL_ENDPOINT) return; + const keys = Crypto.getOwnPublicKeysExported(); + return Compiler.compile('?', [keys], {type:ProtocolDataType.HELLO, sign:false, flood:true, __routing_ttl:ttl}) as Promise; + } + + + private setDefaultInterface(defaultInterface: CommunicationInterface) { + this.#defaultInterface = defaultInterface + this.defaultSocket = defaultInterface.getSockets().values().next().value + } + + /** + * Returns true when the socket is registered. + * Returns true when the endpoint is registered for the socket (if an endpoint is provided). + */ + public hasSocket(socket: CommunicationInterfaceSocket, endpoint?: Endpoint) { + if (endpoint) return this.#registeredSockets.get(socket as ConnectedCommunicationInterfaceSocket)?.has(endpoint) + else return this.#registeredSockets.has(socket as ConnectedCommunicationInterfaceSocket) + } + + public handleOfflineEndpoint(endpoint: Endpoint) { + for (const socket of this.iterateEndpointSockets(endpoint, false, false)) { + // direct connection endpoint offline, should not happen + if (socket.endpoint == endpoint) { + // this.#logger.error("Direct socket endpoint "+endpoint.toString()+" is not reachable, but socket is still connected."); + // (socket as CommunicationInterfaceSocket).connected = false; + } + // indirect endpoint offline, remove socket registration + else { + this.#logger.debug("Indirect socket endpoint "+endpoint.toString()+" is offline, removing socket registration."); + this.unregisterSocket(socket, endpoint); + } + } + } + + /** + * Returns true when the endpoint or a matching instance endpoint is directly registered for the socket. + */ + public hasDirectSocket(endpoint: Endpoint) { + // check if exact endpoint instance is registered + if (this._hasDirectSocket(endpoint)) return true; + + // find socket that matches instance if main endpoint + if (endpoint.main === endpoint) { + const instances = [...this.#activeEndpointInstances.get(endpoint)??[]]; + for (const instance of instances) { + if (this._hasDirectSocket(instance)) return true; + } + } + + return false + } + + private _hasDirectSocket(endpoint: Endpoint) { + // check if exact endpoint instance is registered as direct socket + return this.iterateEndpointSockets(endpoint, true, false) + .next().done === false; + } + + /** + * Method called by CommunicationInterfaceSockets when they receive a datex block + * @param data + */ + + public datexIn(data: DatexInData) { + if (!this.#datexInHandler) throw new Error("No datexInHandler set") + return this.#datexInHandler(data) + } + + /** + * Sort available sockets for endpoint: + * - direct sockets first + * - then sort by channel channelFactor (latency,bandwidth) + * - then sort by socket connectTimestamp + */ + private sortSockets(endpoint: Endpoint) { + const sockets = this.#endpointSockets.get(endpoint) + if (!sockets) throw new Error("No sockets for endpoint " + endpoint); + const sortedSockets = + new Map( + // sort by direct/indirect, direct first + this.sortGrouped(sockets, ([socket]) => socket.endpoint === endpoint ? 0 : 1, 1) + .map(sockets => + // sort by channelFactor, highest first + this.sortGrouped(sockets, ([socket]) => socket.channelFactor, -1) + .map(sockets => + // sort by distance, smallest first + this.sortGrouped(sockets, ([_, {distance}]) => distance, 1) + .map(sockets => + // sort by knownSince, newest (highest) first + this.sortGrouped(sockets, ([_, {knownSince}]) => knownSince, -1) + ))) + .flat(4) + ) + this.#endpointSockets.set(endpoint, sortedSockets) + } + + private sortGrouped>(iterable: I, groupBy: (item: I extends Iterable ? Item : unknown) => number, sortDirection = 1) { + return Object + // group by channelFactor + .entries(Object.groupBy(iterable as any, groupBy)) + // sort by channelFactor + .toSorted(([a], [b]) => (Number(a) - Number(b)) * sortDirection) + .map(([_, values]) => values!) + } + + + private getPreferredSocketForEndpoint(endpoint: Endpoint, excludeSocket?: CommunicationInterfaceSocket) { + + // find socket that matches endpoint instance exactly + const socket = this.findMatchingEndpointSocket(endpoint, excludeSocket); + if (socket) return socket; + + // find socket that matches instance if main endpoint + if (endpoint.main === endpoint) { + const instances = [...this.#activeEndpointInstances.get(endpoint)??[]]; + for (let i=instances.length-1; i>=0; i--) { + const socket = this.findMatchingEndpointSocket(instances[i], excludeSocket); + if (socket) return socket; + } + } + + if (this.defaultSocket !== excludeSocket) + return this.defaultSocket; + } + + private findMatchingEndpointSocket(endpoint: Endpoint, excludeSocket?: CommunicationInterfaceSocket) { + for (const socket of this.iterateEndpointSockets(endpoint, false, true)) { + if (socket === excludeSocket) continue; + return socket; + } + } + + + private *iterateEndpointSockets(endpoint: Endpoint, onlyDirect = true, onlyOutgoing = true) { + for (const socket of this.#endpointSockets.get(endpoint)?.keys() ?? []) { + if (onlyDirect && socket.endpoint !== endpoint) continue; + if (onlyOutgoing && !socket.canSend) continue; + yield socket; + } + } + + private *iterateSockets(onlyOutgoing = true) { + for (const socket of this.#registeredSockets.keys()) { + if (onlyOutgoing && !socket.canSend) continue; + yield socket; + } + } + + /** + * Method called to send a datex block to a receiver (or as a broadcast) + * @param dxb + */ + public async datexOut(data: DatexOutData):Promise { + + this.updateTTL(data.dxb, -1) // decrement TTL + + // broadcast + if (data.receivers == BROADCAST) return this.datexBroadcastOut(data); + + const receivers = data.receivers instanceof Endpoint ? [data.receivers] : [...data.receivers]; + const outGroups = receivers.length == 1 ? + + // single endpoint shortcut + new Map([[this.getPreferredSocketForEndpoint(receivers[0]), new Disjunction(...receivers)]]) : + + // group for multiple endpoints + new Map( + // group receivers by socket + [...Map.groupBy( + // map receivers to sockets + receivers.map(r => ({endpoint: r, socket: this.getPreferredSocketForEndpoint(r)}), + ), ({socket}) => socket + ).entries() + ] + // map endpoint object arrays to Set + .map(([k, v]) => [k, new Disjunction(...v.map(({endpoint}) => endpoint))] as const) + ); + + + const promises = [] + + for (const [socket, endpoints] of outGroups) { + if (!socket) continue; + promises.push(this.sendAddressedBlockToReceivers(data.dxb, endpoints, socket)); + } + + // throw error if message could not be sent to some receivers + if (outGroups.has(undefined)) { + const endpointsString = [...outGroups.get(undefined)!].map(e => e.toString()).join(", ") + throw new NetworkError("No socket for " + endpointsString); + } + + await Promise.all(promises); + } + + public datexBroadcastOut(data: DatexOutData) { + const reachedEndpoints = new Set() + + for (const socket of this.iterateSockets()) { + if (data.socket === socket) continue; + if (reachedEndpoints.has(socket.endpoint)) continue; + reachedEndpoints.add(socket.endpoint); + + socket.sendBlock(data.dxb).catch(console.error); + } + } + + public async sendAddressedBlockToReceivers(dxb: ArrayBuffer, receivers: Disjunction, destSocket: CommunicationInterfaceSocket) { + const addressdDXB = Compiler.updateHeaderReceiver(dxb, receivers); + if (!addressdDXB) throw new Error("Failed to update header receivers"); + + IOHandler.handleDatexSent(addressdDXB, receivers, destSocket) + + const success = await destSocket.sendBlock(addressdDXB); + if (!success) { + this.updateTTL(dxb, 1) // reset TTL to original + return this.datexOut({ + dxb, + receivers, + socket: destSocket + }) + } + } + + private updateTTL(dxb: ArrayBuffer, delta = -1) { + const uint8 = new Uint8Array(dxb); + const currentTTL = uint8[5]; + // console.log("currentTTL", currentTTL, delta); + + // too many redirects (ttl is 0) + if (currentTTL <= 0) throw new NetworkError(DATEX_ERROR.TOO_MANY_REDIRECTS); + + uint8[5] = currentTTL + delta; + } + +} + + +export const communicationHub = CommunicationHub.get() diff --git a/network/communication-interface.ts b/network/communication-interface.ts index 4ce868b0..69f77001 100644 --- a/network/communication-interface.ts +++ b/network/communication-interface.ts @@ -1,36 +1,471 @@ +import { ProtocolDataType } from "../compiler/protocol_types.ts"; +import { BROADCAST } from "../types/addressing.ts"; import { Endpoint } from "../types/addressing.ts"; +import { Logger } from "../utils/logger.ts"; +import { COM_HUB_SECRET, communicationHub } from "./communication-hub.ts"; +import { IOHandler } from "../runtime/io_handler.ts"; +import { LOCAL_ENDPOINT } from "../types/addressing.ts"; +import { Runtime } from "../runtime/runtime.ts"; +import { dxb_header } from "../utils/global_types.ts"; + +export enum InterfaceDirection { + /** + * Supported communication direction: only receive + */ + IN, + /** + * Supported communication direction: only send + */ + OUT, + /** + * Supported communication directions: send and receive + */ + IN_OUT +} + +export type InterfaceProperties = { + type: string, + name?: string, + + /** + * Supported communication directions + */ + direction: InterfaceDirection, + /** + * Time in milliseconds to wait before reconnecting after a connection error + */ + reconnectInterval?: number, + /** + * Estimated mean latency for this interface type in milliseconds (round trip time). + * Lower latency interfaces are preferred over higher latency channels + */ + latency: number, + /** + * Bandwidth in bytes per second + */ + bandwidth: number, + + /** + * If true, the interface does not support continuous connections. + * All sockets are indirectly connected + */ + noContinuousConnection?: boolean +} + +function getIdentifier(properties?: InterfaceProperties) { + if (!properties) return "unknown"; + return `${properties.type}${properties.name? ` (${properties.name})` : ''}` +} + +class EndpointConnectEvent extends Event { + constructor(public endpoint: Endpoint) { + super('connect'); + } +} +class EndpointBeforeChangeEvent extends Event { + constructor(public endpoint: Endpoint) { + super('beforechange') + } +} +class BrokenChannelEvent extends Event { + constructor() { + super('brokenchannel') + } +} + + +interface CustomEventMap { + "connect": EndpointConnectEvent + "beforechange": EndpointBeforeChangeEvent, + "brokenchannel": BrokenChannelEvent +} + +/** + * A connected communication interface socket that is registered in the communication hub + * and can be used to send and receive messages + */ +export type ConnectedCommunicationInterfaceSocket = CommunicationInterfaceSocket & { + connected: true, + endpoint: Endpoint, + channelFactor: number, + interfaceProperties: InterfaceProperties +} + + +type ReadonlySet = Set & {add: never, delete: never, clear: never} + + + +export abstract class CommunicationInterfaceSocket extends EventTarget { + /** + * Endpoint is only set once. If the endpoint changes, a new socket is created + */ + #endpoint?: Endpoint + #connected = false + #destroyed = false; + #opened = false; + + clone?: CommunicationInterfaceSocket + + static defaultLogger = new Logger("CommunicationInterfaceSocket") + public logger = CommunicationInterfaceSocket.defaultLogger; + + #connectTimestamp = Date.now(); + + get connectTimestamp() { + return this.#connectTimestamp + } + + /** + * Calculated value describing the properties of the interface channel, + * based on bandwidth and latency. + * Interfaces with higher channel factor are preferred. + */ + get channelFactor() { + if (!this.interfaceProperties) return undefined; + return this.interfaceProperties.bandwidth / this.interfaceProperties.latency + } + + interfaceProperties?: InterfaceProperties + + get isRegistered() { + return communicationHub.handler.hasSocket(this) + } + + get connected() { + return this.#connected + } + set connected(connected) { + if (connected === this.#connected) return; // no change + if (this.#destroyed) throw new Error("Cannot change connected state of destroyed socket.") + this.#connected = connected + this.#updateRegistration(); + } + + get endpoint(): Endpoint|undefined { + return this.#endpoint + } + + set endpoint(endpoint: Endpoint) { + if (this.#endpoint) throw new Error("Cannot change endpoint of socket. Create a new socket instead.") + this.#endpoint = endpoint + this.#updateRegistration(); + } + + /** + * Adds or removes the socket from the communication hub based + * on the connection state and endpoint availability + */ + #updateRegistration() { + // handle open/close + if (this.#opened && !this.#connected) { + this.#opened = false; + this.close() + } + else if (!this.#opened && this.#connected) { + this.open() + this.#opened = true; + } + + if (!this.#endpoint || this.#destroyed) return; + + if (this.#connected) { + if (!this.isRegistered) { + this.dispatchEvent(new EndpointConnectEvent(this.#endpoint)) + communicationHub.handler.registerSocket(this as ConnectedCommunicationInterfaceSocket, undefined, {knownSince: this.#connectTimestamp, distance: 0}) + } + } + else { + if (this.isRegistered) { + communicationHub.handler.unregisterSocket(this) + } + } + } + + public get canSend() { + return this.interfaceProperties?.direction !== InterfaceDirection.IN + } + public get canReceive() { + return this.interfaceProperties?.direction !== InterfaceDirection.OUT + } + + public sendHello(dxb: ArrayBuffer) { + if (!Runtime.endpoint || Runtime.endpoint == LOCAL_ENDPOINT) return; + this.sendBlock(dxb).catch(console.error) + } + + public sendGoodbye(dxb: ArrayBuffer) { + if (!Runtime.endpoint || Runtime.endpoint == LOCAL_ENDPOINT) return; + this.sendBlock(dxb).catch(console.error) + } + + public async sendBlock(dxb: ArrayBuffer) { + if (!this.canSend) throw new Error("Cannot send from an IN interface socket"); + if (this.#destroyed) throw new Error("Cannot send from destroyed socket.") + if (!this.connected) throw new Error("Cannot send from disconnected socket"); + + const successful = await this.send(dxb) + if (!successful) { + console.error("Failed to send block via " + this + (this.endpoint ? ` - ${this.endpoint}`: "") + " (channel broken). Disconnecting socket.") + // send was not succesful, meaning the channel is broken. Disconnect socket + this.dispatchEvent(new BrokenChannelEvent()) + this.connected = false + } + return successful; + } + + protected async receive(dxb: ArrayBuffer) { + if (!this.canReceive) throw new Error("Cannot receive on an OUT interface socket"); + if (this.#destroyed) throw new Error("Cannot receive on destroyed socket"); + if (!this.connected) throw new Error("Cannot receive on disconnected socket"); + + let header: dxb_header; + try { + header = await communicationHub.handler.datexIn({ + dxb, + socket: this + }) + IOHandler.handleDatexReceived(header, dxb, this) + } + catch (e) { + console.error(e); + return; + } + // a cloned socket was already created in the meantime, handle header in clone + if (this.clone) { + this.clone.handleReceiveHeader(header) + } + // handle header in this socket + else this.handleReceiveHeader(header) + } + + protected handleReceiveHeader(header: dxb_header) { + if (this.#destroyed) return; + if (!this.connected) return; + + if (this.endpoint) { + // received GOODBYE message, assume endpoint switch. If endpoint just disconnects + // this will be recognized when the socket is disconnected + if (header.type == ProtocolDataType.GOODBYE && header.sender === this.endpoint) { + this.connected = false + this.#destroyed = true + this.dispatchEvent(new EndpointBeforeChangeEvent(this.endpoint)) + } + // message from another endpoint, record as indirect socket connection + else if (header.sender !== this.endpoint && !communicationHub.handler.hasSocket(this as ConnectedCommunicationInterfaceSocket, header.sender)) { + if (header.sender === Runtime.endpoint) { + // loopback connection to own endpoint, this is not a problem, but might help with debugging + this.logger.debug("Indirect connection to own endpoint detected at " + this + " (loopback)"); + } + else { + const ttl = header.routing?.ttl; + const maxTTL = header.routing?.prio; + const hops = (maxTTL??0) - (ttl??0); + this.logger.debug("New indirect connection detected at " + this + " (from " + header.sender + " - ttl:" + ttl + "/" + maxTTL + " hops:" + hops + ")"); + if (!header.timestamp) this.logger.warn("header timestamp missing for indirect connection to " + header.sender); + communicationHub.handler.registerSocket(this as ConnectedCommunicationInterfaceSocket, header.sender, {knownSince: header.timestamp?.getTime() || Date.now(), distance: hops}) + } + } + } + // detect new endpoint + else if (header.sender) { + if (!header.timestamp) this.logger.warn("header timestamp missing for direct connection to " + header.sender) + this.#connectTimestamp = header.timestamp?.getTime() || Date.now(); + this.endpoint = header.sender + } + } + + + /** + * Send a DATEX block via this interface + * @param datex + * @param to + */ + protected abstract send(datex:ArrayBuffer): Promise|boolean + + protected abstract open(): void + protected abstract close(): void + + toString() { + return getIdentifier(this.interfaceProperties) + } + + + declare addEventListener: (type: K, listener: (ev: CustomEventMap[K]) => void) => void; + declare removeEventListener: (type: K, listener: (ev: CustomEventMap[K]) => void) => void; + declare dispatchEvent: (ev: CustomEventMap[K]) => boolean; +} + /** * Base class for all DATEX communication interfaces */ -export abstract class CommunicationInterface { +export abstract class CommunicationInterface extends EventTarget { + + protected logger = new Logger(this.constructor.name) + + #sockets = new Set() + + abstract properties: InterfaceProperties + + abstract connect(): boolean|Promise + abstract disconnect(): void|Promise + + #connecting = false; + + /** + * @private + */ + async init(secret: symbol, timeout?: number) { + + if (secret !== COM_HUB_SECRET) throw new Error("Directly calling CommunicationInterface.init() is not allowed") + // if (Runtime.endpoint == LOCAL_ENDPOINT) throw new Error("Cannot use communication interface with local endpoint") + + // return false if not connected after timeout + if (timeout) { + return Promise.race([ + this.#connectLoop().then(() => true), + new Promise(resolve => { + setTimeout(() => { + resolve(false) + }, timeout) + }) + ]) + } + // no timeout + else { + await this.#connectLoop(); + return true; + } + } + + /** + * @private + */ + async deinit(secret: symbol) { + if (secret !== COM_HUB_SECRET) throw new Error("Directly calling CommunicationInterface.deinit() is not allowed") + this.clearSockets() + await this.disconnect() + } + + getSockets(): ReadonlySet { + return this.#sockets as ReadonlySet + } + + + #connectHandler: ((endpoint: EndpointConnectEvent) => void)|null|undefined + + /** + * Event handler that is called when a new endpoint is connected to a socket on this interface + */ + set onConnect(handler: ((endpoint: EndpointConnectEvent) => void)|null|undefined) { + if (handler) { + this.#connectHandler = handler + this.addEventListener("connect", handler) + } + else { + if (this.#connectHandler) this.removeEventListener("connect", this.#connectHandler) + this.#connectHandler = handler + } + } + + async #connectLoop(reconnecting = false) { + if (this.#connecting) return; + this.#connecting = true; + while (!await this.connect()) { + const interval = this.properties.reconnectInterval || 3000; + reconnecting = true; + this.logger.error("Could not connect to " + this + ", trying again in " + Math.round(interval/1000) + "s"); + await sleep(interval) + } + if (reconnecting) this.logger.success("Reconnected to " + this); + else this.logger.debug("Connected to " + this); + this.#connecting = false; + } + + protected async onConnectionError() { + this.logger.error("Connection error (" + this + ")"); + this.clearSockets(); + await this.#connectLoop(true) + } + + /** + * Create a new socket for this interface + */ + protected async addSocket(socket: Socket) { + if (this.#sockets.has(socket)) throw new Error("Socket already part of interface sockets.") + + // endpoint will change (or socket is disconnected completely), propagate event and clone socket + socket.addEventListener('beforechange', e => { + this.dispatchEvent(new EndpointBeforeChangeEvent(e.endpoint)) + // remove old socket + this.removeSocket(socket) + // clone and add new socket + const newSocket = this.cloneSocket(socket); + socket.clone = newSocket; + this.addSocket(newSocket); + }) + + // endpoint connected, propagate event + socket.addEventListener('connect', e => { + this.dispatchEvent(new EndpointConnectEvent(e.endpoint)) + }) - abstract name: string - abstract description?: string + // channel broken, remove socket + socket.addEventListener('brokenchannel', () => { + this.removeSocket(socket) + }); - /** - * Can send data - */ - abstract canSend: boolean + // no direct endpoint connections supported, set socket endpoint to @@any to force only + // indirect connection registrations + if (this.properties.noContinuousConnection) { + socket.endpoint = BROADCAST + } - /** - * Can receive data - */ - abstract canReceive: boolean + socket.interfaceProperties = this.properties + socket.logger = this.logger; + socket.connected = true; // adds sockets to communication hub + this.#sockets.add(socket); + // send HELLO message + if (socket.canSend) { + const helloMessage = await communicationHub.handler.compileHelloMessage() + if (helloMessage) socket.sendHello(helloMessage) + } + } - /** - * Has a connection to the supranet, use as a default interface if possible - */ - abstract isGlobal: boolean // + /** + * Create a new socket from an existing socket + * which is no longer connected to an endpoint + * @param socket + */ + protected abstract cloneSocket(socket: Socket): Socket + /** + * Remove a socket from this interface + */ + protected removeSocket(socket: Socket) { + if (!this.#sockets.has(socket)) { + return; + // throw new Error("Cannot remove socket, not part of interface sockets.") + } + this.#sockets.delete(socket) + socket.connected = false; // removes socket from communication hub + } - /** - * Send a DATEX block via this interface - * @param datex - * @param to - */ - public send(datex:ArrayBuffer, to?: Endpoint) { + /** + * Remove all sockets from this interface + */ + protected clearSockets() { + for (const socket of this.#sockets) { + this.removeSocket(socket) + } + } - } + toString() { + return getIdentifier(this.properties) + } + declare addEventListener: (type: K, listener: (ev: CustomEventMap[K]) => void) => void; + declare removeEventListener: (type: K, listener: (ev: CustomEventMap[K]) => void) => void; + declare dispatchEvent: (ev: CustomEventMap[K]) => boolean; } \ No newline at end of file diff --git a/network/communication-interfaces/http-server-interface.ts b/network/communication-interfaces/http-server-interface.ts new file mode 100644 index 00000000..3d94a1a5 --- /dev/null +++ b/network/communication-interfaces/http-server-interface.ts @@ -0,0 +1,84 @@ +import { CommunicationInterfaceSocket, InterfaceDirection } from "../communication-interface.ts"; +import { CommunicationInterface } from "../communication-interface.ts"; +import { InterfaceProperties } from "../communication-interface.ts"; + +/** + * Server interface, implemented by UIX Server + */ +export interface WebServer { + addRequestHandler(requestHandler: requestHandler, prioritize?: boolean): void +} +type requestHandler = (req: Deno.RequestEvent, path:string, con:Deno.Conn)=>void|boolean|string|Promise; + + + +export class HTTPServerInterfaceSocket extends CommunicationInterfaceSocket { + + constructor(public server: WebServer) { + super(); + } + + open() { + this.server.addRequestHandler(this.handleRequest.bind(this), true); + } + close() { + // ignore (TODO: remove request handler) + } + + protected async handleRequest(requestEvent: Deno.RequestEvent){ + // POST request to /datex-http + if (requestEvent.request.method == "POST" && new URL(requestEvent.request.url).pathname == "/datex-http") { + const dxb = await requestEvent.request.arrayBuffer() + this.receive(dxb); + requestEvent.respondWith(new Response("Ok")); + } + else return false; + } + + send(_dxb: ArrayBuffer) { + // ignore + return false; + } + + override async sendHello(_dxb:ArrayBuffer) { + // ignore + } + override async sendGoodbye(_dxb:ArrayBuffer) { + // ignore + } +} + +/** + * Creates a DATEX HTTP interface that can be used to receive DATEX messages sent via a HTTP POST request to /datex-http + */ +export class HTTPServerInterface extends CommunicationInterface { + + public properties: InterfaceProperties = { + type: "http-server", + direction: InterfaceDirection.IN, + noContinuousConnection: true, + latency: 0, + bandwidth: 1 + } + + #server: WebServer; + + constructor(server: WebServer) { + super() + this.#server = server; + } + + connect() { + const socket = new HTTPServerInterfaceSocket(this.#server) + this.addSocket(socket) + return true; + } + + disconnect() { + // ignore + } + + cloneSocket(socket: HTTPServerInterfaceSocket) { + return new HTTPServerInterfaceSocket(socket.server); + } +} \ No newline at end of file diff --git a/network/communication-interfaces/local-loopback-interface.ts b/network/communication-interfaces/local-loopback-interface.ts new file mode 100644 index 00000000..956c6490 --- /dev/null +++ b/network/communication-interfaces/local-loopback-interface.ts @@ -0,0 +1,71 @@ +import { Runtime } from "../../runtime/runtime.ts"; +import { LOCAL_ENDPOINT, Endpoint } from "../../types/addressing.ts"; +import { CommunicationInterface, CommunicationInterfaceSocket, InterfaceDirection, InterfaceProperties } from "../communication-interface.ts"; + +export class LocalLoopbackInterfaceSocket extends CommunicationInterfaceSocket { + + open() {} + close() {} + + send(dxb: ArrayBuffer) { + Runtime.datexIn({ + dxb, + socket: this + }) + return true; + } + + override async sendHello(_dxb:ArrayBuffer) { + // ignore + } + override async sendGoodbye(_dxb:ArrayBuffer) { + // ignore + } +} + +/** + * Loopback interface for sending and DATEX messages to the local endpoint + */ +export class LocalLoopbackInterface extends CommunicationInterface { + + public properties: InterfaceProperties = { + type: "local", + direction: InterfaceDirection.OUT, + latency: 0, + bandwidth: 1_000_000 + } + + #currentSocket?: LocalLoopbackInterfaceSocket + + constructor() { + super(); + } + + connect() { + // default @@local socket (never removed) + this.createSocket(LOCAL_ENDPOINT); + + Runtime.onEndpointChanged((endpoint) => { + if (endpoint === LOCAL_ENDPOINT) return; + // remove socket for previous endpoint + if (this.#currentSocket) this.removeSocket(this.#currentSocket) + // add new socket for endpoint + this.#currentSocket = this.createSocket(endpoint); + }) + + return true; + } + disconnect() {} + + private createSocket(endpoint: Endpoint) { + const socket = new LocalLoopbackInterfaceSocket(); + socket.endpoint = endpoint; + this.addSocket(socket) + return socket; + } + + cloneSocket(_socket: LocalLoopbackInterfaceSocket): never { + throw new Error("LocalLoopbackInterface does not support cloning") + } + +} \ No newline at end of file diff --git a/network/communication-interfaces/websocket-client-interface.ts b/network/communication-interfaces/websocket-client-interface.ts new file mode 100644 index 00000000..99602bf2 --- /dev/null +++ b/network/communication-interfaces/websocket-client-interface.ts @@ -0,0 +1,70 @@ +import { Endpoint } from "../../types/addressing.ts"; +import { client_type } from "../../utils/constants.ts"; +import { InterfaceDirection } from "../communication-interface.ts"; +import { InterfaceProperties } from "../communication-interface.ts"; +import { WebSocketInterface, WebSocketInterfaceSocket } from "./websocket-interface.ts"; + +/** + * WebSocket client interface for connecting to a WebSocket server + */ +export class WebSocketClientInterface extends WebSocketInterface { + + public properties: InterfaceProperties = { + type: "websocket-client", + direction: InterfaceDirection.IN_OUT, + latency: 40, + bandwidth: 50_000 + } + + public origin:URL + #initialEndpoint?: Endpoint + + constructor(origin: string|URL, initialEndpoint?: Endpoint) { + super() + + // normalize origin + if (typeof origin === "string") { + if (!origin.match(/^\w+?:\/\//)) { + origin = 'wss://' + origin + } + origin = new URL(origin) + } + if (origin.protocol === "https:") origin.protocol = "wss:" + if (origin.protocol === "http:") origin.protocol = "ws:" + if (origin.protocol !== "wss:" && origin.protocol !== "ws:") { + throw new Error("Invalid protocol for WebSocketClientInterface") + } + + this.origin = origin; + this.#initialEndpoint = initialEndpoint; + this.properties.name = origin.toString(); + } + + + connect() { + if (client_type == "browser" && !navigator.onLine) { + this.logger.error("Cannot connect (offline)") + return false; + } + + const webSocket = new WebSocket(this.origin); + return this.initWebSocket(webSocket) + } + + protected addSocket(socket: WebSocketInterfaceSocket) { + // set initial endpoint if already known + if (this.#initialEndpoint) socket.endpoint = this.#initialEndpoint; + return super.addSocket(socket); + } + + onWebSocketOpened(_webSocket: WebSocket) { + if (this.origin.protocol === 'ws:' && !this.origin.host.match(/(localhost|127\.0\.0\.1)(:\d+)?/)) + this.logger.warn(`unsecure websocket connection to ${this.origin.host}`) + } + + onWebSocketClosed(_socket: WebSocketInterfaceSocket) { + // only one websocket exists, so we handle a interface connection error here and try to reconnect + this.onConnectionError(); + } + +} \ No newline at end of file diff --git a/network/communication-interfaces/websocket-interface.ts b/network/communication-interfaces/websocket-interface.ts new file mode 100644 index 00000000..2abfe49a --- /dev/null +++ b/network/communication-interfaces/websocket-interface.ts @@ -0,0 +1,122 @@ +import { CommunicationInterface, CommunicationInterfaceSocket } from "../communication-interface.ts"; + +/** + * WebSocket interface socket, used by WebSocket client and server interfaces + */ +export class WebSocketInterfaceSocket extends CommunicationInterfaceSocket { + constructor(public readonly webSocket: WebSocket) { + super(); + } + + handleReceive = (event: MessageEvent) => { + this.receive(event.data) + } + + open() { + this.webSocket.addEventListener('message', this.handleReceive); + } + + close() { + this.webSocket.removeEventListener('message', this.handleReceive); + } + + send(datex: ArrayBuffer) { + try { + this.webSocket.send(datex) + return true; + } + catch { + return false; + } + } +} + +/** + * Common base class for WebSocket client and server interfaces + */ +export abstract class WebSocketInterface extends CommunicationInterface { + + #webSockets = new Map void, + openHandler: () => void + }>() + + initWebSocket(webSocket: WebSocket) { + return new Promise(resolve => { + try { + webSocket.binaryType = 'arraybuffer'; + + const socket = new WebSocketInterfaceSocket(webSocket) + + let connectionOpen = false; + const errorHandler = () => { + // don't trigger any further errorHandlers + webSocket.removeEventListener('close', errorHandler); + webSocket.removeEventListener('error', errorHandler); + + this.#webSockets.delete(webSocket); + if (webSocket.readyState !== WebSocket.CLOSED) { + // make sure the socket is closed + try {webSocket.close()} catch {/*ignore*/} + } + if (!connectionOpen) resolve(false); + else { + this.removeSocket(socket); + this.onWebSocketClosed(socket) + } + }; + + const openHandler = async () => { + await this.addSocket(socket); + connectionOpen = true; + this.onWebSocketOpened(webSocket); + resolve(true); + }; + + // webSocket already open, call openHandler immediately + if (webSocket.readyState === WebSocket.OPEN) { + openHandler(); + } + + webSocket.addEventListener('open', openHandler); + webSocket.addEventListener('error', errorHandler); + webSocket.addEventListener('close', errorHandler); + + this.#webSockets.set(webSocket, { + errorHandler, + openHandler + }) + } + catch { + resolve(false) + } + }) + } + + /** + * Called when a new WebSocket connection is opened + */ + abstract onWebSocketOpened(webSocket: WebSocket): void + + /** + * Called when a WebSocket connection is closed + */ + abstract onWebSocketClosed(socket: WebSocketInterfaceSocket): void + + disconnect() { + for (const [webSocket, {errorHandler, openHandler}] of this.#webSockets.entries()) { + try { + webSocket.removeEventListener('open', openHandler); + webSocket.removeEventListener('error', errorHandler); + webSocket.removeEventListener('close', errorHandler); + webSocket.close(); + } + catch {} + } + this.#webSockets.clear(); + } + + cloneSocket(socket: WebSocketInterfaceSocket) { + return new WebSocketInterfaceSocket(socket.webSocket); + } +} \ No newline at end of file diff --git a/network/communication-interfaces/websocket-server-interface.ts b/network/communication-interfaces/websocket-server-interface.ts new file mode 100644 index 00000000..bea04a95 --- /dev/null +++ b/network/communication-interfaces/websocket-server-interface.ts @@ -0,0 +1,75 @@ +import { InterfaceDirection } from "../communication-interface.ts"; +import { InterfaceProperties } from "../communication-interface.ts"; +import type { WebServer } from "./http-server-interface.ts"; +import { WebSocketInterface, WebSocketInterfaceSocket } from "./websocket-interface.ts"; + +/** + * WebSocket server interface for receiving WebSocket connections from clients + */ +export class WebSocketServerInterface extends WebSocketInterface { + + public properties: InterfaceProperties = { + type: "websocket-server", + direction: InterfaceDirection.IN_OUT, + latency: 40, + bandwidth: 50_000 + } + + #server: WebServer; + + constructor(server: WebServer) { + super() + this.#server = server; + } + + connect() { + this.#server.addRequestHandler(this.handleRequest.bind(this), true); + return true; + } + + protected async handleRequest(requestEvent: Deno.RequestEvent){ + // is websocket upgrade? + if (requestEvent.request.headers.get("upgrade") === "websocket") { + try { + const socket = this.upgradeWebSocket(requestEvent); + await this.initWebSocket(socket); + return true; + } + catch { + return false; + } + } + else return false; + } + + protected upgradeWebSocket(requestEvent: Deno.RequestEvent) { + // upgrade to websocket + const req = requestEvent.request; + const { socket, response } = Deno.upgradeWebSocket(req); + + requestEvent + .respondWith(response) + .catch(() => {}); // ignore error + + + // infer interface ws url from request url + if (!this.properties.name) { + let name = requestEvent.request.url + .replace("http://localhost", "ws://localhost") + .replace("http://", "wss://") + .replace("https://", "wss://"); + if (name.endsWith("/")) name = name.slice(0, -1); + this.properties.name = name; + } + + return socket; + } + + onWebSocketOpened(_webSocket: WebSocket) { + // ignore + } + + onWebSocketClosed(_socket: WebSocketInterfaceSocket) { + // ignore; + } +} \ No newline at end of file diff --git a/network/communication-interfaces/window-interface.ts b/network/communication-interfaces/window-interface.ts new file mode 100644 index 00000000..bd313851 --- /dev/null +++ b/network/communication-interfaces/window-interface.ts @@ -0,0 +1,240 @@ +import { Runtime } from "../../runtime/runtime.ts"; +import { Endpoint, Target } from "../../types/addressing.ts"; +import { CommunicationInterface, CommunicationInterfaceSocket, InterfaceDirection, InterfaceProperties } from "../communication-interface.ts"; +import { communicationHub } from "../communication-hub.ts"; + +export class WindowInterfaceSocket extends CommunicationInterfaceSocket { + constructor(public readonly window: Window, public readonly windowOrigin: string) { + super(); + } + + handleReceive = (event: MessageEvent) => { + if (event.origin == this.windowOrigin && event.data instanceof ArrayBuffer) { + this.receive(event.data) + } + } + + open() { + globalThis.addEventListener('message', this.handleReceive); + } + + close() { + globalThis.removeEventListener('message', this.handleReceive); + } + + send(dxb: ArrayBuffer) { + try { + this.window.postMessage(dxb, this.windowOrigin) + return true; + } + catch { + return false; + } + } +} + +/** + * Creates a direct DATEX communication channel between a parent and child window + */ +export class WindowInterface extends CommunicationInterface { + + public properties: InterfaceProperties = { + type: "window", + direction: InterfaceDirection.IN_OUT, + latency: 15, + bandwidth: 1_000_000 + } + + #windowOrIFrame: Window|HTMLIFrameElement + #windowOrigin: string + #isChild: boolean + + get window() { + return this.#windowOrIFrame instanceof HTMLIFrameElement ? this.#windowOrIFrame.contentWindow! : this.#windowOrIFrame; + } + + constructor(window: Window, windowOrigin?: string|URL, type?: "parent"|"child") + constructor(iframe: HTMLIFrameElement, iframeOrigin?: string|URL, type?: "parent"|"child") + constructor(window: Window|HTMLIFrameElement, windowOrigin?: string|URL, type?: "parent"|"child") { + super() + + let windowOriginURL = windowOrigin ? new URL(windowOrigin) : null; + + this.#windowOrIFrame = window; + + // is parent document, has iframe + if (window instanceof HTMLIFrameElement) { + this.#isChild = false; + window.setAttribute("sandbox", "allow-scripts allow-same-origin allow-popups allow-popups-to-escape-sandbox") + this.#windowOrigin = new URL(window.src).origin; + windowOriginURL = new URL(window.src); + this.logger.debug("initializing as parent window, child iframe origin: " + this.#windowOrigin) + } + // is opened child window or inside iframe + else if (type !== "parent" && (type === "child" || window === self.window.opener || globalThis.self !== globalThis.top)) { + this.#isChild = true; + + // explicitly set window origin + if (windowOriginURL) { + this.#windowOrigin = windowOriginURL.origin; + } + else { + // first try window.location.origin + try { + this.#windowOrigin = window.location.origin; + } + // try document.referrer + catch { + if (!document.referrer) throw new Error("The origin of the parent window cannot be determined automatically. Please provide windowOrigin as second argument."); + this.#windowOrigin = new URL(document.referrer).origin; + } + } + this.logger.debug("initializing as child window, parent window origin: " + this.#windowOrigin) + } + // is the parent document + else { + this.#isChild = false; + + // explicitly set window origin + if (windowOriginURL) { + this.#windowOrigin = windowOriginURL.origin; + } + else { + throw new Error("The origin of the child window cannot be determined automatically. Please provide windowOrigin as second argument."); + } + this.logger.debug("initializing as parent window, child window origin: " + this.#windowOrigin) + } + + this.properties.name = windowOriginURL?.toString() || this.#windowOrigin; + + globalThis.addEventListener("message", this.onReceive); + if (this.#isChild) { + // if in sub window: send INIT to parent immediately + this.sendInit(); + } + this.handleClose(); + } + + #connectedPromise = Promise.withResolvers() + + connect() { + return this.#connectedPromise.promise; + } + + disconnect() { + // make sure event listener is removed if INIT not yet completed + globalThis.removeEventListener("message", this.onReceive); + } + + + private sendInit() { + this.window.postMessage({ + type: "INIT", + endpoint: Runtime.endpoint.toString() + }, this.#windowOrigin); + } + + onClose?: ()=>void + + private handleClose() { + // check window.closed every second + const interval = setInterval(() => { + if (this.window?.closed) { + clearInterval(interval); + this.clearSockets() + this.onClose?.() + } + }, 1000); + } + + private onReceive = (event: MessageEvent) => { + if (event.origin == this.#windowOrigin) { + const data = event.data; + if (data?.type == "INIT") { + this.#connectedPromise.resolve(true) + + // only one active socket allowed, remove existing + this.clearSockets(); + + const socket = new WindowInterfaceSocket(this.window, this.#windowOrigin) + this.addSocket(socket) + + // if in parent: send INIT to window after initialized + if (!this.#isChild) this.sendInit(); + } + } + } + + cloneSocket(socket: WindowInterfaceSocket) { + return new WindowInterfaceSocket(socket.window, socket.windowOrigin); + } + + + static createChildWindowInterface(childWindow: Window, windowOrigin: string|URL) { + return new WindowInterface(childWindow, windowOrigin, "parent") + } + + static createChildIFrameInterface(iframe: HTMLIFrameElement) { + return new WindowInterface(iframe, undefined, "parent") + } + + static createParentInterface(parentWindow: Window, windowOrigin?: string|URL) { + return new WindowInterface(parentWindow, windowOrigin, "child") + } + + + /** + * Opens a new window and registers a attached WindowInterface. + * The WindowInterface is automatically removed when the window is closed. + */ + static createWindow(url: string | URL, target?: string, features?: string, connectionTimeout?: number) { + const newWindow = window.open(url, target, features); + if (!newWindow) return Promise.resolve({window: null, endpoint: null}); + const windowInterface = this.createChildWindowInterface(newWindow, url) + + communicationHub.addInterface(windowInterface) + windowInterface.onClose = () => { + communicationHub.removeInterface(windowInterface) + } + + return new Promise<{window:Window|null, endpoint: Endpoint|null}>((resolve) => { + windowInterface.addEventListener("connect", e => { + resolve({ + window: newWindow, + endpoint: e.endpoint + }) + }) + if (connectionTimeout!=null && isFinite(connectionTimeout)) { + setTimeout(() => { + newWindow.close(); + resolve({window: newWindow, endpoint: null}) + }, connectionTimeout); + } + }) + } + + /** + * Binds a Iframe and registers a attached WindowInterface. + * The WindowInterface is automatically removed when the iframe is closed. + */ + static bindIFrame(iframe: HTMLIFrameElement, connectionTimeout?: number) { + const windowInterface = this.createChildIFrameInterface(iframe) + + communicationHub.addInterface(windowInterface) + windowInterface.onClose = () => { + communicationHub.removeInterface(windowInterface) + } + + return new Promise((resolve) => { + windowInterface.addEventListener("connect", e => { + resolve(e.endpoint) + }) + if (connectionTimeout!=null && isFinite(connectionTimeout)) { + setTimeout(() => { + resolve(null) + }, connectionTimeout); + } + }) + } + +} \ No newline at end of file diff --git a/network/communication-interfaces/worker-interface.ts b/network/communication-interfaces/worker-interface.ts new file mode 100644 index 00000000..4d3f5246 --- /dev/null +++ b/network/communication-interfaces/worker-interface.ts @@ -0,0 +1,64 @@ +import { Endpoint } from "../../types/addressing.ts"; +import { CommunicationInterface, CommunicationInterfaceSocket, InterfaceDirection, InterfaceProperties } from "../communication-interface.ts"; + +export class WorkerInterfaceSocket extends CommunicationInterfaceSocket { + constructor(public readonly worker: Worker) { + super(); + } + + handleReceive = (event: MessageEvent) => { + if (event.data instanceof ArrayBuffer) { + this.receive(event.data) + } + } + + open() { + this.worker.addEventListener("message", this.handleReceive); + } + + close() { + this.worker.removeEventListener('message', this.handleReceive); + } + + send(dxb: ArrayBuffer) { + try { + this.worker.postMessage(dxb) + return true; + } + catch { + return false; + } + } +} + +/** + * Creates a direct DATEX communication channel between workers/threads + */ +export class WorkerInterface extends CommunicationInterface { + + public properties: InterfaceProperties = { + type: "worker", + direction: InterfaceDirection.IN_OUT, + latency: 15, + bandwidth: 1_000_000 + } + + constructor(worker: Worker, endpoint?: Endpoint) { + super() + const socket = new WorkerInterfaceSocket(worker); + if (endpoint) socket.endpoint = endpoint; + this.addSocket(socket); + // TODO: currently there is no way to know if the worker is still alive + } + + connect() { + return true; + } + + disconnect() {} + + cloneSocket(socket: WorkerInterfaceSocket) { + return new WorkerInterfaceSocket(socket.worker); + } + +} \ No newline at end of file diff --git a/network/supranet.ts b/network/supranet.ts index 26e5e71e..01eae861 100644 --- a/network/supranet.ts +++ b/network/supranet.ts @@ -1,34 +1,29 @@ /** ╔══════════════════════════════════════════════════════════════════════════════════════╗ - ║ Datex Cloud - Entrypoint ║ + ║ unyt.org Supranet connection handler ║ ╠══════════════════════════════════════════════════════════════════════════════════════╣ - ║ Visit https://docs.unyt.org/datex for more information ║ + ║ Visit https://docs.unyt.org/manual/datex/supranet-networking for more information ║ ╠═════════════════════════════════════════╦════════════════════════════════════════════╣ - ║ © 2021 unyt.org ║ ║ + ║ © 2024 unyt.org ║ ║ ╚═════════════════════════════════════════╩════════════════════════════════════════════╝ */ -import InterfaceManager, { CommonInterface } from "./client.ts" -import { Compiler } from "../compiler/compiler.ts"; import { Runtime } from "../runtime/runtime.ts"; import { Crypto } from "../runtime/crypto.ts"; import {client_type} from "../utils/constants.ts"; -import { Endpoint, filter_target_name_id, Target } from "../types/addressing.ts"; - - +import { Endpoint } from "../types/addressing.ts"; import { Logger } from "../utils/logger.ts"; -import { ProtocolDataType } from "../compiler/protocol_types.ts"; -import { buffer2hex } from "../utils/utils.ts"; import { endpoint_config } from "../runtime/endpoint_config.ts"; import { endpoint_name, UnresolvedEndpointProperty } from "../datex_all.ts"; import { Datex } from "../mod.ts"; import { Storage } from "../runtime/storage.ts"; -import { sendDatexViaHTTPChannel } from "./datex-http-channel.ts"; +import { WebSocketClientInterface } from "./communication-interfaces/websocket-client-interface.ts"; +import { communicationHub } from "./communication-hub.ts"; + const logger = new Logger("DATEX Supranet"); -// entry point to connect to the datex network export class Supranet { static available_channel_types:string[] = []; // all available interface channel types, sorted by preference @@ -39,22 +34,6 @@ export class Supranet { static #initialized = false; static get initialized(){return this.#initialized} - // add listeners for interface changes - private static listeners_set = false; - private static setListeners(){ - if (this.listeners_set) return; - this.listeners_set = true; - // say hello when (re)connected - InterfaceManager.onInterfaceConnected((i)=>{ - logger.debug("interface connected: "+ i.endpoint + " - " + i.type); - if (i.type != "local") this.sayHello(i.endpoint) - }) - InterfaceManager.onInterfaceDisconnected((i)=>{ - logger.debug("interface disconnected: "+ i.endpoint + " - " + i.type); - // TODO: validate this - if (!InterfaceManager.active_interfaces.size) this.#connected = false; - }) - } // connect without cache and random endpoint id public static connectAnonymous(){ @@ -66,7 +45,7 @@ export class Supranet { return this.connect(endpoint, false); } - // connect to cloud, say hello with public key + // connect to Supranet // if local_cache=false, a new endpoint is created and not saved in the cache, even if an endpoint is stored in the cache // TODO problem: using same keys as stored endpoint! public static async connect(endpoint?:Endpoint|UnresolvedEndpointProperty, local_cache?: boolean, sign_keys?:[ArrayBuffer|CryptoKey,ArrayBuffer|CryptoKey], enc_keys?:[ArrayBuffer|CryptoKey,ArrayBuffer|CryptoKey], via_node?:Endpoint) { @@ -76,6 +55,8 @@ export class Supranet { return true; } + const alreadyConnected = this.#connected; + // load runtime, own endpoint, nodes this.#connected = false; endpoint = await this.init(endpoint, local_cache, sign_keys, enc_keys) @@ -87,27 +68,27 @@ export class Supranet { // already connected to endpoint during init if (this.#connected && endpoint === Runtime.endpoint) { - const switched = shouldSwitchInstance ? await this.handleSwitchToInstance() : false; + if (shouldSwitchInstance) await this.handleSwitchToInstance() logger.success("Connected to the supranet as " + endpoint) - if (!switched) this.sayHelloToAllInterfaces(); return true; } - const connected = await this._connect(via_node, !shouldSwitchInstance); - if (shouldSwitchInstance) await this.handleSwitchToInstance() + if (alreadyConnected) { + if (shouldSwitchInstance) await this.handleSwitchToInstance(); + return true; + } + else { + const connected = await this._connect(via_node, !shouldSwitchInstance); + if (shouldSwitchInstance) await this.handleSwitchToInstance() + return connected; + } - return connected; } - private static sayHelloToAllInterfaces() { - for (const i of InterfaceManager.active_interfaces) { - if (i.type != "local") this.sayHello(i.endpoint) - } - } private static shouldSwitchInstance(endpoint: Endpoint) { // return false; - return (endpoint.main === endpoint || Runtime.getActiveLocalStorageEndpoints().includes(endpoint)) && Runtime.Blockchain + return (endpoint.main === endpoint || Runtime.getActiveLocalStorageEndpoints().includes(endpoint)) && (!!Runtime.Blockchain) } /** @@ -150,14 +131,12 @@ export class Supranet { Runtime.init(instance); endpoint_config.endpoint = instance; endpoint_config.save(); - this.sayHelloToAllInterfaces(); logger.success("Switched to endpoint instance " + instance) this.handleConnect(); return true; } catch { logger.error("Could not determine endpoint instance (request error)"); - this.sayHelloToAllInterfaces(); this.handleConnect(); } } @@ -167,12 +146,8 @@ export class Supranet { private static async _connect(via_node?:Endpoint, handleOnConnect = true) { // find node for available channel - const [node, channel_type] = await this.getNode(via_node) - - await InterfaceManager.disconnect() // first disconnect completely - const connected = await InterfaceManager.connect(channel_type, node) - - Runtime.setMainNode(node); + const [node, channel_type] = this.getNode(via_node) + const connected = await this.connectToEndpoint(node, channel_type) if (!connected) logger.error("connection failed") else if (handleOnConnect) await this.handleConnect(); @@ -189,17 +164,54 @@ export class Supranet { static getNode(use_node?:Endpoint) { // channel types? - // @ts-ignore if (globalThis.WebSocketStream || client_type!="browser") this.available_channel_types.push("websocketstream") this.available_channel_types.push("websocket"); // find node for available channel - const [node, channel_type] = endpoint_config.getNodeWithChannelType(this.available_channel_types, use_node); + const [node, channel_type] = endpoint_config.getNodeWithInterfaceType(this.available_channel_types, use_node); if (!node) throw ("Cannot find a node that support any channel type of: " + this.available_channel_types + (use_node ? " via " + use_node : '')); if (!channel_type) throw("No channel type for node: " + node); - return <[Endpoint,string]> [node, channel_type] + return [node, channel_type] as const; } + + + /** + * Connects to a endpoint via an available interface if a known + * interface exists for the endpoint + * @param endpoint endpoint to connect to + * @param interfaceType optional interface type to connect with + * @returns true if a connection could be established + */ + public static async connectToEndpoint(endpoint: Endpoint, interfaceType?: string, setAsDefault = true): Promise { + + // check if interface is available + const info = endpoint.getInterfaceChannelInfo(interfaceType); + if (info) { + // websocket + if (interfaceType == "websocket") { + if (!(info instanceof URL || typeof info === "string")) { + logger.error("Invalid data for websocket interface, expected string or URL"); + return false; + } + const webSocketInterface = new WebSocketClientInterface(info instanceof URL ? info.origin : info, endpoint) + await communicationHub.addInterface(webSocketInterface, setAsDefault); + return true; + } + // TODO: more interfaces + else { + logger.error("Interface type not supported: " + interfaceType); + return false; + } + } + else { + return false; + } + + } + + + private static handleConnect() { for (const listener of this.#connectListeners) listener(); if (this.onConnect) this.onConnect() @@ -207,7 +219,7 @@ export class Supranet { // @override public static onConnect = ()=>{ - logger.success("Connected as **"+Runtime.endpoint+"** to the Supranet via **" + CommonInterface.default_interface.endpoint + "** (" + CommonInterface.default_interface.type + ")" ) + logger.success("Connected as **"+Runtime.endpoint+"** to the Supranet via **" + communicationHub.handler.defaultSocket?.endpoint + "** (" + communicationHub.handler.defaultSocket?.toString() + ")" ) } static #connectListeners = new Set<()=>void>() @@ -263,8 +275,6 @@ export class Supranet { return this._init(endpoint, local_cache, sign_keys, enc_keys, keys); } - static #interfaces_initialized = false - private static async _init(endpoint:Endpoint, local_cache = !endpoint_config.temporary, sign_keys?:[ArrayBuffer|CryptoKey,ArrayBuffer|CryptoKey], enc_keys?:[ArrayBuffer|CryptoKey,ArrayBuffer|CryptoKey], keys?:Crypto.ExportedKeySet) { // load/create keys, even if endpoint was provided? @@ -301,13 +311,6 @@ export class Supranet { // bind keys to initialized endpoint (already done for @@local in Crypto.loadOwnKeys) Crypto.saveOwnPublicKeysInEndpointKeyMap() - // setup interface manager - if (!this.#interfaces_initialized) { - this.#interfaces_initialized = true; - await InterfaceManager.init() - this.setListeners(); - } - this.#initialized = true; return endpoint; @@ -365,40 +368,5 @@ export class Supranet { return keys; } - - // important network methods - - public static sayHello(node:Endpoint = Runtime.main_node){ - // TODO REPLACE, only temporary as placeholder to inform router about own public keys - const keys = Crypto.getOwnPublicKeysExported(); - logger.debug("saying hello as " + Runtime.endpoint) - Runtime.datexOut(['?', [keys], {type:ProtocolDataType.HELLO, sign:false, flood:true, __routing_ttl:10}], undefined, undefined, false, false) - // send with plain endpoint id as sender - // if (Runtime.endpoint.id_endpoint !== Runtime.endpoint) Runtime.datexOut(['?', [keys], {type:ProtocolDataType.HELLO, sign:false, flood:true, force_id:true, __routing_ttl:1}], undefined, undefined, false, false) - } - - // ping all endpoints with same base (@endpoint/*) - public static async findOnlineEndpoints(endpoint:Endpoint){ - // TODO - //await this.pingEndpoint(Target.get(endpoint.toString())) - } - - - // get DATEX roundtime/ping for endpoint - public static async pingEndpoint(endpoint_or_string:string|Endpoint, sign=false, encrypt=false) { - let endpoint = endpoint_or_string instanceof Endpoint ? endpoint_or_string : Endpoint.get(endpoint_or_string); - const start_time = new Date().getTime(); - const half_time = (await Runtime.datexOut(['