From 33b92061754565712a364cb201b42fb5416e6796 Mon Sep 17 00:00:00 2001 From: benStre Date: Thu, 25 Jan 2024 19:45:08 +0100 Subject: [PATCH 1/3] datex formatting (unrelated) --- runtime/pointers.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runtime/pointers.ts b/runtime/pointers.ts index c3b2bb58..34844a02 100644 --- a/runtime/pointers.ts +++ b/runtime/pointers.ts @@ -2607,12 +2607,12 @@ export class Pointer extends Ref { // propagate updates via datex if (this.send_updates_to_origin) { - this.handleDatexUpdate(null, '#0=?;? = #0', [this.current_val, this], this.origin, true) + this.handleDatexUpdate(null, '#0 = ?; ? = #0', [this.current_val, this], this.origin, true) } if (this.update_endpoints.size) { logger.debug("forwarding update to subscribers", this.update_endpoints); // console.log(this.#update_endpoints); - this.handleDatexUpdate(null, '#0=?;? = #0', [this.current_val, this], this.update_endpoints, true) + this.handleDatexUpdate(null, '#0 = ?; ? = #0', [this.current_val, this], this.update_endpoints, true) } // pointer value change listeners From 5258e3a33911fc4f93046cea2916e2434287d52a Mon Sep 17 00:00:00 2001 From: benStre Date: Thu, 25 Jan 2024 19:45:40 +0100 Subject: [PATCH 2/3] add functionality to set custom blockchain relay endpoint with blockchain_relay entry in .dx file --- network/blockchain_adapter.ts | 16 +++++++++++++--- runtime/endpoint_config.ts | 19 ++++++++++++++----- 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/network/blockchain_adapter.ts b/network/blockchain_adapter.ts index a4185f96..5a194ab7 100644 --- a/network/blockchain_adapter.ts +++ b/network/blockchain_adapter.ts @@ -1,5 +1,5 @@ -import { Datex, instance } from "../mod.ts"; -import { Endpoint } from "../datex_all.ts"; +import { Datex, f, instance } from "../mod.ts"; +import { Disjunction, Endpoint } from "../datex_all.ts"; import {endpoint, property} from "../datex_all.ts"; import { Logger } from "../utils/logger.ts"; @@ -98,8 +98,18 @@ export type BCData = /** * Blockchain interface (using @+unyt2 relay node) */ +const relayNode = new Disjunction(f('@+unyt2')); -@endpoint('@+unyt2') export class Blockchain { +@endpoint(relayNode) export class Blockchain { + + static setRelayNode(node: Endpoint) { + logger.debug("using blockchain relay node: " + node); + relayNode.clear(); + relayNode.add(node); + } + static getRelayNode() { + return [...relayNode][0]; + } /** * Methods that must be implemented on an endpoint that has access to the blockchain:# diff --git a/runtime/endpoint_config.ts b/runtime/endpoint_config.ts index cf520c23..43fb9c02 100644 --- a/runtime/endpoint_config.ts +++ b/runtime/endpoint_config.ts @@ -11,7 +11,6 @@ import { DatexObject } from "../types/object.ts"; import { Ref } from "./pointers.ts"; import { normalizePath } from "../utils/normalize-path.ts"; - type channel_type = 'websocket'|'http' type node_config = { channels: Record, @@ -22,9 +21,10 @@ export interface EndpointConfigData { endpoint?:Endpoint keys?: Crypto.ExportedKeySet connect?:boolean // default true - ws_relay?: boolean // create ws relay on backend server, default true + ws_relay?: boolean // create ws relay on backend server (default: true) temporary?:boolean // default false nodes?: Map, + blockchain_relay?: Endpoint // custom blockchain relay endpoint (default: @+unyt2) } @@ -39,6 +39,7 @@ class EndpointConfig implements EndpointConfigData { public temporary?:boolean public ws_relay?:boolean public nodes?: Map + public blockchain_relay?: Endpoint /*****************/ // not saved in endpoint config, loaded from https://unyt.cc/nodes.dx @@ -128,8 +129,8 @@ class EndpointConfig implements EndpointConfigData { this.connect = DatexObject.get(config, 'connect') this.temporary = DatexObject.get(config, 'temporary') this.ws_relay = DatexObject.get(config, 'ws_relay') - // TODO: enable nodes from cached .dx file, currently disabled for backends to avoid problems during development - this.nodes = client_type == "browser" ? DatexObject.get(config, 'nodes') : undefined // DatexObject.get(config, 'nodes'); + this.blockchain_relay = DatexObject.get(config, 'blockchain_relay') + this.nodes = DatexObject.get(config, 'nodes') } if (this.storage) { @@ -139,6 +140,13 @@ class EndpointConfig implements EndpointConfigData { localStorage.removeItem(this.storageId); } + // set custom blockchain relay + if (this.blockchain_relay) { + if (!Runtime.Blockchain) throw new Error("Runtime.Blockchain not initialized"); + if (this.blockchain_relay instanceof Endpoint) Runtime.Blockchain.setRelayNode(this.blockchain_relay); + else throw new Error("blockchain_relay must be an Endpoint") + } + // load public nodes from unyt.org await this.loadPublicNodes(); await this.initNodes() @@ -152,7 +160,7 @@ class EndpointConfig implements EndpointConfigData { } save() { - const serialized = Runtime.valueToDatexString(new Tuple({endpoint:this.#endpoint, connect:this.connect, ws_relay:this.ws_relay, temporary:this.temporary, keys:this.keys, nodes:this.nodes})); + const serialized = Runtime.valueToDatexString(new Tuple({endpoint:this.#endpoint, connect:this.connect, ws_relay:this.ws_relay, temporary:this.temporary, keys:this.keys, nodes:this.nodes, blockchain_relay:this.blockchain_relay})); if (client_type=="deno") { try { @@ -206,6 +214,7 @@ class EndpointConfig implements EndpointConfigData { this.ws_relay = undefined; this.keys = undefined; this.nodes = undefined; + this.blockchain_relay = undefined; if (client_type=="deno") { const config_file = new URL('./' + this.DX_FILE_NAME, cache_path); From 7871676d4c50a966776c9241049d939a339e1cf7 Mon Sep 17 00:00:00 2001 From: benStre Date: Sat, 27 Jan 2024 00:22:58 +0100 Subject: [PATCH 3/3] add stream abort signal --- compiler/compiler.ts | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/compiler/compiler.ts b/compiler/compiler.ts index f60231fd..1e760da2 100644 --- a/compiler/compiler.ts +++ b/compiler/compiler.ts @@ -248,6 +248,8 @@ export type compiler_options = { plugins?: string[] // list of enabled plugins required_plugins?: string[] // list of enabled plugins that must be used + stream_abort_signal?: AbortSignal // abort signal for stream + // for routing header __routing_ttl?:number, __routing_prio?:number, @@ -5149,7 +5151,15 @@ export class Compiler { let next:ReadableStreamReadResult, value: any; while (true) { + if (SCOPE.options.stream_abort_signal?.aborted) { + console.log(`Stream ${SCOPE.options.sid} was aborted`) + break; + } next = await reader.read() + if (SCOPE.options.stream_abort_signal?.aborted) { + console.log(`Stream ${SCOPE.options.sid} was aborted`) + break; + } if (next.done) break; value = next.value; @@ -5355,16 +5365,16 @@ export class Compiler { return this.export(blob, output_name_or_path, file_type, script_or_url instanceof URL ? script_or_url : undefined) } - static async exportValue(value: any, output_name_or_path?:string|URL, file_type: DATEX_FILE_TYPE = FILE_TYPE.DATEX_BINARY, collapse_pointers = true, collapse_first_inserted = true, keep_external_pointers = true) { + static async exportValue(value: any, output_name_or_path?:string|URL, file_type: DATEX_FILE_TYPE = FILE_TYPE.DATEX_BINARY, collapse_pointers = true, collapse_first_inserted = true, keep_external_pointers = true, resolve_slots = true) { // compile value - const buffer = await this.compile("?", [value], {collapse_pointers, collapse_first_inserted, keep_external_pointers, no_create_pointers: false}) as ArrayBuffer; + const buffer = await this.compile("?", [value], {collapse_pointers, collapse_first_inserted, keep_external_pointers, no_create_pointers: false, flood: true, to: undefined}) as ArrayBuffer; if (file_type[0] == "application/datex") { // export return this.export(buffer, output_name_or_path, file_type) } else { - return this.export(MessageLogger.decompile(buffer, true, false), output_name_or_path, file_type) + return this.export(MessageLogger.decompile(buffer, true, false, resolve_slots), output_name_or_path, file_type) } }