From a9299feaaf64199eb331887051e0068dbe6cd413 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emanuel=20Tesa=C5=99?= Date: Thu, 19 Oct 2023 12:47:08 +0200 Subject: [PATCH] Skip persisting signed data for data feed with existing timestamp --- packages/api/src/cache.ts | 2 +- packages/api/src/handlers.test.ts | 39 ++++++++++++++++++++++++++++- packages/api/src/handlers.ts | 29 +++++++++++++++++---- packages/api/src/in-memory-cache.ts | 16 ++++++------ 4 files changed, 71 insertions(+), 15 deletions(-) diff --git a/packages/api/src/cache.ts b/packages/api/src/cache.ts index bf3c45bf..3860cf11 100644 --- a/packages/api/src/cache.ts +++ b/packages/api/src/cache.ts @@ -1,7 +1,7 @@ import type { SignedData } from './schema'; type SignedDataCache = Record< - string, // Airnode ID. + string, // Airnode address. Record< string, // Template ID. SignedData[] // Signed data is ordered by timestamp (oldest first). diff --git a/packages/api/src/handlers.test.ts b/packages/api/src/handlers.test.ts index d3b36a31..04f17d6e 100644 --- a/packages/api/src/handlers.test.ts +++ b/packages/api/src/handlers.test.ts @@ -8,6 +8,7 @@ import { createSignedData, generateRandomWallet } from '../test/utils'; import * as cacheModule from './cache'; import * as configModule from './config'; import { batchInsertData, getData, listAirnodeAddresses } from './handlers'; +import { logger } from './logger'; // eslint-disable-next-line jest/no-hooks beforeEach(() => { @@ -44,13 +45,49 @@ describe(batchInsertData.name, () => { expect(cacheModule.getCache()).toStrictEqual({}); }); + it('skips signed data if there exists one with the same timestamp', async () => { + const airnodeWallet = generateRandomWallet(); + const storedSignedData = await createSignedData({ airnodeWallet }); + cacheModule.setCache({ + [storedSignedData.airnode]: { + [storedSignedData.templateId]: [storedSignedData], + }, + }); + const batchData = [ + await createSignedData({ + airnodeWallet, + templateId: storedSignedData.templateId, + timestamp: storedSignedData.timestamp, + }), + await createSignedData(), + ]; + jest.spyOn(logger, 'debug'); + + const result = await batchInsertData(batchData); + + expect(result).toStrictEqual({ + body: JSON.stringify({ count: 1, skipped: 1 }), + headers: { + 'access-control-allow-methods': '*', + 'access-control-allow-origin': '*', + 'content-type': 'application/json', + }, + statusCode: 201, + }); + expect(logger.debug).toHaveBeenCalledWith( + 'Skipping signed data because signed data with the same timestamp already exists', + expect.any(Object) + ); + expect(cacheModule.getCache()[storedSignedData.airnode]![storedSignedData.templateId]!).toHaveLength(1); + }); + it('inserts the batch if data is valid', async () => { const batchData = [await createSignedData(), await createSignedData()]; const result = await batchInsertData(batchData); expect(result).toStrictEqual({ - body: JSON.stringify({ count: 2 }), + body: JSON.stringify({ count: 2, skipped: 0 }), headers: { 'access-control-allow-methods': '*', 'access-control-allow-origin': '*', diff --git a/packages/api/src/handlers.ts b/packages/api/src/handlers.ts index 1125247c..28a696fe 100644 --- a/packages/api/src/handlers.ts +++ b/packages/api/src/handlers.ts @@ -4,8 +4,9 @@ import { isEmpty, isNil, omit, size } from 'lodash'; import { getConfig } from './config'; import { CACHE_HEADERS, COMMON_HEADERS } from './constants'; import { deriveBeaconId, recoverSignerAddress } from './evm'; -import { getAll, getAllAirnodeAddresses, prune, putAll } from './in-memory-cache'; -import { batchSignedDataSchema, evmAddressSchema } from './schema'; +import { get, getAll, getAllAirnodeAddresses, prune, putAll } from './in-memory-cache'; +import { logger } from './logger'; +import { type SignedData, batchSignedDataSchema, evmAddressSchema } from './schema'; import type { ApiResponse } from './types'; import { generateErrorResponse, isBatchUnique } from './utils'; @@ -67,8 +68,22 @@ export const batchInsertData = async (requestBody: unknown): Promise get(signedData.airnode, signedData.templateId, requestTimestamp)); + if (goReadDb.data && requestTimestamp === Number.parseInt(goReadDb.data.timestamp, 10)) { + logger.debug('Skipping signed data because signed data with the same timestamp already exists', { signedData }); + continue; + } + + newSignedData.push(signedData); + } + // Write batch of validated data to the database - const goBatchWriteDb = await go(async () => putAll(batchSignedData)); + const goBatchWriteDb = await go(async () => putAll(newSignedData)); if (!goBatchWriteDb.success) { return generateErrorResponse(500, 'Unable to send batch of signed data to database', goBatchWriteDb.error.message); } @@ -76,12 +91,16 @@ export const batchInsertData = async (requestBody: unknown): Promise Math.max(acc, endpoint.delaySeconds), 0); const maxIgnoreAfterTimestamp = Math.floor(Date.now() / 1000 - maxDelay); - const goPruneCache = await go(async () => prune(batchSignedData, maxIgnoreAfterTimestamp)); + const goPruneCache = await go(async () => prune(newSignedData, maxIgnoreAfterTimestamp)); if (!goPruneCache.success) { return generateErrorResponse(500, 'Unable to remove outdated cache data', goPruneCache.error.message); } - return { statusCode: 201, headers: COMMON_HEADERS, body: JSON.stringify({ count: batchSignedData.length }) }; + return { + statusCode: 201, + headers: COMMON_HEADERS, + body: JSON.stringify({ count: newSignedData.length, skipped: batchSignedData.length - newSignedData.length }), + }; }; // Returns the most fresh signed data for each templateId for the given airnode address. The API can be delayed, which diff --git a/packages/api/src/in-memory-cache.ts b/packages/api/src/in-memory-cache.ts index 3bc3e70f..1c97d6f3 100644 --- a/packages/api/src/in-memory-cache.ts +++ b/packages/api/src/in-memory-cache.ts @@ -10,26 +10,26 @@ export const ignoreTooFreshData = (signedDatas: SignedData[], ignoreAfterTimesta // The API is deliberately asynchronous to mimic a database call. // eslint-disable-next-line @typescript-eslint/require-await -export const get = async (airnodeId: string, templateId: string, ignoreAfterTimestamp: number) => { - logger.debug('Getting signed data', { airnodeId, templateId, ignoreAfterTimestamp }); +export const get = async (airnodeAddress: string, templateId: string, ignoreAfterTimestamp: number) => { + logger.debug('Getting signed data', { airnodeAddress, templateId, ignoreAfterTimestamp }); const signedDataCache = getCache(); - if (!signedDataCache[airnodeId]) return null; - const signedDatas = signedDataCache[airnodeId]![templateId]; + if (!signedDataCache[airnodeAddress]) return null; + const signedDatas = signedDataCache[airnodeAddress]![templateId]; if (!signedDatas) return null; return last(ignoreTooFreshData(signedDatas, ignoreAfterTimestamp)) ?? null; }; // The API is deliberately asynchronous to mimic a database call. -export const getAll = async (airnodeId: string, ignoreAfterTimestamp: number) => { - logger.debug('Getting all signed data', { airnodeId, ignoreAfterTimestamp }); +export const getAll = async (airnodeAddress: string, ignoreAfterTimestamp: number) => { + logger.debug('Getting all signed data', { airnodeAddress, ignoreAfterTimestamp }); const signedDataCache = getCache(); - const signedDataByTemplateId = signedDataCache[airnodeId] ?? {}; + const signedDataByTemplateId = signedDataCache[airnodeAddress] ?? {}; const freshestSignedData: SignedData[] = []; for (const templateId of Object.keys(signedDataByTemplateId)) { - const freshest = await get(airnodeId, templateId, ignoreAfterTimestamp); + const freshest = await get(airnodeAddress, templateId, ignoreAfterTimestamp); if (freshest) freshestSignedData.push(freshest); }