Skip to content

Commit

Permalink
feat: use start and stop time range for reconciliation bulk queries (#…
Browse files Browse the repository at this point in the history
…2269)

## Why is this change needed?

Reconciliation takes a really long time and puts a lot of pressure on
hubs from read rpcs. Specifying a start and stop time range will reduce
the overhead of running reconciliation.

## Merge Checklist

_Choose all relevant options below by adding an `x` now or at any time
before submitting for review_

- [x] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [ ] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [x] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [ ] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.

<!-- start pr-codex -->

---

## PR-Codex overview
This PR introduces a time range option for reconciliation in the
`shuttle` package.

### Detailed summary
- Added `startTimestamp` and `stopTimestamp` parameters to
`MessageReconciliation` class
- Implemented time range filtering for message reconciliation
- Updated message retrieval methods to support time range filtering

> The following files were skipped due to too many changes:
`packages/shuttle/src/shuttle/messageReconciliation.ts`

> ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->
  • Loading branch information
aditiharini authored Aug 20, 2024
1 parent cc0d0a3 commit 70d1f83
Show file tree
Hide file tree
Showing 3 changed files with 268 additions and 25 deletions.
5 changes: 5 additions & 0 deletions .changeset/twelve-lies-type.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/shuttle": patch
---

feat: add time range option for reconciliation
149 changes: 149 additions & 0 deletions packages/shuttle/src/shuttle.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
MessageType,
MessagesResponse,
Metadata,
getFarcasterTime,
} from "@farcaster/hub-nodejs";
import {
RedisClient,
Expand Down Expand Up @@ -463,6 +464,154 @@ describe("shuttle", () => {
expect(missingFromDb).toMatchObject([false]);
});

test("reconciler takes start and stop time into account", async () => {
const startTimestamp = getFarcasterTime()._unsafeUnwrap();

const linkAddMessage = await Factories.LinkAddMessage.create(
{ data: { timestamp: startTimestamp } },
{ transient: { signer } },
);

const castAddMessage = await Factories.CastAddMessage.create({
data: { timestamp: startTimestamp - 1, fid: linkAddMessage.data.fid },
});

const verificationAddMessage = await Factories.CastAddMessage.create({
data: { timestamp: startTimestamp - 2, fid: linkAddMessage.data.fid },
});

await subscriber.processHubEvent(
HubEvent.create({
id: 1,
type: HubEventType.MERGE_MESSAGE,
mergeMessageBody: { message: verificationAddMessage },
}),
);
await subscriber.processHubEvent(
HubEvent.create({
id: 2,
type: HubEventType.MERGE_MESSAGE,
mergeMessageBody: { message: castAddMessage },
}),
);
await subscriber.processHubEvent(
HubEvent.create({
id: 3,
type: HubEventType.MERGE_MESSAGE,
mergeMessageBody: { message: linkAddMessage },
}),
);

// It's a hack, but mockito is not handling this well:
const mockRPCClient = {
getAllLinkMessagesByFid: async (_request: FidRequest, _metadata: Metadata, _options: Partial<CallOptions>) => {
return ok(
MessagesResponse.create({
messages: [linkAddMessage],
nextPageToken: undefined,
}),
);
},
getAllCastMessagesByFid: async (_request: FidRequest, _metadata: Metadata, _options: Partial<CallOptions>) => {
return ok(
MessagesResponse.create({
messages: [
/* Pretend this message is missing from the hub */
],
nextPageToken: undefined,
}),
);
},
getAllVerficationMessagesByFid: async (
_request: FidRequest,
_metadata: Metadata,
_options: Partial<CallOptions>,
) => {
return ok(
MessagesResponse.create({
messages: [verificationAddMessage],
nextPageToken: undefined,
}),
);
},
getAllReactionMessagesByFid: async (
_request: FidRequest,
_metadata: Metadata,
_options: Partial<CallOptions>,
) => {
return ok(
MessagesResponse.create({
messages: [],
nextPageToken: undefined,
}),
);
},
getLinkCompactStateMessageByFid: async (
_request: FidRequest,
_metadata: Metadata,
_options: Partial<CallOptions>,
) => {
return ok(
MessagesResponse.create({
messages: [],
nextPageToken: undefined,
}),
);
},
getAllVerificationMessagesByFid: async (
_request: FidRequest,
_metadata: Metadata,
_options: Partial<CallOptions>,
) => {
return ok(
MessagesResponse.create({
messages: [],
nextPageToken: undefined,
}),
);
},
getAllUserDataMessagesByFid: async (
_request: FidRequest,
_metadata: Metadata,
_options: Partial<CallOptions>,
) => {
return ok(
MessagesResponse.create({
messages: [],
nextPageToken: undefined,
}),
);
},
};

// Only include 2 of the 3 messages in the time window
const reconciler = new MessageReconciliation(mockRPCClient as unknown as HubRpcClient, db, log);
const messagesOnHub: Message[] = [];
const messagesInDb: {
hash: Uint8Array;
prunedAt: Date | null;
revokedAt: Date | null;
fid: number;
type: MessageType;
raw: Uint8Array;
signer: Uint8Array;
}[] = [];
await reconciler.reconcileMessagesForFid(
linkAddMessage.data.fid,
async (msg, _missing, _pruned, _revoked) => {
messagesOnHub.push(msg);
},
async (dbMsg, _missing) => {
messagesInDb.push(dbMsg);
},
startTimestamp - 1,
startTimestamp,
);

expect(messagesOnHub.length).toBe(1);
expect(messagesInDb.length).toBe(2);
});

test("marks messages as pruned", async () => {
const addMessage = await Factories.ReactionAddMessage.create({}, { transient: { signer } });
subscriber.addMessageCallback((msg, operation, state, isNew, wasMissed) => {
Expand Down
Loading

0 comments on commit 70d1f83

Please sign in to comment.