From 406953d225e3e755cb42c2d6f37800223c72e77f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emanuel=20Tesa=C5=99?= Date: Tue, 31 Oct 2023 12:56:54 +0100 Subject: [PATCH] Add heartbeat log (#100) * Add heartbeat log * Use different logger to make sure heartbeat is enabled * Fix tests --- .../pusher/src/heartbeat/heartbeat.test.ts | 79 +++++++++++++++++++ packages/pusher/src/heartbeat/heartbeat.ts | 55 +++++++++++++ packages/pusher/src/heartbeat/index.ts | 1 + packages/pusher/src/heartbeat/logger.ts | 15 ++++ packages/pusher/src/index.ts | 2 + packages/pusher/src/state.ts | 3 + packages/pusher/src/validation/config.ts | 18 +++-- packages/pusher/test/fixtures.ts | 24 ++++++ 8 files changed, 189 insertions(+), 8 deletions(-) create mode 100644 packages/pusher/src/heartbeat/heartbeat.test.ts create mode 100644 packages/pusher/src/heartbeat/heartbeat.ts create mode 100644 packages/pusher/src/heartbeat/index.ts create mode 100644 packages/pusher/src/heartbeat/logger.ts diff --git a/packages/pusher/src/heartbeat/heartbeat.test.ts b/packages/pusher/src/heartbeat/heartbeat.test.ts new file mode 100644 index 00000000..8e159d3d --- /dev/null +++ b/packages/pusher/src/heartbeat/heartbeat.test.ts @@ -0,0 +1,79 @@ +import { readFileSync } from 'node:fs'; +import { join } from 'node:path'; + +import * as promiseUtilsModule from '@api3/promise-utils'; + +import { config, parseHeartbeatLog } from '../../test/fixtures'; +import * as stateModule from '../state'; +import * as configModule from '../validation/config'; + +import { heartbeatLogger } from './logger'; + +import { initiateHeartbeat, logHeartbeat, createHash } from '.'; + +// eslint-disable-next-line jest/no-hooks +beforeEach(() => { + jest.useFakeTimers().setSystemTime(new Date('2023-01-20')); +}); + +afterEach(() => { + jest.useRealTimers(); +}); + +describe(logHeartbeat.name, () => { + const expectedLogMessage = [ + '0xbF3137b0a7574563a23a8fC8badC6537F98197CC', + 'test', + '0.1.0', + '1674172803', + '1674172800', + '0x126e768ba244efdb790d63a76821047e163dfc502ace09b2546a93075594c286', + '0x14f123ec1006bace8f8971cd8c94eb022b9bb0e1364e88ae4e8562a5f02de43e35dd4ecdefc976595eba5fec3d04222a0249e876453599b27847e85e14ff77601b', + ].join(' - '); + + it('sends the correct heartbeat log', async () => { + const rawConfig = JSON.parse(readFileSync(join(__dirname, '../../config/pusher.example.json'), 'utf8')); + jest.spyOn(configModule, 'loadRawConfig').mockReturnValue(rawConfig); + const state = stateModule.getInitialState(config); + jest.spyOn(stateModule, 'getState').mockReturnValue(state); + jest.spyOn(heartbeatLogger, 'info').mockImplementation(); + jest.advanceTimersByTime(1000 * 3); // Advance time by 3 seconds to ensure the timestamp of the log is different from deployment timestamp. + + await logHeartbeat(); + + expect(heartbeatLogger.info).toHaveBeenCalledWith(expectedLogMessage); + }); + + it('the heartbeat log can be parsed', () => { + const rawConfig = JSON.parse(readFileSync(join(__dirname, '../../config/pusher.example.json'), 'utf8')); + jest.spyOn(configModule, 'loadRawConfig').mockReturnValue(rawConfig); + const expectedHeartbeatPayload = { + airnodeAddress: '0xbF3137b0a7574563a23a8fC8badC6537F98197CC', + stage: 'test', + nodeVersion: '0.1.0', + heartbeatTimestamp: '1674172803', + deploymentTimestamp: '1674172800', + configHash: '0x126e768ba244efdb790d63a76821047e163dfc502ace09b2546a93075594c286', + signature: + '0x14f123ec1006bace8f8971cd8c94eb022b9bb0e1364e88ae4e8562a5f02de43e35dd4ecdefc976595eba5fec3d04222a0249e876453599b27847e85e14ff77601b', + }; + + const heartbeatPayload = parseHeartbeatLog(expectedLogMessage); + + expect(heartbeatPayload).toStrictEqual(expectedHeartbeatPayload); + expect(heartbeatPayload.configHash).toBe(createHash(JSON.stringify(rawConfig))); + }); +}); + +test('sends heartbeat payload every minute', async () => { + // We would ideally want to assert that the logHeartbeat function is called, but spying on functions that are called + // from the same module is annoying. See: https://jestjs.io/docs/mock-functions#mocking-partials. + // + // Instead we spyOn the "go" which is a third party module that wraps the logHeartbeat call. + jest.spyOn(promiseUtilsModule, 'go'); + + initiateHeartbeat(); + + await jest.advanceTimersByTimeAsync(1000 * 60 * 8); + expect(promiseUtilsModule.go).toHaveBeenCalledTimes(8); +}); diff --git a/packages/pusher/src/heartbeat/heartbeat.ts b/packages/pusher/src/heartbeat/heartbeat.ts new file mode 100644 index 00000000..974e1894 --- /dev/null +++ b/packages/pusher/src/heartbeat/heartbeat.ts @@ -0,0 +1,55 @@ +import { go } from '@api3/promise-utils'; +import { ethers } from 'ethers'; + +import { logger } from '../logger'; +import { getState } from '../state'; +import { loadRawConfig } from '../validation/config'; + +import { heartbeatLogger } from './logger'; + +export const initiateHeartbeat = () => { + logger.debug('Initiating heartbeat loop'); + setInterval(async () => { + const goLogHeartbeat = await go(logHeartbeat); + if (!goLogHeartbeat.success) logger.error('Failed to log heartbeat', goLogHeartbeat.error); + }, 1000 * 60); // Frequency is hardcoded to 1 minute. +}; + +export const signHeartbeat = async (airnodeWallet: ethers.Wallet, heartbeatPayload: unknown[]) => { + logger.debug('Signing heartbeat payload'); + const signaturePayload = ethers.utils.arrayify(createHash(JSON.stringify(heartbeatPayload))); + return airnodeWallet.signMessage(signaturePayload); +}; + +export const createHash = (value: string) => ethers.utils.keccak256(ethers.utils.toUtf8Bytes(value)); + +export const logHeartbeat = async () => { + logger.debug('Creating heartbeat log'); + + const rawConfig = loadRawConfig(); // We want to log the raw config, not the one with interpolated secrets. + const rawConfigHash = createHash(JSON.stringify(rawConfig)); + const { + airnodeWallet, + deploymentTimestamp, + config: { + nodeSettings: { stage, nodeVersion }, + }, + } = getState(); + + logger.debug('Creating heartbeat payload'); + const currentTimestamp = Math.floor(Date.now() / 1000); + const heartbeatPayload = [ + airnodeWallet.address, + stage, + nodeVersion, + currentTimestamp.toString(), + deploymentTimestamp.toString(), + rawConfigHash, + ]; + const heartbeatSignature = await signHeartbeat(airnodeWallet, heartbeatPayload); + const heartbeatLog = [...heartbeatPayload, heartbeatSignature].join(' - '); + + // The logs are sent to API3 for validation (that the data provider deployed deployed the correct configuration) and + // monitoring purposes (whether the instance is running). + heartbeatLogger.info(heartbeatLog); +}; diff --git a/packages/pusher/src/heartbeat/index.ts b/packages/pusher/src/heartbeat/index.ts new file mode 100644 index 00000000..77b305b1 --- /dev/null +++ b/packages/pusher/src/heartbeat/index.ts @@ -0,0 +1 @@ +export * from './heartbeat'; diff --git a/packages/pusher/src/heartbeat/logger.ts b/packages/pusher/src/heartbeat/logger.ts new file mode 100644 index 00000000..0486cdda --- /dev/null +++ b/packages/pusher/src/heartbeat/logger.ts @@ -0,0 +1,15 @@ +import { createLogger } from '@api3/commons'; + +import { loadEnv } from '../validation/env'; + +// We need to load the environment variables before we can use the logger. Because we want the logger to always be +// available, we load the environment variables as a side effect during the module import. +const env = loadEnv(); + +export const heartbeatLogger = createLogger({ + colorize: env.LOG_COLORIZE, + format: env.LOG_FORMAT, + // We make sure the heartbeat logger is always enabled and logs all levels. + enabled: true, + minLevel: 'debug', +}); diff --git a/packages/pusher/src/index.ts b/packages/pusher/src/index.ts index bfcd36cf..d604dbac 100644 --- a/packages/pusher/src/index.ts +++ b/packages/pusher/src/index.ts @@ -1,4 +1,5 @@ import { initiateFetchingBeaconData } from './fetch-beacon-data'; +import { initiateHeartbeat } from './heartbeat'; import { initializeState } from './state'; import { initiateUpdatingSignedApi } from './update-signed-api'; import { loadConfig } from './validation/config'; @@ -9,6 +10,7 @@ const main = async () => { initiateFetchingBeaconData(); initiateUpdatingSignedApi(); + initiateHeartbeat(); }; void main(); diff --git a/packages/pusher/src/state.ts b/packages/pusher/src/state.ts index 65c7ad0c..e524c1c9 100644 --- a/packages/pusher/src/state.ts +++ b/packages/pusher/src/state.ts @@ -14,6 +14,8 @@ export interface State { apiLimiters: Record; // We persist the derived Airnode wallet in memory as a performance optimization. airnodeWallet: ethers.Wallet; + // The timestamp of when the service was initialized. This can be treated as a "deployment" timestamp. + deploymentTimestamp: number; } let state: State; @@ -86,6 +88,7 @@ export const getInitialState = (config: Config): State => { templateValues: buildTemplateStorages(config), apiLimiters: buildApiLimiters(config), airnodeWallet: ethers.Wallet.fromMnemonic(config.nodeSettings.airnodeWalletMnemonic), + deploymentTimestamp: Math.floor(Date.now() / 1000), }; }; diff --git a/packages/pusher/src/validation/config.ts b/packages/pusher/src/validation/config.ts index 67cf7f8e..d8243e73 100644 --- a/packages/pusher/src/validation/config.ts +++ b/packages/pusher/src/validation/config.ts @@ -8,16 +8,18 @@ import dotenv from 'dotenv'; import { configSchema } from './schema'; import { interpolateSecrets, parseSecrets } from './utils'; -export const loadConfig = async () => { - // When pusher is built the "/dist" file contains "src" folder and "package.json" and the config is expected to be - // located next to the "/dist" folder. When run in development, the config is expected to be located next to the "src" - // folder (one less import level). We resolve the config by CWD as a workaround. Since the pusher is dockerized, this - // is hidden from the user. - const configPath = join(cwd(), './config'); - const rawSecrets = dotenv.parse(readFileSync(join(configPath, 'secrets.env'), 'utf8')); +// When pusher is built the "/dist" file contains "src" folder and "package.json" and the config is expected to be +// located next to the "/dist" folder. When run in development, the config is expected to be located next to the "src" +// folder (one less import level). We resolve the config by CWD as a workaround. Since the pusher is dockerized, this +// is hidden from the user. +const getConfigPath = () => join(cwd(), './config'); + +export const loadRawConfig = () => JSON.parse(fs.readFileSync(join(getConfigPath(), 'pusher.json'), 'utf8')); +export const loadConfig = async () => { const goLoadConfig = await go(async () => { - const rawConfig = JSON.parse(fs.readFileSync(join(configPath, 'pusher.json'), 'utf8')); + const rawSecrets = dotenv.parse(readFileSync(join(getConfigPath(), 'secrets.env'), 'utf8')); + const rawConfig = loadRawConfig(); const secrets = parseSecrets(rawSecrets); return configSchema.parseAsync(interpolateSecrets(rawConfig, secrets)); }); diff --git a/packages/pusher/test/fixtures.ts b/packages/pusher/test/fixtures.ts index 5a9eebd1..b13526f3 100644 --- a/packages/pusher/test/fixtures.ts +++ b/packages/pusher/test/fixtures.ts @@ -1,4 +1,5 @@ import type { AxiosResponse } from 'axios'; +import { ethers } from 'ethers'; import packageJson from '../package.json'; import type { SignedResponse, TemplateResponse } from '../src/sign-template-data'; @@ -184,3 +185,26 @@ export const signedApiResponse: Partial = { }, data: { count: 3 }, }; + +export const parseHeartbeatLog = (logMessage: string) => { + const [airnodeAddress, stage, nodeVersion, heartbeatTimestamp, deploymentTimestamp, configHash, signature] = + logMessage.split(' - '); + + // Verify that the signature is valid. + const heartbeatPayload = [airnodeAddress, stage, nodeVersion, heartbeatTimestamp, deploymentTimestamp, configHash]; + const signaturePayload = ethers.utils.arrayify( + ethers.utils.keccak256(ethers.utils.toUtf8Bytes(JSON.stringify(heartbeatPayload))) + ); + const recoveredAddress = ethers.utils.verifyMessage(signaturePayload, signature!); + if (recoveredAddress !== airnodeAddress) throw new Error('Invalid signature'); + + return { + airnodeAddress, + stage, + nodeVersion, + deploymentTimestamp, + heartbeatTimestamp, + configHash, + signature, + }; +};