Skip to content

Commit

Permalink
Merge pull request #68 from unyt-org/feat-custom-blockchain-relay
Browse files Browse the repository at this point in the history
Add support for custom blockchain relay endpoints
  • Loading branch information
jonasstrehle authored Jan 30, 2024
2 parents 911d75d + 7871676 commit f8e1cb7
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 13 deletions.
16 changes: 13 additions & 3 deletions compiler/compiler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ export type compiler_options = {
plugins?: string[] // list of enabled plugins
required_plugins?: string[] // list of enabled plugins that must be used

stream_abort_signal?: AbortSignal // abort signal for stream

// for routing header
__routing_ttl?:number,
__routing_prio?:number,
Expand Down Expand Up @@ -5149,7 +5151,15 @@ export class Compiler {
let next:ReadableStreamReadResult<any>,
value: any;
while (true) {
if (SCOPE.options.stream_abort_signal?.aborted) {
console.log(`Stream ${SCOPE.options.sid} was aborted`)
break;
}
next = await reader.read()
if (SCOPE.options.stream_abort_signal?.aborted) {
console.log(`Stream ${SCOPE.options.sid} was aborted`)
break;
}
if (next.done) break;
value = next.value;

Expand Down Expand Up @@ -5355,16 +5365,16 @@ export class Compiler {
return this.export(blob, output_name_or_path, file_type, script_or_url instanceof URL ? script_or_url : undefined)
}

static async exportValue(value: any, output_name_or_path?:string|URL, file_type: DATEX_FILE_TYPE = FILE_TYPE.DATEX_BINARY, collapse_pointers = true, collapse_first_inserted = true, keep_external_pointers = true) {
static async exportValue(value: any, output_name_or_path?:string|URL, file_type: DATEX_FILE_TYPE = FILE_TYPE.DATEX_BINARY, collapse_pointers = true, collapse_first_inserted = true, keep_external_pointers = true, resolve_slots = true) {
// compile value
const buffer = await this.compile("?", [value], {collapse_pointers, collapse_first_inserted, keep_external_pointers, no_create_pointers: false}) as ArrayBuffer;
const buffer = await this.compile("?", [value], {collapse_pointers, collapse_first_inserted, keep_external_pointers, no_create_pointers: false, flood: true, to: undefined}) as ArrayBuffer;

if (file_type[0] == "application/datex") {
// export
return this.export(buffer, output_name_or_path, file_type)
}
else {
return this.export(MessageLogger.decompile(buffer, true, false), output_name_or_path, file_type)
return this.export(MessageLogger.decompile(buffer, true, false, resolve_slots), output_name_or_path, file_type)
}

}
Expand Down
16 changes: 13 additions & 3 deletions network/blockchain_adapter.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Datex, instance } from "../mod.ts";
import { Endpoint } from "../datex_all.ts";
import { Datex, f, instance } from "../mod.ts";
import { Disjunction, Endpoint } from "../datex_all.ts";
import {endpoint, property} from "../datex_all.ts";
import { Logger } from "../utils/logger.ts";

Expand Down Expand Up @@ -98,8 +98,18 @@ export type BCData<T extends BCEntryType> =
/**
* Blockchain interface (using @+unyt2 relay node)
*/
const relayNode = new Disjunction(f('@+unyt2'));

@endpoint('@+unyt2') export class Blockchain {
@endpoint(relayNode) export class Blockchain {

static setRelayNode(node: Endpoint) {
logger.debug("using blockchain relay node: " + node);
relayNode.clear();
relayNode.add(node);
}
static getRelayNode() {
return [...relayNode][0];
}

/**
* Methods that must be implemented on an endpoint that has access to the blockchain:#
Expand Down
19 changes: 14 additions & 5 deletions runtime/endpoint_config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import { DatexObject } from "../types/object.ts";
import { Ref } from "./pointers.ts";
import { normalizePath } from "../utils/normalize-path.ts";


type channel_type = 'websocket'|'http'
type node_config = {
channels: Record<channel_type, unknown>,
Expand All @@ -22,9 +21,10 @@ export interface EndpointConfigData {
endpoint?:Endpoint
keys?: Crypto.ExportedKeySet
connect?:boolean // default true
ws_relay?: boolean // create ws relay on backend server, default true
ws_relay?: boolean // create ws relay on backend server (default: true)
temporary?:boolean // default false
nodes?: Map<Endpoint, node_config>,
blockchain_relay?: Endpoint // custom blockchain relay endpoint (default: @+unyt2)
}


Expand All @@ -39,6 +39,7 @@ class EndpointConfig implements EndpointConfigData {
public temporary?:boolean
public ws_relay?:boolean
public nodes?: Map<Endpoint, node_config>
public blockchain_relay?: Endpoint
/*****************/

// not saved in endpoint config, loaded from https://unyt.cc/nodes.dx
Expand Down Expand Up @@ -128,8 +129,8 @@ class EndpointConfig implements EndpointConfigData {
this.connect = DatexObject.get(<any>config, 'connect')
this.temporary = DatexObject.get(<any>config, 'temporary')
this.ws_relay = DatexObject.get(<any>config, 'ws_relay')
// TODO: enable nodes from cached .dx file, currently disabled for backends to avoid problems during development
this.nodes = client_type == "browser" ? DatexObject.get(<any>config, 'nodes') : undefined // DatexObject.get(<any>config, 'nodes');
this.blockchain_relay = DatexObject.get(<any>config, 'blockchain_relay')
this.nodes = DatexObject.get(<any>config, 'nodes')
}

if (this.storage) {
Expand All @@ -139,6 +140,13 @@ class EndpointConfig implements EndpointConfigData {
localStorage.removeItem(this.storageId);
}

// set custom blockchain relay
if (this.blockchain_relay) {
if (!Runtime.Blockchain) throw new Error("Runtime.Blockchain not initialized");
if (this.blockchain_relay instanceof Endpoint) Runtime.Blockchain.setRelayNode(this.blockchain_relay);
else throw new Error("blockchain_relay must be an Endpoint")
}

// load public nodes from unyt.org
await this.loadPublicNodes();
await this.initNodes()
Expand All @@ -152,7 +160,7 @@ class EndpointConfig implements EndpointConfigData {
}

save() {
const serialized = Runtime.valueToDatexString(new Tuple({endpoint:this.#endpoint, connect:this.connect, ws_relay:this.ws_relay, temporary:this.temporary, keys:this.keys, nodes:this.nodes}));
const serialized = Runtime.valueToDatexString(new Tuple({endpoint:this.#endpoint, connect:this.connect, ws_relay:this.ws_relay, temporary:this.temporary, keys:this.keys, nodes:this.nodes, blockchain_relay:this.blockchain_relay}));

if (client_type=="deno") {
try {
Expand Down Expand Up @@ -206,6 +214,7 @@ class EndpointConfig implements EndpointConfigData {
this.ws_relay = undefined;
this.keys = undefined;
this.nodes = undefined;
this.blockchain_relay = undefined;

if (client_type=="deno") {
const config_file = new URL('./' + this.DX_FILE_NAME, cache_path);
Expand Down
4 changes: 2 additions & 2 deletions runtime/pointers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2607,12 +2607,12 @@ export class Pointer<T = any> extends Ref<T> {

// propagate updates via datex
if (this.send_updates_to_origin) {
this.handleDatexUpdate(null, '#0=?;? = #0', [this.current_val, this], this.origin, true)
this.handleDatexUpdate(null, '#0 = ?; ? = #0', [this.current_val, this], this.origin, true)
}
if (this.update_endpoints.size) {
logger.debug("forwarding update to subscribers", this.update_endpoints);
// console.log(this.#update_endpoints);
this.handleDatexUpdate(null, '#0=?;? = #0', [this.current_val, this], this.update_endpoints, true)
this.handleDatexUpdate(null, '#0 = ?; ? = #0', [this.current_val, this], this.update_endpoints, true)
}

// pointer value change listeners
Expand Down

0 comments on commit f8e1cb7

Please sign in to comment.