From 8b5b690f47c6ca5d6f36fad4ea72e9406fef737b Mon Sep 17 00:00:00 2001 From: Gabriel Gava Date: Mon, 24 Jun 2024 10:35:31 +0200 Subject: [PATCH] Fixes subscription to _deviceConnector.deviceConnectionStateUpdateStream leaking - Previous code would create a broadcastStream from a normal stream (created by Repeater). When this is done, the source stream subscription needs to be explicitly closed. This is usually done by implementing the onCancel callback from the asBroadcastStream method. This was not happening, so every call to connectedDeviceStream was creating a new subscription that was never closed. - To fix the problem, this commit uses the Repeater.broadcast constructor which already returns a broadcast stream and also closes the source stream subscription when needed --- .../flutter_reactive_ble/lib/src/reactive_ble.dart | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/packages/flutter_reactive_ble/lib/src/reactive_ble.dart b/packages/flutter_reactive_ble/lib/src/reactive_ble.dart index 94f1188c..cc1fd9bf 100644 --- a/packages/flutter_reactive_ble/lib/src/reactive_ble.dart +++ b/packages/flutter_reactive_ble/lib/src/reactive_ble.dart @@ -60,11 +60,10 @@ class FlutterReactiveBle { BleStatus get status => _status; /// A stream providing connection updates for all the connected BLE devices. - Stream get connectedDeviceStream => Repeater(onListenEmitFrom: () async* { + Stream get connectedDeviceStream => Repeater.broadcast(onListenEmitFrom: () async* { await initialize(); yield* _deviceConnector.deviceConnectionStateUpdateStream; - }).stream.asBroadcastStream() - ..listen((_) {}); + }).stream; /// A stream providing value updates for all the connected BLE devices. /// @@ -105,8 +104,7 @@ class FlutterReactiveBle { ); if (Platform.isAndroid || Platform.isIOS) { - ReactiveBlePlatform.instance = - const ReactiveBleMobilePlatformFactory().create( + ReactiveBlePlatform.instance = const ReactiveBleMobilePlatformFactory().create( logger: _debugLogger, ); } @@ -398,11 +396,10 @@ class FlutterReactiveBle { Future clearGattCache(String deviceId) => _blePlatform.clearGattCache(deviceId).then((info) => info.dematerialize()); - /// Reads the RSSI of the of the peripheral with the given device ID. + /// Reads the RSSI of the of the peripheral with the given device ID. /// The peripheral must be connected, otherwise a [PlatformException] will be /// thrown - Future readRssi(String deviceId) async => - _blePlatform.readRssi(deviceId); + Future readRssi(String deviceId) async => _blePlatform.readRssi(deviceId); /// Subscribes to updates from the characteristic specified. ///