Skip to content

Commit

Permalink
Simplify and factorize for easier unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
liamappelbe committed Oct 2, 2024
1 parent 1b7384c commit 6925b50
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 84 deletions.
172 changes: 89 additions & 83 deletions pkgs/coverage/lib/src/isolate_paused_listener.dart
Original file line number Diff line number Diff line change
Expand Up @@ -27,54 +27,7 @@ class IsolatePausedListener {
/// Starts listening and returns a future that completes when all isolates
/// have exited.
Future<void> waitUntilAllExited() async {
// NOTE: Why is this class so complicated?
// - We only receive start/pause events that arrive after we've subscribed,
// using _service.streamListen below.
// - So after we subscribe, we have to backfill any isolates that are
// already started/paused by looking at the current isolates.
// - But since that backfill is an async process, we may get isolate events
// arriving during that process. Eg, a lone pause event received before
// the backfill would complete the _allExitedCompleter before any other
// isolate groups have been seen.
// - The simplest and most robust way of solving this issue is to buffer
// all the received events until the backfill is complete.
// - That means we can receive duplicate add/pause events: one from the
// backfill, and one from a real event that arrived during the backfill.
// - So the _onStart/_onPause methods need to be robust to duplicate events
// (and out-of-order events to some extent, as the backfill's events and
// the real] events can be interleaved).
// - Finally, we resume each isolate after the its pause callback is done.
// But we need to delay resuming the main isolate until everything else
// is finished, because the VM shuts down once the main isolate exits.
final eventBuffer = IsolateEventBuffer((Event event) async {
switch (event.kind) {
case EventKind.kIsolateStart:
return _onStart(event.isolate!);
case EventKind.kPauseExit:
return _onPause(event.isolate!);
}
});

// Listen for isolate open/close events.
_service.onIsolateEvent.listen(eventBuffer.add);
await _service.streamListen(EventStreams.kIsolate);

// Listen for isolate paused events.
_service.onDebugEvent.listen(eventBuffer.add);
await _service.streamListen(EventStreams.kDebug);

// Backfill. Add/pause isolates that existed before we subscribed.
for (final isolateRef in await getAllIsolates(_service)) {
_onStart(isolateRef);
final isolate = await _service.getIsolate(isolateRef.id!);
if (isolate.pauseEvent!.kind == EventKind.kPauseExit) {
await _onPause(isolateRef);
}
}

// Flush the buffered stream events, and the start processing them as they
// arrive.
await eventBuffer.flush();
await listenToIsolateLifecycleEvents(_service, _onStart, _onPause, _onExit);

await _allExitedCompleter.future;

Expand All @@ -95,14 +48,29 @@ class IsolatePausedListener {
Future<void> _onPause(IsolateRef isolateRef) async {
if (_allExitedCompleter.isCompleted) return;
final group = _getGroup(isolateRef);
if (group.pause(isolateRef.id!)) {
try {
await _onIsolatePaused(isolateRef, group.noRunningIsolates);
} finally {
await _maybeResumeIsolate(isolateRef);
group.exit(isolateRef.id!);
_maybeFinish();
}
group.pause(isolateRef.id!);
try {
await _onIsolatePaused(isolateRef, group.noRunningIsolates);
} finally {
await _maybeResumeIsolate(isolateRef);
}
}

Future<void> _maybeResumeIsolate(IsolateRef isolateRef) async {
if (_mainIsolate == null && _isMainIsolate(isolateRef)) {
_mainIsolate = isolateRef;
// Pretend this isolate has exited so _allExitedCompleter can complete.
_onExit(isolateRef);
} else {
await _service.resume(isolateRef.id!);
}
}

void _onExit(IsolateRef isolateRef) {
if (_allExitedCompleter.isCompleted) return;
_getGroup(isolateRef).exit(isolateRef.id!);
if (isolateGroups.values.every((group) => group.noLiveIsolates)) {
_allExitedCompleter.complete();
}
}

Expand All @@ -117,28 +85,74 @@ class IsolatePausedListener {
// reliable test when it's available.
return isolateRef.name == 'main';
}
}

Future<void> _maybeResumeIsolate(IsolateRef isolateRef) async {
if (_mainIsolate == null && _isMainIsolate(isolateRef)) {
_mainIsolate = isolateRef;
} else {
await _service.resume(isolateRef.id!);
}
/// Listens to isolate start and pause events, and backfills events for isolates
/// that existed before listening started.
///
/// Ensures that:
/// - Every [onIsolatePaused] and [onIsolateExited] call will be preceeded by
/// an [onIsolateStarted] call for the same isolate.
/// - Not every [onIsolateExited] call will be preceeded by a [onIsolatePaused]
/// call, but a [onIsolatePaused] will never follow a [onIsolateExited].
/// - Each callback will only be called once per isolate.
Future<void> listenToIsolateLifecycleEvents(
VmService service,
void Function(IsolateRef isolate) onIsolateStarted,
Future<void> Function(IsolateRef isolate) onIsolatePaused,
void Function(IsolateRef isolate) onIsolateExited) async {
final started = <String>{};
void onStart(IsolateRef isolateRef) {
if (started.add(isolateRef.id!)) onIsolateStarted(isolateRef);
}

void _maybeFinish() {
if (_allExitedCompleter.isCompleted) return;
if (isolateGroups.values.every((group) => group.noIsolates)) {
_allExitedCompleter.complete();
final paused = <String>{};
Future<void> onPause(IsolateRef isolateRef) async {
onStart(isolateRef);
if (paused.add(isolateRef.id!)) await onIsolatePaused(isolateRef);
}

final exited = <String>{};
void onExit(IsolateRef isolateRef) {
onStart(isolateRef);
paused.add(isolateRef.id!);
if (exited.add(isolateRef.id!)) onIsolateExited(isolateRef);
}

final eventBuffer = IsolateEventBuffer((Event event) async {
switch (event.kind) {
case EventKind.kIsolateStart:
return onStart(event.isolate!);
case EventKind.kPauseExit:
return await onPause(event.isolate!);
case EventKind.kIsolateExit:
return onExit(event.isolate!);
}
});

// Listen for isolate start/exit events.
service.onIsolateEvent.listen(eventBuffer.add);
await service.streamListen(EventStreams.kIsolate);

// Listen for isolate paused events.
service.onDebugEvent.listen(eventBuffer.add);
await service.streamListen(EventStreams.kDebug);

// Backfill. Add/pause isolates that existed before we subscribed.
for (final isolateRef in await getAllIsolates(service)) {
onStart(isolateRef);
final isolate = await service.getIsolate(isolateRef.id!);
if (isolate.pauseEvent!.kind == EventKind.kPauseExit) {
await onPause(isolateRef);
}
}

// Flush the buffered stream events, and the start processing them as they
// arrive.
await eventBuffer.flush();
}

/// Keeps track of isolates in an isolate group.
///
/// Isolates are expected to go through either [start] -> [pause] -> [exit] or
/// simply [start] -> [exit]. [start] and [pause] return false if that sequence
/// is violated.
class IsolateGroupState {
// IDs of the isolates running in this group.
@visibleForTesting
Expand All @@ -148,30 +162,22 @@ class IsolateGroupState {
@visibleForTesting
final paused = <String>{};

// IDs of the isolates that have exited in this group.
@visibleForTesting
final exited = <String>{};

bool get noRunningIsolates => running.isEmpty;
bool get noIsolates => running.isEmpty && paused.isEmpty;
bool get noLiveIsolates => running.isEmpty && paused.isEmpty;

bool start(String id) {
if (paused.contains(id) || exited.contains(id)) return false;
void start(String id) {
paused.remove(id);
running.add(id);
return true;
}

bool pause(String id) {
if (exited.contains(id)) return false;
void pause(String id) {
running.remove(id);
paused.add(id);
return true;
}

void exit(String id) {
paused.remove(id);
running.remove(id);
exited.add(id);
paused.remove(id);
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkgs/coverage/test/run_and_collect_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ void checkHitmap(Map<String, HitMap> hitMap) {
38: 1,
39: 1,
41: 1,
42: 3,
42: 4,
43: 1,
44: 3,
45: 1,
Expand Down

0 comments on commit 6925b50

Please sign in to comment.