Skip to content

Commit

Permalink
Performance test fixes and optimizations (#159)
Browse files Browse the repository at this point in the history
* Improve signed data performance, improve logs

* Schedule pushing signed data after they are fetched

* Fix package.json

* Only sign and push signed data when the API request is success

* Log short info level for incoming requests
  • Loading branch information
Siegrift authored Dec 18, 2023
1 parent 372a0ac commit 14eec89
Show file tree
Hide file tree
Showing 15 changed files with 91 additions and 178 deletions.
4 changes: 2 additions & 2 deletions packages/airnode-feed/src/api-requests/data-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export const callApi = async (
);
};

export const makeTemplateRequests = async (signedApiUpdate: SignedApiUpdate): Promise<TemplateResponse[]> => {
export const makeTemplateRequests = async (signedApiUpdate: SignedApiUpdate): Promise<TemplateResponse[] | null> => {
const {
config: { endpoints, templates, ois: oises, apiCredentials },
} = getState();
Expand Down Expand Up @@ -72,7 +72,7 @@ export const makeTemplateRequests = async (signedApiUpdate: SignedApiUpdate): Pr
operationTemplateId,
errorMessage: goCallApi.error.message,
});
return [];
return null;
}
const apiCallResponse = goCallApi.data;

Expand Down
10 changes: 5 additions & 5 deletions packages/airnode-feed/src/api-requests/signed-api.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import { config, signedApiResponse, nodarySignedTemplateResponses } from '../../
import { logger } from '../logger';
import * as stateModule from '../state';

import { postSignedApiData } from './signed-api';
import { pushSignedData } from './signed-api';

describe(postSignedApiData.name, () => {
describe(pushSignedData.name, () => {
it('posts data to central api', async () => {
const state = stateModule.getInitialState(config);
// Assumes the template responses are for unique template IDs (which is true in the test fixtures).
Expand All @@ -21,7 +21,7 @@ describe(postSignedApiData.name, () => {
jest.spyOn(stateModule, 'getState').mockReturnValue(state);
jest.spyOn(axios, 'post').mockResolvedValue(signedApiResponse);

const response = await postSignedApiData(config.triggers.signedApiUpdates[0]!);
const response = await pushSignedData(config.triggers.signedApiUpdates[0]!);

expect(response).toStrictEqual({ count: 3, success: true });
});
Expand All @@ -40,7 +40,7 @@ describe(postSignedApiData.name, () => {
jest.spyOn(logger, 'warn');
jest.spyOn(axios, 'post').mockResolvedValue({ youHaveNotThoughAboutThisDidYou: 'yes-I-did' });

const response = await postSignedApiData(config.triggers.signedApiUpdates[0]!);
const response = await pushSignedData(config.triggers.signedApiUpdates[0]!);

expect(response).toStrictEqual({ success: false });
expect(logger.warn).toHaveBeenCalledWith('Failed to parse response from the signed API.', {
Expand Down Expand Up @@ -70,7 +70,7 @@ describe(postSignedApiData.name, () => {
jest.spyOn(logger, 'warn');
jest.spyOn(axios, 'post').mockRejectedValue(new Error('simulated-network-error'));

const response = await postSignedApiData(config.triggers.signedApiUpdates[0]!);
const response = await pushSignedData(config.triggers.signedApiUpdates[0]!);

expect(response).toStrictEqual({ success: false });
expect(logger.warn).toHaveBeenCalledWith('Failed to make update signed API request.', {
Expand Down
7 changes: 4 additions & 3 deletions packages/airnode-feed/src/api-requests/signed-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ import { isEmpty, isNil, pick } from 'lodash';

import { logger } from '../logger';
import { getState } from '../state';
import type { SignedApiNameUpdateDelayGroup } from '../update-signed-api';
import { type SignedApiPayload, signedApiResponseSchema } from '../validation/schema';
import { type SignedApiPayload, signedApiResponseSchema, type SignedApiUpdate } from '../validation/schema';

export const postSignedApiData = async (group: SignedApiNameUpdateDelayGroup) => {
export const pushSignedData = async (group: SignedApiUpdate) => {
const {
config: { signedApis },
templateValues,
Expand All @@ -17,6 +16,8 @@ export const postSignedApiData = async (group: SignedApiNameUpdateDelayGroup) =>
const { signedApiName, templateIds, updateDelay } = group;

return logger.runWithContext({ signedApiName, updateDelay }, async () => {
logger.debug('Pushing signed data to the signed API.');

const airnode = airnodeWallet.address;
const batchPayloadOrNull = templateIds.map((templateId): SignedApiPayload | null => {
// Calculate the reference timestamp based on the current time and update delay.
Expand Down
52 changes: 33 additions & 19 deletions packages/airnode-feed/src/fetch-beacon-data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,51 @@ import { makeTemplateRequests } from './api-requests/data-provider';
import { logger } from './logger';
import { signTemplateResponses } from './sign-template-data';
import { getState } from './state';
import { schedulePushingSignedData } from './update-signed-api';
import { sleep } from './utils';
import type { SignedApiUpdate } from './validation/schema';

export const initiateFetchingBeaconData = () => {
logger.debug('Initiating fetching all beacon data.');
export const initiateSignedApiUpdateLoops = () => {
logger.debug('Initiating feed loop.');
const { config } = getState();

return config.triggers.signedApiUpdates.map(async (element) => fetchBeaconDataInLoop(element));
return config.triggers.signedApiUpdates.map(async (signedApiUpdate) => initiateSignedApiUpdateLoop(signedApiUpdate));
};

const fetchBeaconDataInLoop = async (signedApiUpdate: SignedApiUpdate) => {
const initiateSignedApiUpdateLoop = async (signedApiUpdate: SignedApiUpdate) => {
const { templateValues } = getState();

while (true) {
const startTimestamp = Date.now();
logger.debug('Making template requests.');
const templateResponses = await makeTemplateRequests(signedApiUpdate);
const signedResponses = await signTemplateResponses(templateResponses);
// eslint-disable-next-line unicorn/no-array-for-each
signedResponses.forEach(async ([templateId, signedResponse]) => {
const goPut = await go(() => templateValues[templateId]!.put(signedResponse));
if (!goPut.success) {
// Because there can be multiple triggers for the same template ID it is possible that a race condition occurs,
// where the (newer) response from a different trigger is put first. This throws, because the signed data must
// be inserted increasingly by timestamp.
logger.warn(`Could not put signed response.`, {
templateId,
signedResponse,
errorMessage: goPut.error.message,
});
}
});

if (templateResponses) {
logger.debug('Signing template responses.');
const signedResponses = await signTemplateResponses(templateResponses);

logger.debug('Putting signed responses to storage.');
await Promise.all(
signedResponses.map(async ([templateId, signedResponse]) => {
const goPut = await go(() => templateValues[templateId]!.put(signedResponse));
if (!goPut.success) {
// Because there can be multiple triggers for the same template ID it is possible that a race condition
// occurs, where the (newer) response from a different trigger is put first. This throws, because the signed
// data must be inserted increasingly by timestamp.
logger.debug(`Could not put signed response.`, {
templateId,
signedResponse,
errorMessage: goPut.error.message,
});
}
})
);

// We want to send the data to the Signed API "in background" without waiting for the response to avoid blocking the
// fetch interval loop.
logger.debug('Scheduling pushing signed data to the API.');
schedulePushingSignedData(signedApiUpdate);
}

const duration = Date.now() - startTimestamp;
// Take at most 10% of the fetch interval as extra time to avoid all API requests be done at the same time. This
Expand Down
4 changes: 2 additions & 2 deletions packages/airnode-feed/src/heartbeat/heartbeat.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import * as configModule from '../validation/config';

import { heartbeatLogger } from './logger';

import { initiateHeartbeat, logHeartbeat, type HeartbeatPayload, stringifyUnsignedHeartbeatPayload } from '.';
import { initiateHeartbeatLoop, logHeartbeat, type HeartbeatPayload, stringifyUnsignedHeartbeatPayload } from '.';

// eslint-disable-next-line jest/no-hooks
beforeEach(() => {
Expand Down Expand Up @@ -96,7 +96,7 @@ test('sends heartbeat payload every minute', async () => {
// Instead we spyOn the "go" which is a third party module that wraps the logHeartbeat call.
jest.spyOn(promiseUtilsModule, 'go');

initiateHeartbeat();
initiateHeartbeatLoop();

await jest.advanceTimersByTimeAsync(1000 * 60 * 8);
expect(promiseUtilsModule.go).toHaveBeenCalledTimes(8);
Expand Down
2 changes: 1 addition & 1 deletion packages/airnode-feed/src/heartbeat/heartbeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { heartbeatLogger } from './logger';
// other teams, such as monitoring) will listen for this exact message to parse the heartbeat.
const HEARTBEAT_LOG_MESSAGE = 'Sending heartbeat log.';

export const initiateHeartbeat = () => {
export const initiateHeartbeatLoop = () => {
logger.debug('Initiating heartbeat loop.');
setInterval(async () => {
const goLogHeartbeat = await go(logHeartbeat);
Expand Down
10 changes: 4 additions & 6 deletions packages/airnode-feed/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
import { initiateFetchingBeaconData } from './fetch-beacon-data';
import { initiateHeartbeat } from './heartbeat';
import { initiateSignedApiUpdateLoops } from './fetch-beacon-data';
import { initiateHeartbeatLoop } from './heartbeat';
import { initializeState } from './state';
import { initiateUpdatingSignedApi } from './update-signed-api';
import { loadConfig } from './validation/config';

const main = async () => {
const config = await loadConfig();
initializeState(config);

initiateFetchingBeaconData();
initiateUpdatingSignedApi();
initiateHeartbeat();
initiateSignedApiUpdateLoops();
initiateHeartbeatLoop();
};

void main();
59 changes: 4 additions & 55 deletions packages/airnode-feed/src/update-signed-api.ts
Original file line number Diff line number Diff line change
@@ -1,57 +1,6 @@
import { get, isEmpty, uniq } from 'lodash';
import { pushSignedData } from './api-requests/signed-api';
import type { SignedApiUpdate } from './validation/schema';

import { postSignedApiData } from './api-requests/signed-api';
import { SIGNED_DATA_PUSH_POLLING_INTERVAL } from './constants';
import { logger } from './logger';
import { getState } from './state';
import { sleep } from './utils';
import type { TemplateId } from './validation/schema';

// <Signed API Provider, <Update Delay, List of template IDs>>
type SignedApiUpdateDelayTemplateIdsMap = Record<string, Record<number, TemplateId[]>>;

export interface SignedApiNameUpdateDelayGroup {
signedApiName: string;
templateIds: TemplateId[];
updateDelay: number;
}

export const initiateUpdatingSignedApi = () => {
logger.debug('Initiating updating signed API.');
const { config } = getState();

const signedApiUpdateDelayTemplateIdsMap =
config.triggers.signedApiUpdates.reduce<SignedApiUpdateDelayTemplateIdsMap>((acc, signedApiUpdate) => {
if (isEmpty(signedApiUpdate.templateIds)) return acc;

return {
...acc,
[signedApiUpdate.signedApiName]: {
...acc[signedApiUpdate.signedApiName],
[signedApiUpdate.updateDelay]: uniq([
...get(acc, [signedApiUpdate.signedApiName, signedApiUpdate.updateDelay], []),
...signedApiUpdate.templateIds,
]),
},
};
}, {});

const signedApiUpdateDelayGroups: SignedApiNameUpdateDelayGroup[] = Object.entries(
signedApiUpdateDelayTemplateIdsMap
).flatMap(([signedApiName, updateDelayTemplateIds]) =>
Object.entries(updateDelayTemplateIds).map(([updateDelay, templateIds]) => ({
signedApiName,
updateDelay: Number.parseInt(updateDelay, 10),
templateIds,
}))
);

signedApiUpdateDelayGroups.map(async (element) => updateSignedApiInLoop(element));
};

export const updateSignedApiInLoop = async (signedApiNameUpdateDelayGroup: SignedApiNameUpdateDelayGroup) => {
while (true) {
await postSignedApiData(signedApiNameUpdateDelayGroup);
await sleep(SIGNED_DATA_PUSH_POLLING_INTERVAL);
}
export const schedulePushingSignedData = (signedApiUpdate: SignedApiUpdate) => {
setTimeout(async () => pushSignedData(signedApiUpdate), signedApiUpdate.updateDelay);
};
3 changes: 1 addition & 2 deletions packages/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
"node": "^18.19.0",
"pnpm": "^8.12.0"
},
"main": "index.js",
"scripts": {
"build": "tsc --project tsconfig.build.json",
"clean": "rm -rf coverage dist",
Expand All @@ -16,7 +15,7 @@
"eslint:fix": "eslint . --ext .js,.ts --fix",
"prettier:check": "prettier --check \"./**/*.{js,ts,md,json}\"",
"prettier:fix": "prettier --write \"./**/*.{js,ts,md,json}\"",
"start-prod": "node dist/dev-server.js",
"start-prod": "node dist/src/dev-server.js",
"test": "jest",
"tsc": "tsc --project ."
},
Expand Down
20 changes: 0 additions & 20 deletions packages/api/src/evm.ts

This file was deleted.

37 changes: 21 additions & 16 deletions packages/api/src/handlers.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { ethers } from 'ethers';
import { omit } from 'lodash';

import { getMockedConfig } from '../test/fixtures';
import { createSignedData, generateRandomWallet } from '../test/utils';
import { createSignedData, deriveBeaconId, generateRandomBytes, generateRandomWallet } from '../test/utils';

import * as cacheModule from './cache';
import * as configModule from './config';
Expand All @@ -19,29 +19,38 @@ afterEach(() => {
});

describe(batchInsertData.name, () => {
it('drops the batch if it is invalid', async () => {
it('does not validate signature (for performance reasons)', async () => {
const invalidData = await createSignedData({ signature: '0xInvalid' });
const batchData = [await createSignedData(), invalidData];

const result = await batchInsertData(batchData);

expect(result).toStrictEqual({
body: JSON.stringify({
message: 'Unable to recover signer address',
context: {
detail:
'signature missing v and recoveryParam (argument="signature", value="0xInvalid", code=INVALID_ARGUMENT, version=bytes/5.7.0)',
signedData: invalidData,
},
}),
body: JSON.stringify({ count: 2, skipped: 0 }),
headers: {
'access-control-allow-methods': '*',
'access-control-allow-origin': '*',
'content-type': 'application/json',
},
statusCode: 400,
statusCode: 201,
});
});

it('does not validate beacon ID (for performance reasons)', async () => {
const data = await createSignedData();
const invalidData = { ...data, beaconId: deriveBeaconId(data.airnode, generateRandomBytes(32)) };

const result = await batchInsertData([invalidData]);

expect(result).toStrictEqual({
body: JSON.stringify({ count: 1, skipped: 0 }),
headers: {
'access-control-allow-methods': '*',
'access-control-allow-origin': '*',
'content-type': 'application/json',
},
statusCode: 201,
});
expect(cacheModule.getCache()).toStrictEqual({});
});

it('drops the batch if the airnode address is not allowed', async () => {
Expand Down Expand Up @@ -99,10 +108,6 @@ describe(batchInsertData.name, () => {
},
statusCode: 201,
});
expect(logger.debug).toHaveBeenCalledWith(
'Not storing signed data because signed data with the same timestamp already exists.',
expect.any(Object)
);
expect(cacheModule.getCache()[storedSignedData.airnode]![storedSignedData.templateId]!).toHaveLength(1);
});

Expand Down
Loading

0 comments on commit 14eec89

Please sign in to comment.