Skip to content

Commit

Permalink
Merge pull request #64 from unyt-org/fix-compute-clusters
Browse files Browse the repository at this point in the history
Handle offline endpoints in compute clusters
  • Loading branch information
benStre authored Jan 24, 2024
2 parents 32a8777 + c758268 commit 76394c2
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 0 deletions.
1 change: 1 addition & 0 deletions threads/compute-clusters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ export class ComputeCluster {
cluster = await datex(cluster);
if (!(cluster instanceof ComputeCluster)) throw new Error(`"${cluster}" is not a ComputeCluster`)
}
else if (!(cluster instanceof ComputeCluster)) throw new Error(`cluster must be a ComputeCluster or a DATEX identifier (e.g. "@myEndpoint.myComputCluster")`)

const ptr = Datex.Pointer.getByValue(cluster);
if (!ptr) throw new Error(`ComputeCluster "${cluster.name}" is not a bound to a pointer`)
Expand Down
19 changes: 19 additions & 0 deletions threads/threads.ts
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,20 @@ async function getThread(): Promise<ThreadModule> {

}

function removeThreadEndpoint(thread: ThreadModule) {
const endpoint = threadEndpoints.get(thread);
if (endpoint) {
availableThreads.set(thread, 0);
threadEndpoints.delete(thread);
if (configuration.cluster) {
configuration.cluster.endpoints.delete(endpoint);
}
}
else {
throw new Error("Thread is not a remote thread")
}
}

function freeThread(thread: ThreadModule) {
if (!availableThreads.has(thread)) return;
availableThreads.set(thread, availableThreads.get(thread)! - 1);
Expand Down Expand Up @@ -668,6 +682,11 @@ export async function run<ReturnType>(task: (() => ReturnType)|JSTransferableFun
const variableName = e.message.match(/ReferenceError - (\S*)/)![1];
throw new RuntimeError("Variable '"+variableName+"' from the parent scope must be explicitly declared at the beginning of the function body with 'use ("+variableName+")'.")
}
else if (e instanceof Error && e.message.endsWith("is offline")) {
console.log("cluster endpoint " + endpoint + " is offline");
removeThreadEndpoint(thread);
return run(task, options, _meta);
}
else if (e instanceof Error) {
throw new Error(e.message);
}
Expand Down

0 comments on commit 76394c2

Please sign in to comment.