From ab2e4b679465707eb8c87363cd82b5e9224036c6 Mon Sep 17 00:00:00 2001 From: Phil Nash Date: Fri, 16 Jul 2021 14:25:02 +1000 Subject: [PATCH] feat(logs): paginate through logs after backoff --- packages/serverless-api/src/api/logs.ts | 94 +++++++++++++++------ packages/serverless-api/src/streams/logs.ts | 92 +++++++++++++++----- 2 files changed, 138 insertions(+), 48 deletions(-) diff --git a/packages/serverless-api/src/api/logs.ts b/packages/serverless-api/src/api/logs.ts index 647474ea..61f83d97 100644 --- a/packages/serverless-api/src/api/logs.ts +++ b/packages/serverless-api/src/api/logs.ts @@ -5,11 +5,39 @@ import { LogApiResource, LogList, Sid, LogFilters } from '../types'; import { TwilioServerlessApiClient } from '../client'; import { getPaginatedResource } from './utils/pagination'; import { ClientApiError } from '../utils/error'; +import { OptionsOfJSONResponseBody } from 'got'; const log = debug('twilio-serverless-api:logs'); +function urlWithFilters( + environmentSid: Sid, + serviceSid: Sid, + filters: LogFilters = {} +): string { + const pageSize = filters.pageSize || 50; + const { functionSid, startDate, endDate, pageToken } = filters; + let url = `Services/${serviceSid}/Environments/${environmentSid}/Logs?PageSize=${pageSize}`; + if (typeof functionSid !== 'undefined') { + url += `&FunctionSid=${functionSid}`; + } + if (typeof startDate !== 'undefined') { + url += `&StartDate=${ + startDate instanceof Date ? startDate.toISOString() : startDate + }`; + } + if (typeof endDate !== 'undefined') { + url += `&EndDate=${ + endDate instanceof Date ? endDate.toISOString() : endDate + }`; + } + if (typeof pageToken !== 'undefined') { + url += `&PageToken=${pageToken}`; + } + return url; +} + /** - * Calls the API to retrieve a list of all assets + * Calls the API to retrieve a list of all logs * * @param {Sid} environmentSid environment in which to get logs * @param {Sid} serviceSid service to look for logs @@ -24,7 +52,7 @@ export async function listLogResources( try { return getPaginatedResource( client, - `Services/${serviceSid}/Environments/${environmentSid}/Logs` + urlWithFilters(environmentSid, serviceSid) ); } catch (err) { log('%O', new ClientApiError(err)); @@ -33,7 +61,7 @@ export async function listLogResources( } /** - * Calls the API to retrieve a list of all assets + * Calls the API to retrieve one page of a list of logs * * @param {Sid} environmentSid environment in which to get logs * @param {Sid} serviceSid service to look for logs @@ -46,28 +74,10 @@ export async function listOnePageLogResources( client: TwilioServerlessApiClient, filters: LogFilters ): Promise { - const pageSize = filters.pageSize || 50; - const { functionSid, startDate, endDate, pageToken } = filters; + const url = urlWithFilters(environmentSid, serviceSid, filters); try { - let url = `Services/${serviceSid}/Environments/${environmentSid}/Logs?PageSize=${pageSize}`; - if (typeof functionSid !== 'undefined') { - url += `&FunctionSid=${functionSid}`; - } - if (typeof startDate !== 'undefined') { - url += `&StartDate=${ - startDate instanceof Date ? startDate.toISOString() : startDate - }`; - } - if (typeof endDate !== 'undefined') { - url += `&EndDate=${ - endDate instanceof Date ? endDate.toISOString() : endDate - }`; - } - if (typeof pageToken !== 'undefined') { - url += `&PageToken=${pageToken}`; - } const resp = await client.request('get', url); - const content = (resp.body as unknown) as LogList; + const content = resp.body as unknown as LogList; return content.logs as LogApiResource[]; } catch (err) { log('%O', new ClientApiError(err)); @@ -76,7 +86,41 @@ export async function listOnePageLogResources( } /** - * Calls the API to retrieve a list of all assets + * Calls the API to retrieve a paginated list of logs + * + * @param {Sid} environmentSid environment in which to get logs + * @param {Sid} serviceSid service to look for logs + * @param {TwilioServerlessApiClient} client API client + * @param {LogFilters} filters filters to apply to the request + * @param {string} nextPageUrl if you have a next page url, use it + * @returns {Promise} + */ +export async function listPaginatedLogs( + environmentSid: Sid, + serviceSid: Sid, + client: TwilioServerlessApiClient, + filters: LogFilters = {}, + nextPageUrl?: string +): Promise { + try { + const opts: OptionsOfJSONResponseBody = { responseType: 'json' }; + let url = nextPageUrl; + if (typeof url === 'undefined') { + url = urlWithFilters(environmentSid, serviceSid, filters); + } + if (url.startsWith('http')) { + opts.prefixUrl = ''; + } + const resp = await client.request('get', url, opts); + return resp.body as unknown as LogList; + } catch (err) { + log('%O', new ClientApiError(err)); + throw err; + } +} + +/** + * Calls the API to retrieve a single log resource * * @param {Sid} logSid SID of log to retrieve * @param {Sid} environmentSid environment in which to get logs @@ -95,7 +139,7 @@ export async function getLog( 'get', `Services/${serviceSid}/Environments/${environmentSid}/Logs/${logSid}` ); - return (resp.body as unknown) as LogApiResource; + return resp.body as unknown as LogApiResource; } catch (err) { log('%O', new ClientApiError(err)); throw err; diff --git a/packages/serverless-api/src/streams/logs.ts b/packages/serverless-api/src/streams/logs.ts index 4c48268b..d54cbd40 100644 --- a/packages/serverless-api/src/streams/logs.ts +++ b/packages/serverless-api/src/streams/logs.ts @@ -1,5 +1,5 @@ import { Readable } from 'stream'; -import { listOnePageLogResources } from '../api/logs'; +import { listPaginatedLogs } from '../api/logs'; import { TwilioServerlessApiClient } from '../client'; import { Sid } from '../types'; import { LogsConfig } from '../types/logs'; @@ -22,6 +22,7 @@ export class LogsStream extends Readable { private _interval: NodeJS.Timeout | undefined; private _viewedSids: Set; private _viewedLogs: Array<{ sid: Sid; dateCreated: Date }>; + private _paginating: boolean; constructor( private environmentSid: Sid, @@ -39,6 +40,7 @@ export class LogsStream extends Readable { config.maxPollingFrequency || defaultMaxPollingFrequency; this._pollsWithoutResults = 0; this._pollingCacheSize = config.logCacheSize || defaultLogCacheSize; + this._paginating = false; } set pollingFrequency(frequency: number) { @@ -53,7 +55,12 @@ export class LogsStream extends Readable { async _poll() { try { - const logs = await listOnePageLogResources( + if (this._paginating) { + // We are going back through older logs that have been missed between + // polls, so don't start a new poll of the latest logs yet. + return; + } + let logPage = await listPaginatedLogs( this.environmentSid, this.serviceSid, this.client, @@ -62,32 +69,39 @@ export class LogsStream extends Readable { pageSize: this.config.limit, } ); - const unviewedLogs = logs.filter((log) => !this._viewedSids.has(log.sid)); + let logs = logPage.logs; + let unviewedLogs = logs.filter((log) => !this._viewedSids.has(log.sid)); + if (this._viewedSids.size > 0) { + // if we have seen some logs, we need to check if more than one page of + // logs are new. + while ( + unviewedLogs.length === logs.length && + logPage.meta.next_page_url + ) { + // all of the logs are new, so we should get the next page + this._paginating = true; + logPage = await listPaginatedLogs( + this.environmentSid, + this.serviceSid, + this.client, + {}, + logPage.meta.next_page_url + ); + unviewedLogs = unviewedLogs.concat( + logPage.logs.filter((log) => !this._viewedSids.has(log.sid)) + ); + logs = logs.concat(logPage.logs); + } + } if (unviewedLogs.length > 0) { - this._pollsWithoutResults = 0; - this.pollingFrequency = this._initialPollingFrequency; - log( - `New log received. Now polling once every ${this._pollingFrequency} milliseconds.` - ); + // We got new logs, make sure we are polling at the base frequency + this._resetPollingFrequency(); unviewedLogs.reverse().forEach((log) => { this.push(log); }); } else { - if (this._pollsWithoutResults < pollsBeforeBackOff) { - this._pollsWithoutResults++; - } else { - if (this._pollingFrequency < this._maxPollingFrequency) { - log( - `No new logs for ${ - this._pollsWithoutResults * this._pollingFrequency - } milliseconds. Now polling once every ${ - this._pollingFrequency * 2 - } milliseconds.` - ); - this.pollingFrequency = this._pollingFrequency * 2; - this._pollsWithoutResults = 0; - } - } + // No new logs this time, so maybe back off polling + this._backOffPolling(); } // The logs endpoint is not reliably returning logs in the same order @@ -122,9 +136,13 @@ export class LogsStream extends Readable { // Finally we create a set of just SIDs to compare against. this._viewedSids = new Set(this._viewedLogs.map((log) => log.sid)); + // If this is not tailing the logs, stop the stream. if (!this.config.tail) { this.push(null); } + // If we were paginating through older resources, we can now allow the + // next poll when it is triggered. + this._paginating = false; } catch (err) { this.destroy(err); } @@ -148,4 +166,32 @@ export class LogsStream extends Readable { this._interval = undefined; } } + + private _resetPollingFrequency() { + this._pollsWithoutResults = 0; + if (this.pollingFrequency !== this._initialPollingFrequency) { + this.pollingFrequency = this._initialPollingFrequency; + log( + `New log received. Now polling once every ${this._pollingFrequency} milliseconds.` + ); + } + } + + private _backOffPolling() { + if (this._pollsWithoutResults < pollsBeforeBackOff) { + this._pollsWithoutResults++; + } else { + if (this._pollingFrequency < this._maxPollingFrequency) { + log( + `No new logs for ${ + this._pollsWithoutResults * this._pollingFrequency + } milliseconds. Now polling once every ${ + this._pollingFrequency * 2 + } milliseconds.` + ); + this.pollingFrequency = this._pollingFrequency * 2; + this._pollsWithoutResults = 0; + } + } + } }