-
Notifications
You must be signed in to change notification settings - Fork 0
/
UdpServer.cs
85 lines (74 loc) · 3.28 KB
/
UdpServer.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
using System.Net;
using System.Net.Sockets;
using TeltonikaDataServer.Config;
using TeltonikaDataServer.Models;
using Microsoft.Extensions.Options;
namespace TeltonikaDataServer;
public class UdpServer : BackgroundService
{
private readonly ILogger<UdpServer> _logger;
private readonly ServerOptions _config;
private readonly TeltonikaDataHandler _dataHandler;
private readonly List<Task> _handlerTasks = new();
public UdpServer(ILogger<UdpServer> logger, IOptions<ServerOptions> options, TeltonikaDataHandler dataHandler)
{
_logger = logger;
_config = options.Value;
_dataHandler = dataHandler;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var packetIdCache = new PacketIdCache(_config.PacketIdCacheSize);
_logger.LogInformation($"Listening on UDP {_config.Ip}:{_config.Port}");
UdpClient udpClient = new UdpClient(new IPEndPoint(IPAddress.Parse(_config.Ip), _config.Port));
try
{
while (!stoppingToken.IsCancellationRequested)
{
var result = await udpClient.ReceiveAsync(stoppingToken);
_logger.LogDebug($"Got {result.Buffer.Length} bytes from {result.RemoteEndPoint}");
var packet = Codec8Parser.Parse(result.Buffer);
byte? latestAvlPacketId = packetIdCache.GetLatestPacketId(packet.AvlHeader.Imei);
_logger.LogDebug($"PacketId {packet.Header.PacketId} AvlPacketId {packet.AvlHeader.AvlPacketId}");
// skip processing if duplicate packet (UDP acks can get lost)
if (latestAvlPacketId.HasValue && latestAvlPacketId == packet.AvlHeader.AvlPacketId)
{
_logger.LogWarning($"Skipping duplicate AVL packet id {packet.AvlHeader.AvlPacketId}");
}
else
{
HandlePacketInBackground(packet);
}
// always ack quickly to lower gsm power consumption on battery powered trackers
// in case of handling exception, data will be lost atm ¯\_(ツ)_/¯
var ack = Codec8Parser.BuildAck(packet);
var ackPayload = Codec8Parser.SerializeAck(ack);
await udpClient.SendAsync(ackPayload, ackPayload.Length, result.RemoteEndPoint);
packetIdCache.SaveLatestPacketId(packet.AvlHeader.Imei, packet.AvlHeader.AvlPacketId);
}
}
finally
{
RemoveCompletedHandlerTasks();
_logger.LogInformation($"Awaiting {_handlerTasks.Count} handler tasks");
await Task.WhenAll(_handlerTasks);
}
}
private void RemoveCompletedHandlerTasks() => _handlerTasks.RemoveAll(t => t.IsCompleted);
private void HandlePacketInBackground(Codec8UdpPacket parsedPacket)
{
RemoveCompletedHandlerTasks();
_handlerTasks.Add(Handlepacket(parsedPacket));
}
private async Task Handlepacket(Codec8UdpPacket parsedPacket)
{
try
{
await _dataHandler.Handle(parsedPacket);
}
catch (Exception e)
{
_logger.LogError(e, "Handler exception");
}
}
}