diff --git a/packages/serverless-api/src/api/logs.ts b/packages/serverless-api/src/api/logs.ts index 647474ea..61f83d97 100644 --- a/packages/serverless-api/src/api/logs.ts +++ b/packages/serverless-api/src/api/logs.ts @@ -5,11 +5,39 @@ import { LogApiResource, LogList, Sid, LogFilters } from '../types'; import { TwilioServerlessApiClient } from '../client'; import { getPaginatedResource } from './utils/pagination'; import { ClientApiError } from '../utils/error'; +import { OptionsOfJSONResponseBody } from 'got'; const log = debug('twilio-serverless-api:logs'); +function urlWithFilters( + environmentSid: Sid, + serviceSid: Sid, + filters: LogFilters = {} +): string { + const pageSize = filters.pageSize || 50; + const { functionSid, startDate, endDate, pageToken } = filters; + let url = `Services/${serviceSid}/Environments/${environmentSid}/Logs?PageSize=${pageSize}`; + if (typeof functionSid !== 'undefined') { + url += `&FunctionSid=${functionSid}`; + } + if (typeof startDate !== 'undefined') { + url += `&StartDate=${ + startDate instanceof Date ? startDate.toISOString() : startDate + }`; + } + if (typeof endDate !== 'undefined') { + url += `&EndDate=${ + endDate instanceof Date ? endDate.toISOString() : endDate + }`; + } + if (typeof pageToken !== 'undefined') { + url += `&PageToken=${pageToken}`; + } + return url; +} + /** - * Calls the API to retrieve a list of all assets + * Calls the API to retrieve a list of all logs * * @param {Sid} environmentSid environment in which to get logs * @param {Sid} serviceSid service to look for logs @@ -24,7 +52,7 @@ export async function listLogResources( try { return getPaginatedResource( client, - `Services/${serviceSid}/Environments/${environmentSid}/Logs` + urlWithFilters(environmentSid, serviceSid) ); } catch (err) { log('%O', new ClientApiError(err)); @@ -33,7 +61,7 @@ export async function listLogResources( } /** - * Calls the API to retrieve a list of all assets + * Calls the API to retrieve one page of a list of logs * * @param {Sid} environmentSid environment in which to get logs * @param {Sid} serviceSid service to look for logs @@ -46,28 +74,10 @@ export async function listOnePageLogResources( client: TwilioServerlessApiClient, filters: LogFilters ): Promise { - const pageSize = filters.pageSize || 50; - const { functionSid, startDate, endDate, pageToken } = filters; + const url = urlWithFilters(environmentSid, serviceSid, filters); try { - let url = `Services/${serviceSid}/Environments/${environmentSid}/Logs?PageSize=${pageSize}`; - if (typeof functionSid !== 'undefined') { - url += `&FunctionSid=${functionSid}`; - } - if (typeof startDate !== 'undefined') { - url += `&StartDate=${ - startDate instanceof Date ? startDate.toISOString() : startDate - }`; - } - if (typeof endDate !== 'undefined') { - url += `&EndDate=${ - endDate instanceof Date ? endDate.toISOString() : endDate - }`; - } - if (typeof pageToken !== 'undefined') { - url += `&PageToken=${pageToken}`; - } const resp = await client.request('get', url); - const content = (resp.body as unknown) as LogList; + const content = resp.body as unknown as LogList; return content.logs as LogApiResource[]; } catch (err) { log('%O', new ClientApiError(err)); @@ -76,7 +86,41 @@ export async function listOnePageLogResources( } /** - * Calls the API to retrieve a list of all assets + * Calls the API to retrieve a paginated list of logs + * + * @param {Sid} environmentSid environment in which to get logs + * @param {Sid} serviceSid service to look for logs + * @param {TwilioServerlessApiClient} client API client + * @param {LogFilters} filters filters to apply to the request + * @param {string} nextPageUrl if you have a next page url, use it + * @returns {Promise} + */ +export async function listPaginatedLogs( + environmentSid: Sid, + serviceSid: Sid, + client: TwilioServerlessApiClient, + filters: LogFilters = {}, + nextPageUrl?: string +): Promise { + try { + const opts: OptionsOfJSONResponseBody = { responseType: 'json' }; + let url = nextPageUrl; + if (typeof url === 'undefined') { + url = urlWithFilters(environmentSid, serviceSid, filters); + } + if (url.startsWith('http')) { + opts.prefixUrl = ''; + } + const resp = await client.request('get', url, opts); + return resp.body as unknown as LogList; + } catch (err) { + log('%O', new ClientApiError(err)); + throw err; + } +} + +/** + * Calls the API to retrieve a single log resource * * @param {Sid} logSid SID of log to retrieve * @param {Sid} environmentSid environment in which to get logs @@ -95,7 +139,7 @@ export async function getLog( 'get', `Services/${serviceSid}/Environments/${environmentSid}/Logs/${logSid}` ); - return (resp.body as unknown) as LogApiResource; + return resp.body as unknown as LogApiResource; } catch (err) { log('%O', new ClientApiError(err)); throw err; diff --git a/packages/serverless-api/src/streams/logs.ts b/packages/serverless-api/src/streams/logs.ts index b1cfb651..d54cbd40 100644 --- a/packages/serverless-api/src/streams/logs.ts +++ b/packages/serverless-api/src/streams/logs.ts @@ -1,15 +1,28 @@ import { Readable } from 'stream'; -import { listOnePageLogResources } from '../api/logs'; +import { listPaginatedLogs } from '../api/logs'; import { TwilioServerlessApiClient } from '../client'; import { Sid } from '../types'; import { LogsConfig } from '../types/logs'; +import debug from 'debug'; + +const log = debug('twilio-serverless-api:client:logs'); + +const pollsBeforeBackOff = 10; +const defaultPollingFrequency = 1000; +// This default max allows the command to get to polling once every 32 seconds +const defaultMaxPollingFrequency = 30000; +const defaultLogCacheSize = 1000; export class LogsStream extends Readable { + private _initialPollingFrequency: number; private _pollingFrequency: number; + private _maxPollingFrequency: number; + private _pollsWithoutResults: number; private _pollingCacheSize: number; private _interval: NodeJS.Timeout | undefined; private _viewedSids: Set; private _viewedLogs: Array<{ sid: Sid; dateCreated: Date }>; + private _paginating: boolean; constructor( private environmentSid: Sid, @@ -21,8 +34,13 @@ export class LogsStream extends Readable { this._interval = undefined; this._viewedSids = new Set(); this._viewedLogs = []; - this._pollingFrequency = config.pollingFrequency || 1000; - this._pollingCacheSize = config.logCacheSize || 1000; + this._pollingFrequency = this._initialPollingFrequency = + config.pollingFrequency || defaultPollingFrequency; + this._maxPollingFrequency = + config.maxPollingFrequency || defaultMaxPollingFrequency; + this._pollsWithoutResults = 0; + this._pollingCacheSize = config.logCacheSize || defaultLogCacheSize; + this._paginating = false; } set pollingFrequency(frequency: number) { @@ -37,7 +55,12 @@ export class LogsStream extends Readable { async _poll() { try { - const logs = await listOnePageLogResources( + if (this._paginating) { + // We are going back through older logs that have been missed between + // polls, so don't start a new poll of the latest logs yet. + return; + } + let logPage = await listPaginatedLogs( this.environmentSid, this.serviceSid, this.client, @@ -46,12 +69,40 @@ export class LogsStream extends Readable { pageSize: this.config.limit, } ); - logs - .filter(log => !this._viewedSids.has(log.sid)) - .reverse() - .forEach(log => { + let logs = logPage.logs; + let unviewedLogs = logs.filter((log) => !this._viewedSids.has(log.sid)); + if (this._viewedSids.size > 0) { + // if we have seen some logs, we need to check if more than one page of + // logs are new. + while ( + unviewedLogs.length === logs.length && + logPage.meta.next_page_url + ) { + // all of the logs are new, so we should get the next page + this._paginating = true; + logPage = await listPaginatedLogs( + this.environmentSid, + this.serviceSid, + this.client, + {}, + logPage.meta.next_page_url + ); + unviewedLogs = unviewedLogs.concat( + logPage.logs.filter((log) => !this._viewedSids.has(log.sid)) + ); + logs = logs.concat(logPage.logs); + } + } + if (unviewedLogs.length > 0) { + // We got new logs, make sure we are polling at the base frequency + this._resetPollingFrequency(); + unviewedLogs.reverse().forEach((log) => { this.push(log); }); + } else { + // No new logs this time, so maybe back off polling + this._backOffPolling(); + } // The logs endpoint is not reliably returning logs in the same order // Therefore we need to keep a set of all previously seen log entries @@ -68,26 +119,30 @@ export class LogsStream extends Readable { // and new logs by stringifying the sid and the date together. const viewedLogsSet = new Set([ ...this._viewedLogs.map( - log => `${log.sid}-${log.dateCreated.toISOString()}` + (log) => `${log.sid}-${log.dateCreated.toISOString()}` ), - ...logs.map(log => `${log.sid}-${log.date_created}`), + ...logs.map((log) => `${log.sid}-${log.date_created}`), ]); // Then we take that set, map over the logs and split them up into sid and // date again, sort them most to least recent and chop off the oldest if // they are beyond the polling cache size. this._viewedLogs = [...viewedLogsSet] - .map(logString => { + .map((logString) => { const [sid, dateCreated] = logString.split('-'); return { sid, dateCreated: new Date(dateCreated) }; }) .sort((a, b) => b.dateCreated.valueOf() - a.dateCreated.valueOf()) .slice(0, this._pollingCacheSize); // Finally we create a set of just SIDs to compare against. - this._viewedSids = new Set(this._viewedLogs.map(log => log.sid)); + this._viewedSids = new Set(this._viewedLogs.map((log) => log.sid)); + // If this is not tailing the logs, stop the stream. if (!this.config.tail) { this.push(null); } + // If we were paginating through older resources, we can now allow the + // next poll when it is triggered. + this._paginating = false; } catch (err) { this.destroy(err); } @@ -111,4 +166,32 @@ export class LogsStream extends Readable { this._interval = undefined; } } + + private _resetPollingFrequency() { + this._pollsWithoutResults = 0; + if (this.pollingFrequency !== this._initialPollingFrequency) { + this.pollingFrequency = this._initialPollingFrequency; + log( + `New log received. Now polling once every ${this._pollingFrequency} milliseconds.` + ); + } + } + + private _backOffPolling() { + if (this._pollsWithoutResults < pollsBeforeBackOff) { + this._pollsWithoutResults++; + } else { + if (this._pollingFrequency < this._maxPollingFrequency) { + log( + `No new logs for ${ + this._pollsWithoutResults * this._pollingFrequency + } milliseconds. Now polling once every ${ + this._pollingFrequency * 2 + } milliseconds.` + ); + this.pollingFrequency = this._pollingFrequency * 2; + this._pollsWithoutResults = 0; + } + } + } } diff --git a/packages/serverless-api/src/types/logs.ts b/packages/serverless-api/src/types/logs.ts index 8dca1c02..56f89d40 100644 --- a/packages/serverless-api/src/types/logs.ts +++ b/packages/serverless-api/src/types/logs.ts @@ -9,6 +9,7 @@ export type LogsConfig = { limit?: number; filterByFunction?: string | Sid; pollingFrequency?: number; + maxPollingFrequency?: number; logCacheSize?: number; };