Skip to content

Commit

Permalink
Implement async StreamData method
Browse files Browse the repository at this point in the history
Refactor method for `StreamData` for individual
and aggregate data to `Astarte`

Signed-off-by: Osman Hadzic <osman.hadzic@secomind.com>
  • Loading branch information
osmanhadzic committed Apr 24, 2024
1 parent 1ac497d commit 2ae33dc
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ namespace AstarteDeviceSDKCSharp.Protocol
{
public class AstarteDeviceAggregateDatastreamInterface : AstarteAggregateDatastreamInterface, IAstarteAggregateDataStreamer
{
public void StreamData(string path, Dictionary<string, object> payload)
public async Task StreamData(string path, Dictionary<string, object> payload)
{
StreamData(path, payload, null);
await StreamData(path, payload, null);
}

public void StreamData(string path, Dictionary<string, object> payload, DateTime? timestamp)
public async Task StreamData(string path, Dictionary<string, object> payload, DateTime? timestamp)
{
ValidatePayload(path, payload, timestamp);

Expand All @@ -42,7 +42,7 @@ public void StreamData(string path, Dictionary<string, object> payload, DateTime
throw new AstarteTransportException("No available transport");
}

transport.SendAggregate(this, path, payload, timestamp);
await transport.SendAggregate(this, path, payload, timestamp);
}

public void ValidatePayload(string path, Dictionary<string, object> payload, DateTime? timestamp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ namespace AstarteDeviceSDKCSharp.Protocol
{
public class AstarteDeviceDatastreamInterface : AstarteDatastreamInterface, IAstarteDataStreamer
{
public void StreamData(string path, object payload)
public async Task StreamData(string path, object payload)
{
StreamData(path, payload, null);
await StreamData(path, payload, null);
}

public void StreamData(string path, object payload, DateTime? timestamp)
public async Task StreamData(string path, object payload, DateTime? timestamp)
{
ValidatePayload(path, payload, timestamp);
AstarteTransport? transport = GetAstarteTransport();
Expand All @@ -38,7 +38,7 @@ public void StreamData(string path, object payload, DateTime? timestamp)
throw new AstarteTransportException("No available transport");
}

transport.SendIndividualValue(this, path, payload, timestamp);
await transport.SendIndividualValue(this, path, payload, timestamp);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ namespace AstarteDeviceSDKCSharp.Protocol
{
public interface IAstarteAggregateDataStreamer
{
void StreamData(String path, Dictionary<String, Object> payload);
Task StreamData(String path, Dictionary<String, Object> payload);

void StreamData(String path, Dictionary<String, Object> payload, DateTime? timestamp);
Task StreamData(String path, Dictionary<String, Object> payload, DateTime? timestamp);
}
}
4 changes: 2 additions & 2 deletions AstarteDeviceSDKCSharp/Protocol/IAstarteDataStreamer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ public interface IAstarteDataStreamer
/// </summary>
/// <param name="path">Endpoint</param>
/// <param name="payload">Message for MQTT broker</param>
void StreamData(String path, Object payload);
Task StreamData(String path, Object payload);

/// <summary>
/// Method for sending individual values to Astarte with timestamp
/// </summary>
/// <param name="path">Endpoint</param>
/// <param name="payload">Message for MQTT broker</param>
/// <param name="timestamp">UTC</param>
void StreamData(String path, Object payload, DateTime? timestamp);
Task StreamData(String path, Object payload, DateTime? timestamp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public async Task AggregateFromDeviceToServer()
(AstarteDeviceAggregateDatastreamInterface)astarteDevice
.GetInterface(interfaceName);

astarteDeviceAggregateDatastream
await astarteDeviceAggregateDatastream
.StreamData("/%{sensor_id}", astarteMockDevice.MockDataDictionary, DateTime.Now);
Thread.Sleep(500);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public async Task DatastreamFromDeviceToServer()

foreach (var item in astarteMockDevice.MockDataDictionary)
{
astarteDatastreamInterface.StreamData($"/{item.Key}", item.Value, DateTime.Now);
await astarteDatastreamInterface
.StreamData($"/{item.Key}", item.Value, DateTime.Now);
Thread.Sleep(500);
}

Expand Down

0 comments on commit 2ae33dc

Please sign in to comment.