From e1b6aff4779ef31b2b0d6326fa5adb0bc65d9026 Mon Sep 17 00:00:00 2001 From: benStre Date: Wed, 24 Jan 2024 10:55:51 +0100 Subject: [PATCH 1/2] prevent rejected promise from datex out --- runtime/runtime.ts | 14 ++++++++++---- threads/threads.ts | 19 +++++++++++++++++++ types/addressing.ts | 2 +- 3 files changed, 30 insertions(+), 5 deletions(-) diff --git a/runtime/runtime.ts b/runtime/runtime.ts index 906220ce..176efd7a 100644 --- a/runtime/runtime.ts +++ b/runtime/runtime.ts @@ -1056,7 +1056,10 @@ export class Runtime { if (flood) { this.datex_out(dxb, flood_exclude, true, source) .then(finish) - .catch(e=>reject(e)); + .catch(e => { + if (wait_for_result) reject(e); + else console.error("Error sending datex block", e); + }); } // send to receivers else if (to) { @@ -1064,11 +1067,14 @@ export class Runtime { // send and catch errors while sending, like NetworkError for (const to_endpoint of to) { // check offline status (async), immediately reject if offline - this._handleEndpointOffline(to_endpoint, reject) + if (wait_for_result) this._handleEndpointOffline(to_endpoint, reject) // send dxb this.datex_out(dxb, to_endpoint, undefined, source) .then(finish) - .catch(e=>reject(e)); + .catch(e => { + if (wait_for_result) reject(e); + else console.error("Error sending datex block", e); + }); } } @@ -1125,7 +1131,7 @@ export class Runtime { logger.debug("redirect " + (ProtocolDataType[header.type]) + " " + header.sid + " > " + Runtime.valueToDatexString(header.routing.receivers) + ", ttl="+ (header.routing.ttl-1)); - let res = await this.datexOut(datex, header.routing.receivers, header.sid, wait_for_result, undefined, undefined, undefined, undefined, undefined, source); + const res = await this.datexOut(datex, header.routing.receivers, header.sid, wait_for_result, undefined, undefined, undefined, undefined, undefined, source); return res; } diff --git a/threads/threads.ts b/threads/threads.ts index 1ca2705e..43c326aa 100644 --- a/threads/threads.ts +++ b/threads/threads.ts @@ -376,6 +376,20 @@ async function getThread(): Promise { } +function removeThreadEndpoint(thread: ThreadModule) { + const endpoint = threadEndpoints.get(thread); + if (endpoint) { + availableThreads.set(thread, 0); + threadEndpoints.delete(thread); + if (configuration.cluster) { + configuration.cluster.endpoints.delete(endpoint); + } + } + else { + throw new Error("Thread is not a remote thread") + } +} + function freeThread(thread: ThreadModule) { if (!availableThreads.has(thread)) return; availableThreads.set(thread, availableThreads.get(thread)! - 1); @@ -668,6 +682,11 @@ export async function run(task: (() => ReturnType)|JSTransferableFun const variableName = e.message.match(/ReferenceError - (\S*)/)![1]; throw new RuntimeError("Variable '"+variableName+"' from the parent scope must be explicitly declared at the beginning of the function body with 'use ("+variableName+")'.") } + else if (e instanceof Error && e.message.endsWith("is offline")) { + console.log("cluster endpoint " + endpoint + " is offline"); + removeThreadEndpoint(thread); + return run(task, options, _meta); + } else if (e instanceof Error) { throw new Error(e.message); } diff --git a/types/addressing.ts b/types/addressing.ts index eeadabd7..0bc6b922 100644 --- a/types/addressing.ts +++ b/types/addressing.ts @@ -453,7 +453,7 @@ export class Endpoint extends Target { try { // ping await Runtime.datexOut( - ['"ping"', [], {sign:false, encrypt:false}], + ['', [], {sign:false, encrypt:false}], this, undefined, true, From b125b347a73dfeef6ff0a9284858d6f4bd4a546e Mon Sep 17 00:00:00 2001 From: benStre Date: Wed, 24 Jan 2024 10:57:25 +0100 Subject: [PATCH 2/2] fix endpoint online state no header provided --- runtime/runtime.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/runtime/runtime.ts b/runtime/runtime.ts index 176efd7a..6b96faf5 100644 --- a/runtime/runtime.ts +++ b/runtime/runtime.ts @@ -1858,8 +1858,10 @@ export class Runtime { //throw e console.error(e[1]??e) const header = e[0]; - this.handleScopeError(header, e[1]); - this.updateEndpointOnlineState(header); + if (header) { + this.handleScopeError(header, e[1]); + this.updateEndpointOnlineState(header); + } return; }