Skip to content

Commit

Permalink
Merge pull request #1152 from planetary-social/stop-retrying-publish-…
Browse files Browse the repository at this point in the history
…eventually

Add a cutoff of 5 days for republishing events that failed to publish
  • Loading branch information
mplorentz authored May 20, 2024
2 parents 18c67a2 + 7e4812f commit 400a8c4
Showing 1 changed file with 23 additions and 7 deletions.
30 changes: 23 additions & 7 deletions Nos/Service/Relay/RelayService.swift
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ final class RelayService: ObservableObject {

// TODO: fire this after all relays have connected, not right on init
self.backgroundProcessTimer = AsyncTimer(timeInterval: 60, firesImmediately: true, onFire: { [weak self] in
await self?.publishFailedEvents()
await self?.retryFailedPublishes()
await self?.deleteExpiredEvents()
})

Expand Down Expand Up @@ -380,7 +380,7 @@ extension RelayService {
Log.error(error.localizedDescription)
}
} else {
// This will be picked up later in publishFailedEvents
// This will be picked up later in retryFailedPublishes()
if responseArray.count > 2, let message = responseArray[3] as? String {
// Mark duplicates or replaces as done on our end
if message.contains("replaced:") || message.contains("duplicate:") {
Expand Down Expand Up @@ -584,7 +584,12 @@ extension RelayService {
return jsonEvent
}

@MainActor func publishFailedEvents() async {
/// This function is mean to be run periodically to try to publish events that failed to publish in the past. It
/// uses the `shouldBePublishedTo` and `publishedTo` relationships on `Event` to determine what failed to publish.
/// These events could have failed to publish becuase the relay was offline, or because the user was offline. Often
/// the user has relay in their list that they don't have write access to so eventually this function will stop
/// trying to republish the same event.
@MainActor func retryFailedPublishes() async {
guard let userKey = currentUser.author?.hexadecimalPublicKey else {
return
}
Expand All @@ -595,18 +600,18 @@ extension RelayService {
return
}
let objectContext = self.backgroundContext
let userSentEvents = Event.unpublishedEvents(for: user, context: objectContext)
let eventsToRetry = Event.unpublishedEvents(for: user, context: objectContext)

for event in userSentEvents {
// Try to publish each of these again to each relay that failed.
for event in eventsToRetry {
let missedRelays = event.shouldBePublishedTo.subtracting(event.publishedTo)

print("\(missedRelays.count) relays missing a published event.")
for missedRelay in missedRelays {
guard let missedAddress = missedRelay.address, let jsonEvent = event.codable else { continue }
Task {
if let socket = await self.subscriptions.socket(for: missedAddress) {
// Publish again to this socket
print("Republishing \(jsonEvent.id) on \(missedAddress)")
Log.info("Retrying publish of event \(jsonEvent.id) to \(missedAddress)")
do {
try await self.publish(from: socket, jsonEvent: jsonEvent)
} catch {
Expand All @@ -617,6 +622,17 @@ extension RelayService {
}
}

// Don't try again if the event is more than five days old
let fiveDays: TimeInterval = 60 * 60 * 24 * 5
let now = Date.now.timeIntervalSince1970
for event in eventsToRetry {
let publishDate = event.createdAt?.timeIntervalSince1970 ?? 0
if now - publishDate > fiveDays {
Log.info("Done retrying publish for event \(String(describing: event.identifier))")
event.shouldBePublishedTo = Set()
}
}

try? self.backgroundContext.saveIfNeeded()
}
}
Expand Down

0 comments on commit 400a8c4

Please sign in to comment.