diff --git a/compiler/compiler.ts b/compiler/compiler.ts index 4508db1e..c233b072 100644 --- a/compiler/compiler.ts +++ b/compiler/compiler.ts @@ -5189,7 +5189,9 @@ export class Compiler { // insert another value else SCOPE.buffer = Compiler.compileValue(value, {}, false); - controller.enqueue(await Compiler.createBlockFromScope(SCOPE)); + const buffer = await Compiler.createBlockFromScope(SCOPE); + (buffer as any)._is_stream = true; // internal flag, set when the outgoing message is a stream, used to prevent recursive stream logs + controller.enqueue(buffer); } // continue after stream, reset SCOPE to previous state diff --git a/network/communication-hub.ts b/network/communication-hub.ts index 8f3afe35..bd3d3e78 100644 --- a/network/communication-hub.ts +++ b/network/communication-hub.ts @@ -252,17 +252,21 @@ export class CommunicationHubHandler { return string; } - public printEndpointSockets(endpoint: Endpoint|string) { + public getEndpointSockets(endpoint: Endpoint|string) { endpoint = endpoint instanceof Endpoint ? endpoint : Endpoint.get(endpoint) as Endpoint; let string = ""; string += `Available sockets for ${endpoint}:\n` - for (const socket of this.iterateEndpointSockets(endpoint, false, false)) { + for (const socket of this.iterateEndpointSockets(endpoint, false, true)) { string += " - " + socket.toString() + "\n"; } - console.log(string) + return string; + } + + public printEndpointSockets(endpoint: Endpoint|string) { + console.log(this.getEndpointSockets(endpoint)) } @@ -542,14 +546,14 @@ export class CommunicationHubHandler { const outGroups = receivers.length == 1 ? // single endpoint shortcut - new Map([[this.getPreferredSocketForEndpoint(receivers[0]), new Disjunction(...receivers)]]) : + new Map([[this.getPreferredSocketForEndpoint(receivers[0], data.socket), new Disjunction(...receivers)]]) : // group for multiple endpoints new Map( // group receivers by socket [...Map.groupBy( // map receivers to sockets - receivers.map(r => ({endpoint: r, socket: this.getPreferredSocketForEndpoint(r)}), + receivers.map(r => ({endpoint: r, socket: this.getPreferredSocketForEndpoint(r, data.socket)}), ), ({socket}) => socket ).entries() ] @@ -587,13 +591,17 @@ export class CommunicationHubHandler { } public async sendAddressedBlockToReceivers(dxb: ArrayBuffer, receivers: Disjunction, destSocket: CommunicationInterfaceSocket) { - const addressdDXB = Compiler.updateHeaderReceiver(dxb, receivers); - if (!addressdDXB) throw new Error("Failed to update header receivers"); + const addressedDXB = Compiler.updateHeaderReceiver(dxb, receivers); + if ((dxb as any)._is_stream) { + (addressedDXB as any)._is_stream = true + } + if (!addressedDXB) throw new Error("Failed to update header receivers"); - IOHandler.handleDatexSent(addressdDXB, receivers, destSocket) + IOHandler.handleDatexSent(addressedDXB, receivers, destSocket) - const success = await destSocket.sendBlock(addressdDXB); + const success = await destSocket.sendBlock(addressedDXB); if (!success) { + this.#logger.warn("Failed to send block to " + receivers.toString() + " via " + destSocket.toString() + ", retrying"); this.updateTTL(dxb, 1) // reset TTL to original return this.datexOut({ dxb, diff --git a/network/communication-interface.ts b/network/communication-interface.ts index 69f77001..266175b9 100644 --- a/network/communication-interface.ts +++ b/network/communication-interface.ts @@ -172,7 +172,7 @@ export abstract class CommunicationInterfaceSocket extends EventTarget { if (this.#connected) { if (!this.isRegistered) { this.dispatchEvent(new EndpointConnectEvent(this.#endpoint)) - communicationHub.handler.registerSocket(this as ConnectedCommunicationInterfaceSocket, undefined, {knownSince: this.#connectTimestamp, distance: 0}) + communicationHub.handler.registerSocket(this as ConnectedCommunicationInterfaceSocket, undefined, {knownSince: this.#connectTimestamp, distance: 1}) } } else { @@ -207,7 +207,7 @@ export abstract class CommunicationInterfaceSocket extends EventTarget { const successful = await this.send(dxb) if (!successful) { console.error("Failed to send block via " + this + (this.endpoint ? ` - ${this.endpoint}`: "") + " (channel broken). Disconnecting socket.") - // send was not succesful, meaning the channel is broken. Disconnect socket + // send was not successful, meaning the channel is broken. Disconnect socket this.dispatchEvent(new BrokenChannelEvent()) this.connected = false } diff --git a/runtime/pointers.ts b/runtime/pointers.ts index 52d901bd..d6c70336 100644 --- a/runtime/pointers.ts +++ b/runtime/pointers.ts @@ -3299,7 +3299,7 @@ export class Pointer extends Ref { this.subscribers.add(subscriber); if (this.subscribers.size == 1) this.updateGarbageCollection() // first subscriber - if (this.streaming.length) setTimeout(()=>this.startStreamOutForEndpoint(subscriber), 1000); // TODO do without timeout? + if (this.streaming.length) this.startStreamOutForEndpoint(subscriber) // setTimeout(()=>, 200); // TODO do without timeout? // force enable live mode also if primitive (subscriber is not handled a new observer) if (this.is_js_primitive) this.setForcedLiveTransform(true) @@ -3322,6 +3322,9 @@ export class Pointer extends Ref { this.updateGarbageCollection() } + // stop streaming + this.stopStreamOutForEndpoint(subscriber) + // remove from endpoint subscriptions map Pointer.#endpoint_subscriptions.get(subscriber)?.delete(this) if (Pointer.#endpoint_subscriptions.get(subscriber)?.size == 0) { @@ -3985,22 +3988,42 @@ export class Pointer extends Ref { this.streaming.push(true); // also stream for all future subscribers + // TODO: stream to multiple endpoints in single DXB block + // if (this.send_updates_to_origin) { + // logger.info("streaming to parent " + this.origin); + // this.handleDatexUpdate(null, '? << ?'/*DatexRuntime.PRECOMPILED_DXB.STREAM*/, [this, obj], this.origin) + // } + // if (this.update_endpoints.size) { + // logger.info("streaming to subscribers " + this.update_endpoints); + // this.handleDatexUpdate(null, '? << ?'/*DatexRuntime.PRECOMPILED_DXB.STREAM*/, [this, obj], this.update_endpoints) + // } + if (this.send_updates_to_origin) { - logger.info("streaming to parent " + this.origin); - this.handleDatexUpdate(null, '? << ?'/*DatexRuntime.PRECOMPILED_DXB.STREAM*/, [this, obj], this.origin) + this.startStreamOutForEndpoint(this.origin); } - if (this.update_endpoints.size) { - logger.info("streaming to subscribers " + this.update_endpoints); - this.handleDatexUpdate(null, '? << ?'/*DatexRuntime.PRECOMPILED_DXB.STREAM*/, [this, obj], this.update_endpoints) + for (const endpoint of this.update_endpoints) { + this.startStreamOutForEndpoint(endpoint); } } + #streamAbortControllers = new Map() + // TODO better way than streaming individually to every new subscriber? startStreamOutForEndpoint(endpoint:Endpoint) { + const abortController = new AbortController(); + this.#streamAbortControllers.set(endpoint, abortController); logger.info("streaming to new subscriber " + endpoint); - this.handleDatexUpdate(null, '? << ?'/*DatexRuntime.PRECOMPILED_DXB.STREAM*/, [this, this.current_val], endpoint) + this.handleDatexUpdate(null, '? << ?', [this, this.current_val], endpoint, undefined, abortController.signal) } + stopStreamOutForEndpoint(endpoint: Endpoint) { + if (this.#streamAbortControllers.has(endpoint)) { + logger.info("stopping streaming to subscriber " + endpoint); + this.#streamAbortControllers.get(endpoint)?.abort(); + this.#streamAbortControllers.delete(endpoint); + } + } + /** all values are removed */ handleClear() { @@ -4201,7 +4224,7 @@ export class Pointer extends Ref { // actual update to subscribers/origin // if identifier is set, further updates to the same identifier are overwritten - async handleDatexUpdate(identifier:string|null, datex:string|PrecompiledDXB, data:any[], receiver:endpoints, collapse_first_inserted = false){ + async handleDatexUpdate(identifier:string|null, datex:string|PrecompiledDXB, data:any[], receiver:endpoints, collapse_first_inserted = false, stream_abort_signal?: AbortSignal){ // let schedulter handle updates (cannot throw errors) if (this.#scheduler) { @@ -4212,7 +4235,7 @@ export class Pointer extends Ref { else { if (receiver instanceof Disjunction && !receiver.size) return; try { - await Runtime.datexOut([datex, data, {collapse_first_inserted, type:ProtocolDataType.UPDATE, preemptive_pointer_init: true}], receiver, undefined, false, undefined, undefined, false, this.datex_timeout); + await Runtime.datexOut([datex, data, {collapse_first_inserted, type:ProtocolDataType.UPDATE, preemptive_pointer_init: true, stream_abort_signal}], receiver, undefined, false, undefined, undefined, false, this.datex_timeout); } catch(e) { //throw e; console.error("forwarding failed", e, datex, data) diff --git a/utils/logger.ts b/utils/logger.ts index ebb2aec7..49832279 100644 --- a/utils/logger.ts +++ b/utils/logger.ts @@ -401,11 +401,11 @@ export class Logger { if (this.origin) Logger.loggers_by_origin.get(this.origin)?.delete(this); } - private log(color: COLOR, text: string, data:any[], log_level:LOG_LEVEL = LOG_LEVEL.DEFAULT, only_log_own_stream = false, add_tag = true) { + private log(color: COLOR, text: string, data:any[], log_level:LOG_LEVEL = LOG_LEVEL.DEFAULT, only_log_own_stream = false, add_tag = true, raw = false) { if (this.production && (log_level < Logger.production_log_level)) return; // don't log for production if (!this.production && (log_level < Logger.development_log_level)) return; // don't log for development - const log_string = this.generateLogString(color, text, data, add_tag); + const log_string = raw ? text : this.generateLogString(color, text, data, add_tag); this.logRaw(log_string, log_level, only_log_own_stream); } @@ -743,6 +743,10 @@ export class Logger { this.log(console_theme == 'dark' ? COLOR.WHITE : COLOR.BLACK, this.normalizeLogText(text), data, LOG_LEVEL.DEFAULT, false, false) } + public raw(text:string) { + this.log(console_theme == 'dark' ? COLOR.WHITE : COLOR.BLACK, this.normalizeLogText(text), [], LOG_LEVEL.DEFAULT, false, false, true) + } + // does not have an effect in the native browser console or log streams with multiple logger inputs (intentionally) public clear(silent = false){ this.logRaw(ESCAPE_SEQUENCES.CLEAR, LOG_LEVEL.DEFAULT, true) @@ -898,8 +902,8 @@ export class Logger { } // stream for specific origins else { - for (let origin of filter_origins) { - for (let logger of this.loggers_by_origin.get(origin)??[]) { + for (const origin of filter_origins) { + for (const logger of this.loggers_by_origin.get(origin)??[]) { logger.out_streams.add(stream); if (!this.loggersForStream.has(stream)) this.loggersForStream.set(stream, new Set()); this.loggersForStream.get(stream)?.add(logger); diff --git a/utils/message_logger.ts b/utils/message_logger.ts index fd396292..dbb0650e 100644 --- a/utils/message_logger.ts +++ b/utils/message_logger.ts @@ -33,16 +33,17 @@ export class MessageLogger { if (!this.logger) this.logger = new Logger("DATEX Message"); IOHandler.onDatexReceived((header, dxb, socket)=>{ + const log = (dxb as any)._is_stream ? console.log : this.logger.raw.bind(this.logger); + // ignore incoming requests from own endpoint to own endpoint const receivers = header.routing?.receivers; const receiverIsOwnEndpoint = receivers instanceof Logical && receivers?.size == 1 && (receivers.has(Runtime.endpoint) || receivers.has(Runtime.endpoint.main)); if (!showRedirectedMessages && !receiverIsOwnEndpoint) return; if (header.sender == Runtime.endpoint && receiverIsOwnEndpoint && header.type != ProtocolDataType.RESPONSE && header.type != ProtocolDataType.DEBUGGER) return; - // ignore hello messages if (header.type == ProtocolDataType.HELLO || header.type == ProtocolDataType.GOODBYE) { - this.logger.plain(`\n#color(blue)◀── ${header.sender||'@*'} ${header.type!=undefined? `(${ProtocolDataType[header.type]}) ` : ''}${socket ? `via ${socket.toString()}` : ''}`); + log(`${ESCAPE_SEQUENCES.BLUE}◀── ${header.sender||'@*'} ${header.type!=undefined? `(${ProtocolDataType[header.type]}) ` : ''}${socket ? `via ${socket.toString()}` : ''}`); return; }; @@ -50,13 +51,16 @@ export class MessageLogger { if (content.trim() == "\x1b[38;2;219;45;129mvoid\x1b[39m;") return; // dont log void; messages content = - `\n${ESCAPE_SEQUENCES.BLUE}${receiverIsOwnEndpoint?'':Runtime.valueToDatexStringExperimental(receivers, false, false)+ ' '}◀── ${header.sender||'@*'} ${header.type!=undefined ? `(${ProtocolDataType[header.type]}) ` : ''}${socket ? `via ${socket.toString()} ` : ''}`.padEnd(80, '─') + '\n' + `${ESCAPE_SEQUENCES.BLUE}${receiverIsOwnEndpoint?'':Runtime.valueToDatexStringExperimental(receivers, false, false)+ ' '}◀── ${header.sender||'@*'} ${header.type!=undefined ? `(${ProtocolDataType[header.type]}) ` : ''}${socket ? `via ${socket.toString()} ` : ''}`.padEnd(80, '─') + '\n' + content + `\n${ESCAPE_SEQUENCES.BLUE}──────────────────────────────────────────────────────────────────────────\n`; - console.log(content) + log(content) }); IOHandler.onDatexSent((header, dxb, socket) => { + + const log = (dxb as any)._is_stream ? console.log : this.logger.raw.bind(this.logger); + // ignore outgoing responses from own endpoint to own endpoint const receivers = header.routing?.receivers; if (header.sender == Runtime.endpoint && (receivers instanceof Logical && receivers?.size == 1 && receivers.has(Runtime.endpoint)) && header.type != ProtocolDataType.RESPONSE && header.type != ProtocolDataType.DEBUGGER) return; @@ -65,7 +69,7 @@ export class MessageLogger { // ignore hello messages if (header.type == ProtocolDataType.HELLO || header.type == ProtocolDataType.GOODBYE) { - this.logger.plain(`\n#color(green)${header.sender||'@*'} ──▶ ${receivers||'@*'} ${header.type!=undefined ? `(${ProtocolDataType[header.type]}) ` : ''}${socket ? `via ${socket.toString()}` : ''}`); + log(`${ESCAPE_SEQUENCES.GREEN}${header.sender||'@*'} ──▶ ${receivers||'@*'} ${header.type!=undefined ? `(${ProtocolDataType[header.type]}) ` : ''}${socket ? `via ${socket.toString()}` : ''}`); return; }; @@ -73,11 +77,11 @@ export class MessageLogger { if (content.trim() == "\x1b[38;2;219;45;129mvoid\x1b[39m;") return; // dont log void; messages content = - `\n${ESCAPE_SEQUENCES.GREEN}${senderIsOwnEndpoint?'':Runtime.valueToDatexStringExperimental(header.sender, false, false)+' '}──▶ ${receivers||'@*'} ${header.type!=undefined ? `(${ProtocolDataType[header.type]}) ` : ''}${socket ? `via ${socket.toString()} ` : ''}`.padEnd(80, '─') + '\n' + `${ESCAPE_SEQUENCES.GREEN}${senderIsOwnEndpoint?'':Runtime.valueToDatexStringExperimental(header.sender, false, false)+' '}──▶ ${receivers||'@*'} ${header.type!=undefined ? `(${ProtocolDataType[header.type]}) ` : ''}${socket ? `via ${socket.toString()} ` : ''}`.padEnd(80, '─') + '\n' + content + `\n${ESCAPE_SEQUENCES.GREEN}──────────────────────────────────────────────────────────────────────────\n`; - console.log(content); + log(content); }); } diff --git a/wasm/adapter/pkg/datex_wasm_bg.wasm b/wasm/adapter/pkg/datex_wasm_bg.wasm index c6f42979..78c62bee 100644 Binary files a/wasm/adapter/pkg/datex_wasm_bg.wasm and b/wasm/adapter/pkg/datex_wasm_bg.wasm differ