Skip to content

Commit

Permalink
Implement Offline mode
Browse files Browse the repository at this point in the history
Add `AstarteTransportOffline` class and implement
methods for send individual and aggregate value.
Values save to database and wait for available
connections.
Add method `PingAstartePairing` to
`AstartePairingHandler`.

Signed-off-by: Osman Hadzic <osman.hadzic@secomind.com>
  • Loading branch information
osmanhadzic committed Nov 26, 2024
1 parent 83bd064 commit 8ee6b89
Show file tree
Hide file tree
Showing 5 changed files with 280 additions and 33 deletions.
38 changes: 34 additions & 4 deletions AstarteDeviceSDKCSharp/AstartePairingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,27 @@ public class AstartePairingHandler
private List<AstarteTransport>? _transports;
private X509Certificate2? _certificate;
private TimeSpan _timeOut;
private string _baseUrl;

public AstartePairingHandler(string pairingUrl, string astarteRealm, string deviceId,
string credentialSecret, AstarteCryptoStore astarteCryptoStore, TimeSpan timeout)
{
_baseUrl = pairingUrl;
_astarteRealm = astarteRealm;
_deviceId = deviceId;
_credentialSecret = credentialSecret;
_cryptoStore = astarteCryptoStore;
_timeOut = timeout;
_AstartePairingService = new AstartePairingService(pairingUrl, astarteRealm, timeout);

_certificate = _cryptoStore.GetCertificate();
if (_certificate == null)
if (PingAstartePairing().Result)
{
_ = _AstartePairingService.RequestNewCertificate(credentialSecret,
_cryptoStore, deviceId).Result;
_certificate = _cryptoStore.GetCertificate();
if (_certificate == null)
{
_ = _AstartePairingService.RequestNewCertificate(credentialSecret,
_cryptoStore, deviceId).Result;
}
}

}
Expand Down Expand Up @@ -95,5 +100,30 @@ public async Task RequestNewCertificate()

}

public async Task<bool> PingAstartePairing()
{
if (!_baseUrl.EndsWith("/"))
{
_baseUrl += "/";
}

string url = $"{_baseUrl}health";
using var httpClient = new HttpClient();

try
{
// Send a GET request to the server
var response = await httpClient.GetAsync(url);

return response.IsSuccessStatusCode;

}
catch (Exception)
{
return false;
}

}

}
}
82 changes: 54 additions & 28 deletions AstarteDeviceSDKCSharp/Device/AstarteDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,30 @@ public AstarteDevice(
}

_astarteFailedMessageStorage = new(fullCryptoDirPath);

if (!_pairingHandler.PingAstartePairing().Result)
{
_astarteTransport = AstarteTransportFactory.CreateAstarteTransportOfflineAstarteTransport
(AstarteProtocolType.OFFLINE,
astarteRealm,
deviceId,
"Offline",
astarteCryptoStore,
(TimeSpan)(timeOut is null ? TimeSpan.FromSeconds(5) : timeOut),
_astarteFailedMessageStorage);

if (_astarteTransport is null)
{
throw new AstarteTransportException("No supported transports for the device !");
}

foreach (AstarteInterface astarteInterface in GetAllInterfaces())
{
astarteInterface.SetAstarteTransport(_astarteTransport);
}
_astarteTransport.SetDevice(this);
_astarteTransport.SendIntrospection();
}
}

private async Task Init()
Expand Down Expand Up @@ -197,45 +221,47 @@ public bool GetAlwaysReconnect()
/// <exception cref="AstartePairingException"></exception>
public async Task Connect()
{

if (!_pairingHandler.IsCertificateAvailable())
{
await _pairingHandler.RequestNewCertificate();
_initialized = false;
}

if (!_initialized)
if (_pairingHandler.PingAstartePairing().Result)
{
if (!_pairingHandler.IsCertificateAvailable())
{
await _pairingHandler.RequestNewCertificate();
_initialized = false;
}

await Init();
_initialized = true;
}
if (!_initialized)
{

if (IsConnected())
{
return;
}
await Init();
_initialized = true;
}

try
{
if (_astarteTransport is null)
if (IsConnected())
{
return;
}
await _astarteTransport.Connect();
}
catch (AstarteCryptoException)
{
AstarteLogger.Debug("Regenerating the cert", this.GetType().Name);

try
{
await _pairingHandler.RequestNewCertificate();
_initialized = false;
if (_astarteTransport is null)
{
return;
}
await _astarteTransport.Connect();
}
catch (AstartePairingException ex)
catch (AstarteCryptoException)
{
OnTransportConnectionError(ex);
return;
Trace.WriteLine("Regenerating the cert");
try
{
await _pairingHandler.RequestNewCertificate();
_initialized = false;
}
catch (AstartePairingException ex)
{
OnTransportConnectionError(ex);
return;
}
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion AstarteDeviceSDKCSharp/Protocol/AstarteProtocolType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public enum AstarteProtocolType
[Description("")]
UNKNOWN_PROTOCOL,
[Description("astarte_mqtt_v1")]
ASTARTE_MQTT_V1
ASTARTE_MQTT_V1,
OFFLINE
}
}
15 changes: 15 additions & 0 deletions AstarteDeviceSDKCSharp/Transport/AstarteTransportFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

using AstarteDeviceSDK.Protocol;
using AstarteDeviceSDKCSharp.Crypto;
using AstarteDeviceSDKCSharp.Data;
using AstarteDeviceSDKCSharp.Transport.MQTT;
using AstarteDeviceSDKCSharp.Transport.Offline;

namespace AstarteDeviceSDKCSharp.Transport
{
Expand All @@ -47,5 +49,18 @@ public static AstarteTransport? CreateAstarteTransportFromPairing
return null;
}
}

public static AstarteTransport? CreateAstarteTransportOfflineAstarteTransport
(AstarteProtocolType protocolType, string astarteRealm,
string deviceId, dynamic protocolData, AstarteCryptoStore astarteCryptoStore,
TimeSpan timeOut, AstarteFailedMessageStorage astarteFailedMessageStorage)
{
return new AstarteTransportOffline(new MutualSSLAuthenticationMqttConnectionInfo(
new Uri("about:blank"),
astarteRealm,
deviceId,
astarteCryptoStore.GetMqttClientOptionsBuilderTlsParameters(),
timeOut), astarteFailedMessageStorage);
}
}
}
175 changes: 175 additions & 0 deletions AstarteDeviceSDKCSharp/Transport/Offline/AstarteTransportOffline.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* This file is part of Astarte.
*
* Copyright 2024 SECO Mind Srl
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

using System.Text;
using AstarteDeviceSDK.Protocol;
using AstarteDeviceSDKCSharp.Data;
using AstarteDeviceSDKCSharp.Device;
using AstarteDeviceSDKCSharp.Protocol;
using AstarteDeviceSDKCSharp.Protocol.AstarteException;
using AstarteDeviceSDKCSharp.Transport.MQTT;
using AstarteDeviceSDKCSharp.Utilities;

namespace AstarteDeviceSDKCSharp.Transport.Offline
{
public class AstarteTransportOffline : AstarteMqttV1Transport
{
private AstarteFailedMessageStorage _astarteFailedMessageStorage;
private readonly string _baseTopic;

public AstarteTransportOffline(MutualSSLAuthenticationMqttConnectionInfo connectionInfo,
AstarteFailedMessageStorage astarteFailedMessageStorage)
: base(connectionInfo)
{
_astarteFailedMessageStorage = astarteFailedMessageStorage;
_baseTopic = connectionInfo.GetClientId();
}

public override async Task SendAggregate(AstarteAggregateDatastreamInterface astarteInterface,
string path, Dictionary<string, object> value, DateTime? timeStamp)
{
AstarteInterfaceDatastreamMapping mapping =
(AstarteInterfaceDatastreamMapping)astarteInterface.GetMappings().Values.ToArray()[0];

if (mapping is null)
{
throw new AstarteTransportException("Mapping not found");
}

int qos = QosFromReliability(mapping);

string topic = _baseTopic + "/" + astarteInterface.InterfaceName + path;
byte[] payload = AstartePayload.Serialize(value, timeStamp);

AstarteFailedMessageEntry astarteFailedMessageEntry = new(
qos,
payload,
topic,
Guid.NewGuid()
);

await SaveMessageToDatabase(astarteFailedMessageEntry);
}

public override async Task SendIndividualValue(AstarteInterface astarteInterface, string path, object? value)
{
await SendIndividualValue(astarteInterface, path, value, null);
}

public override async Task SendIndividualValue(AstarteInterface astarteInterface, string path, object? value, DateTime? timestamp)
{
AstarteInterfaceDatastreamMapping mapping = new();
int qos = 2;

if (astarteInterface.GetType() == (typeof(AstarteDeviceDatastreamInterface)))
{
try
{
// Find a matching mapping
mapping = (AstarteInterfaceDatastreamMapping)astarteInterface
.FindMappingInInterface(path);
}
catch (AstarteInterfaceMappingNotFoundException e)
{
throw new AstarteTransportException("Mapping not found", e);
}
qos = QosFromReliability(mapping);
}

string topic = _baseTopic + "/" + astarteInterface.InterfaceName + path;
byte[] payload = AstartePayload.Serialize(value, timestamp);

AstarteFailedMessageEntry astarteFailedMessageEntry = new(
qos,
payload,
topic,
Guid.NewGuid()
);
await SaveMessageToDatabase(astarteFailedMessageEntry);
}

private async Task SaveMessageToDatabase(AstarteFailedMessageEntry astarteFailedMessageEntry)
{

_astarteFailedMessageStorage?.InsertStored(astarteFailedMessageEntry.Topic,
astarteFailedMessageEntry.Payload,
astarteFailedMessageEntry.Qos,
astarteFailedMessageEntry.Guid);

if (Device is not null)
{
await Device.Connect();
}

}

public override async Task SendIntrospection()
{
StringBuilder introspectionStringBuilder = new();
AstarteDevice? astarteDevice = GetDevice();

if (Device == null)
{
throw new AstarteTransportException("Error sending introspection." +
" Astarte device is null");
}

foreach (AstarteInterface astarteInterface in
Device.GetAllInterfaces())
{
introspectionStringBuilder.Append(astarteInterface.InterfaceName);
introspectionStringBuilder.Append(':');
introspectionStringBuilder.Append(astarteInterface.MajorVersion);
introspectionStringBuilder.Append(':');
introspectionStringBuilder.Append(astarteInterface.MinorVersion);
introspectionStringBuilder.Append(';');
}

// Remove last ;
introspectionStringBuilder = introspectionStringBuilder
.Remove(introspectionStringBuilder.Length - 1, 1);
string introspection = introspectionStringBuilder.ToString();

AstarteFailedMessageEntry astarteFailedMessageEntry = new(
0,
Encoding.ASCII.GetBytes(introspection),
_baseTopic,
Guid.NewGuid()
);

await SaveMessageToDatabase(astarteFailedMessageEntry);
}

private int QosFromReliability(AstarteInterfaceDatastreamMapping mapping)
{
switch (mapping.GetReliability())
{
case AstarteInterfaceDatastreamMapping.MappingReliability.UNIQUE:
return 2;
case AstarteInterfaceDatastreamMapping.MappingReliability.GUARANTEED:
return 1;
case AstarteInterfaceDatastreamMapping.MappingReliability.UNRELIABLE:
return 0;
default:
return 0;
}
}
}
}

0 comments on commit 8ee6b89

Please sign in to comment.