Skip to content

Commit

Permalink
Go back to manually managed fx lifetimes (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
gordonbrander authored Dec 19, 2023
1 parent 371a014 commit 41f3356
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 22 deletions.
65 changes: 43 additions & 22 deletions Sources/ObservableStore/ObservableStore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ public protocol StoreProtocol {
public final class Store<Model>: ObservableObject, StoreProtocol
where Model: ModelProtocol
{
/// Cancellable for fx subscription.
private var cancelFx: AnyCancellable?
/// Stores cancellables by ID
private(set) var cancellables: [UUID: AnyCancellable] = [:]

/// Private for all actions sent to the store.
private var _actions = PassthroughSubject<Model.Action, Never>()
Expand All @@ -215,17 +215,6 @@ where Model: ModelProtocol
_actions.eraseToAnyPublisher()
}

/// Source publisher for batches of fx modeled as publishers.
private var _fxBatches = PassthroughSubject<Fx<Model.Action>, Never>()

/// `fx` represents a flat stream of actions from all fx publishers.
private var fx: AnyPublisher<Model.Action, Never> {
_fxBatches
.flatMap({ publisher in publisher })
.receive(on: DispatchQueue.main)
.eraseToAnyPublisher()
}

/// Publisher for updates performed on state
private var _updates = PassthroughSubject<Model.UpdateType, Never>()

Expand Down Expand Up @@ -273,11 +262,6 @@ where Model: ModelProtocol
subsystem: "ObservableStore",
category: "Store"
)

self.cancelFx = self.fx
.sink(receiveValue: { [weak self] action in
self?.send(action)
})
}

/// Initialize with a closure that receives environment.
Expand Down Expand Up @@ -318,12 +302,49 @@ where Model: ModelProtocol
self.send(action)
}

/// Subscribe to a publisher of actions, send the actions it publishes
/// to the store.
/// Subscribe to a publisher of actions, piping them through to
/// the store.
///
/// Holds on to the cancellable until publisher completes.
/// When publisher completes, removes cancellable.
public func subscribe(to fx: Fx<Model.Action>) {
self._fxBatches.send(fx)
// Create a UUID for the cancellable.
// Store cancellable in dictionary by UUID.
// Remove cancellable from dictionary upon effect completion.
// This retains the effect pipeline for as long as it takes to complete
// the effect, and then removes it, so we don't have a cancellables
// memory leak.
let id = UUID()

// Receive Fx on main thread. This does two important things:
//
// First, SwiftUI requires that any state mutations that would change
// views happen on the main thread. Receiving on main ensures that
// all fx-driven state transitions happen on main, even if the
// publisher is off-main-thread.
//
// Second, if we didn't schedule receive on main, it would be possible
// for publishers to complete immediately, causing receiveCompletion
// to attempt to remove the publisher from `cancellables` before
// it is added. By scheduling to receive publisher on main,
// we force publisher to complete on next tick, ensuring that it
// is always first added, then removed from `cancellables`.
let cancellable = fx
.receive(
on: DispatchQueue.main,
options: .init(qos: .default)
)
.sink(
receiveCompletion: { [weak self] _ in
self?.cancellables.removeValue(forKey: id)
},
receiveValue: { [weak self] action in
self?.send(action)
}
)
self.cancellables[id] = cancellable
}

/// Send an action to the store to update state and generate effects.
/// Any effects generated are fed back into the store.
///
Expand Down
80 changes: 80 additions & 0 deletions Tests/ObservableStoreTests/ObservableStoreTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,86 @@ final class ObservableStoreTests: XCTestCase {
XCTAssertEqual(store.state.count, 1, "state is advanced")
}

/// Tests that the immediately-completing empty Fx used as the default for
/// updates get removed from the cancellables array.
///
/// Failure to remove immediately-completing fx would cause a memory leak.
func testEmptyFxRemovedOnComplete() {
let store = Store(
state: AppModel(),
environment: AppModel.Environment()
)
store.send(.increment)
store.send(.increment)
store.send(.increment)
let expectation = XCTestExpectation(
description: "cancellable removed when publisher completes"
)
DispatchQueue.main.async {
XCTAssertEqual(
store.cancellables.count,
0,
"cancellables removed when publisher completes"
)
expectation.fulfill()
}
wait(for: [expectation], timeout: 0.1)
}

/// Tests that immediately-completing Fx get removed from the cancellables.
///
/// array. Failure to remove immediately-completing fx would cause a
/// memory leak.
///
/// When you don't specify fx for an update, we default to
/// an immediately-completing `Empty` publisher, so this test is
/// technically the same as the one above. The difference is that it
/// does not rely on an implementation detail of `Update` but instead
/// tests this behavior directly, in case the implementation were to
/// change somehow.
func testEmptyFxThatCompleteImmiedatelyRemovedOnComplete() {
let store = Store(
state: AppModel(),
environment: AppModel.Environment()
)
store.send(.createEmptyFxThatCompletesImmediately)
store.send(.createEmptyFxThatCompletesImmediately)
store.send(.createEmptyFxThatCompletesImmediately)
let expectation = XCTestExpectation(
description: "cancellable removed when publisher completes"
)
DispatchQueue.main.async {
XCTAssertEqual(
store.cancellables.count,
0,
"cancellables removed when publisher completes"
)
expectation.fulfill()
}
wait(for: [expectation], timeout: 0.1)
}

func testAsyncFxRemovedOnComplete() {
let store = Store(
state: AppModel(),
environment: AppModel.Environment()
)
store.send(.delayIncrement(0.1))
store.send(.delayIncrement(0.2))
let expectation = XCTestExpectation(
description: "cancellable removed when publisher completes"
)
DispatchQueue.main.asyncAfter(deadline: .now() + 0.3) {
XCTAssertEqual(
store.cancellables.count,
0,
"cancellables removed when publisher completes"
)
expectation.fulfill()
}
wait(for: [expectation], timeout: 0.5)
}

func testPublishedPropertyFires() throws {
let store = Store(
state: AppModel(),
Expand Down

0 comments on commit 41f3356

Please sign in to comment.