From 5c9342e93b6524a72b06de2a195b825f5422ee16 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Fri, 3 Jan 2025 10:30:45 -0800 Subject: [PATCH] Add Reticulate support to the kernel supervisor (#5854) This change makes it possible to use Reticulate sessions with the kernel supervisor. image The supervisor has a new `adopt` endpoint that allows it to connect to a session that is already running somewhere else. Now, when you start a Reticulate session, the following things happen: - An R session is started, if necessary. - A Reticulate session is started inside the R session. - The Python kernel is started inside the Reticulate session. - The supervisor "adopts" the Python kernel, connecting to its ZeroMQ sockets and proxying messages as it does for other kernels. In addition to adding the orchestration to use the supervisor, there are a couple of other small changes in this PR: - There is a new _Restart the Kernel Supervisor_ command, which shuts down the supervisor process and all sessions, regardless of what state they're in. This is mostly intended as a debugging tool, and it was added in order to make it possible to pick up e.g. debug log level changes without restarting Positron, but could also be helpful as a last resort if things get stuck. - The progress shown when starting a Reticulate session is now much more chatty; it appears as soon as you try to start the session and shows more information along the way. Progress towards #4579; this enables the use of Reticulate on Posit Workbench, and is the last major piece of functionality that required the Jupyter Adapter. (Note that the main body of this change lives in the supervisor itself; this is just the front end / UI bits.) ### QA Notes There's a _lot_ of orchestration involved already in starting Reticulate and this makes it even more complicated. The highest risk areas are around lifecycle management: shutting down, restarting, reconnecting, etc. There is a known issue in which if you have an exited R session in your Console and try to start a Reticulate session, it never happens. I didn't try to fix that in this PR. --- .../src/client/positron/session.ts | 2 +- extensions/positron-reticulate/src/async.ts | 74 +++++++ .../positron-reticulate/src/extension.ts | 181 ++++++++++++------ extensions/positron-supervisor/package.json | 8 +- .../positron-supervisor/package.nls.json | 3 +- .../positron-supervisor/src/AdoptedSession.ts | 76 ++++++++ .../src/KallichoreAdapterApi.ts | 65 ++++++- .../src/KallichoreSession.ts | 132 ++++++++++++- .../src/kcclient/.openapi-generator/FILES | 1 - .../src/kcclient/api/defaultApi.ts | 96 +++++++++- .../src/kcclient/model/models.ts | 3 - extensions/positron-supervisor/src/util.ts | 22 +++ 12 files changed, 585 insertions(+), 78 deletions(-) create mode 100644 extensions/positron-reticulate/src/async.ts create mode 100644 extensions/positron-supervisor/src/AdoptedSession.ts diff --git a/extensions/positron-python/src/client/positron/session.ts b/extensions/positron-python/src/client/positron/session.ts index 3970fb22124..1d5fa5411c9 100644 --- a/extensions/positron-python/src/client/positron/session.ts +++ b/extensions/positron-python/src/client/positron/session.ts @@ -468,7 +468,7 @@ export class PythonRuntimeSession implements positron.LanguageRuntimeSession, vs private async createKernel(): Promise { const config = vscode.workspace.getConfiguration('kernelSupervisor'); - if (config.get('enable', true) && this.runtimeMetadata.runtimeId !== 'reticulate') { + if (config.get('enable', true)) { // Use the Positron kernel supervisor if enabled const ext = vscode.extensions.getExtension('positron.positron-supervisor'); if (!ext) { diff --git a/extensions/positron-reticulate/src/async.ts b/extensions/positron-reticulate/src/async.ts new file mode 100644 index 00000000000..009cb635882 --- /dev/null +++ b/extensions/positron-reticulate/src/async.ts @@ -0,0 +1,74 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (C) 2024 Posit Software, PBC. All rights reserved. + * Licensed under the Elastic License 2.0. See LICENSE.txt for license information. + *--------------------------------------------------------------------------------------------*/ + +/** + * PromiseHandles is a class that represents a promise that can be resolved or + * rejected externally. + */ +export class PromiseHandles { + resolve!: (value: T | Promise) => void; + + reject!: (error: unknown) => void; + + promise: Promise; + + constructor() { + this.promise = new Promise((resolve, reject) => { + this.resolve = resolve; + this.reject = reject; + }); + } +} + +/** + * A barrier that is initially closed and then becomes opened permanently. + * Ported from VS Code's async.ts. + */ + +export class Barrier { + private _isOpen: boolean; + private _promise: Promise; + private _completePromise!: (v: boolean) => void; + + constructor() { + this._isOpen = false; + this._promise = new Promise((c, _e) => { + this._completePromise = c; + }); + } + + isOpen(): boolean { + return this._isOpen; + } + + open(): void { + this._isOpen = true; + this._completePromise(true); + } + + wait(): Promise { + return this._promise; + } +} + +/** + * Wraps a promise in a timeout that rejects the promise if it does not resolve + * within the given time. + * + * @param promise The promise to wrap + * @param timeout The timeout interval in milliseconds + * @param message The error message to use if the promise times out + * + * @returns The wrapped promise + */ +export function withTimeout(promise: Promise, + timeout: number, + message: string): Promise { + return Promise.race([ + promise, + new Promise((_, reject) => setTimeout(() => reject(new Error(message)), timeout)) + ]); +} + diff --git a/extensions/positron-reticulate/src/extension.ts b/extensions/positron-reticulate/src/extension.ts index 747a6f3cfca..6d1a4898d3d 100644 --- a/extensions/positron-reticulate/src/extension.ts +++ b/extensions/positron-reticulate/src/extension.ts @@ -8,6 +8,7 @@ import * as positron from 'positron'; import path = require('path'); import fs = require('fs'); import { JupyterKernelSpec, JupyterSession, JupyterKernel } from './jupyter-adapter.d'; +import { Barrier, PromiseHandles } from './async'; export class ReticulateRuntimeManager implements positron.LanguageRuntimeManager { @@ -166,53 +167,121 @@ class InitializationError extends Error { class ReticulateRuntimeSession implements positron.LanguageRuntimeSession { private kernel: JupyterKernel | undefined; + public started = new Barrier(); private pythonSession: positron.LanguageRuntimeSession; // To create a reticulate runtime session we need to first create a python // runtime session using the exported interface from the positron-python // extension. - // The PythonRuntimeSession object in the positron-python extensions, is created - // by passing 'runtimeMetadata', 'sessionMetadata' and something called 'kernelSpec' - // that's further passed to the JupyterAdapter extension in order to actually initialize - // the session. - - // ReticulateRuntimeSession are only different from Python runtime sessions in the - // way the kernel spec is provided. In general, the kernel spec contains a runtime - // path and some arguments that are used start the kernel process. (The kernel is started - // by the Jupyter Adapter in a vscode terminal). In the reticulate case, the kernel isn't - // started that way. Instead, we need to call into the R console to start the python jupyter - // kernel (that's actually running in the same process as R), and only then, ask JupyterAdapter - // to connect to that kernel. + // The PythonRuntimeSession object in the positron-python extensions, is + // created by passing 'runtimeMetadata', 'sessionMetadata' and something + // called 'kernelSpec' that's further passed to the JupyterAdapter + // extension in order to actually initialize the session. + + // ReticulateRuntimeSession are only different from Python runtime sessions + // in the way the kernel spec is provided. In general, the kernel spec + // contains a runtime path and some arguments that are used start the + // kernel process. (The kernel is started by the Jupyter Adapter in a + // vscode terminal). In the reticulate case, the kernel isn't started that + // way. Instead, we need to call into the R console to start the python + // jupyter kernel (that's actually running in the same process as R), and + // only then, ask JupyterAdapter to connect to that kernel. static async create( runtimeMetadata: positron.LanguageRuntimeMetadata, sessionMetadata: positron.RuntimeSessionMetadata, ): Promise { - const rSession = await getRSession(); - const config = await ReticulateRuntimeSession.checkRSession(rSession); - const metadata = await ReticulateRuntimeSession.fixInterpreterPath(runtimeMetadata, config.python); - return new ReticulateRuntimeSession( - rSession, - metadata, - sessionMetadata, - ReticulateRuntimeSessionType.Create - ); + // A deferred promise that will resolve when the session is created. + const sessionPromise = new PromiseHandles(); + + // Show a progress notification while we create the session. + vscode.window.withProgress({ + location: vscode.ProgressLocation.Notification, + title: 'Creating the Reticulate Python session', + cancellable: false + }, async (progress, _token) => { + let session: ReticulateRuntimeSession | undefined; + try { + // Get the R session that we'll use to start the reticulate session. + progress.report({ increment: 10, message: 'Initializing the host R session' }); + const rSession = await getRSession(progress); + + // Make sure the R session has the necessary packages installed. + progress.report({ increment: 10, message: 'Checking prerequisites' }); + const config = await ReticulateRuntimeSession.checkRSession(rSession); + const metadata = await ReticulateRuntimeSession.fixInterpreterPath(runtimeMetadata, config.python); + + // Create the session itself. + session = new ReticulateRuntimeSession( + rSession, + metadata, + sessionMetadata, + ReticulateRuntimeSessionType.Create, + progress + ); + sessionPromise.resolve(session); + } catch (err) { + sessionPromise.reject(err); + } + + // Wait for the session to start (or fail to start) before + // returning from this callback, so that the progress bar stays up + // while we wait. + if (session) { + progress.report({ increment: 10, message: 'Waiting to connect' }); + await session.started.wait(); + } + }); + + return sessionPromise.promise; } static async restore( runtimeMetadata: positron.LanguageRuntimeMetadata, sessionMetadata: positron.RuntimeSessionMetadata, ): Promise { - const rSession = await getRSession(); - const config = await ReticulateRuntimeSession.checkRSession(rSession); - const metadata = await ReticulateRuntimeSession.fixInterpreterPath(runtimeMetadata, config.python); - return new ReticulateRuntimeSession( - rSession, - metadata, - sessionMetadata, - ReticulateRuntimeSessionType.Restore - ); + + const sessionPromise = new PromiseHandles(); + + vscode.window.withProgress({ + location: vscode.ProgressLocation.Notification, + title: 'Restoring the Reticulate Python session', + cancellable: false + }, async (progress, _token) => { + let session: ReticulateRuntimeSession | undefined; + try { + // Find the R session that we'll use to restore the reticulate session. + progress.report({ increment: 10, message: 'Initializing the host R session' }); + const rSession = await getRSession(progress); + + // Make sure the R session has the necessary packages installed. + progress.report({ increment: 10, message: 'Checking prerequisites' }); + const config = await ReticulateRuntimeSession.checkRSession(rSession); + const metadata = await ReticulateRuntimeSession.fixInterpreterPath(runtimeMetadata, config.python); + + // Create the session itself. + session = new ReticulateRuntimeSession( + rSession, + metadata, + sessionMetadata, + ReticulateRuntimeSessionType.Restore, + progress + ); + sessionPromise.resolve(session); + } catch (err) { + sessionPromise.reject(err); + } + + // Wait for the session to resume (or fail to resume) before + // returning + if (session) { + progress.report({ increment: 10, message: 'Waiting to reconnect' }); + await session.started.wait(); + } + }); + + return sessionPromise.promise; } static async checkRSession(rSession: positron.LanguageRuntimeSession): Promise<{ python: string }> { @@ -330,6 +399,7 @@ class ReticulateRuntimeSession implements positron.LanguageRuntimeSession { runtimeMetadata: positron.LanguageRuntimeMetadata, sessionMetadata: positron.RuntimeSessionMetadata, sessionType: ReticulateRuntimeSessionType, + readonly progress: vscode.Progress<{ message?: string; increment?: number }> ) { // When the kernelSpec is undefined, the PythonRuntimeSession // will perform a restore session. @@ -360,12 +430,20 @@ class ReticulateRuntimeSession implements positron.LanguageRuntimeSession { if (!api) { throw new Error('Failed to find the Positron Python extension API.'); } + this.progress.report({ increment: 10, message: 'Creating the Python session' }); this.pythonSession = api.exports.positron.createPythonRuntimeSession( runtimeMetadata, sessionMetadata, kernelSpec ); + // Open the start barrier once the session is ready. + this.pythonSession.onDidChangeRuntimeState((state) => { + if (state === positron.RuntimeState.Ready || state === positron.RuntimeState.Idle) { + this.started.open(); + } + }); + this.onDidReceiveRuntimeMessage = this.pythonSession.onDidReceiveRuntimeMessage; this.onDidChangeRuntimeState = this.pythonSession.onDidChangeRuntimeState; this.onDidEndSession = this.pythonSession.onDidEndSession; @@ -373,7 +451,8 @@ class ReticulateRuntimeSession implements positron.LanguageRuntimeSession { // A function that starts a kernel and then connects to it. async startKernel(session: JupyterSession, kernel: JupyterKernel) { - kernel.log('Starting the reticulate session!'); + kernel.log('Starting the Reticulate session!'); + this.progress.report({ increment: 10, message: 'Starting the Reticulate session in R' }); // Store a reference to the kernel, so the session can log, reconnect, etc. this.kernel = kernel; @@ -407,11 +486,15 @@ class ReticulateRuntimeSession implements positron.LanguageRuntimeSession { throw new Error(`Reticulate initialization failed: ${init_err}`); } + this.progress.report({ increment: 10, message: 'Connecting to the Reticulate session' }); + try { await kernel.connectToSession(session); } catch (err: any) { kernel.log('Failed connecting to the Reticulate Python session'); throw err; + } finally { + this.started.open(); } } @@ -533,7 +616,7 @@ class ReticulateRuntimeSession implements positron.LanguageRuntimeSession { } } -async function getRSession(): Promise { +async function getRSession(progress: vscode.Progress<{ message?: string; increment?: number }>): Promise { // Retry logic to start an R session. const maxRetries = 5; @@ -541,7 +624,7 @@ async function getRSession(): Promise { let error; for (let i = 0; i < maxRetries; i++) { try { - session = await getRSession_(); + session = await getRSession_(progress); } catch (err: any) { error = err; // Keep the last error so we can display it @@ -566,12 +649,12 @@ class RSessionError extends Error { } } -async function getRSession_(): Promise { +async function getRSession_(progress: vscode.Progress<{ message?: string; increment?: number }>): Promise { let session = await positron.runtime.getForegroundSession(); if (session) { // Get foreground session will return a runtime session even if it has - // already exitted. We check that it's still there before proceeding. + // already exited. We check that it's still there before proceeding. // TODO: it would be nice to have an API to check for the session state. try { await session.callMethod?.('is_installed', 'reticulate', '1.39'); @@ -581,25 +664,15 @@ async function getRSession_(): Promise { } if (!session || session.runtimeMetadata.languageId !== 'r') { - session = await vscode.window.withProgress({ - location: vscode.ProgressLocation.Notification, - title: 'Starting R session for reticulate', - cancellable: true - }, async (progress, token) => { - token.onCancellationRequested(() => { - throw new RSessionError('User requested cancellation', true); - }); - progress.report({ increment: 0, message: 'Looking for prefered runtime...' }); - const runtime = await positron.runtime.getPreferredRuntime('r'); - - progress.report({ increment: 20, message: 'Starting R runtime...' }); - await positron.runtime.selectLanguageRuntime(runtime.runtimeId); - - progress.report({ increment: 70, message: 'Getting R session...' }); - session = await positron.runtime.getForegroundSession(); - - return session; - }); + progress.report({ increment: 10, message: 'Looking for prefered runtime...' }); + + const runtime = await positron.runtime.getPreferredRuntime('r'); + + progress.report({ increment: 10, message: 'Starting R runtime...' }); + await positron.runtime.selectLanguageRuntime(runtime.runtimeId); + + progress.report({ increment: 10, message: 'Getting R session...' }); + session = await positron.runtime.getForegroundSession(); } if (!session) { diff --git a/extensions/positron-supervisor/package.json b/extensions/positron-supervisor/package.json index 36a1d3cd2b9..50ec585b0ac 100644 --- a/extensions/positron-supervisor/package.json +++ b/extensions/positron-supervisor/package.json @@ -90,6 +90,12 @@ "title": "%command.reconnectSession.title%", "shortTitle": "%command.reconnectSession.title%", "enablement": "isDevelopment" + }, + { + "command": "positron.supervisor.restartSupervisor", + "category": "%command.positron.supervisor.category%", + "title": "%command.restartSupervisor.title%", + "shortTitle": "%command.restartSupervisor.title%" } ] }, @@ -109,7 +115,7 @@ }, "positron": { "binaryDependencies": { - "kallichore": "0.1.22" + "kallichore": "0.1.26" } }, "dependencies": { diff --git a/extensions/positron-supervisor/package.nls.json b/extensions/positron-supervisor/package.nls.json index b37e03e37a8..6e757e777dc 100644 --- a/extensions/positron-supervisor/package.nls.json +++ b/extensions/positron-supervisor/package.nls.json @@ -14,5 +14,6 @@ "configuration.sleepOnStartup.description": "Sleep for n seconds before starting up Jupyter kernel (when supported)", "command.positron.supervisor.category": "Kernel Supervisor", "command.showKernelSupervisorLog.title": "Show the Kernel Supervisor Log", - "command.reconnectSession.title": "Reconnect the current session" + "command.reconnectSession.title": "Reconnect the Current Session", + "command.restartSupervisor.title": "Restart the Kernel Supervisor" } diff --git a/extensions/positron-supervisor/src/AdoptedSession.ts b/extensions/positron-supervisor/src/AdoptedSession.ts new file mode 100644 index 00000000000..510031f2378 --- /dev/null +++ b/extensions/positron-supervisor/src/AdoptedSession.ts @@ -0,0 +1,76 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (C) 2024 Posit Software, PBC. All rights reserved. + * Licensed under the Elastic License 2.0. See LICENSE.txt for license information. + *--------------------------------------------------------------------------------------------*/ + +import { JupyterKernel, JupyterSession } from './jupyter-adapter'; +import { KallichoreSession } from './KallichoreSession'; +import { KernelInfoReply } from './jupyter/KernelInfoRequest'; +import { ConnectionInfo, DefaultApi, HttpError } from './kcclient/api'; +import { summarizeHttpError } from './util'; +import { Barrier } from './async'; + +/** + * Represents a Jupyter kernel that has been adopted by a supervisor. These + * sessions are typically started outside the control of the supervisor, and + * then adopted by the supervisor once started. + * + * Currently, only Reticulate kernels use this mechanism. + */ +export class AdoptedSession implements JupyterKernel { + private _runtimeInfo: KernelInfoReply | undefined; + + /// Whether the session is connected (or the connection has failed) + public connected = new Barrier(); + + /** + * Create a new adopted session. + * + * @param _session The Kallichore session + * @param _connectionInfo The connection information for the adopted session + * @param _api The Kallichore API instance + */ + constructor( + private readonly _session: KallichoreSession, + private readonly _connectionInfo: ConnectionInfo, + private readonly _api: DefaultApi + ) { + + } + + /** + * Connect to (adopt) the given session. + * + * @param session The session to connect to + */ + async connectToSession(session: JupyterSession): Promise { + try { + // Adopt the session via the API, using the connection information + this._runtimeInfo = (await this._api.adoptSession(session.state.sessionId, this._connectionInfo)).body; + } catch (err) { + const message = err instanceof HttpError ? summarizeHttpError(err) : err.message; + throw new Error(`Failed to adopt session: ${message}`); + } finally { + // Open the connected barrier to indicate we've finished connecting + // (or failed to) + this.connected.open(); + } + } + + /** + * Get the runtime information for the kernel. We know this information + * only if the session is connected. + */ + get runtimeInfo(): KernelInfoReply | undefined { + return this._runtimeInfo; + } + + /** + * Log a message to the session's output log. + * + * @param msg The message to log + */ + log(msg: string): void { + this._session.log(msg); + } +} diff --git a/extensions/positron-supervisor/src/KallichoreAdapterApi.ts b/extensions/positron-supervisor/src/KallichoreAdapterApi.ts index 34259ab9c27..4c1a428bbbe 100644 --- a/extensions/positron-supervisor/src/KallichoreAdapterApi.ts +++ b/extensions/positron-supervisor/src/KallichoreAdapterApi.ts @@ -15,7 +15,7 @@ import { JupyterKernelExtra, JupyterKernelSpec, JupyterLanguageRuntimeSession } import { KallichoreSession } from './KallichoreSession'; import { Barrier, PromiseHandles, withTimeout } from './async'; import { LogStreamer } from './LogStreamer'; -import { createUniqueId, summarizeHttpError } from './util'; +import { createUniqueId, summarizeError, summarizeHttpError } from './util'; const KALLICHORE_STATE_KEY = 'positron-supervisor.v1'; @@ -74,6 +74,13 @@ export class KCApi implements KallichoreAdapterApi { */ private _disposables: vscode.Disposable[] = []; + /** + * The terminal hosting the server, if we know it. We only know the + * terminal if it has been started in this session; reconnecting to an + * existing server doesn't give us the terminal. + */ + private _terminal: vscode.Terminal | undefined; + /** * Create a new Kallichore API object. * @@ -97,6 +104,10 @@ export class KCApi implements KallichoreAdapterApi { _context.subscriptions.push(vscode.commands.registerCommand('positron.supervisor.reconnectSession', () => { this.reconnectActiveSession(); })); + + _context.subscriptions.push(vscode.commands.registerCommand('positron.supervisor.restartSupervisor', () => { + this.restartSupervisor(); + })); } /** @@ -771,4 +782,56 @@ export class KCApi implements KallichoreAdapterApi { kallichoreSession.log('Disconnecting by user request', vscode.LogLevel.Info); kallichoreSession.disconnect(); } + + /** + * Restarts the supervisor, ending all sessions. + */ + private async restartSupervisor(): Promise { + + // If we never started the supervisor, just start it + if (!this._started.isOpen()) { + return this.ensureStarted(); + } + + this._log.appendLine('Restarting Kallichore server'); + + // Clean up all the sessions and mark them as exited + this._sessions.forEach(session => { + session.markExited(0, positron.RuntimeExitReason.Shutdown); + session.dispose(); + }); + this._sessions.length = 0; + + // Clear the workspace state so we don't try to reconnect to the old + // server + this._context.workspaceState.update(KALLICHORE_STATE_KEY, undefined); + + // Shut down the server itself + try { + await this._api.shutdownServer(); + } catch (err) { + // We can start a new server even if we failed to shut down the old + // one, so just log this error + const message = summarizeError(err); + this._log.appendLine(`Failed to shut down Kallichore server: ${message}`); + } + + // If we know the terminal, kill it + if (this._terminal) { + this._terminal.dispose(); + this._terminal = undefined; + } + + // Reset the start barrier + this._started = new Barrier(); + + // Start the new server + try { + await this.ensureStarted(); + vscode.window.showInformationMessage(vscode.l10n.t('Kernel supervisor successfully restarted')); + } catch (err) { + const message = err instanceof HttpError ? summarizeHttpError(err) : err; + vscode.window.showErrorMessage(vscode.l10n.t('Failed to restart kernel supervisor: {0}', err)); + } + } } diff --git a/extensions/positron-supervisor/src/KallichoreSession.ts b/extensions/positron-supervisor/src/KallichoreSession.ts index ef43ccab1ae..7a3dd379868 100644 --- a/extensions/positron-supervisor/src/KallichoreSession.ts +++ b/extensions/positron-supervisor/src/KallichoreSession.ts @@ -8,8 +8,8 @@ import * as positron from 'positron'; import * as os from 'os'; import * as path from 'path'; import * as fs from 'fs'; -import { JupyterKernelExtra, JupyterKernelSpec, JupyterLanguageRuntimeSession } from './jupyter-adapter'; -import { ActiveSession, DefaultApi, HttpError, InterruptMode, NewSession, Status } from './kcclient/api'; +import { JupyterKernelExtra, JupyterKernelSpec, JupyterLanguageRuntimeSession, JupyterSession } from './jupyter-adapter'; +import { ActiveSession, ConnectionInfo, DefaultApi, HttpError, InterruptMode, NewSession, Status } from './kcclient/api'; import { JupyterMessage } from './jupyter/JupyterMessage'; import { JupyterRequest } from './jupyter/JupyterRequest'; import { KernelInfoReply, KernelInfoRequest } from './jupyter/KernelInfoRequest'; @@ -37,7 +37,8 @@ import { DapClient } from './DapClient'; import { SocketSession } from './ws/SocketSession'; import { KernelOutputMessage } from './ws/KernelMessage'; import { UICommRequest } from './UICommRequest'; -import { createUniqueId, summarizeHttpError } from './util'; +import { createUniqueId, summarizeError, summarizeHttpError } from './util'; +import { AdoptedSession } from './AdoptedSession'; export class KallichoreSession implements JupyterLanguageRuntimeSession { /** @@ -94,6 +95,9 @@ export class KallichoreSession implements JupyterLanguageRuntimeSession { /** A map of pending comm startups */ private _startingComms: Map> = new Map(); + /** The original kernelspec */ + private _kernelSpec: JupyterKernelSpec | undefined; + /** * The channel to which output for this specific kernel is logged, if any */ @@ -184,6 +188,9 @@ export class KallichoreSession implements JupyterLanguageRuntimeSession { throw new Error(`Session ${this.metadata.sessionId} already exists`); } + // Save the kernel spec for later use + this._kernelSpec = kernelSpec; + // Forward the environment variables from the kernel spec const env = {}; if (kernelSpec.env) { @@ -716,6 +723,115 @@ export class KallichoreSession implements JupyterLanguageRuntimeSession { this._established.open(); } + /** + * Starts and then adopts a kernel owned by an external provider. + * + * @param kernelSpec The kernel spec to use for the session + * @returns The runtime info for the kernel + */ + async startAndAdoptKernel( + kernelSpec: JupyterKernelSpec): + Promise { + + // Mark the session as starting + this.onStateChange(positron.RuntimeState.Starting, 'starting kernel via external provider'); + + try { + const result = await this.tryStartAndAdoptKernel(kernelSpec); + return result; + } catch (err) { + // If we never made it to the "ready" state, mark the session as + // exited since we didn't ever start it fully. + if (this._runtimeState === positron.RuntimeState.Starting) { + const event: positron.LanguageRuntimeExit = { + runtime_name: this.runtimeMetadata.runtimeName, + exit_code: 0, + reason: positron.RuntimeExitReason.StartupFailed, + message: summarizeError(err) + }; + this._exit.fire(event); + this.onStateChange(positron.RuntimeState.Exited, 'kernel adoption failed'); + } + throw err; + } + } + + /** + * Tries to start and then adopt a kernel owned by an external provider. + * + * @param kernelSpec The kernel spec to use for the session + */ + async tryStartAndAdoptKernel(kernelSpec: JupyterKernelSpec): Promise { + + // Get the connection info for the session + const connectionFileContents = {}; + let connectionInfo: ConnectionInfo; + try { + // Read the connection info from the API. This arrives to us in the + // form of a `ConnectionInfo` object. + const result = await this._api.connectionInfo(this.metadata.sessionId); + connectionInfo = result.body; + + // The serialized form of the connection info is a JSON object with + // snake_case names, but ConnectionInfo uses camelCase. Use the map + // in ConnectionInfo to convert the names to snake_case for + // serialization. + for (const [inKey, val] of Object.entries(connectionInfo)) { + for (const outKey of ConnectionInfo.attributeTypeMap) { + if (inKey === outKey.name) { + connectionFileContents[outKey.baseName] = val; + } + } + } + } catch (err) { + throw new Error(`Failed to aquire connection info for session ${this.metadata.sessionId}: ${summarizeError(err)}`); + } + + // Ensure we have a log file + if (!this._kernelLogFile) { + const logFile = path.join(os.tmpdir(), `kernel-${this.metadata.sessionId}.log`); + this._kernelLogFile = logFile; + fs.writeFile(logFile, '', async () => { + await this.streamLogFile(logFile); + }); + } + + // Write the connection file to disk + const connectionFile = path.join(os.tmpdir(), `connection-${this.metadata.sessionId}.json`); + fs.writeFileSync(connectionFile, JSON.stringify(connectionFileContents)); + const session: JupyterSession = { + state: { + sessionId: this.metadata.sessionId, + connectionFile: connectionFile, + logFile: this._kernelLogFile, + processId: 0, + } + }; + + // Create the "kernel" + const kernel = new AdoptedSession(this, connectionInfo, this._api); + + // Start the kernel and wait for it to be ready + await kernelSpec.startKernel!(session, kernel); + + // Wait for session adoption to finish + await kernel.connected.wait(); + + // Connect to the session's websocket + await withTimeout(this.connect(), 2000, `Start failed: timed out connecting to adopted session ${this.metadata.sessionId}`); + + // Mark the session as ready + this.markReady('kernel adoption complete'); + + // Return the runtime info from the adopted session + const info = kernel.runtimeInfo; + if (info) { + return this.runtimeInfoFromKernelInfo(info); + } else { + return this.getKernelInfo(); + } + } + /** * Starts a previously established session. * @@ -725,6 +841,12 @@ export class KallichoreSession implements JupyterLanguageRuntimeSession { * @returns The kernel info for the session. */ async start(): Promise { + // If this session needs to be started by an external provider, do that + // instead of asking the supervisor to start it. + if (this._kernelSpec?.startKernel) { + return this.startAndAdoptKernel(this._kernelSpec); + } + try { // Attempt to start the session const info = await this.tryStart(); @@ -753,13 +875,11 @@ export class KallichoreSession implements JupyterLanguageRuntimeSession { // Attempt to extract a message from the error, or just // stringify it if it's not an Error - const message = - err instanceof HttpError ? summarizeHttpError(err) : err instanceof Error ? err.message : JSON.stringify(err); const event: positron.LanguageRuntimeExit = { runtime_name: this.runtimeMetadata.runtimeName, exit_code: 0, reason: positron.RuntimeExitReason.StartupFailed, - message + message: summarizeError(err) }; this._exit.fire(event); } diff --git a/extensions/positron-supervisor/src/kcclient/.openapi-generator/FILES b/extensions/positron-supervisor/src/kcclient/.openapi-generator/FILES index 29853d7ec5e..cc35ae39bd7 100644 --- a/extensions/positron-supervisor/src/kcclient/.openapi-generator/FILES +++ b/extensions/positron-supervisor/src/kcclient/.openapi-generator/FILES @@ -4,7 +4,6 @@ api/apis.ts api/defaultApi.ts git_push.sh model/activeSession.ts -model/adoptedSession.ts model/connectionInfo.ts model/executionQueue.ts model/interruptMode.ts diff --git a/extensions/positron-supervisor/src/kcclient/api/defaultApi.ts b/extensions/positron-supervisor/src/kcclient/api/defaultApi.ts index 3cd6bed4a83..49addd14271 100644 --- a/extensions/positron-supervisor/src/kcclient/api/defaultApi.ts +++ b/extensions/positron-supervisor/src/kcclient/api/defaultApi.ts @@ -16,7 +16,7 @@ import http from 'http'; /* tslint:disable:no-unused-locals */ import { ActiveSession } from '../model/activeSession'; -import { AdoptedSession } from '../model/adoptedSession'; +import { ConnectionInfo } from '../model/connectionInfo'; import { ModelError } from '../model/modelError'; import { NewSession } from '../model/newSession'; import { NewSession200Response } from '../model/newSession200Response'; @@ -102,10 +102,12 @@ export class DefaultApi { /** * * @summary Adopt an existing session - * @param adoptedSession + * @param sessionId + * @param connectionInfo */ - public async adoptSession (adoptedSession: AdoptedSession, options: {headers: {[name: string]: string}} = {headers: {}}) : Promise<{ response: http.IncomingMessage; body: NewSession200Response; }> { - const localVarPath = this.basePath + '/sessions/adopt'; + public async adoptSession (sessionId: string, connectionInfo: ConnectionInfo, options: {headers: {[name: string]: string}} = {headers: {}}) : Promise<{ response: http.IncomingMessage; body: any; }> { + const localVarPath = this.basePath + '/sessions/{session_id}/adopt' + .replace('{' + 'session_id' + '}', encodeURIComponent(String(sessionId))); let localVarQueryParameters: any = {}; let localVarHeaderParams: any = (Object).assign({}, this._defaultHeaders); const produces = ['application/json']; @@ -117,9 +119,14 @@ export class DefaultApi { } let localVarFormParams: any = {}; - // verify required parameter 'adoptedSession' is not null or undefined - if (adoptedSession === null || adoptedSession === undefined) { - throw new Error('Required parameter adoptedSession was null or undefined when calling adoptSession.'); + // verify required parameter 'sessionId' is not null or undefined + if (sessionId === null || sessionId === undefined) { + throw new Error('Required parameter sessionId was null or undefined when calling adoptSession.'); + } + + // verify required parameter 'connectionInfo' is not null or undefined + if (connectionInfo === null || connectionInfo === undefined) { + throw new Error('Required parameter connectionInfo was null or undefined when calling adoptSession.'); } (Object).assign(localVarHeaderParams, options.headers); @@ -133,7 +140,7 @@ export class DefaultApi { uri: localVarPath, useQuerystring: this._useQuerystring, json: true, - body: ObjectSerializer.serialize(adoptedSession, "AdoptedSession") + body: ObjectSerializer.serialize(connectionInfo, "ConnectionInfo") }; let authenticationPromise = Promise.resolve(); @@ -152,13 +159,13 @@ export class DefaultApi { localVarRequestOptions.form = localVarFormParams; } } - return new Promise<{ response: http.IncomingMessage; body: NewSession200Response; }>((resolve, reject) => { + return new Promise<{ response: http.IncomingMessage; body: any; }>((resolve, reject) => { localVarRequest(localVarRequestOptions, (error, response, body) => { if (error) { reject(error); } else { if (response.statusCode && response.statusCode >= 200 && response.statusCode <= 299) { - body = ObjectSerializer.deserialize(body, "NewSession200Response"); + body = ObjectSerializer.deserialize(body, "any"); resolve({ response: response, body: body }); } else { reject(new HttpError(response, body, response.statusCode)); @@ -236,6 +243,75 @@ export class DefaultApi { }); }); } + /** + * + * @summary Get Jupyter connection information for the session + * @param sessionId + */ + public async connectionInfo (sessionId: string, options: {headers: {[name: string]: string}} = {headers: {}}) : Promise<{ response: http.IncomingMessage; body: ConnectionInfo; }> { + const localVarPath = this.basePath + '/sessions/{session_id}/connection_info' + .replace('{' + 'session_id' + '}', encodeURIComponent(String(sessionId))); + let localVarQueryParameters: any = {}; + let localVarHeaderParams: any = (Object).assign({}, this._defaultHeaders); + const produces = ['application/json']; + // give precedence to 'application/json' + if (produces.indexOf('application/json') >= 0) { + localVarHeaderParams.Accept = 'application/json'; + } else { + localVarHeaderParams.Accept = produces.join(','); + } + let localVarFormParams: any = {}; + + // verify required parameter 'sessionId' is not null or undefined + if (sessionId === null || sessionId === undefined) { + throw new Error('Required parameter sessionId was null or undefined when calling connectionInfo.'); + } + + (Object).assign(localVarHeaderParams, options.headers); + + let localVarUseFormData = false; + + let localVarRequestOptions: localVarRequest.Options = { + method: 'GET', + qs: localVarQueryParameters, + headers: localVarHeaderParams, + uri: localVarPath, + useQuerystring: this._useQuerystring, + json: true, + }; + + let authenticationPromise = Promise.resolve(); + authenticationPromise = authenticationPromise.then(() => this.authentications.default.applyToRequest(localVarRequestOptions)); + + let interceptorPromise = authenticationPromise; + for (const interceptor of this.interceptors) { + interceptorPromise = interceptorPromise.then(() => interceptor(localVarRequestOptions)); + } + + return interceptorPromise.then(() => { + if (Object.keys(localVarFormParams).length) { + if (localVarUseFormData) { + (localVarRequestOptions).formData = localVarFormParams; + } else { + localVarRequestOptions.form = localVarFormParams; + } + } + return new Promise<{ response: http.IncomingMessage; body: ConnectionInfo; }>((resolve, reject) => { + localVarRequest(localVarRequestOptions, (error, response, body) => { + if (error) { + reject(error); + } else { + if (response.statusCode && response.statusCode >= 200 && response.statusCode <= 299) { + body = ObjectSerializer.deserialize(body, "ConnectionInfo"); + resolve({ response: response, body: body }); + } else { + reject(new HttpError(response, body, response.statusCode)); + } + } + }); + }); + }); + } /** * * @summary Delete session diff --git a/extensions/positron-supervisor/src/kcclient/model/models.ts b/extensions/positron-supervisor/src/kcclient/model/models.ts index dd9ed9fd296..b51c48b42e4 100644 --- a/extensions/positron-supervisor/src/kcclient/model/models.ts +++ b/extensions/positron-supervisor/src/kcclient/model/models.ts @@ -1,7 +1,6 @@ import localVarRequest from 'request'; export * from './activeSession'; -export * from './adoptedSession'; export * from './connectionInfo'; export * from './executionQueue'; export * from './interruptMode'; @@ -27,7 +26,6 @@ export type RequestFile = string | Buffer | fs.ReadStream | RequestDetailedFile; import { ActiveSession } from './activeSession'; -import { AdoptedSession } from './adoptedSession'; import { ConnectionInfo } from './connectionInfo'; import { ExecutionQueue } from './executionQueue'; import { InterruptMode } from './interruptMode'; @@ -58,7 +56,6 @@ let enumsMap: {[index: string]: any} = { let typeMap: {[index: string]: any} = { "ActiveSession": ActiveSession, - "AdoptedSession": AdoptedSession, "ConnectionInfo": ConnectionInfo, "ExecutionQueue": ExecutionQueue, "ModelError": ModelError, diff --git a/extensions/positron-supervisor/src/util.ts b/extensions/positron-supervisor/src/util.ts index 6b1e1a664ee..553c4e00b73 100644 --- a/extensions/positron-supervisor/src/util.ts +++ b/extensions/positron-supervisor/src/util.ts @@ -15,6 +15,28 @@ export function createUniqueId(): string { return Math.floor(Math.random() * 0x100000000).toString(16); } +/** + * Summarizes an error into a human-readable string. Used for serializing + * errors reported across the Positron API boundary. + * + * @param err An error to summarize. + * @returns A human-readable string summarizing the error. + */ +export function summarizeError(err: any): string { + if (err instanceof HttpError) { + // HTTP errors are common and should be summarized + return summarizeHttpError(err); + } else if (err instanceof Error) { + // Other errors should be summarized as their message + return err.message; + } else if (typeof err === 'string') { + // Strings are returned as-is + return err; + } + // For anything else, return the JSON representation + return JSON.stringify(err); +} + /** * Summarizes an HTTP error into a human-readable string. Used for serializing * structured errors reported up to Positron where only a string can be