Skip to content

Commit

Permalink
Merge pull request #125 from unyt-org/fix/parallel-pointer-loading
Browse files Browse the repository at this point in the history
Fix parallel pointer loading
  • Loading branch information
benStre authored Jul 17, 2024
2 parents 31e43cb + e491e87 commit c11c4e5
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 8 deletions.
7 changes: 4 additions & 3 deletions runtime/pointers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1446,7 +1446,7 @@ export class Pointer<T = any> extends Ref<T> {
private static loading_pointers:Map<string, {promise: Promise<Pointer>, scopeList: WeakSet<datex_scope>}> = new Map();

// load from storage or request from remote endpoint if pointer not yet loaded
static load(id:string|Uint8Array, SCOPE?:datex_scope, only_load_local = false, sender_knows_pointer = true, allow_failure = false): Promise<Pointer>|Pointer|LazyPointer<unknown> {
static load(id:string|Uint8Array, SCOPE?:datex_scope, only_load_local = false, sender_knows_pointer = true, allow_failure = false, lockedPointerPromise?: Promise<Pointer>): Promise<Pointer>|Pointer|LazyPointer<unknown> {

// pointer already exists
const existing_pointer = Pointer.get(id);
Expand Down Expand Up @@ -1481,7 +1481,7 @@ export class Pointer<T = any> extends Ref<T> {
const loadPromise = this.handleLoad(id_string, id, SCOPE, only_load_local, sender_knows_pointer, allow_failure);
// only add load data if load not already finished
if (this.loading_pointers.has(id_string)) {
this.addLoadingPointerPromise(id_string, loadPromise, SCOPE);
this.addLoadingPointerPromise(id_string, lockedPointerPromise??loadPromise, SCOPE);
}

return loadPromise;
Expand Down Expand Up @@ -2601,7 +2601,8 @@ export class Pointer<T = any> extends Ref<T> {
let alreadyProxy = false;
if (val && typeof val == "object" && DX_PTR in val) {
alreadyProxy = true;
console.warn("The value assigned to pointer "+this.idString()+" is already bound to " + (val[DX_PTR] as unknown as Pointer).idString() + ":", val)
// TODO: handle this correctly
// console.warn("The value assigned to pointer "+this.idString()+" is already bound to " + (val[DX_PTR] as unknown as Pointer).idString() + ":", val)
}

// TODO: is this required somewhere?
Expand Down
11 changes: 6 additions & 5 deletions runtime/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7123,16 +7123,19 @@ export class Runtime {
const init_block_size = SCOPE.buffer_views.data_view.getUint32(SCOPE.current_index, true);
SCOPE.current_index += Uint32Array.BYTES_PER_ELEMENT;

const {promise: lockedPointerPromise, resolve: lockedPointerResolve, reject: lockedPointerReject} = Promise.withResolvers<Pointer>();

// pointer exists
try {
let pointer = Pointer.create(id);
// when remote pointer is initialized by owner endpoint, it does not have to be loaded here from the remote endpoint, because the content is loaded in the initialization
// TODO: currently the initialization block is always used, even if it was not sent by the pointer owner, because SCOPE.sender?.equals(pointer.origin) does not work with ids and aliases of remote endpoints
const only_load_local = true; //pointer.is_origin || SCOPE.sender?.equals(pointer.origin);
pointer = await Pointer.load(id, SCOPE, only_load_local, knows_pointer?true:false);
pointer = await Pointer.load(id, SCOPE, only_load_local, knows_pointer?true:false, false, lockedPointerPromise);
// console.log("has $" + Pointer.normalizePointerId(id), jmp_index, buffer2hex(SCOPE.buffer_views.uint8.slice(jmp_index), " "));
// pointer.is_persistent = true;
SCOPE.current_index += init_block_size; // jump to end of init block
lockedPointerResolve(pointer);
}
// does not exist: init, or no permission
catch (e) {
Expand All @@ -7146,11 +7149,9 @@ export class Runtime {
else {
if (!SCOPE.inner_scope.waiting_ptrs) SCOPE.inner_scope.waiting_ptrs = new Set();
const tmp_ptr = Pointer.create(id);
// add pointer init promise for recursive init
const {promise, resolve, reject} = Promise.withResolvers<Pointer>();
Pointer.addLoadingPointerPromise(id, promise, SCOPE);
Pointer.addLoadingPointerPromise(id, lockedPointerPromise, SCOPE);
// TODO: make sure resolve or reject is called at some point or the promise is removed
SCOPE.inner_scope.waiting_ptrs.add([tmp_ptr, {resolve, reject}]); // assign next value to pointer;
SCOPE.inner_scope.waiting_ptrs.add([tmp_ptr, {resolve: lockedPointerResolve, reject: lockedPointerReject}]); // assign next value to pointer;
}

}
Expand Down

0 comments on commit c11c4e5

Please sign in to comment.