diff --git a/.changeset/gold-geckos-remain.md b/.changeset/gold-geckos-remain.md new file mode 100644 index 0000000000..324f9fcb02 --- /dev/null +++ b/.changeset/gold-geckos-remain.md @@ -0,0 +1,6 @@ +--- +"@farcaster/core": patch +"@farcaster/hubble": patch +--- + +feat: request missing on chain events on related submit message errors diff --git a/apps/hubble/src/eth/l2EventsProvider.test.ts b/apps/hubble/src/eth/l2EventsProvider.test.ts index 5515a4d2e5..12994bff45 100644 --- a/apps/hubble/src/eth/l2EventsProvider.test.ts +++ b/apps/hubble/src/eth/l2EventsProvider.test.ts @@ -199,4 +199,52 @@ describe("process events", () => { }, TEST_TIMEOUT_LONG, ); + + test( + "retry by fid", + async () => { + const rentSim = await simulateContract(publicClient, { + address: storageRegistryAddress, + abi: StorageRegistry.abi, + functionName: "credit", + account: accounts[0].address, + args: [BigInt(1), BigInt(1)], + }); + + // Set up the storage rent event + const rentHash = await writeContract(walletClientWithAccount, rentSim.request); + const rentTrx = await waitForTransactionReceipt(publicClient, { hash: rentHash }); + await sleep(1000); // allow time for the rent event to be polled for + await mine(testClient, { blocks: L2EventsProvider.numConfirmations }); + await waitForBlock(Number(rentTrx.blockNumber) + L2EventsProvider.numConfirmations); + + const events1 = await onChainEventStore.getOnChainEvents(OnChainEventType.EVENT_TYPE_STORAGE_RENT, 1); + expect(events1.length).toEqual(1); + expect(events1[0]?.fid).toEqual(1); + expect(events1[0]?.storageRentEventBody?.units).toEqual(1); + + const clearAndRetryForFid = async () => { + // Clear on chain events and show that they get re-ingested when retrying by fid + await OnChainEventStore.clearEvents(db); + const events = await onChainEventStore.getOnChainEvents(OnChainEventType.EVENT_TYPE_STORAGE_RENT, 1); + + expect(events.length).toEqual(0); + + await l2EventsProvider.retryEventsForFid(1); + await sleep(1000); // allow time for the rent event to be polled for + return await onChainEventStore.getOnChainEvents(OnChainEventType.EVENT_TYPE_STORAGE_RENT, 1); + }; + + const events2 = await clearAndRetryForFid(); + + expect(events2.length).toEqual(1); + expect(events2[0]?.fid).toEqual(1); + expect(events2[0]?.storageRentEventBody?.units).toEqual(1); + + // After retrying once, we don't retry again + const events3 = await clearAndRetryForFid(); + expect(events3.length).toEqual(0); + }, + TEST_TIMEOUT_LONG, + ); }); diff --git a/apps/hubble/src/eth/l2EventsProvider.ts b/apps/hubble/src/eth/l2EventsProvider.ts index 68600ad70f..5927123674 100644 --- a/apps/hubble/src/eth/l2EventsProvider.ts +++ b/apps/hubble/src/eth/l2EventsProvider.ts @@ -70,6 +70,12 @@ export class OptimismConstants { } const RENT_EXPIRY_IN_SECONDS = 365 * 24 * 60 * 60; // One year +const FID_RETRY_DEDUP_LOOKBACK_MS = 60 * 60 * 1000; // One hour + +type EventSpecificArgs = { + eventName: string; + fid: number; +}; /** * Class that follows the Optimism chain to handle on-chain events from the Storage Registry contract. @@ -87,6 +93,7 @@ export class L2EventsProvider>; private _retryDedupMap: Map; + private _fidRetryDedupMap: Map; private _blockTimestampsCache: Map; private _lastBlockNumber: number; @@ -160,6 +167,12 @@ export class L2EventsProvider { + this._fidRetryDedupMap.clear(); + }, FID_RETRY_DEDUP_LOOKBACK_MS); + this._blockTimestampsCache = new Map(); this.setAddresses(storageRegistryAddress, keyRegistryV2Address, idRegistryV2Address); @@ -300,6 +313,7 @@ export class L2EventsProvider, + ); + } + + async getIdRegistryEvents(fromBlock?: number, toBlock?: number, eventSpecificArgs?: EventSpecificArgs) { + const idV2LogsPromise = this.getContractEvents({ + ...this.getCommonFilterArgs(fromBlock, toBlock, eventSpecificArgs), + address: this.idRegistryV2Address, + abi: IdRegistry.abi, + args: { + id: this.getEventSpecificFid(eventSpecificArgs) /* The fid parameter is named "id" in the IdRegistry events */, + }, + }); + + await this.processIdRegistryV2Events( + (await idV2LogsPromise) as WatchContractEventOnLogsParameter, + ); + } + + async getKeyRegistryEvents(fromBlock?: number, toBlock?: number, eventSpecificArgs?: EventSpecificArgs) { + const keyV2LogsPromise = this.getContractEvents({ + ...this.getCommonFilterArgs(fromBlock, toBlock, eventSpecificArgs), + address: this.keyRegistryV2Address, + abi: KeyRegistry.abi, + args: { fid: this.getEventSpecificFid(eventSpecificArgs) }, + }); + + await this.processKeyRegistryEventsV2( + (await keyV2LogsPromise) as WatchContractEventOnLogsParameter, + ); + } + /** * Sync old Storage events that may have happened before hub was started. We'll put them all * in the sync queue to be processed later, to make sure we don't process any unconfirmed events. */ - private async syncHistoricalEvents(fromBlock: number, toBlock: number, batchSize: number) { + private async syncHistoricalEvents( + fromBlock: number, + toBlock: number, + batchSize: number, // This batch size is enforced by us internally, not by the RPC provider + byEventKind?: { + StorageRegistry: EventSpecificArgs[]; + IdRegistry: EventSpecificArgs[]; + KeyRegistry: EventSpecificArgs[]; + }, + ) { if (!this.idRegistryV2Address || !this.keyRegistryV2Address || !this.storageRegistryAddress) { return; } @@ -789,41 +936,26 @@ export class L2EventsProvider, - ); - await this.processIdRegistryV2Events( - (await idV2LogsPromise) as WatchContractEventOnLogsParameter, - ); - await this.processKeyRegistryEventsV2( - (await keyV2LogsPromise) as WatchContractEventOnLogsParameter, - ); + for (const keyRegistryEventSpecific of byEventKind.KeyRegistry) { + await this.getKeyRegistryEvents(nextFromBlock, nextToBlock, keyRegistryEventSpecific); + } + } else { + statsd().increment("l2events.blocks", Math.min(toBlock, nextToBlock - nextFromBlock)); + await this.getStorageEvents(nextFromBlock, nextToBlock); + await this.getIdRegistryEvents(nextFromBlock, nextToBlock); + await this.getKeyRegistryEvents(nextFromBlock, nextToBlock); + } // Write out all the cached blocks first await this.writeCachedBlocks(toBlock); diff --git a/apps/hubble/src/hubble.ts b/apps/hubble/src/hubble.ts index c4fbf4fc2e..3eaf68e352 100644 --- a/apps/hubble/src/hubble.ts +++ b/apps/hubble/src/hubble.ts @@ -471,6 +471,7 @@ export class Hub implements HubInterface { mainnetClient, opClient as PublicClient, this.fNameRegistryEventsProvider, + this.l2RegistryProvider, ); const profileSync = options.profileSync ?? false; diff --git a/apps/hubble/src/rpc/test/httpServer.test.ts b/apps/hubble/src/rpc/test/httpServer.test.ts index 09bc6591af..91e859b838 100644 --- a/apps/hubble/src/rpc/test/httpServer.test.ts +++ b/apps/hubble/src/rpc/test/httpServer.test.ts @@ -273,7 +273,7 @@ describe("httpServer", () => { const response = (e as any).response; expect(response.status).toBe(400); - expect(response.data.errCode).toEqual("bad_request.validation_failure"); + expect(response.data.errCode).toEqual("bad_request.unknown_fid"); expect(response.data.details).toMatch("unknown fid"); } expect(errored).toBeTruthy(); diff --git a/apps/hubble/src/rpc/test/messageService.test.ts b/apps/hubble/src/rpc/test/messageService.test.ts index 0e4240cd67..b32e69566c 100644 --- a/apps/hubble/src/rpc/test/messageService.test.ts +++ b/apps/hubble/src/rpc/test/messageService.test.ts @@ -105,7 +105,7 @@ describe("submitMessage", () => { test("fails without signer", async () => { const result = await client.submitMessage(castAdd); const err = result._unsafeUnwrapErr(); - expect(err.errCode).toEqual("bad_request.validation_failure"); + expect(err.errCode).toEqual("bad_request.unknown_fid"); expect(err.message).toMatch("unknown fid"); }); }); @@ -134,12 +134,12 @@ describe("validateMessage", () => { test("fails without signer", async () => { const castResult = await client.submitMessage(castAdd); const castErr = castResult._unsafeUnwrapErr(); - expect(castErr.errCode).toEqual("bad_request.validation_failure"); + expect(castErr.errCode).toEqual("bad_request.unknown_fid"); expect(castErr.message).toMatch("unknown fid"); const frameResult = await client.submitMessage(frameAction); const frameErr = frameResult._unsafeUnwrapErr(); - expect(frameErr.errCode).toEqual("bad_request.validation_failure"); + expect(frameErr.errCode).toEqual("bad_request.unknown_fid"); expect(frameErr.message).toMatch("unknown fid"); }); }); diff --git a/apps/hubble/src/storage/engine/index.test.ts b/apps/hubble/src/storage/engine/index.test.ts index cd652757dc..831fe75c0e 100644 --- a/apps/hubble/src/storage/engine/index.test.ts +++ b/apps/hubble/src/storage/engine/index.test.ts @@ -67,7 +67,13 @@ const fNameProvider = new FNameRegistryEventsProvider( } as any, false, ); -engine = new Engine(db, network, undefined, publicClient, undefined, fNameProvider); + +// biome-ignore lint/suspicious/noExplicitAny: mock used only in tests +const l2EventsProvider = jest.fn() as any; +l2EventsProvider.retryEventsForFid = jest.fn(); +const retryEventsForFidMock = l2EventsProvider.retryEventsForFid; + +engine = new Engine(db, network, undefined, publicClient, undefined, fNameProvider, l2EventsProvider); const fid = Factories.Fid.build(); const fname = Factories.Fname.build(); @@ -656,7 +662,7 @@ describe("mergeMessage", () => { await engine.mergeOnChainEvent(Factories.SignerOnChainEvent.build({ fid })); const result = await engine.mergeMessage(reactionAdd); - expect(result).toMatchObject(err({ errCode: "bad_request.validation_failure" })); + expect(result).toMatchObject(err({ errCode: "bad_request.unknown_signer" })); expect(result._unsafeUnwrapErr().message).toMatch("invalid signer"); }); @@ -666,7 +672,7 @@ describe("mergeMessage", () => { await engine.mergeOnChainEvent(Factories.SignerOnChainEvent.build({ fid })); const result = await engine.mergeMessage(linkAdd); - expect(result).toMatchObject(err({ errCode: "bad_request.validation_failure" })); + expect(result).toMatchObject(err({ errCode: "bad_request.unknown_signer" })); expect(result._unsafeUnwrapErr().message).toMatch("invalid signer"); }); }); @@ -677,7 +683,7 @@ describe("mergeMessage", () => { afterEach(async () => { const result = await engine.mergeMessage(message); const err = result._unsafeUnwrapErr(); - expect(err.errCode).toEqual("bad_request.validation_failure"); + expect(err.errCode).toEqual("bad_request.unknown_fid"); expect(err.message).toMatch("unknown fid"); }); @@ -1098,7 +1104,8 @@ describe("mergeMessages", () => { expect(results.size).toBe(3); expect(results.get(0)).toBeInstanceOf(Ok); - expect(results.get(1)).toMatchObject(err({ errCode: "bad_request.validation_failure" })); + expect(results.get(1)).toMatchObject(err({ errCode: "bad_request.unknown_fid" })); + expect(retryEventsForFidMock).toHaveBeenLastCalledWith(0); expect(results.get(2)).toBeInstanceOf(Ok); const fid2 = Factories.Fid.build(); @@ -1123,14 +1130,16 @@ describe("mergeMessages", () => { expect(results.size).toBe(2); expect(results.get(0)).toBeInstanceOf(Ok); - expect(results.get(1)).toMatchObject(err({ errCode: "bad_request.validation_failure" })); + expect(results.get(1)).toMatchObject(err({ errCode: "bad_request.unknown_fid" })); + expect(retryEventsForFidMock).toHaveBeenLastCalledWith(fid2); // Add custody address, but adding without signer is invalid await engine.mergeOnChainEvent(custodyEvent2); results = await engine.mergeMessages([castAdd2, linkAdd]); expect(results.size).toBe(2); - expect(results.get(0)).toMatchObject(err({ errCode: "bad_request.validation_failure" })); + expect(results.get(0)).toMatchObject(err({ errCode: "bad_request.unknown_signer" })); + expect(retryEventsForFidMock).toHaveBeenLastCalledWith(fid2); expect(results.get(1)).toBeInstanceOf(Ok); // Add signer address, but adding without storage is invalid @@ -1139,7 +1148,8 @@ describe("mergeMessages", () => { expect(results.size).toBe(2); expect(results.get(0)).toBeInstanceOf(Ok); - expect(results.get(1)).toMatchObject(err({ errCode: "bad_request.prunable" })); + expect(results.get(1)).toMatchObject(err({ errCode: "bad_request.no_storage" })); + expect(retryEventsForFidMock).toHaveBeenLastCalledWith(fid2); // Add the storage event, and now it should merge await engine.mergeOnChainEvent(storageEvent2); diff --git a/apps/hubble/src/storage/engine/index.ts b/apps/hubble/src/storage/engine/index.ts index a6b524e604..8b5effd7da 100644 --- a/apps/hubble/src/storage/engine/index.ts +++ b/apps/hubble/src/storage/engine/index.ts @@ -77,6 +77,7 @@ import { RateLimiterAbstract, RateLimiterMemory } from "rate-limiter-flexible"; import { TypedEmitter } from "tiny-typed-emitter"; import { FNameRegistryEventsProvider } from "../../eth/fnameRegistryEventsProvider.js"; import { statsd } from "../../utils/statsd.js"; +import { L2EventsProvider } from "eth/l2EventsProvider.js"; export const NUM_VALIDATION_WORKERS = 2; @@ -126,6 +127,7 @@ class Engine extends TypedEmitter { private _publicClient: PublicClient | undefined; private _l2PublicClient: PublicClient | undefined; private _fNameRegistryEventsProvider: FNameRegistryEventsProvider | undefined; + private _l2EventsProvider: L2EventsProvider | undefined; private _linkStore: LinkStore; private _reactionStore: ReactionStore; @@ -157,6 +159,7 @@ class Engine extends TypedEmitter { publicClient?: PublicClient, l2PublicClient?: PublicClient, fNameRegistryEventsProvider?: FNameRegistryEventsProvider, + l2EventsProvider?: L2EventsProvider, ) { super(); this._db = db; @@ -164,6 +167,7 @@ class Engine extends TypedEmitter { this._publicClient = publicClient; this._l2PublicClient = l2PublicClient; this._fNameRegistryEventsProvider = fNameRegistryEventsProvider; + this._l2EventsProvider = l2EventsProvider; this.eventHandler = eventHandler ?? new StoreEventHandler(db); @@ -279,6 +283,33 @@ class Engine extends TypedEmitter { } } + async computeMergeResult(message: Message, i: number) { + const fid = message.data?.fid ?? 0; + const validatedMessage = await this.validateMessage(message); + if (validatedMessage.isErr()) { + return err(validatedMessage.error); + } + + const storageSlot = await this.eventHandler.getCurrentStorageSlotForFid(fid); + if (storageSlot.isErr()) { + return err(storageSlot.error); + } + + const totalUnits = storageSlot.value.legacy_units + storageSlot.value.units; + if (totalUnits === 0) { + return err(new HubError("bad_request.no_storage", "no storage")); + } + + const limiter = getRateLimiterForTotalMessages(totalUnits * this._totalPruneSize); + const isRateLimited = await isRateLimitedByKey(`${fid}`, limiter); + if (isRateLimited) { + log.warn({ fid }, "rate limit exceeded for FID"); + return err(new HubError("unavailable", `rate limit exceeded for FID ${fid}`)); + } + + return ok({ i, fid, limiter, message }); + } + async mergeMessages(messages: Message[]): Promise>> { const mergeResults: Map> = new Map(); const validatedMessages: IndexedMessage[] = []; @@ -286,38 +317,24 @@ class Engine extends TypedEmitter { // Validate all messages first await Promise.all( messages.map(async (message, i) => { - const validatedMessage = await this.validateMessage(message); - if (validatedMessage.isErr()) { - mergeResults.set(i, err(validatedMessage.error)); - return; - } - // Extract the FID that this message was signed by - const fid = message.data?.fid ?? 0; - const storageSlot = await this.eventHandler.getCurrentStorageSlotForFid(fid); - - if (storageSlot.isErr()) { - mergeResults.set(i, err(storageSlot.error)); - return; - } - - const totalUnits = storageSlot.value.legacy_units + storageSlot.value.units; - - if (totalUnits === 0) { - mergeResults.set(i, err(new HubError("bad_request.prunable", "no storage"))); - return; - } - // We rate limit the number of messages that can be merged per FID - const limiter = getRateLimiterForTotalMessages(totalUnits * this._totalPruneSize); - const isRateLimited = await isRateLimitedByKey(`${fid}`, limiter); - if (isRateLimited) { - log.warn({ fid }, "rate limit exceeded for FID"); - mergeResults.set(i, err(new HubError("unavailable", `rate limit exceeded for FID ${fid}`))); - return; + const result = await this.computeMergeResult(message, i); + if (result.isErr()) { + mergeResults.set(i, result); + // Try to request on chain event if it's missing + if ( + result.error.errCode === "bad_request.no_storage" || + "bad_request.unknown_signer" || + "bad_request.missing_fid" + ) { + const fid = message.data?.fid ?? 0; + // Don't await because we don't want to block hubs from processing new messages. + this._l2EventsProvider?.retryEventsForFid(fid); + } + } else { + validatedMessages.push(result.value); } - - validatedMessages.push({ i, fid, limiter, message }); }), ); @@ -1217,7 +1234,7 @@ class Engine extends TypedEmitter { } if (!custodyAddress) { - return err(new HubError("bad_request.validation_failure", `unknown fid: ${message.data.fid}`)); + return err(new HubError("bad_request.unknown_fid", `unknown fid: ${message.data.fid}`)); } // 4. Check that the signer is valid @@ -1231,7 +1248,7 @@ class Engine extends TypedEmitter { return hex.andThen((signerHex) => { return err( new HubError( - "bad_request.validation_failure", + "bad_request.unknown_signer", `invalid signer: signer ${signerHex} not found for fid ${message.data?.fid}`, ), ); diff --git a/apps/hubble/src/storage/stores/onChainEventStore.ts b/apps/hubble/src/storage/stores/onChainEventStore.ts index 42a801de05..aaeb7deeb6 100644 --- a/apps/hubble/src/storage/stores/onChainEventStore.ts +++ b/apps/hubble/src/storage/stores/onChainEventStore.ts @@ -185,6 +185,7 @@ class OnChainEventStore { if (result.isErr()) { throw result.error; } + return result.value; } diff --git a/packages/core/src/errors.ts b/packages/core/src/errors.ts index bfca347f3b..8a05e64dd3 100644 --- a/packages/core/src/errors.ts +++ b/packages/core/src/errors.ts @@ -69,9 +69,12 @@ export type HubErrorCode = | "bad_request.parse_failure" | "bad_request.invalid_param" | "bad_request.validation_failure" + | "bad_request.unknown_signer" | "bad_request.duplicate" | "bad_request.conflict" | "bad_request.prunable" + | "bad_request.no_storage" + | "bad_request.unknown_fid" /* The requested resource could not be found */ | "not_found" /* The request could not be completed because the operation is not executable */