diff --git a/Sources/ObservableStore/ObservableStore.swift b/Sources/ObservableStore/ObservableStore.swift index 8f561df..be6b197 100644 --- a/Sources/ObservableStore/ObservableStore.swift +++ b/Sources/ObservableStore/ObservableStore.swift @@ -204,8 +204,8 @@ public protocol StoreProtocol { public final class Store: ObservableObject, StoreProtocol where Model: ModelProtocol { - /// Cancellable for fx subscription. - private var cancelFx: AnyCancellable? + /// Stores cancellables by ID + private(set) var cancellables: [UUID: AnyCancellable] = [:] /// Private for all actions sent to the store. private var _actions = PassthroughSubject() @@ -215,17 +215,6 @@ where Model: ModelProtocol _actions.eraseToAnyPublisher() } - /// Source publisher for batches of fx modeled as publishers. - private var _fxBatches = PassthroughSubject, Never>() - - /// `fx` represents a flat stream of actions from all fx publishers. - private var fx: AnyPublisher { - _fxBatches - .flatMap({ publisher in publisher }) - .receive(on: DispatchQueue.main) - .eraseToAnyPublisher() - } - /// Publisher for updates performed on state private var _updates = PassthroughSubject() @@ -273,11 +262,6 @@ where Model: ModelProtocol subsystem: "ObservableStore", category: "Store" ) - - self.cancelFx = self.fx - .sink(receiveValue: { [weak self] action in - self?.send(action) - }) } /// Initialize with a closure that receives environment. @@ -318,12 +302,49 @@ where Model: ModelProtocol self.send(action) } - /// Subscribe to a publisher of actions, send the actions it publishes - /// to the store. + /// Subscribe to a publisher of actions, piping them through to + /// the store. + /// + /// Holds on to the cancellable until publisher completes. + /// When publisher completes, removes cancellable. public func subscribe(to fx: Fx) { - self._fxBatches.send(fx) + // Create a UUID for the cancellable. + // Store cancellable in dictionary by UUID. + // Remove cancellable from dictionary upon effect completion. + // This retains the effect pipeline for as long as it takes to complete + // the effect, and then removes it, so we don't have a cancellables + // memory leak. + let id = UUID() + + // Receive Fx on main thread. This does two important things: + // + // First, SwiftUI requires that any state mutations that would change + // views happen on the main thread. Receiving on main ensures that + // all fx-driven state transitions happen on main, even if the + // publisher is off-main-thread. + // + // Second, if we didn't schedule receive on main, it would be possible + // for publishers to complete immediately, causing receiveCompletion + // to attempt to remove the publisher from `cancellables` before + // it is added. By scheduling to receive publisher on main, + // we force publisher to complete on next tick, ensuring that it + // is always first added, then removed from `cancellables`. + let cancellable = fx + .receive( + on: DispatchQueue.main, + options: .init(qos: .default) + ) + .sink( + receiveCompletion: { [weak self] _ in + self?.cancellables.removeValue(forKey: id) + }, + receiveValue: { [weak self] action in + self?.send(action) + } + ) + self.cancellables[id] = cancellable } - + /// Send an action to the store to update state and generate effects. /// Any effects generated are fed back into the store. /// diff --git a/Tests/ObservableStoreTests/ObservableStoreTests.swift b/Tests/ObservableStoreTests/ObservableStoreTests.swift index 1b7c492..fd164d6 100644 --- a/Tests/ObservableStoreTests/ObservableStoreTests.swift +++ b/Tests/ObservableStoreTests/ObservableStoreTests.swift @@ -97,6 +97,86 @@ final class ObservableStoreTests: XCTestCase { XCTAssertEqual(store.state.count, 1, "state is advanced") } + /// Tests that the immediately-completing empty Fx used as the default for + /// updates get removed from the cancellables array. + /// + /// Failure to remove immediately-completing fx would cause a memory leak. + func testEmptyFxRemovedOnComplete() { + let store = Store( + state: AppModel(), + environment: AppModel.Environment() + ) + store.send(.increment) + store.send(.increment) + store.send(.increment) + let expectation = XCTestExpectation( + description: "cancellable removed when publisher completes" + ) + DispatchQueue.main.async { + XCTAssertEqual( + store.cancellables.count, + 0, + "cancellables removed when publisher completes" + ) + expectation.fulfill() + } + wait(for: [expectation], timeout: 0.1) + } + + /// Tests that immediately-completing Fx get removed from the cancellables. + /// + /// array. Failure to remove immediately-completing fx would cause a + /// memory leak. + /// + /// When you don't specify fx for an update, we default to + /// an immediately-completing `Empty` publisher, so this test is + /// technically the same as the one above. The difference is that it + /// does not rely on an implementation detail of `Update` but instead + /// tests this behavior directly, in case the implementation were to + /// change somehow. + func testEmptyFxThatCompleteImmiedatelyRemovedOnComplete() { + let store = Store( + state: AppModel(), + environment: AppModel.Environment() + ) + store.send(.createEmptyFxThatCompletesImmediately) + store.send(.createEmptyFxThatCompletesImmediately) + store.send(.createEmptyFxThatCompletesImmediately) + let expectation = XCTestExpectation( + description: "cancellable removed when publisher completes" + ) + DispatchQueue.main.async { + XCTAssertEqual( + store.cancellables.count, + 0, + "cancellables removed when publisher completes" + ) + expectation.fulfill() + } + wait(for: [expectation], timeout: 0.1) + } + + func testAsyncFxRemovedOnComplete() { + let store = Store( + state: AppModel(), + environment: AppModel.Environment() + ) + store.send(.delayIncrement(0.1)) + store.send(.delayIncrement(0.2)) + let expectation = XCTestExpectation( + description: "cancellable removed when publisher completes" + ) + DispatchQueue.main.asyncAfter(deadline: .now() + 0.3) { + XCTAssertEqual( + store.cancellables.count, + 0, + "cancellables removed when publisher completes" + ) + expectation.fulfill() + } + wait(for: [expectation], timeout: 0.5) + } + func testPublishedPropertyFires() throws { let store = Store( state: AppModel(),