diff --git a/lib/hmi_networking.dart b/lib/hmi_networking.dart index 71b0877..24e60f1 100644 --- a/lib/hmi_networking.dart +++ b/lib/hmi_networking.dart @@ -27,6 +27,7 @@ export 'src/core/line_socket/line_socket.dart'; export 'src/core/ds_data_stream_extract.dart'; export 'src/core/entities/ds_data_point_extracted.dart'; export 'src/core/ds_send.dart'; +export 'src/core/non_repetitive_stream.dart'; export 'src/core/entities/double_container.dart'; export 'src/core/entities/network_operation_state.dart'; export 'src/core/entities/data_object.dart'; diff --git a/lib/src/core/jds_service/jds_service_startup.dart b/lib/src/core/jds_service/jds_service_startup.dart index 894be10..c1b4ac4 100644 --- a/lib/src/core/jds_service/jds_service_startup.dart +++ b/lib/src/core/jds_service/jds_service_startup.dart @@ -7,48 +7,43 @@ import 'package:hmi_networking/src/core/jds_service/jds_service.dart'; class JdsServiceStartup { static const _log = Log('JdsServiceStartup'); final JdsService _service; - final Duration _authRetryDelay; /// - /// [JdsService] startup sequence. - /// - /// Config from [service] will be saved to [cache]. - /// - /// [dsClient] will be subscribed on all points from [service] config. + /// Jds [service] startup sequence. const JdsServiceStartup({ required JdsService service, - Duration authRetryDelay = const Duration(milliseconds: 1000), }) : - _service = service, - _authRetryDelay = authRetryDelay; + _service = service; /// /// Pull points config from JDS service, /// save it to cache (if provided) /// and subscribe for all of the points. Future> run() async { - _log.info('Authenting...'); - while(await _service.authenticate('12345') is Err) { - _log.warning('Unable to authenticate.'); - await Future.delayed(_authRetryDelay); - _log.warning('Retrying...'); - } - _log.info('Requesting points config...'); - switch(await _service.points()) { - case Ok(value:final configs): - _log.info('Points config successfully pulled!'); - _log.info('Subscribing to received points...'); - final subscribeResult = await _service.subscribe( - configs.names, - ); - switch(subscribeResult) { - case Ok(): - _log.info('Succsessfully subscribed!'); - return const Ok(null); + _log.info('Authenticating...'); + switch(await _service.authenticate('12345')) { + case Ok(): + _log.info('Authenticated successfully!'); + _log.info('Requesting points config...'); + switch(await _service.points()) { + case Ok(value:final configs): + _log.info('Points config successfully pulled!'); + _log.info('Subscribing to received points...'); + final subscribeResult = await _service.subscribe( + configs.names, + ); + switch(subscribeResult) { + case Ok(): + _log.info('Succsessfully subscribed!'); + return const Ok(null); + case Err(:final error): + _log.warning('Failed to subscribe'); + return Err(error); + } case Err(:final error): - _log.warning('Failed to subscribe'); + _log.warning('Failed to pull points'); return Err(error); } case Err(:final error): - _log.warning('Failed to pull points'); + _log.warning('Failed to authenticate.'); return Err(error); } } diff --git a/lib/src/core/jds_service/jds_service_startup_on_reconnect.dart b/lib/src/core/jds_service/jds_service_startup_on_reconnect.dart index 813e0ae..6196905 100644 --- a/lib/src/core/jds_service/jds_service_startup_on_reconnect.dart +++ b/lib/src/core/jds_service/jds_service_startup_on_reconnect.dart @@ -1,16 +1,17 @@ import 'dart:async'; -import 'package:async/async.dart'; import 'package:hmi_core/hmi_core.dart'; +import 'package:hmi_core/hmi_core_result_new.dart'; import 'package:hmi_networking/src/core/jds_service/jds_service.dart'; import 'package:hmi_networking/src/core/jds_service/jds_service_startup.dart'; +import 'package:hmi_networking/src/core/non_repetitive_stream.dart'; /// /// [JdsService] cache update sequence. class JdsServiceStartupOnReconnect { final Stream> _connectionStatuses; final JdsServiceStartup _startup; bool _isConnected; + bool _isStartupCompleted = true; StreamSubscription? _connectionSubscription; - CancelableOperation? _startupProcess; /// /// [JdsService] cache update sequence. JdsServiceStartupOnReconnect({ @@ -23,28 +24,37 @@ class JdsServiceStartupOnReconnect { _isConnected = initialConnectionStatus; /// void run() { - _connectionSubscription ??= _connectionStatuses.map( - (point) => switch(point) { - DsDataPoint( - value: final connectionStatus, - status: DsStatus.ok, - cot: DsCot.inf, - ) => connectionStatus == DsStatus.ok.value, - _ => false - }, - ).listen((isConnected) async { - if(_isConnected != isConnected) { - _isConnected = isConnected; - await _startupProcess?.cancel(); - if(_isConnected) { - _startupProcess = CancelableOperation.fromFuture(_startup.run()); + _connectionSubscription ??= NonRepetitiveStream( + stream: _connectionStatuses.map( + (point) { + switch(point) { + case DsDataPoint( + value: final connectionStatus, + status: DsStatus.ok, + cot: DsCot.inf, + ): + final isConnected = connectionStatus == DsStatus.ok.value; + _isConnected = isConnected; + return isConnected; + case _: + return false; + } + }, + ), + ).stream + .where((event) => _isStartupCompleted) + .listen((_) async { + while(_isConnected) { + _isStartupCompleted = false; + if(await _startup.run() case Ok()) { + _isStartupCompleted = true; + break; } } }); } /// - Future cancel() async { + Future dispose() async { await _connectionSubscription?.cancel(); - await _startupProcess?.cancel(); } } \ No newline at end of file diff --git a/lib/src/core/non_repetitive_stream.dart b/lib/src/core/non_repetitive_stream.dart new file mode 100644 index 0000000..45bf221 --- /dev/null +++ b/lib/src/core/non_repetitive_stream.dart @@ -0,0 +1,29 @@ +import 'dart:async'; +/// +class NonRepetitiveStream { + final StreamController _controller = StreamController(); + late final StreamSubscription _subscription; + T? _lastValue; + /// + NonRepetitiveStream({ + required Stream stream, + }) { + _subscription = stream.listen( + (event) { + if (_lastValue != event) { + _lastValue = event; + _controller.add(event); + } + }, + onError: (error, stackTrace) { + _controller.addError(error, stackTrace); + }, + ); + } + /// + Stream get stream => _controller.stream; + /// + Future dispose() { + return _subscription.cancel(); + } +} \ No newline at end of file diff --git a/test/unit/core/jds_service/jds_service_startup/jds_service_startup_run_test.dart b/test/unit/core/jds_service/jds_service_startup/jds_service_startup_run_test.dart index 807d804..9744f0f 100644 --- a/test/unit/core/jds_service/jds_service_startup/jds_service_startup_run_test.dart +++ b/test/unit/core/jds_service/jds_service_startup/jds_service_startup_run_test.dart @@ -2,8 +2,6 @@ import 'package:flutter_test/flutter_test.dart'; import 'package:hmi_core/hmi_core_result_new.dart'; import 'package:hmi_networking/src/core/jds_service/jds_point_config/jds_point_configs.dart'; import 'package:hmi_networking/src/core/jds_service/jds_service_startup.dart'; -import '../../ds_client/cache/delayed/fake_ds_client_cache.dart'; -import '../../ds_send/common/fake_ds_client.dart'; import '../fake_jds_service.dart'; void main() { @@ -28,8 +26,6 @@ void main() { ); final startup = JdsServiceStartup( service: service, - cache: FakeDsClientCache(), - dsClient: FakeDsClient(), ); final result = await startup.run(); expect(result, isA());