diff --git a/.changeset/gentle-cobras-jam.md b/.changeset/gentle-cobras-jam.md new file mode 100644 index 0000000000..6c65405317 --- /dev/null +++ b/.changeset/gentle-cobras-jam.md @@ -0,0 +1,5 @@ +--- +"@farcaster/hubble": patch +--- + +fix: Split sync health job into 10 min chunks to limit memory usage diff --git a/apps/hubble/src/network/sync/syncHealthJob.ts b/apps/hubble/src/network/sync/syncHealthJob.ts index a8950e2f2a..dcde887333 100644 --- a/apps/hubble/src/network/sync/syncHealthJob.ts +++ b/apps/hubble/src/network/sync/syncHealthJob.ts @@ -213,58 +213,63 @@ export class MeasureSyncHealthJobScheduler { const syncHealthProbe = new SyncHealthProbe(this._metadataRetriever, peerMetadataRetriever); - const syncHealthMessageStats = await syncHealthProbe.computeSyncHealthMessageStats( - new Date(startTime), - new Date(stopTime), - ); + // Split the start and stop time into 10 minute intervals, so we don't have to process too many messages at once + const interval = 10 * 60 * 1000; // 10 minutes in milliseconds + for (let chunkStartTime = startTime; chunkStartTime < stopTime; chunkStartTime += interval) { + const chunkStopTime = Math.min(chunkStartTime + interval, stopTime); + const syncHealthMessageStats = await syncHealthProbe.computeSyncHealthMessageStats( + new Date(chunkStartTime), + new Date(chunkStopTime), + ); - if (syncHealthMessageStats.isErr()) { - const contactInfo = this.contactInfoForLogs(peer); - log.info( - { - peerId: peer.identifier, - err: syncHealthMessageStats.error, - contactInfo, - }, - `Error computing SyncHealth: ${syncHealthMessageStats.error}.`, + if (syncHealthMessageStats.isErr()) { + const contactInfo = this.contactInfoForLogs(peer); + log.info( + { + peerId: peer.identifier, + err: syncHealthMessageStats.error, + contactInfo, + }, + `Error computing SyncHealth: ${syncHealthMessageStats.error}.`, + ); + continue; + } + + const resultsPushingToUs = await syncHealthProbe.tryPushingDivergingSyncIds( + new Date(chunkStartTime), + new Date(chunkStopTime), + "FromPeer", ); - continue; - } - const resultsPushingToUs = await syncHealthProbe.tryPushingDivergingSyncIds( - new Date(startTime), - new Date(stopTime), - "FromPeer", - ); + if (resultsPushingToUs.isErr()) { + log.info( + { peerId: peer.identifier, err: resultsPushingToUs.error }, + `Error pushing new messages to ourself ${resultsPushingToUs.error}`, + ); + continue; + } + + const processedResults = await this.processSumbitResults( + resultsPushingToUs.value, + peer.identifier, + chunkStartTime, + chunkStopTime, + ); - if (resultsPushingToUs.isErr()) { log.info( - { peerId: peer.identifier, err: resultsPushingToUs.error }, - `Error pushing new messages to ourself ${resultsPushingToUs.error}`, + { + ourNumMessages: syncHealthMessageStats.value.primaryNumMessages, + theirNumMessages: syncHealthMessageStats.value.peerNumMessages, + syncHealth: syncHealthMessageStats.value.computeDiff(), + syncHealthPercentage: syncHealthMessageStats.value.computeDiffPercentage(), + resultsPushingToUs: processedResults, + peerId: peer.identifier, + startTime: chunkStartTime, + stopTime: chunkStopTime, + }, + "Computed SyncHealth stats for peer", ); - continue; } - - const processedResults = await this.processSumbitResults( - resultsPushingToUs.value, - peer.identifier, - startTime, - stopTime, - ); - - log.info( - { - ourNumMessages: syncHealthMessageStats.value.primaryNumMessages, - theirNumMessages: syncHealthMessageStats.value.peerNumMessages, - syncHealth: syncHealthMessageStats.value.computeDiff(), - syncHealthPercentage: syncHealthMessageStats.value.computeDiffPercentage(), - resultsPushingToUs: processedResults, - peerId: peer.identifier, - startTime, - stopTime, - }, - "Computed SyncHealth stats for peer", - ); } } }