diff --git a/src/constants.ts b/src/constants.ts index f6eda4b..2277a74 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -18,6 +18,8 @@ import type { Preset } from './types'; +export const PRE_SSE_VERSION = '2.30.0'; + // copied from https://github.com/crc-org/crc/blob/632676d7c9ba0c030736c3d914984c4e140c1bf5/pkg/crc/constants/constants.go#L198 export function getDefaultCPUs(preset: Preset): number { diff --git a/src/crc-delete.ts b/src/crc-delete.ts index 02b211a..39e44c2 100644 --- a/src/crc-delete.ts +++ b/src/crc-delete.ts @@ -33,7 +33,7 @@ export function registerDeleteCommand(): void { } export async function deleteCrc(suppressNotification = false): Promise { - if (crcStatus.status.CrcStatus === 'No Cluster') { + if (crcStatus.status.CrcStatus === 'NoVM') { if (!suppressNotification) { await extensionApi.window.showNotification({ silent: false, diff --git a/src/crc-status.ts b/src/crc-status.ts index d1a2939..d68120a 100644 --- a/src/crc-status.ts +++ b/src/crc-status.ts @@ -18,17 +18,23 @@ import * as extensionApi from '@podman-desktop/api'; import type { Status, CrcStatus as CrcStatusApi } from './types'; -import { commander } from './daemon-commander'; +import { commander, getCrcApiUrl } from './daemon-commander'; import { isNeedSetup } from './crc-setup'; +import { EventSource } from './events/eventsource'; +import type { CrcVersion } from './crc-cli'; +import { compare } from 'compare-versions'; +import { PRE_SSE_VERSION } from './constants'; const defaultStatus: Status = { CrcStatus: 'Unknown', Preset: 'openshift' }; const setupStatus: Status = { CrcStatus: 'Need Setup', Preset: 'Unknown' }; const errorStatus: Status = { CrcStatus: 'Error', Preset: 'Unknown' }; export class CrcStatus { - private updateTimer: NodeJS.Timer; private _status: Status; private isSetupGoing: boolean; + private statusEventSource: EventSource; + private crcVersion: CrcVersion; + private updateTimer: NodeJS.Timer; private statusChangeEventEmitter = new extensionApi.EventEmitter(); public readonly onStatusChange = this.statusChangeEventEmitter.event; @@ -37,30 +43,61 @@ export class CrcStatus { } startStatusUpdate(): void { - if (this.updateTimer) { - return; // we already set timer - } - this.updateTimer = setInterval(async () => { - try { - // we don't need to update status while setup is going - if (this.isSetupGoing) { - this._status = createStatus('Starting', this._status.Preset); - return; + if (compare(this.crcVersion.version, PRE_SSE_VERSION, '<=')) { + if (this.updateTimer) { + return; // we already set timer + } + this.updateTimer = setInterval(async () => { + try { + // we don't need to update status while setup is going + if (this.isSetupGoing) { + this._status = createStatus('Starting', this._status.Preset); + return; + } + const oldStatus = this._status; + this._status = await commander.status(); + // notify listeners when status changed + if (oldStatus.CrcStatus !== this._status.CrcStatus) { + this.statusChangeEventEmitter.fire(this._status); + } + } catch (e) { + console.error('CRC Status tick: ' + e); + this._status = defaultStatus; } - const oldStatus = this._status; - this._status = await commander.status(); - // notify listeners when status changed - if (oldStatus.CrcStatus !== this._status.CrcStatus) { + }, 2000); + } else { + if (this.statusEventSource) { + return; + } + + this.statusEventSource = new EventSource(getCrcApiUrl() + '/events?stream=status_change'); + this.statusEventSource.on('status_change', (e: MessageEvent) => { + const data = e.data; + try { + if (this.isSetupGoing) { + this._status = createStatus('Starting', this._status.Preset); + return; + } + console.error(`On Status: ${data}`); + this._status = JSON.parse(data).status; this.statusChangeEventEmitter.fire(this._status); + } catch (err) { + console.error(err); + this._status = defaultStatus; } - } catch (e) { - console.error('CRC Status tick: ' + e); + }); + this.statusEventSource.on('error', e => { + console.error(e); this._status = defaultStatus; - } - }, 2000); + }); + } } stopStatusUpdate(): void { + if (this.statusEventSource) { + this.statusEventSource.close(); + this.statusEventSource = undefined; + } if (this.updateTimer) { clearInterval(this.updateTimer); } @@ -74,7 +111,8 @@ export class CrcStatus { this._status = errorStatus; } - async initialize(): Promise { + async initialize(crcVersion: CrcVersion): Promise { + this.crcVersion = crcVersion; if (isNeedSetup) { this._status = setupStatus; return; @@ -107,7 +145,7 @@ export class CrcStatus { case 'Stopping': return 'stopping'; case 'Stopped': - case 'No Cluster': + case 'NoVM': return 'stopped'; default: return 'unknown'; @@ -124,7 +162,7 @@ export class CrcStatus { return 'stopping'; case 'Stopped': return 'configured'; - case 'No Cluster': + case 'NoVM': return 'installed'; case 'Error': return 'error'; diff --git a/src/daemon-commander.ts b/src/daemon-commander.ts index 8ae8fd6..c69a322 100644 --- a/src/daemon-commander.ts +++ b/src/daemon-commander.ts @@ -21,15 +21,18 @@ import got from 'got'; import { isWindows } from './util'; import type { ConfigKeys, Configuration, StartInfo, Status } from './types'; +export function getCrcApiUrl(): string { + if (isWindows()) { + return 'http://unix://?/pipe/crc-http:'; + } + return `http://unix:${process.env.HOME}/.crc/crc-http.sock:`; +} + export class DaemonCommander { private apiPath: string; constructor() { - this.apiPath = `http://unix:${process.env.HOME}/.crc/crc-http.sock:/api`; - - if (isWindows()) { - this.apiPath = 'http://unix://?/pipe/crc-http:/api'; - } + this.apiPath = getCrcApiUrl() + '/api'; } async status(): Promise { @@ -41,7 +44,7 @@ export class DaemonCommander { } catch (error) { // ignore status error, as it may happen when no cluster created return { - CrcStatus: 'No Cluster', + CrcStatus: 'NoVM', }; } } diff --git a/src/events/eventsource.ts b/src/events/eventsource.ts new file mode 100644 index 0000000..197e8e3 --- /dev/null +++ b/src/events/eventsource.ts @@ -0,0 +1,267 @@ +/********************************************************************** + * Copyright (C) 2023 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ***********************************************************************/ + +import EventEmitter from 'node:events'; +import type { Request, Response } from 'got'; +import got from 'got'; + +const maxBufferAheadAllocation = 1024 * 256; +const bom = [239, 187, 191]; +const colon = 58; +const space = 32; +const lineFeed = 10; +const carriageReturn = 13; + +export interface EvenSourceOptions { + /** + * A boolean value, defaulting to `false`, indicating if CORS should be set to include credentials. + */ + withCredentials?: boolean; +} + +enum ReadyState { + CONNECTING = 0, + OPEN = 1, + CLOSED = 2, +} + +export interface Event { + readonly type: string; +} + +export interface ErrorEvent extends Event { + readonly message: string; + readonly status?: number; +} + +export interface MessageEvent extends Event { + data: string; + lastEventId?: string; + origin?: string; +} + +export class EventSource extends EventEmitter { + readonly CONNECTING: ReadyState.CONNECTING; + readonly OPEN: ReadyState.OPEN; + readonly CLOSED: ReadyState.CLOSED; + + readyState: ReadyState; + + private req: Request | undefined; + private data = ''; + private eventName = ''; + private lastEventId = ''; + + constructor(public readonly url: string, private options?: EvenSourceOptions) { + super(); + this.readyState = ReadyState.CONNECTING; + const req = got.stream(this.url, { isStream: true }); + this.req = req; + req.on('response', (res: Response) => { + if (res.statusCode === 500 || res.statusCode === 502 || res.statusCode === 503 || res.statusCode === 504) { + this.emit('error', newErrorEvent(res.statusMessage, res.statusCode)); + this.onConnectionClosed(); + return; + } + + // TODO: should we handle redirects? + + if (res.statusCode !== 200) { + this.emit('error', newErrorEvent(res.statusMessage, res.statusCode)); + this.close(); + return; + } + + this.readyState = ReadyState.OPEN; + + req.on('end', () => { + req.removeAllListeners('close'); + req.removeAllListeners('end'); + this.onConnectionClosed('Stream ended'); + }); + + req.on('close', () => { + req.removeAllListeners('close'); + req.removeAllListeners('end'); + this.onConnectionClosed('Stream closed'); + }); + + this.emit('open', { type: 'open' }); + }); + + let buf: Buffer; + let newBuffer: Buffer; + let startingPos = 0; + let startingFieldLength = -1; + let newBufferSize = 0; + let bytesUsed = 0; + let discardTrailingNewline = false; + + req.on('data', chunk => { + if (!buf) { + buf = chunk; + if (hasBom(buf)) { + buf = buf.subarray(bom.length); + } + bytesUsed = buf.length; + } else { + if (chunk.length > buf.length - bytesUsed) { + newBufferSize = buf.length * 2 + chunk.length; + if (newBufferSize > maxBufferAheadAllocation) { + newBufferSize = buf.length + chunk.length + maxBufferAheadAllocation; + } + newBuffer = Buffer.alloc(newBufferSize); + buf.copy(newBuffer, 0, 0, bytesUsed); + buf = newBuffer; + } + chunk.copy(buf, bytesUsed); + bytesUsed += chunk.length; + } + + let pos = 0; + const length = bytesUsed; + + while (pos < length) { + if (discardTrailingNewline) { + if (buf[pos] === lineFeed) { + ++pos; + } + discardTrailingNewline = false; + } + + let lineLength = -1; + let fieldLength = startingFieldLength; + let c: number; + + for (let i = startingPos; lineLength < 0 && i < length; ++i) { + c = buf[i]; + if (c === colon) { + if (fieldLength < 0) { + fieldLength = i - pos; + } + } else if (c === carriageReturn) { + discardTrailingNewline = true; + lineLength = i - pos; + } else if (c === lineFeed) { + lineLength = i - pos; + } + } + + if (lineLength < 0) { + startingPos = length - pos; + startingFieldLength = fieldLength; + break; + } else { + startingPos = 0; + startingFieldLength = -1; + } + + this.parseEventStreamLine(buf, pos, fieldLength, lineLength); + + pos += lineLength + 1; + } + + if (pos === length) { + buf = void 0; + bytesUsed = 0; + } else if (pos > 0) { + buf = buf.subarray(pos, bytesUsed); + bytesUsed = buf.length; + } + }); + + req.on('error', err => { + this.onConnectionClosed(err.message); + }); + } + + close() { + if (this.readyState === ReadyState.CLOSED) { + return; + } + + this.readyState = ReadyState.CLOSED; + this.req?.destroy(); + } + + private onConnectionClosed(message?: string): void { + if (this.readyState === ReadyState.CLOSED) { + return; + } + + this.emit('error', newErrorEvent(message)); + } + + private parseEventStreamLine(buf: Buffer, pos: number, fieldLength: number, lineLength: number): void { + if (lineLength === 0) { + if (this.data.length > 0) { + const type = this.eventName || 'message'; + this.emit(type, newMessageEvent(type, this.data, this.lastEventId, new URL(this.url).origin)); + this.data = ''; + } + this.eventName = void 0; + } else if (fieldLength > 0) { + const noValue = fieldLength < 0; + let step = 0; + const field = buf.subarray(pos, pos + (noValue ? lineLength : fieldLength)).toString(); + + if (noValue) { + step = lineLength; + } else if (buf[pos + fieldLength + 1] !== space) { + step = fieldLength + 1; + } else { + step = fieldLength + 2; + } + pos += step; + + const valueLength = lineLength - step; + const value = buf.subarray(pos, pos + valueLength).toString(); + + if (field === 'data') { + this.data += value + '\n'; + } else if (field === 'event') { + this.eventName = value; + } else if (field === 'id') { + this.lastEventId = value; + } + } + } +} + +function newErrorEvent(message: string, status?: number): ErrorEvent { + return { + type: 'error', + message, + status, + }; +} + +function newMessageEvent(type: string, data: string, lastEventId, origin: string): MessageEvent { + return { + type, + data, + lastEventId, + origin, + }; +} + +function hasBom(buf: Buffer): boolean { + return bom.every((charCode, index) => { + return buf[index] === charCode; + }); +} diff --git a/src/extension.ts b/src/extension.ts index 798b298..7e4e70f 100644 --- a/src/extension.ts +++ b/src/extension.ts @@ -65,7 +65,7 @@ export async function activate(extensionContext: extensionApi.ExtensionContext): if (!isNeedSetup) { await connectToCrc(); } else { - crcStatus.initialize(); + crcStatus.initialize(crcVersion); } hasDaemonRunning = await isDaemonRunning(); } @@ -119,7 +119,7 @@ export async function activate(extensionContext: extensionApi.ExtensionContext): registerProviderLifecycle(provider, extensionContext, telemetryLogger); } - if (crcStatus.getProviderStatus() === 'installed' || crcStatus.status.CrcStatus === 'No Cluster') { + if (crcStatus.getProviderStatus() === 'installed' || crcStatus.status.CrcStatus === 'NoVM') { registerProviderConnectionFactory(provider, extensionContext, telemetryLogger); } @@ -131,12 +131,12 @@ export async function activate(extensionContext: extensionApi.ExtensionContext): } // if no need to setup we could add commands - if (!isNeedSetup) { + if (!isNeedSetup && crcStatus.status.CrcStatus !== 'NoVM') { addCommands(telemetryLogger); } // no need to setup and crc has cluster - if (!isNeedSetup && crcStatus.status.CrcStatus !== 'No Cluster') { + if (!isNeedSetup && crcStatus.status.CrcStatus !== 'NoVM') { presetChanged(provider, extensionContext, telemetryLogger); } else { // else get preset from cli as setup is not finished and daemon may not running @@ -261,7 +261,7 @@ async function createCrcVm( logger: extensionApi.Logger, ): Promise { // we already have an instance - if (crcStatus.status.CrcStatus !== 'No Cluster' && crcStatus.status.CrcStatus !== 'Need Setup') { + if (crcStatus.status.CrcStatus !== 'NoVM' && crcStatus.status.CrcStatus !== 'Need Setup') { return; } @@ -410,7 +410,7 @@ async function readPreset(): Promise { } async function connectToCrc(): Promise { - await crcStatus.initialize(); + await crcStatus.initialize(crcVersion); crcStatus.startStatusUpdate(); } diff --git a/src/log-provider.ts b/src/log-provider.ts index cccb72d..cfb65ad 100644 --- a/src/log-provider.ts +++ b/src/log-provider.ts @@ -18,28 +18,79 @@ import type { Logger } from '@podman-desktop/api'; import type { DaemonCommander } from './daemon-commander'; -import { commander } from './daemon-commander'; +import { commander, getCrcApiUrl } from './daemon-commander'; +import type { MessageEvent } from './events/eventsource'; +import { EventSource } from './events/eventsource'; +import { compare } from 'compare-versions'; +import { PRE_SSE_VERSION } from './constants'; +import { getCrcVersion } from './crc-cli'; + +interface LogMessage { + level: 'info' | 'error' | 'warning' | 'debug'; + msg: string; + time: string; +} export class LogProvider { private timeout: NodeJS.Timeout; + private logEvents: EventSource; + constructor(private readonly commander: DaemonCommander) {} async startSendingLogs(logger: Logger): Promise { - let lastLogLine = 0; - this.timeout = setInterval(async () => { - try { - const logs = await this.commander.logs(); - const logsDiff: string[] = logs.Messages.slice(lastLogLine, logs.Messages.length - 1); - lastLogLine = logs.Messages.length; - logger.log(logsDiff.join('\n')); - } catch (e) { - console.log('Logs tick: ' + e); - } - }, 3000); + const crcVersion = await getCrcVersion(); + if (!crcVersion) { + console.warn("Can't get CRC CLI version, ignore getting daemons logs..."); + return; + } + if (compare(crcVersion.version, PRE_SSE_VERSION, '<=')) { + let lastLogLine = 0; + this.timeout = setInterval(async () => { + try { + const logs = await this.commander.logs(); + const logsDiff: string[] = logs.Messages.slice(lastLogLine, logs.Messages.length - 1); + lastLogLine = logs.Messages.length; + logger.log(logsDiff.join('\n')); + } catch (e) { + console.log('Logs tick: ' + e); + } + }, 3000); + } else { + this.logEvents = new EventSource(getCrcApiUrl() + '/events?stream=logs'); + this.logEvents.on('logs', (e: MessageEvent) => { + const logMessage = JSON.parse(e.data) as LogMessage; + switch (logMessage.level) { + case 'debug': + logger.warn(logMessage.msg + '\n'); + break; + case 'info': + logger.log(logMessage.msg + '\n'); + break; + case 'error': + logger.error(logMessage.msg + '\n'); + break; + case 'warning': + logger.warn(logMessage.msg + '\n'); + break; + default: + console.error(`Unknown log level: ${logMessage.level}`); + logger.warn(logMessage.msg + '\n'); + break; + } + }); + this.logEvents.on('error', err => { + console.error(err); + }); + } } stopSendingLogs(): void { - clearInterval(this.timeout); + if (this.timeout) { + clearInterval(this.timeout); + } + if (this.logEvents) { + this.logEvents.close(); + } } } diff --git a/src/preferences.ts b/src/preferences.ts index e0951f7..4628463 100644 --- a/src/preferences.ts +++ b/src/preferences.ts @@ -301,7 +301,7 @@ async function handleRecreate( provider: extensionApi.Provider, telemetryLogger: extensionApi.TelemetryLogger, ): Promise { - const needDelete = crcStatus.status.CrcStatus !== 'No Cluster'; + const needDelete = crcStatus.status.CrcStatus !== 'NoVM'; const needStop = crcStatus.getProviderStatus() === 'started' || crcStatus.getProviderStatus() === 'starting'; const buttons = ['Cancel']; diff --git a/src/types.ts b/src/types.ts index f69c9d4..cf1634f 100644 --- a/src/types.ts +++ b/src/types.ts @@ -16,15 +16,7 @@ * SPDX-License-Identifier: Apache-2.0 ***********************************************************************/ -export type CrcStatus = - | 'Running' - | 'Starting' - | 'Stopping' - | 'Stopped' - | 'No Cluster' - | 'Error' - | 'Unknown' - | 'Need Setup'; +export type CrcStatus = 'Running' | 'Starting' | 'Stopping' | 'Stopped' | 'NoVM' | 'Error' | 'Unknown' | 'Need Setup'; export interface Status { readonly CrcStatus: CrcStatus;