From f8cb690fb442129b9846ba24de255d08e0ad6f9c Mon Sep 17 00:00:00 2001 From: Aditi Srinivasan Date: Fri, 6 Sep 2024 11:38:28 -0400 Subject: [PATCH 1/6] first pass --- apps/hubble/src/eth/l2EventsProvider.ts | 95 ++++++++++++++++--------- apps/hubble/src/storage/engine/index.ts | 25 ++++++- packages/core/src/errors.ts | 3 + 3 files changed, 87 insertions(+), 36 deletions(-) diff --git a/apps/hubble/src/eth/l2EventsProvider.ts b/apps/hubble/src/eth/l2EventsProvider.ts index 68600ad70f..4d03f068c6 100644 --- a/apps/hubble/src/eth/l2EventsProvider.ts +++ b/apps/hubble/src/eth/l2EventsProvider.ts @@ -737,6 +737,65 @@ export class L2EventsProvider, + ); + } + + async getIdRegistryEvents(fromBlock?: number, toBlock?: number, fid?: number) { + const idV2LogsPromise = this.getContractEvents({ + ...this.getCommonFilterArgs(fromBlock, toBlock, fid), + address: this.idRegistryV2Address, + abi: IdRegistry.abi, + }); + + await this.processIdRegistryV2Events( + (await idV2LogsPromise) as WatchContractEventOnLogsParameter, + ); + } + + async getKeyRegistryEvents(fromBlock?: number, toBlock?: number, fid?: number) { + const keyV2LogsPromise = this.getContractEvents({ + ...this.getCommonFilterArgs(), + address: this.keyRegistryV2Address, + abi: KeyRegistry.abi, + }); + + await this.processKeyRegistryEventsV2( + (await keyV2LogsPromise) as WatchContractEventOnLogsParameter, + ); + } + /** * Sync old Storage events that may have happened before hub was started. We'll put them all * in the sync queue to be processed later, to make sure we don't process any unconfirmed events. @@ -791,39 +850,9 @@ export class L2EventsProvider, - ); - await this.processIdRegistryV2Events( - (await idV2LogsPromise) as WatchContractEventOnLogsParameter, - ); - await this.processKeyRegistryEventsV2( - (await keyV2LogsPromise) as WatchContractEventOnLogsParameter, - ); + await this.getStorageEvents(nextFromBlock, nextToBlock); + await this.getIdRegistryEvents(nextFromBlock, nextToBlock); + await this.getKeyRegistryEvents(nextFromBlock, nextToBlock); // Write out all the cached blocks first await this.writeCachedBlocks(toBlock); diff --git a/apps/hubble/src/storage/engine/index.ts b/apps/hubble/src/storage/engine/index.ts index a6b524e604..4aea831f1c 100644 --- a/apps/hubble/src/storage/engine/index.ts +++ b/apps/hubble/src/storage/engine/index.ts @@ -77,6 +77,7 @@ import { RateLimiterAbstract, RateLimiterMemory } from "rate-limiter-flexible"; import { TypedEmitter } from "tiny-typed-emitter"; import { FNameRegistryEventsProvider } from "../../eth/fnameRegistryEventsProvider.js"; import { statsd } from "../../utils/statsd.js"; +import { L2EventsProvider } from "eth/l2EventsProvider.js"; export const NUM_VALIDATION_WORKERS = 2; @@ -126,6 +127,7 @@ class Engine extends TypedEmitter { private _publicClient: PublicClient | undefined; private _l2PublicClient: PublicClient | undefined; private _fNameRegistryEventsProvider: FNameRegistryEventsProvider | undefined; + private _l2EventsProvider: L2EventsProvider | undefined; private _linkStore: LinkStore; private _reactionStore: ReactionStore; @@ -157,6 +159,7 @@ class Engine extends TypedEmitter { publicClient?: PublicClient, l2PublicClient?: PublicClient, fNameRegistryEventsProvider?: FNameRegistryEventsProvider, + l2EventsProvider?: L2EventsProvider, ) { super(); this._db = db; @@ -164,6 +167,7 @@ class Engine extends TypedEmitter { this._publicClient = publicClient; this._l2PublicClient = l2PublicClient; this._fNameRegistryEventsProvider = fNameRegistryEventsProvider; + this._l2EventsProvider = l2EventsProvider; this.eventHandler = eventHandler ?? new StoreEventHandler(db); @@ -304,7 +308,7 @@ class Engine extends TypedEmitter { const totalUnits = storageSlot.value.legacy_units + storageSlot.value.units; if (totalUnits === 0) { - mergeResults.set(i, err(new HubError("bad_request.prunable", "no storage"))); + mergeResults.set(i, err(new HubError("bad_request.no_storage", "no storage"))); return; } @@ -332,6 +336,21 @@ class Engine extends TypedEmitter { if (result.isOk() && limiter) { consumeRateLimitByKey(`${fid}`, limiter); } + if (result.isErr()) { + // Try to request on chain event if it's missing + // TODO(aditi): Do we just want to request all? If missing one likely to be missing all? + if (result.error.errCode === "bad_request.no_storage") { + // TODO(aditi): Add timeout + // TODO(aditi): Do we want a start and stop? + await this._l2EventsProvider?.getStorageEvents(undefined, undefined, fid); + } + if (result.error.errCode === "bad_request.missing_signer") { + await this._l2EventsProvider?.getKeyRegistryEvents(undefined, undefined, fid); + } + if (result.error.errCode === "bad_request.missing_fid") { + await this._l2EventsProvider?.getIdRegistryEvents(undefined, undefined, fid); + } + } mergeResults.set(validatedMessages[j]?.i as number, result); } @@ -1217,7 +1236,7 @@ class Engine extends TypedEmitter { } if (!custodyAddress) { - return err(new HubError("bad_request.validation_failure", `unknown fid: ${message.data.fid}`)); + return err(new HubError("bad_request.missing_fid", `unknown fid: ${message.data.fid}`)); } // 4. Check that the signer is valid @@ -1231,7 +1250,7 @@ class Engine extends TypedEmitter { return hex.andThen((signerHex) => { return err( new HubError( - "bad_request.validation_failure", + "bad_request.missing_signer", `invalid signer: signer ${signerHex} not found for fid ${message.data?.fid}`, ), ); diff --git a/packages/core/src/errors.ts b/packages/core/src/errors.ts index bfca347f3b..8f8dd921f9 100644 --- a/packages/core/src/errors.ts +++ b/packages/core/src/errors.ts @@ -69,9 +69,12 @@ export type HubErrorCode = | "bad_request.parse_failure" | "bad_request.invalid_param" | "bad_request.validation_failure" + | "bad_request.missing_signer" | "bad_request.duplicate" | "bad_request.conflict" | "bad_request.prunable" + | "bad_request.no_storage" + | "bad_request.missing_fid" /* The requested resource could not be found */ | "not_found" /* The request could not be completed because the operation is not executable */ From cf8d4444c6c800b375d64de9abc6f05eca01d5f2 Mon Sep 17 00:00:00 2001 From: Aditi Srinivasan Date: Sat, 7 Sep 2024 17:23:19 -0400 Subject: [PATCH 2/6] working for storage --- apps/hubble/src/eth/l2EventsProvider.test.ts | 51 +++++++++++- apps/hubble/src/eth/l2EventsProvider.ts | 79 +++++++++++++++---- apps/hubble/src/storage/engine/index.ts | 14 ++-- .../src/storage/stores/onChainEventStore.ts | 1 + 4 files changed, 118 insertions(+), 27 deletions(-) diff --git a/apps/hubble/src/eth/l2EventsProvider.test.ts b/apps/hubble/src/eth/l2EventsProvider.test.ts index 5515a4d2e5..4ebbf95fc9 100644 --- a/apps/hubble/src/eth/l2EventsProvider.test.ts +++ b/apps/hubble/src/eth/l2EventsProvider.test.ts @@ -7,8 +7,15 @@ import { deployStorageRegistry, publicClient, testClient, walletClientWithAccoun import { accounts } from "../test/constants.js"; import { sleep } from "../utils/crypto.js"; import { L2EventsProvider, OptimismConstants } from "./l2EventsProvider.js"; -import { Transport } from "viem"; -import { getBlockNumber, mine, simulateContract, waitForTransactionReceipt, writeContract } from "viem/actions"; +import { CreateContractEventFilterParameters, Transport } from "viem"; +import { + createContractEventFilter, + getBlockNumber, + mine, + simulateContract, + waitForTransactionReceipt, + writeContract, +} from "viem/actions"; import OnChainEventStore from "../storage/stores/onChainEventStore.js"; import StoreEventHandler from "../storage/stores/storeEventHandler.js"; @@ -199,4 +206,44 @@ describe("process events", () => { }, TEST_TIMEOUT_LONG, ); + + // TODO(aditi): Impl and add tests for key registry and id registry + test( + "retry by fid", + async () => { + const rentSim = await simulateContract(publicClient, { + address: storageRegistryAddress, + abi: StorageRegistry.abi, + functionName: "credit", + account: accounts[0].address, + args: [BigInt(1), BigInt(1)], + }); + + const rentHash = await writeContract(walletClientWithAccount, rentSim.request); + const rentTrx = await waitForTransactionReceipt(publicClient, { hash: rentHash }); + await sleep(1000); // allow time for the rent event to be polled for + await mine(testClient, { blocks: L2EventsProvider.numConfirmations }); + await waitForBlock(Number(rentTrx.blockNumber) + L2EventsProvider.numConfirmations); + + const events1 = await onChainEventStore.getOnChainEvents(OnChainEventType.EVENT_TYPE_STORAGE_RENT, 1); + expect(events1.length).toEqual(1); + expect(events1[0]?.fid).toEqual(1); + expect(events1[0]?.storageRentEventBody?.units).toEqual(1); + + await OnChainEventStore.clearEvents(db); + const events2 = await onChainEventStore.getOnChainEvents(OnChainEventType.EVENT_TYPE_STORAGE_RENT, 1); + + expect(events2.length).toEqual(0); + + await l2EventsProvider.retryEventsForFid(1); + await sleep(1000); // allow time for the rent event to be polled for + + const events3 = await onChainEventStore.getOnChainEvents(OnChainEventType.EVENT_TYPE_STORAGE_RENT, 1); + + expect(events3.length).toEqual(1); + expect(events3[0]?.fid).toEqual(1); + expect(events3[0]?.storageRentEventBody?.units).toEqual(1); + }, + TEST_TIMEOUT_LONG, + ); }); diff --git a/apps/hubble/src/eth/l2EventsProvider.ts b/apps/hubble/src/eth/l2EventsProvider.ts index 4d03f068c6..7b6bcc8db2 100644 --- a/apps/hubble/src/eth/l2EventsProvider.ts +++ b/apps/hubble/src/eth/l2EventsProvider.ts @@ -71,6 +71,12 @@ export class OptimismConstants { const RENT_EXPIRY_IN_SECONDS = 365 * 24 * 60 * 60; // One year +type EventSpecificArgs = { + eventKind: "Storage" | "KeyRegistry" | "IdRegistry"; + eventName: string; + fid: number; +}; + /** * Class that follows the Optimism chain to handle on-chain events from the Storage Registry contract. */ @@ -87,6 +93,7 @@ export class L2EventsProvider>; private _retryDedupMap: Map; + private _fidRetryDedupMap: Map; private _blockTimestampsCache: Map; private _lastBlockNumber: number; @@ -160,6 +167,7 @@ export class L2EventsProvider, ); } - async getIdRegistryEvents(fromBlock?: number, toBlock?: number, fid?: number) { + async getIdRegistryEvents(fromBlock?: number, toBlock?: number, eventSpecificArgs?: EventSpecificArgs) { const idV2LogsPromise = this.getContractEvents({ - ...this.getCommonFilterArgs(fromBlock, toBlock, fid), + ...this.getCommonFilterArgs(fromBlock, toBlock, eventSpecificArgs), address: this.idRegistryV2Address, abi: IdRegistry.abi, }); @@ -784,9 +812,9 @@ export class L2EventsProvider { if (result.isErr()) { // Try to request on chain event if it's missing // TODO(aditi): Do we just want to request all? If missing one likely to be missing all? - if (result.error.errCode === "bad_request.no_storage") { + if ( + result.error.errCode === "bad_request.no_storage" || + "bad_request.missing_signer" || + "bad_request.missing_fid" + ) { // TODO(aditi): Add timeout // TODO(aditi): Do we want a start and stop? - await this._l2EventsProvider?.getStorageEvents(undefined, undefined, fid); - } - if (result.error.errCode === "bad_request.missing_signer") { - await this._l2EventsProvider?.getKeyRegistryEvents(undefined, undefined, fid); - } - if (result.error.errCode === "bad_request.missing_fid") { - await this._l2EventsProvider?.getIdRegistryEvents(undefined, undefined, fid); + await this._l2EventsProvider?.retryEventsForFid(fid); } } mergeResults.set(validatedMessages[j]?.i as number, result); diff --git a/apps/hubble/src/storage/stores/onChainEventStore.ts b/apps/hubble/src/storage/stores/onChainEventStore.ts index 42a801de05..aaeb7deeb6 100644 --- a/apps/hubble/src/storage/stores/onChainEventStore.ts +++ b/apps/hubble/src/storage/stores/onChainEventStore.ts @@ -185,6 +185,7 @@ class OnChainEventStore { if (result.isErr()) { throw result.error; } + return result.value; } From 692cfbc13474886329b09944bf2a387518d8a6be Mon Sep 17 00:00:00 2001 From: Aditi Srinivasan Date: Sat, 7 Sep 2024 20:21:15 -0400 Subject: [PATCH 3/6] implement support for id registry and key registry --- apps/hubble/src/eth/l2EventsProvider.test.ts | 40 ++++++++++---------- apps/hubble/src/eth/l2EventsProvider.ts | 25 ++++++++++-- apps/hubble/src/storage/engine/index.ts | 3 +- 3 files changed, 44 insertions(+), 24 deletions(-) diff --git a/apps/hubble/src/eth/l2EventsProvider.test.ts b/apps/hubble/src/eth/l2EventsProvider.test.ts index 4ebbf95fc9..c93fb969a4 100644 --- a/apps/hubble/src/eth/l2EventsProvider.test.ts +++ b/apps/hubble/src/eth/l2EventsProvider.test.ts @@ -7,15 +7,8 @@ import { deployStorageRegistry, publicClient, testClient, walletClientWithAccoun import { accounts } from "../test/constants.js"; import { sleep } from "../utils/crypto.js"; import { L2EventsProvider, OptimismConstants } from "./l2EventsProvider.js"; -import { CreateContractEventFilterParameters, Transport } from "viem"; -import { - createContractEventFilter, - getBlockNumber, - mine, - simulateContract, - waitForTransactionReceipt, - writeContract, -} from "viem/actions"; +import { Transport } from "viem"; +import { getBlockNumber, mine, simulateContract, waitForTransactionReceipt, writeContract } from "viem/actions"; import OnChainEventStore from "../storage/stores/onChainEventStore.js"; import StoreEventHandler from "../storage/stores/storeEventHandler.js"; @@ -207,7 +200,7 @@ describe("process events", () => { TEST_TIMEOUT_LONG, ); - // TODO(aditi): Impl and add tests for key registry and id registry + // TODO(aditi): It's pretty high overhead to set up tests with the id registry and key registry -- need to deploy contracts (need bytecode). Testing via the storage contract tests most of the meaningful logic. test( "retry by fid", async () => { @@ -219,6 +212,7 @@ describe("process events", () => { args: [BigInt(1), BigInt(1)], }); + // Set up the storage rent event const rentHash = await writeContract(walletClientWithAccount, rentSim.request); const rentTrx = await waitForTransactionReceipt(publicClient, { hash: rentHash }); await sleep(1000); // allow time for the rent event to be polled for @@ -230,19 +224,27 @@ describe("process events", () => { expect(events1[0]?.fid).toEqual(1); expect(events1[0]?.storageRentEventBody?.units).toEqual(1); - await OnChainEventStore.clearEvents(db); - const events2 = await onChainEventStore.getOnChainEvents(OnChainEventType.EVENT_TYPE_STORAGE_RENT, 1); + const clearAndRetryForFid = async () => { + // Clear on chain events and show that they get re-ingested when retrying by fid + await OnChainEventStore.clearEvents(db); + const events = await onChainEventStore.getOnChainEvents(OnChainEventType.EVENT_TYPE_STORAGE_RENT, 1); - expect(events2.length).toEqual(0); + expect(events.length).toEqual(0); - await l2EventsProvider.retryEventsForFid(1); - await sleep(1000); // allow time for the rent event to be polled for + await l2EventsProvider.retryEventsForFid(1); + await sleep(1000); // allow time for the rent event to be polled for + return await onChainEventStore.getOnChainEvents(OnChainEventType.EVENT_TYPE_STORAGE_RENT, 1); + }; + + const events2 = await clearAndRetryForFid(); - const events3 = await onChainEventStore.getOnChainEvents(OnChainEventType.EVENT_TYPE_STORAGE_RENT, 1); + expect(events2.length).toEqual(1); + expect(events2[0]?.fid).toEqual(1); + expect(events2[0]?.storageRentEventBody?.units).toEqual(1); - expect(events3.length).toEqual(1); - expect(events3[0]?.fid).toEqual(1); - expect(events3[0]?.storageRentEventBody?.units).toEqual(1); + // After retrying once, we don't retry again + const events3 = await clearAndRetryForFid(); + expect(events3.length).toEqual(0); }, TEST_TIMEOUT_LONG, ); diff --git a/apps/hubble/src/eth/l2EventsProvider.ts b/apps/hubble/src/eth/l2EventsProvider.ts index 7b6bcc8db2..31d390283a 100644 --- a/apps/hubble/src/eth/l2EventsProvider.ts +++ b/apps/hubble/src/eth/l2EventsProvider.ts @@ -674,13 +674,32 @@ export class L2EventsProvider { } if (result.isErr()) { // Try to request on chain event if it's missing - // TODO(aditi): Do we just want to request all? If missing one likely to be missing all? if ( result.error.errCode === "bad_request.no_storage" || "bad_request.missing_signer" || "bad_request.missing_fid" ) { // TODO(aditi): Add timeout - // TODO(aditi): Do we want a start and stop? + // TODO(aditi): Do we just want to request all or only the appropriate event for the error message. Seems worth just requesting all. Simplifies the dedup logic too. await this._l2EventsProvider?.retryEventsForFid(fid); } } From 1508b0b8126905ce308caaa3964badb401d14ad6 Mon Sep 17 00:00:00 2001 From: Aditi Srinivasan Date: Tue, 10 Sep 2024 10:26:37 -0400 Subject: [PATCH 4/6] cleanups and bugfixes --- apps/hubble/src/eth/l2EventsProvider.test.ts | 1 - apps/hubble/src/eth/l2EventsProvider.ts | 110 +++++++++++------- apps/hubble/src/hubble.ts | 1 + apps/hubble/src/rpc/test/httpServer.test.ts | 2 +- .../src/rpc/test/messageService.test.ts | 6 +- apps/hubble/src/storage/engine/index.test.ts | 26 +++-- apps/hubble/src/storage/engine/index.ts | 87 +++++++------- packages/core/src/errors.ts | 4 +- 8 files changed, 139 insertions(+), 98 deletions(-) diff --git a/apps/hubble/src/eth/l2EventsProvider.test.ts b/apps/hubble/src/eth/l2EventsProvider.test.ts index c93fb969a4..12994bff45 100644 --- a/apps/hubble/src/eth/l2EventsProvider.test.ts +++ b/apps/hubble/src/eth/l2EventsProvider.test.ts @@ -200,7 +200,6 @@ describe("process events", () => { TEST_TIMEOUT_LONG, ); - // TODO(aditi): It's pretty high overhead to set up tests with the id registry and key registry -- need to deploy contracts (need bytecode). Testing via the storage contract tests most of the meaningful logic. test( "retry by fid", async () => { diff --git a/apps/hubble/src/eth/l2EventsProvider.ts b/apps/hubble/src/eth/l2EventsProvider.ts index 31d390283a..3fb2c4f928 100644 --- a/apps/hubble/src/eth/l2EventsProvider.ts +++ b/apps/hubble/src/eth/l2EventsProvider.ts @@ -70,9 +70,9 @@ export class OptimismConstants { } const RENT_EXPIRY_IN_SECONDS = 365 * 24 * 60 * 60; // One year +const FID_RETRY_DEDUP_LOOKBACK_MS = 60 * 60 * 1000; // One hour type EventSpecificArgs = { - eventKind: "Storage" | "KeyRegistry" | "IdRegistry"; eventName: string; fid: number; }; @@ -168,6 +168,11 @@ export class L2EventsProvider { + this._fidRetryDedupMap.clear(); + }, FID_RETRY_DEDUP_LOOKBACK_MS); + this._blockTimestampsCache = new Map(); this.setAddresses(storageRegistryAddress, keyRegistryV2Address, idRegistryV2Address); @@ -669,36 +674,51 @@ export class L2EventsProvider, @@ -824,6 +844,9 @@ export class L2EventsProvider { const response = (e as any).response; expect(response.status).toBe(400); - expect(response.data.errCode).toEqual("bad_request.validation_failure"); + expect(response.data.errCode).toEqual("bad_request.unknown_fid"); expect(response.data.details).toMatch("unknown fid"); } expect(errored).toBeTruthy(); diff --git a/apps/hubble/src/rpc/test/messageService.test.ts b/apps/hubble/src/rpc/test/messageService.test.ts index 0e4240cd67..b32e69566c 100644 --- a/apps/hubble/src/rpc/test/messageService.test.ts +++ b/apps/hubble/src/rpc/test/messageService.test.ts @@ -105,7 +105,7 @@ describe("submitMessage", () => { test("fails without signer", async () => { const result = await client.submitMessage(castAdd); const err = result._unsafeUnwrapErr(); - expect(err.errCode).toEqual("bad_request.validation_failure"); + expect(err.errCode).toEqual("bad_request.unknown_fid"); expect(err.message).toMatch("unknown fid"); }); }); @@ -134,12 +134,12 @@ describe("validateMessage", () => { test("fails without signer", async () => { const castResult = await client.submitMessage(castAdd); const castErr = castResult._unsafeUnwrapErr(); - expect(castErr.errCode).toEqual("bad_request.validation_failure"); + expect(castErr.errCode).toEqual("bad_request.unknown_fid"); expect(castErr.message).toMatch("unknown fid"); const frameResult = await client.submitMessage(frameAction); const frameErr = frameResult._unsafeUnwrapErr(); - expect(frameErr.errCode).toEqual("bad_request.validation_failure"); + expect(frameErr.errCode).toEqual("bad_request.unknown_fid"); expect(frameErr.message).toMatch("unknown fid"); }); }); diff --git a/apps/hubble/src/storage/engine/index.test.ts b/apps/hubble/src/storage/engine/index.test.ts index cd652757dc..831fe75c0e 100644 --- a/apps/hubble/src/storage/engine/index.test.ts +++ b/apps/hubble/src/storage/engine/index.test.ts @@ -67,7 +67,13 @@ const fNameProvider = new FNameRegistryEventsProvider( } as any, false, ); -engine = new Engine(db, network, undefined, publicClient, undefined, fNameProvider); + +// biome-ignore lint/suspicious/noExplicitAny: mock used only in tests +const l2EventsProvider = jest.fn() as any; +l2EventsProvider.retryEventsForFid = jest.fn(); +const retryEventsForFidMock = l2EventsProvider.retryEventsForFid; + +engine = new Engine(db, network, undefined, publicClient, undefined, fNameProvider, l2EventsProvider); const fid = Factories.Fid.build(); const fname = Factories.Fname.build(); @@ -656,7 +662,7 @@ describe("mergeMessage", () => { await engine.mergeOnChainEvent(Factories.SignerOnChainEvent.build({ fid })); const result = await engine.mergeMessage(reactionAdd); - expect(result).toMatchObject(err({ errCode: "bad_request.validation_failure" })); + expect(result).toMatchObject(err({ errCode: "bad_request.unknown_signer" })); expect(result._unsafeUnwrapErr().message).toMatch("invalid signer"); }); @@ -666,7 +672,7 @@ describe("mergeMessage", () => { await engine.mergeOnChainEvent(Factories.SignerOnChainEvent.build({ fid })); const result = await engine.mergeMessage(linkAdd); - expect(result).toMatchObject(err({ errCode: "bad_request.validation_failure" })); + expect(result).toMatchObject(err({ errCode: "bad_request.unknown_signer" })); expect(result._unsafeUnwrapErr().message).toMatch("invalid signer"); }); }); @@ -677,7 +683,7 @@ describe("mergeMessage", () => { afterEach(async () => { const result = await engine.mergeMessage(message); const err = result._unsafeUnwrapErr(); - expect(err.errCode).toEqual("bad_request.validation_failure"); + expect(err.errCode).toEqual("bad_request.unknown_fid"); expect(err.message).toMatch("unknown fid"); }); @@ -1098,7 +1104,8 @@ describe("mergeMessages", () => { expect(results.size).toBe(3); expect(results.get(0)).toBeInstanceOf(Ok); - expect(results.get(1)).toMatchObject(err({ errCode: "bad_request.validation_failure" })); + expect(results.get(1)).toMatchObject(err({ errCode: "bad_request.unknown_fid" })); + expect(retryEventsForFidMock).toHaveBeenLastCalledWith(0); expect(results.get(2)).toBeInstanceOf(Ok); const fid2 = Factories.Fid.build(); @@ -1123,14 +1130,16 @@ describe("mergeMessages", () => { expect(results.size).toBe(2); expect(results.get(0)).toBeInstanceOf(Ok); - expect(results.get(1)).toMatchObject(err({ errCode: "bad_request.validation_failure" })); + expect(results.get(1)).toMatchObject(err({ errCode: "bad_request.unknown_fid" })); + expect(retryEventsForFidMock).toHaveBeenLastCalledWith(fid2); // Add custody address, but adding without signer is invalid await engine.mergeOnChainEvent(custodyEvent2); results = await engine.mergeMessages([castAdd2, linkAdd]); expect(results.size).toBe(2); - expect(results.get(0)).toMatchObject(err({ errCode: "bad_request.validation_failure" })); + expect(results.get(0)).toMatchObject(err({ errCode: "bad_request.unknown_signer" })); + expect(retryEventsForFidMock).toHaveBeenLastCalledWith(fid2); expect(results.get(1)).toBeInstanceOf(Ok); // Add signer address, but adding without storage is invalid @@ -1139,7 +1148,8 @@ describe("mergeMessages", () => { expect(results.size).toBe(2); expect(results.get(0)).toBeInstanceOf(Ok); - expect(results.get(1)).toMatchObject(err({ errCode: "bad_request.prunable" })); + expect(results.get(1)).toMatchObject(err({ errCode: "bad_request.no_storage" })); + expect(retryEventsForFidMock).toHaveBeenLastCalledWith(fid2); // Add the storage event, and now it should merge await engine.mergeOnChainEvent(storageEvent2); diff --git a/apps/hubble/src/storage/engine/index.ts b/apps/hubble/src/storage/engine/index.ts index 5da346ac92..8b5effd7da 100644 --- a/apps/hubble/src/storage/engine/index.ts +++ b/apps/hubble/src/storage/engine/index.ts @@ -283,6 +283,33 @@ class Engine extends TypedEmitter { } } + async computeMergeResult(message: Message, i: number) { + const fid = message.data?.fid ?? 0; + const validatedMessage = await this.validateMessage(message); + if (validatedMessage.isErr()) { + return err(validatedMessage.error); + } + + const storageSlot = await this.eventHandler.getCurrentStorageSlotForFid(fid); + if (storageSlot.isErr()) { + return err(storageSlot.error); + } + + const totalUnits = storageSlot.value.legacy_units + storageSlot.value.units; + if (totalUnits === 0) { + return err(new HubError("bad_request.no_storage", "no storage")); + } + + const limiter = getRateLimiterForTotalMessages(totalUnits * this._totalPruneSize); + const isRateLimited = await isRateLimitedByKey(`${fid}`, limiter); + if (isRateLimited) { + log.warn({ fid }, "rate limit exceeded for FID"); + return err(new HubError("unavailable", `rate limit exceeded for FID ${fid}`)); + } + + return ok({ i, fid, limiter, message }); + } + async mergeMessages(messages: Message[]): Promise>> { const mergeResults: Map> = new Map(); const validatedMessages: IndexedMessage[] = []; @@ -290,38 +317,24 @@ class Engine extends TypedEmitter { // Validate all messages first await Promise.all( messages.map(async (message, i) => { - const validatedMessage = await this.validateMessage(message); - if (validatedMessage.isErr()) { - mergeResults.set(i, err(validatedMessage.error)); - return; - } - // Extract the FID that this message was signed by - const fid = message.data?.fid ?? 0; - const storageSlot = await this.eventHandler.getCurrentStorageSlotForFid(fid); - - if (storageSlot.isErr()) { - mergeResults.set(i, err(storageSlot.error)); - return; - } - - const totalUnits = storageSlot.value.legacy_units + storageSlot.value.units; - - if (totalUnits === 0) { - mergeResults.set(i, err(new HubError("bad_request.no_storage", "no storage"))); - return; - } - // We rate limit the number of messages that can be merged per FID - const limiter = getRateLimiterForTotalMessages(totalUnits * this._totalPruneSize); - const isRateLimited = await isRateLimitedByKey(`${fid}`, limiter); - if (isRateLimited) { - log.warn({ fid }, "rate limit exceeded for FID"); - mergeResults.set(i, err(new HubError("unavailable", `rate limit exceeded for FID ${fid}`))); - return; + const result = await this.computeMergeResult(message, i); + if (result.isErr()) { + mergeResults.set(i, result); + // Try to request on chain event if it's missing + if ( + result.error.errCode === "bad_request.no_storage" || + "bad_request.unknown_signer" || + "bad_request.missing_fid" + ) { + const fid = message.data?.fid ?? 0; + // Don't await because we don't want to block hubs from processing new messages. + this._l2EventsProvider?.retryEventsForFid(fid); + } + } else { + validatedMessages.push(result.value); } - - validatedMessages.push({ i, fid, limiter, message }); }), ); @@ -336,18 +349,6 @@ class Engine extends TypedEmitter { if (result.isOk() && limiter) { consumeRateLimitByKey(`${fid}`, limiter); } - if (result.isErr()) { - // Try to request on chain event if it's missing - if ( - result.error.errCode === "bad_request.no_storage" || - "bad_request.missing_signer" || - "bad_request.missing_fid" - ) { - // TODO(aditi): Add timeout - // TODO(aditi): Do we just want to request all or only the appropriate event for the error message. Seems worth just requesting all. Simplifies the dedup logic too. - await this._l2EventsProvider?.retryEventsForFid(fid); - } - } mergeResults.set(validatedMessages[j]?.i as number, result); } @@ -1233,7 +1234,7 @@ class Engine extends TypedEmitter { } if (!custodyAddress) { - return err(new HubError("bad_request.missing_fid", `unknown fid: ${message.data.fid}`)); + return err(new HubError("bad_request.unknown_fid", `unknown fid: ${message.data.fid}`)); } // 4. Check that the signer is valid @@ -1247,7 +1248,7 @@ class Engine extends TypedEmitter { return hex.andThen((signerHex) => { return err( new HubError( - "bad_request.missing_signer", + "bad_request.unknown_signer", `invalid signer: signer ${signerHex} not found for fid ${message.data?.fid}`, ), ); diff --git a/packages/core/src/errors.ts b/packages/core/src/errors.ts index 8f8dd921f9..8a05e64dd3 100644 --- a/packages/core/src/errors.ts +++ b/packages/core/src/errors.ts @@ -69,12 +69,12 @@ export type HubErrorCode = | "bad_request.parse_failure" | "bad_request.invalid_param" | "bad_request.validation_failure" - | "bad_request.missing_signer" + | "bad_request.unknown_signer" | "bad_request.duplicate" | "bad_request.conflict" | "bad_request.prunable" | "bad_request.no_storage" - | "bad_request.missing_fid" + | "bad_request.unknown_fid" /* The requested resource could not be found */ | "not_found" /* The request could not be completed because the operation is not executable */ From 9f0cdb8694955d69e6734e5de4fb88ae06a6cbfe Mon Sep 17 00:00:00 2001 From: Aditi Srinivasan Date: Sun, 15 Sep 2024 20:03:03 -0400 Subject: [PATCH 5/6] changeset --- .changeset/gold-geckos-remain.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changeset/gold-geckos-remain.md diff --git a/.changeset/gold-geckos-remain.md b/.changeset/gold-geckos-remain.md new file mode 100644 index 0000000000..324f9fcb02 --- /dev/null +++ b/.changeset/gold-geckos-remain.md @@ -0,0 +1,6 @@ +--- +"@farcaster/core": patch +"@farcaster/hubble": patch +--- + +feat: request missing on chain events on related submit message errors From cebffb703d29e8c64edb733942cbf92aa9ba0d9a Mon Sep 17 00:00:00 2001 From: Aditi Srinivasan Date: Sun, 15 Sep 2024 20:23:24 -0400 Subject: [PATCH 6/6] fix metrics --- apps/hubble/src/eth/l2EventsProvider.ts | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/apps/hubble/src/eth/l2EventsProvider.ts b/apps/hubble/src/eth/l2EventsProvider.ts index 3fb2c4f928..5927123674 100644 --- a/apps/hubble/src/eth/l2EventsProvider.ts +++ b/apps/hubble/src/eth/l2EventsProvider.ts @@ -313,6 +313,7 @@ export class L2EventsProvider