Skip to content

Commit

Permalink
Implement MQTTNet extension ManagedClient
Browse files Browse the repository at this point in the history
Refactor `Connect` method in
`AstarteMqttTransport`.
Refactor fallout strategy to implement
ManagedClient storage.

Signed-off-by: Osman Hadzic <osman.hadzic@secomind.com>
  • Loading branch information
osmanhadzic committed Apr 24, 2024
1 parent d0b51c4 commit 000c987
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 470 deletions.
1 change: 1 addition & 0 deletions AstarteDeviceSDKCSharp/AstarteDeviceSDKCSharp.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ SPDX-License-Identifier: Apache-2.0 -->
</PackageReference>
<PackageReference Include="JWT" Version="9.0.3" />
<PackageReference Include="MQTTnet" Version="4.1.4.563" />
<PackageReference Include="MQTTnet.Extensions.ManagedClient" Version="4.1.4.563" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.2" />
<PackageReference Include="Newtonsoft.Json.Bson" Version="1.0.2" />
<PackageReference Include="System.IdentityModel.Tokens.Jwt" Version="6.25.1" />
Expand Down
319 changes: 230 additions & 89 deletions AstarteDeviceSDKCSharp/Data/AstarteFailedMessageStorage.cs

Large diffs are not rendered by default.

30 changes: 11 additions & 19 deletions AstarteDeviceSDKCSharp/Data/IAstarteFailedMessageStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,26 @@
* SPDX-License-Identifier: Apache-2.0
*/

using MQTTnet.Extensions.ManagedClient;

namespace AstarteDeviceSDKCSharp.Data
{
public interface IAstarteFailedMessageStorage
public interface IAstarteFailedMessageStorage : IManagedMqttClientStorage
{
void InsertVolatile(String topic, byte[] payload, int qos);

void InsertVolatile(String topic, byte[] payload, int qos, int relativeExpiry);

void InsertStored(String topic, byte[] payload, int qos);

void InsertStored(String topic, byte[] payload, int qos, int relativeExpiry);
void InsertVolatile(String topic, byte[] payload, int qos, Guid guid);

bool IsEmpty();
void InsertVolatile(String topic, byte[] payload, int qos, Guid guid, int relativeExpiry);

bool IsCacheEmpty();
Task InsertStored(String topic, byte[] payload, int qos, Guid guid);

AstarteFailedMessageEntry? PeekFirst();
Task InsertStored(String topic, byte[] payload, int qos, Guid guid, int relativeExpiry);

AstarteFailedMessageEntry? PeekFirstCache();
Task Reject(AstarteFailedMessageEntry astarteFailedMessages);

void Ack(AstarteFailedMessageEntry failedMessages);

void AckFirstCache();

void Reject(AstarteFailedMessageEntry astarteFailedMessages);

void RejectFirstCache();
void RejectCache(AstarteFailedMessageEntry astarteFailedMessages);

bool IsExpired(long expire);

Task DeleteByGuidAsync(ManagedMqttApplicationMessage applicationMessage);
}
}
129 changes: 17 additions & 112 deletions AstarteDeviceSDKCSharp/Device/AstarteDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ public class AstarteDevice : IAstarteTransportEventListener
private bool _initialized = false;
private const string _cryptoSubDir = "crypto";
private bool _alwaysReconnect = false;
private bool _explicitDisconnectionRequest;
private static int MIN_INCREMENT_INTERVAL = 5000;
private static int MAX_INCREMENT_INTERVAL = 60000;

/// <summary>
/// Basic class defining an Astarte device.
Expand Down Expand Up @@ -163,47 +160,6 @@ private void ConfigureTransport(AstarteTransport astarteTransport)

}

private bool EventualyReconnect()
{
if (_astarteTransport is null)
{
return false;
}
lock (this)
{
int x = 1;
int interval = 0;
while (_alwaysReconnect && !IsConnected())
{

if (interval < MAX_INCREMENT_INTERVAL)
{
interval = MIN_INCREMENT_INTERVAL * x;
x++;
}
else
{
interval = MAX_INCREMENT_INTERVAL;
}

try
{
Task.Run(async () => await Connect()).Wait(interval);
}
catch (AggregateException ex)
{
foreach (var innerException in ex.InnerExceptions)
{
Trace.WriteLine($"Inner Exception: {innerException.GetType().Name}: {innerException.Message}");
}
}
}
}

_explicitDisconnectionRequest = false;
return false;
}

/// <summary>
/// Method for getting a list of interfaces for the device
/// </summary>
Expand All @@ -222,6 +178,11 @@ public void SetAlwaysReconnect(bool alwaysReconnect)
_alwaysReconnect = alwaysReconnect;
}

public bool GetAlwaysReconnect()
{
return _alwaysReconnect;
}

/// <summary>
/// Establishes a connection to the Astarte asynchronously.
/// </summary>
Expand Down Expand Up @@ -291,17 +252,16 @@ public bool IsConnected()
/// <summary>
/// Disconnect device from Astarte
/// </summary>
public void Disconnect()
public async Task Disconnect()
{
lock (this)

if (!IsConnected() || _astarteTransport is null)
{
if (!IsConnected())
{
return;
}
_explicitDisconnectionRequest = true;
_astarteTransport?.Disconnect();
return;
}

await _astarteTransport.Disconnect();

}

/// <summary>
Expand Down Expand Up @@ -428,33 +388,6 @@ public void OnTransportConnected()
}
}

public void OnTransportConnectionInitializationError(Exception ex)
{
lock (this)
{

_astarteMessagelistener?.OnFailure(new AstarteMessageException(ex.Message, ex));

new Thread(delegate ()
{
try
{
Disconnect();

_astarteMessagelistener?
.OnDisconnected(new AstarteMessageException(ex.Message, ex));

}
catch (AstarteTransportException e)
{
Trace.WriteLine(e.Message);
}
EventualyReconnect();
}).Start();

}
}

public void OnTransportConnectionError(Exception ex)
{
lock (this)
Expand All @@ -468,38 +401,18 @@ public void OnTransportConnectionError(Exception ex)
}
catch (AstartePairingException e)
{

if (!EventualyReconnect())
{

_astarteMessagelistener?
.OnFailure(new AstarteMessageException(e.Message, e));
Trace.WriteLine(e);
}
return;
}

try
{
_astarteTransport?.Connect();
}
catch (AstarteTransportException e)
{

_astarteMessagelistener?
.OnFailure(new AstarteMessageException(e.Message, e));
Trace.WriteLine(e);

return;
}

}
else
{
if (!EventualyReconnect())
{

_astarteMessagelistener?
.OnFailure(new AstarteMessageException(ex.Message, ex));

}
_astarteMessagelistener?
.OnFailure(new AstarteMessageException(ex.Message, ex));
}
}
}
Expand All @@ -508,16 +421,8 @@ public void OnTransportDisconnected()
{
lock (this)
{

_astarteMessagelistener?
.OnDisconnected(new AstarteMessageException("Connection lost"));

if (_alwaysReconnect && !_explicitDisconnectionRequest)
{
EventualyReconnect();
}
_explicitDisconnectionRequest = false;

}
}

Expand Down
4 changes: 0 additions & 4 deletions AstarteDeviceSDKCSharp/IAstarteTransportEventListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@ namespace AstarteDeviceSDKCSharp.Transport
public interface IAstarteTransportEventListener
{
public void OnTransportConnected();

public void OnTransportConnectionInitializationError(Exception ex);

public void OnTransportConnectionError(Exception ex);

public void OnTransportDisconnected();
}
}
3 changes: 1 addition & 2 deletions AstarteDeviceSDKCSharp/Transport/AstarteTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,8 @@ public void SetAstarteTransportEventListener(
}

public abstract Task Connect();
public abstract void Disconnect();
public abstract Task Disconnect();
public abstract bool IsConnected();
public abstract void RetryFailedMessages();

public void SetPropertyStorage(IAstartePropertyStorage propertyStorage)
{
Expand Down
Loading

0 comments on commit 000c987

Please sign in to comment.