Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(logs): log polling backs off when it doesn't receive results #307

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 69 additions & 25 deletions packages/serverless-api/src/api/logs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,39 @@ import { LogApiResource, LogList, Sid, LogFilters } from '../types';
import { TwilioServerlessApiClient } from '../client';
import { getPaginatedResource } from './utils/pagination';
import { ClientApiError } from '../utils/error';
import { OptionsOfJSONResponseBody } from 'got';

const log = debug('twilio-serverless-api:logs');

function urlWithFilters(
environmentSid: Sid,
serviceSid: Sid,
filters: LogFilters = {}
): string {
const pageSize = filters.pageSize || 50;
const { functionSid, startDate, endDate, pageToken } = filters;
let url = `Services/${serviceSid}/Environments/${environmentSid}/Logs?PageSize=${pageSize}`;
if (typeof functionSid !== 'undefined') {
url += `&FunctionSid=${functionSid}`;
}
if (typeof startDate !== 'undefined') {
url += `&StartDate=${
startDate instanceof Date ? startDate.toISOString() : startDate
}`;
}
if (typeof endDate !== 'undefined') {
url += `&EndDate=${
endDate instanceof Date ? endDate.toISOString() : endDate
}`;
}
if (typeof pageToken !== 'undefined') {
url += `&PageToken=${pageToken}`;
}
return url;
}

/**
* Calls the API to retrieve a list of all assets
* Calls the API to retrieve a list of all logs
*
* @param {Sid} environmentSid environment in which to get logs
* @param {Sid} serviceSid service to look for logs
Expand All @@ -24,7 +52,7 @@ export async function listLogResources(
try {
return getPaginatedResource<LogList, LogApiResource>(
client,
`Services/${serviceSid}/Environments/${environmentSid}/Logs`
urlWithFilters(environmentSid, serviceSid)
);
} catch (err) {
log('%O', new ClientApiError(err));
Expand All @@ -33,7 +61,7 @@ export async function listLogResources(
}

/**
* Calls the API to retrieve a list of all assets
* Calls the API to retrieve one page of a list of logs
*
* @param {Sid} environmentSid environment in which to get logs
* @param {Sid} serviceSid service to look for logs
Expand All @@ -46,28 +74,10 @@ export async function listOnePageLogResources(
client: TwilioServerlessApiClient,
filters: LogFilters
): Promise<LogApiResource[]> {
const pageSize = filters.pageSize || 50;
const { functionSid, startDate, endDate, pageToken } = filters;
const url = urlWithFilters(environmentSid, serviceSid, filters);
try {
let url = `Services/${serviceSid}/Environments/${environmentSid}/Logs?PageSize=${pageSize}`;
if (typeof functionSid !== 'undefined') {
url += `&FunctionSid=${functionSid}`;
}
if (typeof startDate !== 'undefined') {
url += `&StartDate=${
startDate instanceof Date ? startDate.toISOString() : startDate
}`;
}
if (typeof endDate !== 'undefined') {
url += `&EndDate=${
endDate instanceof Date ? endDate.toISOString() : endDate
}`;
}
if (typeof pageToken !== 'undefined') {
url += `&PageToken=${pageToken}`;
}
const resp = await client.request('get', url);
const content = (resp.body as unknown) as LogList;
const content = resp.body as unknown as LogList;
return content.logs as LogApiResource[];
} catch (err) {
log('%O', new ClientApiError(err));
Expand All @@ -76,7 +86,41 @@ export async function listOnePageLogResources(
}

/**
* Calls the API to retrieve a list of all assets
* Calls the API to retrieve a paginated list of logs
*
* @param {Sid} environmentSid environment in which to get logs
* @param {Sid} serviceSid service to look for logs
* @param {TwilioServerlessApiClient} client API client
* @param {LogFilters} filters filters to apply to the request
* @param {string} nextPageUrl if you have a next page url, use it
* @returns {Promise<LogList>}
*/
export async function listPaginatedLogs(
environmentSid: Sid,
serviceSid: Sid,
client: TwilioServerlessApiClient,
filters: LogFilters = {},
nextPageUrl?: string
): Promise<LogList> {
try {
const opts: OptionsOfJSONResponseBody = { responseType: 'json' };
let url = nextPageUrl;
if (typeof url === 'undefined') {
url = urlWithFilters(environmentSid, serviceSid, filters);
}
if (url.startsWith('http')) {
opts.prefixUrl = '';
}
const resp = await client.request('get', url, opts);
return resp.body as unknown as LogList;
} catch (err) {
log('%O', new ClientApiError(err));
throw err;
}
}

/**
* Calls the API to retrieve a single log resource
*
* @param {Sid} logSid SID of log to retrieve
* @param {Sid} environmentSid environment in which to get logs
Expand All @@ -95,7 +139,7 @@ export async function getLog(
'get',
`Services/${serviceSid}/Environments/${environmentSid}/Logs/${logSid}`
);
return (resp.body as unknown) as LogApiResource;
return resp.body as unknown as LogApiResource;
} catch (err) {
log('%O', new ClientApiError(err));
throw err;
Expand Down
107 changes: 95 additions & 12 deletions packages/serverless-api/src/streams/logs.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
import { Readable } from 'stream';
import { listOnePageLogResources } from '../api/logs';
import { listPaginatedLogs } from '../api/logs';
import { TwilioServerlessApiClient } from '../client';
import { Sid } from '../types';
import { LogsConfig } from '../types/logs';
import debug from 'debug';

const log = debug('twilio-serverless-api:client:logs');

const pollsBeforeBackOff = 10;
const defaultPollingFrequency = 1000;
// This default max allows the command to get to polling once every 32 seconds
const defaultMaxPollingFrequency = 30000;
const defaultLogCacheSize = 1000;

export class LogsStream extends Readable {
private _initialPollingFrequency: number;
private _pollingFrequency: number;
private _maxPollingFrequency: number;
private _pollsWithoutResults: number;
private _pollingCacheSize: number;
private _interval: NodeJS.Timeout | undefined;
private _viewedSids: Set<Sid>;
private _viewedLogs: Array<{ sid: Sid; dateCreated: Date }>;
private _paginating: boolean;

constructor(
private environmentSid: Sid,
Expand All @@ -21,8 +34,13 @@ export class LogsStream extends Readable {
this._interval = undefined;
this._viewedSids = new Set();
this._viewedLogs = [];
this._pollingFrequency = config.pollingFrequency || 1000;
this._pollingCacheSize = config.logCacheSize || 1000;
this._pollingFrequency = this._initialPollingFrequency =
config.pollingFrequency || defaultPollingFrequency;
this._maxPollingFrequency =
config.maxPollingFrequency || defaultMaxPollingFrequency;
this._pollsWithoutResults = 0;
this._pollingCacheSize = config.logCacheSize || defaultLogCacheSize;
this._paginating = false;
}

set pollingFrequency(frequency: number) {
Expand All @@ -37,7 +55,12 @@ export class LogsStream extends Readable {

async _poll() {
try {
const logs = await listOnePageLogResources(
if (this._paginating) {
// We are going back through older logs that have been missed between
// polls, so don't start a new poll of the latest logs yet.
return;
}
let logPage = await listPaginatedLogs(
this.environmentSid,
this.serviceSid,
this.client,
Expand All @@ -46,12 +69,40 @@ export class LogsStream extends Readable {
pageSize: this.config.limit,
}
);
logs
.filter(log => !this._viewedSids.has(log.sid))
.reverse()
.forEach(log => {
let logs = logPage.logs;
let unviewedLogs = logs.filter((log) => !this._viewedSids.has(log.sid));
if (this._viewedSids.size > 0) {
// if we have seen some logs, we need to check if more than one page of
// logs are new.
while (
unviewedLogs.length === logs.length &&
logPage.meta.next_page_url
) {
// all of the logs are new, so we should get the next page
this._paginating = true;
logPage = await listPaginatedLogs(
this.environmentSid,
this.serviceSid,
this.client,
{},
logPage.meta.next_page_url
);
unviewedLogs = unviewedLogs.concat(
logPage.logs.filter((log) => !this._viewedSids.has(log.sid))
);
logs = logs.concat(logPage.logs);
}
}
if (unviewedLogs.length > 0) {
// We got new logs, make sure we are polling at the base frequency
this._resetPollingFrequency();
unviewedLogs.reverse().forEach((log) => {
this.push(log);
});
} else {
// No new logs this time, so maybe back off polling
this._backOffPolling();
}

// The logs endpoint is not reliably returning logs in the same order
// Therefore we need to keep a set of all previously seen log entries
Expand All @@ -68,26 +119,30 @@ export class LogsStream extends Readable {
// and new logs by stringifying the sid and the date together.
const viewedLogsSet = new Set([
...this._viewedLogs.map(
log => `${log.sid}-${log.dateCreated.toISOString()}`
(log) => `${log.sid}-${log.dateCreated.toISOString()}`
),
...logs.map(log => `${log.sid}-${log.date_created}`),
...logs.map((log) => `${log.sid}-${log.date_created}`),
]);
// Then we take that set, map over the logs and split them up into sid and
// date again, sort them most to least recent and chop off the oldest if
// they are beyond the polling cache size.
this._viewedLogs = [...viewedLogsSet]
.map(logString => {
.map((logString) => {
const [sid, dateCreated] = logString.split('-');
return { sid, dateCreated: new Date(dateCreated) };
})
.sort((a, b) => b.dateCreated.valueOf() - a.dateCreated.valueOf())
.slice(0, this._pollingCacheSize);
// Finally we create a set of just SIDs to compare against.
this._viewedSids = new Set(this._viewedLogs.map(log => log.sid));
this._viewedSids = new Set(this._viewedLogs.map((log) => log.sid));

// If this is not tailing the logs, stop the stream.
if (!this.config.tail) {
this.push(null);
}
// If we were paginating through older resources, we can now allow the
// next poll when it is triggered.
this._paginating = false;
} catch (err) {
this.destroy(err);
}
Expand All @@ -111,4 +166,32 @@ export class LogsStream extends Readable {
this._interval = undefined;
}
}

private _resetPollingFrequency() {
this._pollsWithoutResults = 0;
if (this.pollingFrequency !== this._initialPollingFrequency) {
this.pollingFrequency = this._initialPollingFrequency;
log(
`New log received. Now polling once every ${this._pollingFrequency} milliseconds.`
);
}
}

private _backOffPolling() {
if (this._pollsWithoutResults < pollsBeforeBackOff) {
this._pollsWithoutResults++;
} else {
if (this._pollingFrequency < this._maxPollingFrequency) {
log(
`No new logs for ${
this._pollsWithoutResults * this._pollingFrequency
} milliseconds. Now polling once every ${
this._pollingFrequency * 2
} milliseconds.`
);
this.pollingFrequency = this._pollingFrequency * 2;
this._pollsWithoutResults = 0;
}
}
}
}
1 change: 1 addition & 0 deletions packages/serverless-api/src/types/logs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export type LogsConfig = {
limit?: number;
filterByFunction?: string | Sid;
pollingFrequency?: number;
maxPollingFrequency?: number;
logCacheSize?: number;
};

Expand Down