Skip to content

Commit

Permalink
add volatile map, fix weakaction, add isolated scopes
Browse files Browse the repository at this point in the history
  • Loading branch information
benStre committed Dec 11, 2023
1 parent ea8a236 commit 8cd4667
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 51 deletions.
8 changes: 6 additions & 2 deletions functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ export function map<T, U, O extends 'array'|'map' = 'array'>(iterable: Iterable<
if (options?.outType == "map") {
mapped = $$(new Map())

new IterableHandler(iterable, {
const iterableHandler = new IterableHandler(iterable, {
map: (v,k)=>{
return mapFn(v,k,iterable)
},
Expand All @@ -143,6 +143,8 @@ export function map<T, U, O extends 'array'|'map' = 'array'>(iterable: Iterable<
onNewEntry: (v,k) => (mapped as Map<number,U>).set(k,v),
onEmpty: () => (mapped as Map<number,U>).clear()
})
// reverse transform binding
Datex.Pointer.bindDisposable(mapped, iterableHandler)
}

// return array
Expand All @@ -152,7 +154,7 @@ export function map<T, U, O extends 'array'|'map' = 'array'>(iterable: Iterable<
// no gaps in a set -> array splice required
const spliceArray = iterable instanceof Set;

new IterableHandler(iterable, {
const iterableHandler = new IterableHandler(iterable, {
map: (v,k)=>{
return mapFn(v,k,iterable)
},
Expand All @@ -167,6 +169,8 @@ export function map<T, U, O extends 'array'|'map' = 'array'>(iterable: Iterable<
(mapped as U[]).length = 0
}
})
// reverse transform binding
Datex.Pointer.bindDisposable(mapped, iterableHandler)
}

}
Expand Down
10 changes: 10 additions & 0 deletions network/unyt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@ export class Unyt {
return info.app?.dynamicData?.domains ? info.app.dynamicData.domains.map(d=>'https://'+d) : [];
}

/**
* Don't delete: required in docker-host
*/
public formatEndpointURL(endpoint: Endpoint) {
const endpointName = endpoint.toString();
if (endpointName.startsWith("@+")) return `${endpointName.replace("@+","")}.unyt.app`
else if (endpointName.startsWith("@@")) return `${endpointName.replace("@@","")}.unyt.app`
else if (endpointName.startsWith("@")) return `${endpointName.replace("@","")}.unyt.me`
}

// TODO add colored logo dark - light mode
public static async logEndpointInfo(){
const info = this.endpoint_info;
Expand Down
28 changes: 28 additions & 0 deletions runtime/pointers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1770,10 +1770,38 @@ export class Pointer<T = any> extends Ref<T> {
Pointer.pointers.delete(this.#id);
Pointer.primitive_pointers.delete(this.#id)

// remove disposables
for (const disposable of this.#boundDisposables) {
console.log("disposing", disposable)
disposable[Symbol.dispose]?.()
}

// call remove listeners (todo: also for primitive pointers)
if (!this.is_anonymous) for (const l of Pointer.pointer_remove_listeners) l(this);
}

#boundDisposables = new Set<{[Symbol.dispose]: ()=>any}>()

// binds a disposable object to this pointer that gets disposed as soon as the pointer is garbage collected
public bindDisposable(disposable: {[Symbol.dispose]: ()=>any}) {
this.#boundDisposables.add(disposable);
if (!this.is_js_primitive) {
if (!this.val[Pointer.DISPOSABLES]) this.val[Pointer.DISPOSABLES] = []
this.val[Pointer.DISPOSABLES].push(disposable);
}
}

static DISPOSABLES = Symbol("DISPOSABLES")

public static bindDisposable(value: any, disposable: {[Symbol.dispose]: ()=>any}) {
const ptr = value instanceof Pointer ? value : this.getByValue(value);
if (ptr) {
ptr.bindDisposable(disposable);
}
else throw new Error("Cannot bind a disposable value to a non-pointer value")
}


[Symbol.dispose]() {
this.delete()
}
Expand Down
2 changes: 1 addition & 1 deletion runtime/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ export class Runtime {
// possible js module import: fetch headers first and check content type:
if (!raw && (url_string.endsWith("js") || url_string.endsWith("ts") || url_string.endsWith("tsx") || url_string.endsWith("jsx") || url_string.endsWith("dx") || url_string.endsWith("dxb"))) {
try {
response = await fetch(url, {method: 'HEAD'});
response = await fetch(url, {method: 'HEAD', cache: 'no-store'});
const type = response.headers.get('content-type');
if (type?.startsWith("text/javascript") || type?.startsWith("application/javascript")) {
doFetch = false; // no body fetch required, can directly import() module
Expand Down
12 changes: 6 additions & 6 deletions types/function-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ export function getDeclaredExternalVariables(fn: (...args:unknown[])=>unknown) {

// call the function with EXTRACT_USED_VARS metadata
try {
callWithMetadata({[EXTRACT_USED_VARS]: true}, fn as any)
callWithMetadata({[EXTRACT_USED_VARS]: true}, fn as any, [{}]) // TODO: provide call arguments that don't lead to a {}/[] destructuring error
}
catch (e) {
// capture returned variables from use()
Expand Down Expand Up @@ -157,13 +157,13 @@ function assertLazyDependenciesResolved(deps:Record<string,unknown>) {
* @param dependencies
* @returns
*/
export function createFunctionWithDependencyInjectionsResolveLazyPointers(source: string, dependencies: Record<string, unknown>): ((...args:unknown[]) => unknown) {
export function createFunctionWithDependencyInjectionsResolveLazyPointers(source: string, dependencies: Record<string, unknown>, allowValueMutations = true): ((...args:unknown[]) => unknown) {
let fn: Function|undefined;

const intermediateFn = (...args:any[]) => {
if (!fn) {
assertLazyDependenciesResolved(dependencies);
fn = createFunctionWithDependencyInjections(source, dependencies)
fn = createFunctionWithDependencyInjections(source, dependencies, allowValueMutations)
}
return fn(...args)
}
Expand All @@ -178,10 +178,10 @@ export function createFunctionWithDependencyInjectionsResolveLazyPointers(source
* @returns
* @deprecated use createFunctionWithDependencyInjectionsResolveLazyPointers
*/
export function createFunctionWithDependencyInjections(source: string, dependencies: Record<string, unknown>): ((...args:unknown[]) => unknown) {
export function createFunctionWithDependencyInjections(source: string, dependencies: Record<string, unknown>, allowValueMutations = true): ((...args:unknown[]) => unknown) {
const hasThis = Object.keys(dependencies).includes('this');
const renamedVars = Object.keys(dependencies).filter(d => d!=='this').map(k=>'_'+k);
const varMapping = renamedVars.map(k=>`const ${k.slice(1)} = createStaticObject(${k});`).join("\n");
const varMapping = renamedVars.map(k=>`const ${k.slice(1)} = ${allowValueMutations ? 'createStaticObject' : ''}(${k});`).join("\n");

const createStaticFn = `function createStaticObject(val) {
if (val && typeof val == "object" && !globalThis.Datex?.Ref.isRef(val)) {
Expand All @@ -192,7 +192,7 @@ export function createFunctionWithDependencyInjections(source: string, dependenc
};`

try {
let creatorFn = new Function(...renamedVars, `"use strict";${varMapping?createStaticFn:''}${varMapping}; return (${source})`)
let creatorFn = new Function(...renamedVars, `"use strict";${(varMapping&&allowValueMutations)?createStaticFn:''}${varMapping}; return (${source})`)
if (hasThis) creatorFn = creatorFn.bind(dependencies['this'])
return creatorFn(...Object.entries(dependencies).filter(([d]) => d!=='this').map(([_,v]) => v));
}
Expand Down
24 changes: 14 additions & 10 deletions types/js-function.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
* Represents a JS function with source code that can be transferred between endpoints
*/

import { LazyPointer } from "../runtime/lazy-pointer.ts";
import { Pointer } from "../runtime/pointers.ts";
import { Runtime } from "../runtime/runtime.ts";
import { ExtensibleFunction, getDeclaredExternalVariables, getDeclaredExternalVariablesAsync, getSourceWithoutUsingDeclaration, Callable, createFunctionWithDependencyInjectionsResolveLazyPointers } from "./function-utils.ts";


export type JSTransferableFunctionOptions = {
errorOnOriginContext?: Error
errorOnOriginContext?: Error,
isLocal?: boolean
}

export class JSTransferableFunction extends ExtensibleFunction {
Expand All @@ -26,11 +26,13 @@ export class JSTransferableFunction extends ExtensibleFunction {
else {
let ptr: Pointer|undefined;
const fn = (...args:any[]) => {
if (!ptr) ptr = Pointer.getByValue(this);
if (!ptr) throw new Error("Cannot execute js:Function, must be bound to a pointer");
const origin = ptr.origin.main;
if (origin !== Runtime.endpoint && !Runtime.trustedEndpoints.get(origin)?.includes("remote-js-execution")) {
throw new Error("Cannot execute js:Function, origin "+origin+" has no permission to execute js source code on this endpoint");
if (!options?.isLocal) {
if (!ptr) ptr = Pointer.getByValue(this);
if (!ptr) throw new Error("Cannot execute js:Function, must be bound to a pointer");
const origin = ptr.origin.main;
if (origin !== Runtime.endpoint.main && !Runtime.trustedEndpoints.get(origin)?.includes("remote-js-execution")) {
throw new Error("Cannot execute js:Function, origin "+origin+" has no permission to execute js source code on this endpoint");
}
}
return intermediateFn(...args)
}
Expand Down Expand Up @@ -68,8 +70,9 @@ export class JSTransferableFunction extends ExtensibleFunction {
* Important: use createAsync for async functions instead
* @param fn
*/
static create<T extends (...args:unknown[])=>unknown>(fn: T, options?:JSTransferableFunctionOptions): JSTransferableFunction & Callable<Parameters<T>, ReturnType<T>> {
static create<T extends (...args:unknown[])=>unknown>(fn: T, options:JSTransferableFunctionOptions = {}): JSTransferableFunction & Callable<Parameters<T>, ReturnType<T>> {
const {vars, flags} = getDeclaredExternalVariables(fn);
options.isLocal ??= true;
return this.#createTransferableFunction(getSourceWithoutUsingDeclaration(fn), vars, flags, options) as any;
}

Expand All @@ -78,8 +81,9 @@ export class JSTransferableFunction extends ExtensibleFunction {
* Automatically determines dependency variables declared with use()
* @param fn
*/
static async createAsync<T extends (...args:unknown[])=>Promise<unknown>>(fn: T, options?:JSTransferableFunctionOptions): Promise<JSTransferableFunction & Callable<Parameters<T>, ReturnType<T>>> {
static async createAsync<T extends (...args:unknown[])=>Promise<unknown>>(fn: T, options:JSTransferableFunctionOptions = {}): Promise<JSTransferableFunction & Callable<Parameters<T>, ReturnType<T>>> {
const {vars, flags} = await getDeclaredExternalVariablesAsync(fn)
options.isLocal ??= true;
return this.#createTransferableFunction(getSourceWithoutUsingDeclaration(fn), vars, flags, options) as any;
}

Expand All @@ -93,7 +97,7 @@ export class JSTransferableFunction extends ExtensibleFunction {
}

static #createTransferableFunction(source: string, dependencies: Record<string, unknown>, flags?: string[], options?:JSTransferableFunctionOptions) {
const intermediateFn = createFunctionWithDependencyInjectionsResolveLazyPointers(source, dependencies);
const intermediateFn = createFunctionWithDependencyInjectionsResolveLazyPointers(source, dependencies, !options?.isLocal);
return new JSTransferableFunction(intermediateFn, dependencies, source, flags, options);
}

Expand Down
5 changes: 5 additions & 0 deletions utils/isolated-scope.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { JSTransferableFunction } from "../types/js-function.ts";

export function isolatedScope(handler:(...args: any[]) => any) {
return JSTransferableFunction.create(handler);
}
51 changes: 39 additions & 12 deletions utils/iterable-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,18 @@ import { Datex } from "../mod.ts";
import { ValueError } from "../datex_all.ts";
import { weakAction } from "./weak-action.ts";


function workaroundGetHandler(iterableHandler: WeakRef<IterableHandler<any>>) {
return (v:any, k:any, t:any) => {
const deref = iterableHandler.deref();
if (!deref) {
console.warn("Undetected garbage collection (datex-w0001)");
return;
}
deref.onValueChanged(v, k, t)
}
}

export class IterableHandler<T, U = T> {

private map: ((value: T, index: number, array: Iterable<T>) => U) | undefined
Expand Down Expand Up @@ -33,27 +45,42 @@ export class IterableHandler<T, U = T> {
observe() {
// deno-lint-ignore no-this-alias
const self = this;
const iterable = this.iterable;
const iterableRef = new WeakRef(this.iterable);

// const handler = this.workaroundGetHandler(self)
// Datex.Ref.observeAndInit(iterable, handler);

weakAction(
{self},
({self}) => {
const handler = this.workaroundGetHandler(self)
Datex.Ref.observeAndInit(iterable, handler);
use (iterableRef, Datex);

const iterable = iterableRef.deref()! // only here to fix closure scope bug, should always exist at this point
const callback = (v:any, k:any, t:any) => {
const deref = self.deref();
if (!deref) {
console.warn("Undetected garbage collection (datex-w0001)");
return;
}
deref.onValueChanged(v, k, t)
}
Datex.Ref.observeAndInit(iterable, callback);
return callback;
},
(callback) => {
use (iterableRef, Datex);

const deref = iterableRef.deref()
if (deref) Datex.Ref.unobserve(deref, callback);
}
);
}

workaroundGetHandler(handler: WeakRef<IterableHandler<any>>) {
return (v:any, k:any, t:any) => {
const deref = handler.deref();
if (!deref) {
console.warn("Undetected garbage collection (datex-w0001)");
return;
}
deref.onValueChanged(v, k, t)
}
[Symbol.dispose]() {
// TODO: unobserve
}


#entries?: Map<number, U>;
public get entries() {
if (!this.#entries) this.#entries = new Map<number, U>();
Expand Down
37 changes: 28 additions & 9 deletions utils/volatile-map.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@ export class VolatileMap<K,V> extends Map<K,V> {

static readonly MAX_MAP_ENTRIES = 2**24
static readonly MAP_ENTRIES_CLEANUP_CHUNK = 1000
static readonly DEFAULT_ENTRYPOINT_LIFETIME = 30*60 // 30min

#options: VolatileMapOptions
#timeouts = new Map<K, number>()
#liftimeStartTimes = new Map<K, number>()
#customLifetimes = new Map<K, number>()

constructor(iterable?: Iterable<readonly [K, V]> | null, options: Partial<VolatileMapOptions> = {}) {
super(iterable)
this.#options = options as VolatileMapOptions;
this.#options.entryLifetime ??= 30*60 // 30min
this.#options.entryLifetime ??= VolatileMap.DEFAULT_ENTRYPOINT_LIFETIME
this.#options.preventMapOverflow ??= true

this.#startInterval()
}

/**
Expand All @@ -24,6 +28,10 @@ export class VolatileMap<K,V> extends Map<K,V> {
* @returns the current value for the key
*/
keepalive(key: K, overrideLifetime?: number) {
if (!this.has(key)) {
console.warn("key does not exist in VolatileMap")
return;
}
this.#setTimeout(key, overrideLifetime);
return this.get(key)
}
Expand Down Expand Up @@ -54,22 +62,33 @@ export class VolatileMap<K,V> extends Map<K,V> {
return super.clear();
}

#startInterval() {
setInterval(
() => {
const currentTime = new Date().getTime();
for (const [key, time] of this.#liftimeStartTimes) {
const lifetime = 1000 * (this.#customLifetimes.get(key) ?? this.#options.entryLifetime);
if (currentTime-time > lifetime) {
this.delete(key);
}
}
},
Math.min(this.#options.entryLifetime, (VolatileMap.DEFAULT_ENTRYPOINT_LIFETIME)) * 1000 / 5
)
}

#clearTimeout(key: K) {
if (this.#timeouts.has(key)) {
clearTimeout(this.#timeouts.get(key));
this.#timeouts.delete(key);
}
this.#liftimeStartTimes.delete(key);
}

#setTimeout(key: K, overrideLifetime?: number) {
// reset previous timeout
this.#clearTimeout(key);
// store custom lifetime
if (overrideLifetime != undefined) this.#customLifetimes.set(key, overrideLifetime)
const lifetime = overrideLifetime ?? this.#options.entryLifetime;
if (Number.isFinite(lifetime)) {
this.#timeouts.set(key, setTimeout(() => {
this.delete(key)
}, lifetime * 1000))
this.#liftimeStartTimes.set(key, new Date().getTime())
}
}
}
Expand Down
Loading

0 comments on commit 8cd4667

Please sign in to comment.