Skip to content

Commit

Permalink
Interrupt long preprocess delays on update (#3195)
Browse files Browse the repository at this point in the history
* Interrupt long preprocess delays on update

* Fix
  • Loading branch information
rlepinski authored Sep 4, 2024
1 parent b41a963 commit 20ef71c
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ protocol AutomationDelayProcessorProtocol: Sendable {
func process(delay: AutomationDelay?, triggerDate: Date) async

// Waits for any delay - 30s and to be a display window if set
func preprocess(delay: AutomationDelay?, triggerDate: Date) async
func preprocess(delay: AutomationDelay?, triggerDate: Date) async throws

// Checks if conditions are met
@MainActor
Expand Down Expand Up @@ -106,17 +106,19 @@ final class AutomationDelayProcessor: AutomationDelayProcessorProtocol {
}
}

func preprocess(delay: AutomationDelay?, triggerDate: Date) async {
func preprocess(delay: AutomationDelay?, triggerDate: Date) async throws {
guard let delay = delay else { return }

// Handle delay - preprocessSecondsDelayAllowance
let seconds = remainingSeconds(delay: delay, triggerDate: triggerDate) - Self.preprocessSecondsDelayAllowance
if seconds > 0 {
try? await self.taskSleeper.sleep(timeInterval: seconds)
try await self.taskSleeper.sleep(timeInterval: seconds)
}

try Task.checkCancellation()

if let window = delay.executionWindow {
try? await executionWindowProcessor.process(window: window)
try await executionWindowProcessor.process(window: window)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ actor AutomationEngine : AutomationEngineProtocol {

private var processPendingExecutionTask: Task<Void, Never>?
private var pendingExecution: [String: PreparedData] = [:]
private var preprocessDelayTasks: Set<Task<Bool, Error>> = Set()


init(
Expand Down Expand Up @@ -143,6 +144,7 @@ actor AutomationEngine : AutomationEngineProtocol {
}

await self.triggersProcessor.updateSchedules(updated)
self.cancelPreprocessDelayTasks()
}

func cancelSchedules(identifiers: [String]) async throws {
Expand Down Expand Up @@ -356,6 +358,32 @@ fileprivate extension AutomationEngine {
}
}

private func preprocessDelay(data: AutomationScheduleData) async -> Bool {
guard let delay = data.schedule.delay else { return true }
let scheduleID = data.schedule.identifier
let triggerDate = data.triggerInfo?.date ?? data.scheduleStateChangeDate

let task = Task {
AirshipLogger.trace("Preprocessing delay \(scheduleID)")
try await self.delayProcessor.preprocess(
delay: delay,
triggerDate: triggerDate
)
AirshipLogger.trace("Finished preprocessing delay \(scheduleID)")
return true
}

preprocessDelayTasks.insert(task)
let result = try? await task.value
preprocessDelayTasks.remove(task)
return result ?? false
}

private func cancelPreprocessDelayTasks() {
preprocessDelayTasks.forEach { $0.cancel() }
preprocessDelayTasks.removeAll()
}

private func processTriggeredSchedule(scheduleID: String) async throws {
guard
let data = try await self.store.getSchedule(scheduleID: scheduleID)
Expand All @@ -371,12 +399,14 @@ fileprivate extension AutomationEngine {
return
}

AirshipLogger.trace("Preprocessing delay \(scheduleID)")
await self.delayProcessor.preprocess(
delay: data.schedule.delay,
triggerDate: data.triggerInfo?.date ?? data.scheduleStateChangeDate
)
AirshipLogger.trace("Finished preprocessing delay \(scheduleID)")
guard
await preprocessDelay(data: data)
else {
AirshipLogger.trace("Preprocess delay was interrupted, retrying \(scheduleID)")
try await processTriggeredSchedule(scheduleID: scheduleID)
return
}


guard
try await self.store.getSchedule(scheduleID: scheduleID) == data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ final class AutomationDelayProcessorTest: XCTestCase {

let now = date.now
Task { @MainActor [processor] in
await processor!.preprocess(delay: delay, triggerDate: now)
try! await processor!.preprocess(delay: delay, triggerDate: now)
finished.set(true)
ended.fulfill()
}
Expand Down

0 comments on commit 20ef71c

Please sign in to comment.