From dbf1f1558962014649b12d435823d26a0f3219cb Mon Sep 17 00:00:00 2001 From: Sanjay Date: Tue, 1 Oct 2024 15:57:21 -0700 Subject: [PATCH] fix: Split sync health job into 10 min chunks to limit memory usage (#2345) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Why is this change needed? Hubs might be running into memory pressure because divergingSyncIds is examining too many messages at once. Chunk sync health job to 10 minute intervals to reduce memory usage. ## Merge Checklist _Choose all relevant options below by adding an `x` now or at any time before submitting for review_ - [x] PR title adheres to the [conventional commits](https://www.conventionalcommits.org/en/v1.0.0/) standard - [x] PR has a [changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets) - [x] PR has been tagged with a change label(s) (i.e. documentation, feature, bugfix, or chore) - [ ] PR includes [documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs) if necessary. --- ## PR-Codex overview This PR focuses on optimizing the `syncHealthJob` by splitting the processing of sync health messages into 10-minute intervals to reduce memory usage. ### Detailed summary - Introduced a loop to process `syncHealthMessageStats` in 10-minute chunks. - Adjusted the handling of `syncHealthMessageStats` and error logging. - Updated the process for pushing diverging sync IDs and logging results. - Improved logging details for computed sync health stats. > ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your question}` --- .changeset/gentle-cobras-jam.md | 5 + apps/hubble/src/network/sync/syncHealthJob.ts | 95 ++++++++++--------- 2 files changed, 55 insertions(+), 45 deletions(-) create mode 100644 .changeset/gentle-cobras-jam.md diff --git a/.changeset/gentle-cobras-jam.md b/.changeset/gentle-cobras-jam.md new file mode 100644 index 0000000000..6c65405317 --- /dev/null +++ b/.changeset/gentle-cobras-jam.md @@ -0,0 +1,5 @@ +--- +"@farcaster/hubble": patch +--- + +fix: Split sync health job into 10 min chunks to limit memory usage diff --git a/apps/hubble/src/network/sync/syncHealthJob.ts b/apps/hubble/src/network/sync/syncHealthJob.ts index a8950e2f2a..dcde887333 100644 --- a/apps/hubble/src/network/sync/syncHealthJob.ts +++ b/apps/hubble/src/network/sync/syncHealthJob.ts @@ -213,58 +213,63 @@ export class MeasureSyncHealthJobScheduler { const syncHealthProbe = new SyncHealthProbe(this._metadataRetriever, peerMetadataRetriever); - const syncHealthMessageStats = await syncHealthProbe.computeSyncHealthMessageStats( - new Date(startTime), - new Date(stopTime), - ); + // Split the start and stop time into 10 minute intervals, so we don't have to process too many messages at once + const interval = 10 * 60 * 1000; // 10 minutes in milliseconds + for (let chunkStartTime = startTime; chunkStartTime < stopTime; chunkStartTime += interval) { + const chunkStopTime = Math.min(chunkStartTime + interval, stopTime); + const syncHealthMessageStats = await syncHealthProbe.computeSyncHealthMessageStats( + new Date(chunkStartTime), + new Date(chunkStopTime), + ); - if (syncHealthMessageStats.isErr()) { - const contactInfo = this.contactInfoForLogs(peer); - log.info( - { - peerId: peer.identifier, - err: syncHealthMessageStats.error, - contactInfo, - }, - `Error computing SyncHealth: ${syncHealthMessageStats.error}.`, + if (syncHealthMessageStats.isErr()) { + const contactInfo = this.contactInfoForLogs(peer); + log.info( + { + peerId: peer.identifier, + err: syncHealthMessageStats.error, + contactInfo, + }, + `Error computing SyncHealth: ${syncHealthMessageStats.error}.`, + ); + continue; + } + + const resultsPushingToUs = await syncHealthProbe.tryPushingDivergingSyncIds( + new Date(chunkStartTime), + new Date(chunkStopTime), + "FromPeer", ); - continue; - } - const resultsPushingToUs = await syncHealthProbe.tryPushingDivergingSyncIds( - new Date(startTime), - new Date(stopTime), - "FromPeer", - ); + if (resultsPushingToUs.isErr()) { + log.info( + { peerId: peer.identifier, err: resultsPushingToUs.error }, + `Error pushing new messages to ourself ${resultsPushingToUs.error}`, + ); + continue; + } + + const processedResults = await this.processSumbitResults( + resultsPushingToUs.value, + peer.identifier, + chunkStartTime, + chunkStopTime, + ); - if (resultsPushingToUs.isErr()) { log.info( - { peerId: peer.identifier, err: resultsPushingToUs.error }, - `Error pushing new messages to ourself ${resultsPushingToUs.error}`, + { + ourNumMessages: syncHealthMessageStats.value.primaryNumMessages, + theirNumMessages: syncHealthMessageStats.value.peerNumMessages, + syncHealth: syncHealthMessageStats.value.computeDiff(), + syncHealthPercentage: syncHealthMessageStats.value.computeDiffPercentage(), + resultsPushingToUs: processedResults, + peerId: peer.identifier, + startTime: chunkStartTime, + stopTime: chunkStopTime, + }, + "Computed SyncHealth stats for peer", ); - continue; } - - const processedResults = await this.processSumbitResults( - resultsPushingToUs.value, - peer.identifier, - startTime, - stopTime, - ); - - log.info( - { - ourNumMessages: syncHealthMessageStats.value.primaryNumMessages, - theirNumMessages: syncHealthMessageStats.value.peerNumMessages, - syncHealth: syncHealthMessageStats.value.computeDiff(), - syncHealthPercentage: syncHealthMessageStats.value.computeDiffPercentage(), - resultsPushingToUs: processedResults, - peerId: peer.identifier, - startTime, - stopTime, - }, - "Computed SyncHealth stats for peer", - ); } } }