Skip to content

Commit

Permalink
JdsServiceStartupOnReconnect | reworked startup mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
Minyewoo committed Mar 30, 2024
1 parent 58b189b commit a5ba839
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 52 deletions.
1 change: 1 addition & 0 deletions lib/hmi_networking.dart
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export 'src/core/line_socket/line_socket.dart';
export 'src/core/ds_data_stream_extract.dart';
export 'src/core/entities/ds_data_point_extracted.dart';
export 'src/core/ds_send.dart';
export 'src/core/non_repetitive_stream.dart';
export 'src/core/entities/double_container.dart';
export 'src/core/entities/network_operation_state.dart';
export 'src/core/entities/data_object.dart';
Expand Down
53 changes: 24 additions & 29 deletions lib/src/core/jds_service/jds_service_startup.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,48 +7,43 @@ import 'package:hmi_networking/src/core/jds_service/jds_service.dart';
class JdsServiceStartup {
static const _log = Log('JdsServiceStartup');
final JdsService _service;
final Duration _authRetryDelay;
///
/// [JdsService] startup sequence.
///
/// Config from [service] will be saved to [cache].
///
/// [dsClient] will be subscribed on all points from [service] config.
/// Jds [service] startup sequence.
const JdsServiceStartup({
required JdsService service,
Duration authRetryDelay = const Duration(milliseconds: 1000),
}) :
_service = service,
_authRetryDelay = authRetryDelay;
_service = service;
///
/// Pull points config from JDS service,
/// save it to cache (if provided)
/// and subscribe for all of the points.
Future<ResultF<void>> run() async {
_log.info('Authenting...');
while(await _service.authenticate('12345') is Err) {
_log.warning('Unable to authenticate.');
await Future.delayed(_authRetryDelay);
_log.warning('Retrying...');
}
_log.info('Requesting points config...');
switch(await _service.points()) {
case Ok(value:final configs):
_log.info('Points config successfully pulled!');
_log.info('Subscribing to received points...');
final subscribeResult = await _service.subscribe(
configs.names,
);
switch(subscribeResult) {
case Ok():
_log.info('Succsessfully subscribed!');
return const Ok(null);
_log.info('Authenticating...');
switch(await _service.authenticate('12345')) {
case Ok():
_log.info('Authenticated successfully!');
_log.info('Requesting points config...');
switch(await _service.points()) {
case Ok(value:final configs):
_log.info('Points config successfully pulled!');
_log.info('Subscribing to received points...');
final subscribeResult = await _service.subscribe(
configs.names,
);
switch(subscribeResult) {
case Ok():
_log.info('Succsessfully subscribed!');
return const Ok(null);
case Err(:final error):
_log.warning('Failed to subscribe');
return Err(error);
}
case Err(:final error):
_log.warning('Failed to subscribe');
_log.warning('Failed to pull points');
return Err(error);
}
case Err(:final error):
_log.warning('Failed to pull points');
_log.warning('Failed to authenticate.');
return Err(error);
}
}
Expand Down
48 changes: 29 additions & 19 deletions lib/src/core/jds_service/jds_service_startup_on_reconnect.dart
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import 'dart:async';
import 'package:async/async.dart';
import 'package:hmi_core/hmi_core.dart';
import 'package:hmi_core/hmi_core_result_new.dart';
import 'package:hmi_networking/src/core/jds_service/jds_service.dart';
import 'package:hmi_networking/src/core/jds_service/jds_service_startup.dart';
import 'package:hmi_networking/src/core/non_repetitive_stream.dart';
///
/// [JdsService] cache update sequence.
class JdsServiceStartupOnReconnect {
final Stream<DsDataPoint<int>> _connectionStatuses;
final JdsServiceStartup _startup;
bool _isConnected;
bool _isStartupCompleted = true;
StreamSubscription<bool>? _connectionSubscription;
CancelableOperation? _startupProcess;
///
/// [JdsService] cache update sequence.
JdsServiceStartupOnReconnect({
Expand All @@ -23,28 +24,37 @@ class JdsServiceStartupOnReconnect {
_isConnected = initialConnectionStatus;
///
void run() {
_connectionSubscription ??= _connectionStatuses.map(
(point) => switch(point) {
DsDataPoint<int>(
value: final connectionStatus,
status: DsStatus.ok,
cot: DsCot.inf,
) => connectionStatus == DsStatus.ok.value,
_ => false
},
).listen((isConnected) async {
if(_isConnected != isConnected) {
_isConnected = isConnected;
await _startupProcess?.cancel();
if(_isConnected) {
_startupProcess = CancelableOperation.fromFuture(_startup.run());
_connectionSubscription ??= NonRepetitiveStream(
stream: _connectionStatuses.map(
(point) {
switch(point) {
case DsDataPoint<int>(
value: final connectionStatus,
status: DsStatus.ok,
cot: DsCot.inf,
):
final isConnected = connectionStatus == DsStatus.ok.value;
_isConnected = isConnected;
return isConnected;
case _:
return false;
}
},
),
).stream
.where((event) => _isStartupCompleted)
.listen((_) async {
while(_isConnected) {
_isStartupCompleted = false;
if(await _startup.run() case Ok()) {
_isStartupCompleted = true;
break;
}
}
});
}
///
Future<void> cancel() async {
Future<void> dispose() async {
await _connectionSubscription?.cancel();
await _startupProcess?.cancel();
}
}
29 changes: 29 additions & 0 deletions lib/src/core/non_repetitive_stream.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import 'dart:async';
///
class NonRepetitiveStream<T> {
final StreamController<T> _controller = StreamController<T>();
late final StreamSubscription<T> _subscription;
T? _lastValue;
///
NonRepetitiveStream({
required Stream<T> stream,
}) {
_subscription = stream.listen(
(event) {
if (_lastValue != event) {
_lastValue = event;
_controller.add(event);
}
},
onError: (error, stackTrace) {
_controller.addError(error, stackTrace);
},
);
}
///
Stream<T> get stream => _controller.stream;
///
Future<void> dispose() {
return _subscription.cancel();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ import 'package:flutter_test/flutter_test.dart';
import 'package:hmi_core/hmi_core_result_new.dart';
import 'package:hmi_networking/src/core/jds_service/jds_point_config/jds_point_configs.dart';
import 'package:hmi_networking/src/core/jds_service/jds_service_startup.dart';
import '../../ds_client/cache/delayed/fake_ds_client_cache.dart';
import '../../ds_send/common/fake_ds_client.dart';
import '../fake_jds_service.dart';

void main() {
Expand All @@ -28,8 +26,6 @@ void main() {
);
final startup = JdsServiceStartup(
service: service,
cache: FakeDsClientCache(),
dsClient: FakeDsClient(),
);
final result = await startup.run();
expect(result, isA<Ok>());
Expand Down

0 comments on commit a5ba839

Please sign in to comment.