Skip to content

Commit

Permalink
Merge pull request #44 from unyt-org/fix-unintended-unsubscribe
Browse files Browse the repository at this point in the history
Fix unintended pointer unsubscribing after first HELLO message
  • Loading branch information
jonasstrehle authored Jan 21, 2024
2 parents 0d2c2dd + f8c5028 commit bd67d88
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 8 deletions.
12 changes: 11 additions & 1 deletion network/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export interface ComInterface {
endpoint?: Endpoint // connected directly to a single endpoint
endpoints?: Set<Endpoint> // multiple endpoints
is_bidirectional_hub?: boolean, // allow the same block to go in and out eg a -> this interface -> this runtime -> this interface again -> b
immediate?: boolean // can send immediately (eg. for local interfaces, workers)
isEqualSource?:(source: Partial<ComInterface>, to: Endpoint) => boolean
in: boolean // can receive data
out: boolean // can send data
Expand Down Expand Up @@ -162,6 +163,7 @@ export abstract class CommonInterface<Args extends unknown[] = []> implements Co
public in = true
public out = true
public global = true
public immediate = false

public get endpoint() {
return this._endpoint
Expand Down Expand Up @@ -197,7 +199,15 @@ export abstract class CommonInterface<Args extends unknown[] = []> implements Co
this.initial_arguments = args;

this.connected = await this.connect();
if (this.connected) this.updateEndpoint();
if (this.connected) {
this.updateEndpoint();
// immediately consider endpoint as online
if (this.endpoint && this.immediate) {
this.endpoint.setOnline(true)
// don't trigger subscription cleanup once first HELLO message is received
this.endpoint.ignoreHello = true;
}
}
return this.connected;
}

Expand Down
4 changes: 2 additions & 2 deletions runtime/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1824,9 +1824,9 @@ export class Runtime {
}
// other message, assume sender endpoint is online now
else {
// HELLO message received, regard as new login to network, reset previous subscriptions
if (header.type == ProtocolDataType.HELLO && !header.sender.ignoreHello) Pointer.clearEndpointSubscriptions(header.sender)
header.sender.setOnline(true)
// new login to network, reset previous subscriptions
if (header.type == ProtocolDataType.HELLO) Pointer.clearEndpointSubscriptions(header.sender)
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion threads/thread-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { Datex as DatexType } from "../mod.ts";

const isServiceWorker = 'registration' in globalThis && (globalThis as any).registration instanceof ServiceWorkerRegistration;

console.log("initialized thread worker", {isServiceWorker})
console.log("spawned new thread worker")

if (isServiceWorker) {
// https://developer.mozilla.org/en-US/docs/Web/API/Clients/claim
Expand Down Expand Up @@ -58,6 +58,9 @@ addEventListener("message", async function (event) {
// await import("https://ga.jspm.io/npm:es-module-shims@1.8.0/dist/es-module-shims.wasm.js");
// if (data.importMap) importShim.addImportMap(data.importMap);

// inherit theme from parent
(globalThis as any)._override_console_theme = data.theme;

await initDatex(data.datexURL);
await initWorkerComInterface(data.comInterfaceURL);
await initTsInterfaceGenerator(data.tsInterfaceGeneratorURL);
Expand Down
7 changes: 4 additions & 3 deletions threads/threads.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Logger } from "../utils/logger.ts";
import { Logger, console_theme } from "../utils/logger.ts";
import "./worker-com-interface.ts";
import { Equals } from "../utils/global_types.ts";

Expand All @@ -23,7 +23,7 @@ export type ThreadPool<imports extends Record<string, unknown> = Record<string,
& {readonly __tag: unique symbol} & {[Symbol.dispose]: ()=>void}

export type MessageToWorker =
{type: "INIT", datexURL: string, comInterfaceURL: string, moduleURL: string, tsInterfaceGeneratorURL:string, endpoint: string, importMap:Record<string,any>} |
{type: "INIT", datexURL: string, comInterfaceURL: string, moduleURL: string, tsInterfaceGeneratorURL:string, endpoint: string, importMap:Record<string,any>, theme:"dark"|"light"} |
{type: "INIT_PORT"}

export type MessageFromWorker =
Expand Down Expand Up @@ -503,7 +503,8 @@ export async function _initWorker(worker: Worker|ServiceWorkerRegistration, modu
comInterfaceURL: import.meta.resolve("./worker-com-interface.ts"),
tsInterfaceGeneratorURL: import.meta.resolve("../utils/interface-generator.ts"),
moduleURL: modulePath ? import.meta.resolve(modulePath.toString()): null,
endpoint: Datex.Runtime.endpoint.toString()
endpoint: Datex.Runtime.endpoint.toString(),
theme: console_theme
});

let resolve: Function;
Expand Down
1 change: 1 addition & 0 deletions threads/worker-com-interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export class WorkerCommunicationInterface extends CommonInterface<[Worker]> {
override global = false;
override authorization_required = false; // don't connect with public keys
override type = "worker";
override immediate = true;

protected connect() {

Expand Down
4 changes: 4 additions & 0 deletions types/addressing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,10 @@ export class Endpoint extends Target {
setTimeout(() => this.#online=undefined, this.#current_online ? Endpoint.cache_life_online : Endpoint.cache_life_offline);
}

/**
* Ignore HELLO messages from this endpoint (don't clean up subscriptions)
*/
public ignoreHello = false;

// get endpoint from string
public static fromString(string:string) {
Expand Down
2 changes: 1 addition & 1 deletion utils/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ const COLOR = {
POINTER: [ESCAPE_SEQUENCES.BLUE, ESCAPE_SEQUENCES.UNYT_POINTER] as COLOR,
} as const;

export let console_theme:"dark"|"light" = (client_type=="deno" || (<any>globalThis).matchMedia && (<any>globalThis).matchMedia('(prefers-color-scheme: dark)')?.matches) ? "dark" : "light";
export let console_theme:"dark"|"light" = (globalThis as any)._override_console_theme ?? ((client_type=="deno" || (<any>globalThis).matchMedia && (<any>globalThis).matchMedia('(prefers-color-scheme: dark)')?.matches) ? "dark" : "light");

try {
(<any>globalThis).matchMedia && (<any>globalThis).matchMedia('(prefers-color-scheme: dark)')?.addEventListener("change", (e:any)=>{
Expand Down

0 comments on commit bd67d88

Please sign in to comment.