Skip to content

Commit

Permalink
Merge pull request #81 from unyt-org/fix-routing
Browse files Browse the repository at this point in the history
Fix routing
  • Loading branch information
jonasstrehle authored Feb 10, 2024
2 parents 0c8dba4 + e86d064 commit 461e6fa
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 32 deletions.
4 changes: 3 additions & 1 deletion compiler/compiler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5189,7 +5189,9 @@ export class Compiler {
// insert another value
else SCOPE.buffer = Compiler.compileValue(value, {}, false);

controller.enqueue(await Compiler.createBlockFromScope(SCOPE));
const buffer = await Compiler.createBlockFromScope(SCOPE);
(buffer as any)._is_stream = true; // internal flag, set when the outgoing message is a stream, used to prevent recursive stream logs
controller.enqueue(buffer);
}

// continue after stream, reset SCOPE to previous state
Expand Down
26 changes: 17 additions & 9 deletions network/communication-hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -252,17 +252,21 @@ export class CommunicationHubHandler {
return string;
}

public printEndpointSockets(endpoint: Endpoint|string) {
public getEndpointSockets(endpoint: Endpoint|string) {
endpoint = endpoint instanceof Endpoint ? endpoint : Endpoint.get(endpoint) as Endpoint;

let string = "";
string += `Available sockets for ${endpoint}:\n`

for (const socket of this.iterateEndpointSockets(endpoint, false, false)) {
for (const socket of this.iterateEndpointSockets(endpoint, false, true)) {
string += " - " + socket.toString() + "\n";
}

console.log(string)
return string;
}

public printEndpointSockets(endpoint: Endpoint|string) {
console.log(this.getEndpointSockets(endpoint))
}


Expand Down Expand Up @@ -542,14 +546,14 @@ export class CommunicationHubHandler {
const outGroups = receivers.length == 1 ?

// single endpoint shortcut
new Map([[this.getPreferredSocketForEndpoint(receivers[0]), new Disjunction(...receivers)]]) :
new Map([[this.getPreferredSocketForEndpoint(receivers[0], data.socket), new Disjunction(...receivers)]]) :

// group for multiple endpoints
new Map(
// group receivers by socket
[...Map.groupBy(
// map receivers to sockets
receivers.map(r => ({endpoint: r, socket: this.getPreferredSocketForEndpoint(r)}),
receivers.map(r => ({endpoint: r, socket: this.getPreferredSocketForEndpoint(r, data.socket)}),
), ({socket}) => socket
).entries()
]
Expand Down Expand Up @@ -587,13 +591,17 @@ export class CommunicationHubHandler {
}

public async sendAddressedBlockToReceivers(dxb: ArrayBuffer, receivers: Disjunction<Endpoint>, destSocket: CommunicationInterfaceSocket) {
const addressdDXB = Compiler.updateHeaderReceiver(dxb, receivers);
if (!addressdDXB) throw new Error("Failed to update header receivers");
const addressedDXB = Compiler.updateHeaderReceiver(dxb, receivers);
if ((dxb as any)._is_stream) {
(addressedDXB as any)._is_stream = true
}
if (!addressedDXB) throw new Error("Failed to update header receivers");

IOHandler.handleDatexSent(addressdDXB, receivers, destSocket)
IOHandler.handleDatexSent(addressedDXB, receivers, destSocket)

const success = await destSocket.sendBlock(addressdDXB);
const success = await destSocket.sendBlock(addressedDXB);
if (!success) {
this.#logger.warn("Failed to send block to " + receivers.toString() + " via " + destSocket.toString() + ", retrying");
this.updateTTL(dxb, 1) // reset TTL to original
return this.datexOut({
dxb,
Expand Down
4 changes: 2 additions & 2 deletions network/communication-interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ export abstract class CommunicationInterfaceSocket extends EventTarget {
if (this.#connected) {
if (!this.isRegistered) {
this.dispatchEvent(new EndpointConnectEvent(this.#endpoint))
communicationHub.handler.registerSocket(this as ConnectedCommunicationInterfaceSocket, undefined, {knownSince: this.#connectTimestamp, distance: 0})
communicationHub.handler.registerSocket(this as ConnectedCommunicationInterfaceSocket, undefined, {knownSince: this.#connectTimestamp, distance: 1})
}
}
else {
Expand Down Expand Up @@ -207,7 +207,7 @@ export abstract class CommunicationInterfaceSocket extends EventTarget {
const successful = await this.send(dxb)
if (!successful) {
console.error("Failed to send block via " + this + (this.endpoint ? ` - ${this.endpoint}`: "") + " (channel broken). Disconnecting socket.")
// send was not succesful, meaning the channel is broken. Disconnect socket
// send was not successful, meaning the channel is broken. Disconnect socket
this.dispatchEvent(new BrokenChannelEvent())
this.connected = false
}
Expand Down
41 changes: 32 additions & 9 deletions runtime/pointers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3299,7 +3299,7 @@ export class Pointer<T = any> extends Ref<T> {

this.subscribers.add(subscriber);
if (this.subscribers.size == 1) this.updateGarbageCollection() // first subscriber
if (this.streaming.length) setTimeout(()=>this.startStreamOutForEndpoint(subscriber), 1000); // TODO do without timeout?
if (this.streaming.length) this.startStreamOutForEndpoint(subscriber) // setTimeout(()=>, 200); // TODO do without timeout?
// force enable live mode also if primitive (subscriber is not handled a new observer)
if (this.is_js_primitive) this.setForcedLiveTransform(true)

Expand All @@ -3322,6 +3322,9 @@ export class Pointer<T = any> extends Ref<T> {
this.updateGarbageCollection()
}

// stop streaming
this.stopStreamOutForEndpoint(subscriber)

// remove from endpoint subscriptions map
Pointer.#endpoint_subscriptions.get(subscriber)?.delete(this)
if (Pointer.#endpoint_subscriptions.get(subscriber)?.size == 0) {
Expand Down Expand Up @@ -3985,22 +3988,42 @@ export class Pointer<T = any> extends Ref<T> {

this.streaming.push(true); // also stream for all future subscribers

// TODO: stream to multiple endpoints in single DXB block
// if (this.send_updates_to_origin) {
// logger.info("streaming to parent " + this.origin);
// this.handleDatexUpdate(null, '? << ?'/*DatexRuntime.PRECOMPILED_DXB.STREAM*/, [this, obj], this.origin)
// }
// if (this.update_endpoints.size) {
// logger.info("streaming to subscribers " + this.update_endpoints);
// this.handleDatexUpdate(null, '? << ?'/*DatexRuntime.PRECOMPILED_DXB.STREAM*/, [this, obj], this.update_endpoints)
// }

if (this.send_updates_to_origin) {
logger.info("streaming to parent " + this.origin);
this.handleDatexUpdate(null, '? << ?'/*DatexRuntime.PRECOMPILED_DXB.STREAM*/, [this, obj], this.origin)
this.startStreamOutForEndpoint(this.origin);
}
if (this.update_endpoints.size) {
logger.info("streaming to subscribers " + this.update_endpoints);
this.handleDatexUpdate(null, '? << ?'/*DatexRuntime.PRECOMPILED_DXB.STREAM*/, [this, obj], this.update_endpoints)
for (const endpoint of this.update_endpoints) {
this.startStreamOutForEndpoint(endpoint);
}
}

#streamAbortControllers = new Map<Endpoint, AbortController>()

// TODO better way than streaming individually to every new subscriber?
startStreamOutForEndpoint(endpoint:Endpoint) {
const abortController = new AbortController();
this.#streamAbortControllers.set(endpoint, abortController);
logger.info("streaming to new subscriber " + endpoint);
this.handleDatexUpdate(null, '? << ?'/*DatexRuntime.PRECOMPILED_DXB.STREAM*/, [this, this.current_val], endpoint)
this.handleDatexUpdate(null, '? << ?', [this, this.current_val], endpoint, undefined, abortController.signal)
}

stopStreamOutForEndpoint(endpoint: Endpoint) {
if (this.#streamAbortControllers.has(endpoint)) {
logger.info("stopping streaming to subscriber " + endpoint);
this.#streamAbortControllers.get(endpoint)?.abort();
this.#streamAbortControllers.delete(endpoint);
}
}


/** all values are removed */
handleClear() {
Expand Down Expand Up @@ -4201,7 +4224,7 @@ export class Pointer<T = any> extends Ref<T> {

// actual update to subscribers/origin
// if identifier is set, further updates to the same identifier are overwritten
async handleDatexUpdate(identifier:string|null, datex:string|PrecompiledDXB, data:any[], receiver:endpoints, collapse_first_inserted = false){
async handleDatexUpdate(identifier:string|null, datex:string|PrecompiledDXB, data:any[], receiver:endpoints, collapse_first_inserted = false, stream_abort_signal?: AbortSignal){

// let schedulter handle updates (cannot throw errors)
if (this.#scheduler) {
Expand All @@ -4212,7 +4235,7 @@ export class Pointer<T = any> extends Ref<T> {
else {
if (receiver instanceof Disjunction && !receiver.size) return;
try {
await Runtime.datexOut([datex, data, {collapse_first_inserted, type:ProtocolDataType.UPDATE, preemptive_pointer_init: true}], receiver, undefined, false, undefined, undefined, false, this.datex_timeout);
await Runtime.datexOut([datex, data, {collapse_first_inserted, type:ProtocolDataType.UPDATE, preemptive_pointer_init: true, stream_abort_signal}], receiver, undefined, false, undefined, undefined, false, this.datex_timeout);
} catch(e) {
//throw e;
console.error("forwarding failed", e, datex, data)
Expand Down
12 changes: 8 additions & 4 deletions utils/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -401,11 +401,11 @@ export class Logger {
if (this.origin) Logger.loggers_by_origin.get(this.origin)?.delete(this);
}

private log(color: COLOR, text: string, data:any[], log_level:LOG_LEVEL = LOG_LEVEL.DEFAULT, only_log_own_stream = false, add_tag = true) {
private log(color: COLOR, text: string, data:any[], log_level:LOG_LEVEL = LOG_LEVEL.DEFAULT, only_log_own_stream = false, add_tag = true, raw = false) {

if (this.production && (log_level < Logger.production_log_level)) return; // don't log for production
if (!this.production && (log_level < Logger.development_log_level)) return; // don't log for development
const log_string = this.generateLogString(color, text, data, add_tag);
const log_string = raw ? text : this.generateLogString(color, text, data, add_tag);
this.logRaw(log_string, log_level, only_log_own_stream);
}

Expand Down Expand Up @@ -743,6 +743,10 @@ export class Logger {
this.log(console_theme == 'dark' ? COLOR.WHITE : COLOR.BLACK, this.normalizeLogText(text), data, LOG_LEVEL.DEFAULT, false, false)
}

public raw(text:string) {
this.log(console_theme == 'dark' ? COLOR.WHITE : COLOR.BLACK, this.normalizeLogText(text), [], LOG_LEVEL.DEFAULT, false, false, true)
}

// does not have an effect in the native browser console or log streams with multiple logger inputs (intentionally)
public clear(silent = false){
this.logRaw(ESCAPE_SEQUENCES.CLEAR, LOG_LEVEL.DEFAULT, true)
Expand Down Expand Up @@ -898,8 +902,8 @@ export class Logger {
}
// stream for specific origins
else {
for (let origin of filter_origins) {
for (let logger of this.loggers_by_origin.get(origin)??[]) {
for (const origin of filter_origins) {
for (const logger of this.loggers_by_origin.get(origin)??[]) {
logger.out_streams.add(stream);
if (!this.loggersForStream.has(stream)) this.loggersForStream.set(stream, new Set());
this.loggersForStream.get(stream)?.add(logger);
Expand Down
18 changes: 11 additions & 7 deletions utils/message_logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,30 +33,34 @@ export class MessageLogger {
if (!this.logger) this.logger = new Logger("DATEX Message");

IOHandler.onDatexReceived((header, dxb, socket)=>{
const log = (dxb as any)._is_stream ? console.log : this.logger.raw.bind(this.logger);

// ignore incoming requests from own endpoint to own endpoint
const receivers = header.routing?.receivers;
const receiverIsOwnEndpoint = receivers instanceof Logical && receivers?.size == 1 && (receivers.has(Runtime.endpoint) || receivers.has(Runtime.endpoint.main));
if (!showRedirectedMessages && !receiverIsOwnEndpoint) return;
if (header.sender == Runtime.endpoint && receiverIsOwnEndpoint && header.type != ProtocolDataType.RESPONSE && header.type != ProtocolDataType.DEBUGGER) return;


// ignore hello messages
if (header.type == ProtocolDataType.HELLO || header.type == ProtocolDataType.GOODBYE) {
this.logger.plain(`\n#color(blue)◀── ${header.sender||'@*'} ${header.type!=undefined? `(${ProtocolDataType[header.type]}) ` : ''}${socket ? `via ${socket.toString()}` : ''}`);
log(`${ESCAPE_SEQUENCES.BLUE}◀── ${header.sender||'@*'} ${header.type!=undefined? `(${ProtocolDataType[header.type]}) ` : ''}${socket ? `via ${socket.toString()}` : ''}`);
return;
};

let content = MessageLogger.decompile(dxb);
if (content.trim() == "\x1b[38;2;219;45;129mvoid\x1b[39m;") return; // dont log void; messages

content =
`\n${ESCAPE_SEQUENCES.BLUE}${receiverIsOwnEndpoint?'':Runtime.valueToDatexStringExperimental(receivers, false, false)+ ' '}◀── ${header.sender||'@*'} ${header.type!=undefined ? `(${ProtocolDataType[header.type]}) ` : ''}${socket ? `via ${socket.toString()} ` : ''}`.padEnd(80, '─') + '\n'
`${ESCAPE_SEQUENCES.BLUE}${receiverIsOwnEndpoint?'':Runtime.valueToDatexStringExperimental(receivers, false, false)+ ' '}◀── ${header.sender||'@*'} ${header.type!=undefined ? `(${ProtocolDataType[header.type]}) ` : ''}${socket ? `via ${socket.toString()} ` : ''}`.padEnd(80, '─') + '\n'
+ content
+ `\n${ESCAPE_SEQUENCES.BLUE}──────────────────────────────────────────────────────────────────────────\n`;
console.log(content)
log(content)
});

IOHandler.onDatexSent((header, dxb, socket) => {

const log = (dxb as any)._is_stream ? console.log : this.logger.raw.bind(this.logger);

// ignore outgoing responses from own endpoint to own endpoint
const receivers = header.routing?.receivers;
if (header.sender == Runtime.endpoint && (receivers instanceof Logical && receivers?.size == 1 && receivers.has(Runtime.endpoint)) && header.type != ProtocolDataType.RESPONSE && header.type != ProtocolDataType.DEBUGGER) return;
Expand All @@ -65,19 +69,19 @@ export class MessageLogger {

// ignore hello messages
if (header.type == ProtocolDataType.HELLO || header.type == ProtocolDataType.GOODBYE) {
this.logger.plain(`\n#color(green)${header.sender||'@*'} ──▶ ${receivers||'@*'} ${header.type!=undefined ? `(${ProtocolDataType[header.type]}) ` : ''}${socket ? `via ${socket.toString()}` : ''}`);
log(`${ESCAPE_SEQUENCES.GREEN}${header.sender||'@*'} ──▶ ${receivers||'@*'} ${header.type!=undefined ? `(${ProtocolDataType[header.type]}) ` : ''}${socket ? `via ${socket.toString()}` : ''}`);
return;
};

let content = MessageLogger.decompile(dxb);
if (content.trim() == "\x1b[38;2;219;45;129mvoid\x1b[39m;") return; // dont log void; messages

content =
`\n${ESCAPE_SEQUENCES.GREEN}${senderIsOwnEndpoint?'':Runtime.valueToDatexStringExperimental(header.sender, false, false)+' '}──▶ ${receivers||'@*'} ${header.type!=undefined ? `(${ProtocolDataType[header.type]}) ` : ''}${socket ? `via ${socket.toString()} ` : ''}`.padEnd(80, '─') + '\n'
`${ESCAPE_SEQUENCES.GREEN}${senderIsOwnEndpoint?'':Runtime.valueToDatexStringExperimental(header.sender, false, false)+' '}──▶ ${receivers||'@*'} ${header.type!=undefined ? `(${ProtocolDataType[header.type]}) ` : ''}${socket ? `via ${socket.toString()} ` : ''}`.padEnd(80, '─') + '\n'
+ content
+ `\n${ESCAPE_SEQUENCES.GREEN}──────────────────────────────────────────────────────────────────────────\n`;

console.log(content);
log(content);
});

}
Expand Down
Binary file modified wasm/adapter/pkg/datex_wasm_bg.wasm
Binary file not shown.

0 comments on commit 461e6fa

Please sign in to comment.