diff --git a/AdapterInterface/Adapter.cs b/AdapterInterface/Adapter.cs
index 0d737d56..a1aa7385 100644
--- a/AdapterInterface/Adapter.cs
+++ b/AdapterInterface/Adapter.cs
@@ -14,6 +14,7 @@
using Mtconnect.AdapterInterface.Contracts.Attributes;
using System.Diagnostics.Tracing;
using EventAttribute = Mtconnect.AdapterInterface.Contracts.Attributes.EventAttribute;
+using System.Threading;
namespace Mtconnect
{
@@ -124,11 +125,23 @@ public virtual double Heartbeat
/// Generic constructor of a new Adapter instance with basic AdapterOptions.
///
///
- public Adapter(AdapterOptions options)
+ /// Reference to the logger factory to handle logging.
+ public Adapter(AdapterOptions options, ILogger logger = null)
{
_options = options;
Heartbeat = options.Heartbeat;
CanEnqueueDataItems = options.CanEnqueueDataItems;
+
+ _logger = logger;
+ }
+
+ public bool Contains(string dataItemName)
+ {
+ return _dataItems.ContainsKey(dataItemName);
+ }
+ public bool Contains(DataItem dataItem)
+ {
+ return Contains(dataItem.Name);
}
///
@@ -156,6 +169,8 @@ public void AddDataItem(DataItem dataItem)
_dataItems[internalName].FormatValue = options?.Formatter;
if (!string.IsNullOrEmpty(options?.DataItemName) && options?.DataItemName != internalName)
_dataItems[internalName].Name = options?.DataItemName;
+
+ _logger?.LogTrace("Added DataItem {DataItemName}", _dataItems[internalName].Name);
}
private void Adapter_OnDataItemChanged(DataItem sender, DataItemChangedEventArgs e)
@@ -206,6 +221,7 @@ public void RemoveDataItem(DataItem dataItem)
///
public void Unavailable()
{
+ _logger?.LogTrace("Setting all DataItem values to UNAVAILABLE");
foreach (var kvp in _dataItems)
kvp.Value.Unavailable();
}
@@ -236,6 +252,8 @@ public void Begin()
/// Flag for identifying which s to send.
public virtual void Send(DataItemSendTypes sendType = DataItemSendTypes.Changed, string clientId = null)
{
+ _logger?.LogTrace("Sending {DataItemSendType} values", sendType.ToString());
+
var values = new List();
switch (sendType)
{
@@ -271,6 +289,8 @@ public virtual void Send(DataItemSendTypes sendType = DataItemSendTypes.Changed,
/// Collection of s to send values.
protected void Send(IEnumerable values, string clientId = null)
{
+ _logger?.LogTrace($"Sending {values.Count()} values");
+
var orderedValues = values.OrderBy(o => o.Timestamp).ToList();
var individualValues = values.Where(o => o.HasNewLine).ToList();
var multiplicityValues = orderedValues.Except(individualValues).ToList();
@@ -319,17 +339,17 @@ public virtual void AddAsset(Asset asset)
Write(sb.ToString());
}
- ///
- /// The heartbeat thread for a client. This thread receives data from a client, closes the socket when it fails, and handles communication timeouts when the client does not send a heartbeat within 2x the heartbeat frequency. When the heartbeat is not received, the client is assumed to be unresponsive and the connection is closed. Waits for one ping to be received before enforcing the timeout.
- ///
- /// The client we are communicating with.
- protected abstract void HeartbeatClient(object client);
+ /////
+ ///// The heartbeat thread for a client. This thread receives data from a client, closes the socket when it fails, and handles communication timeouts when the client does not send a heartbeat within 2x the heartbeat frequency. When the heartbeat is not received, the client is assumed to be unresponsive and the connection is closed. Waits for one ping to be received before enforcing the timeout.
+ /////
+ ///// The client we are communicating with.
+ //protected abstract void HeartbeatClient(object client);
///
/// Start the listener thread.
///
/// Flag for whether or not to also call .
- public abstract void Start(bool begin = true);
+ public abstract void Start(bool begin = true, CancellationToken token = default);
///
/// Starts the listener thread and the provided .
@@ -350,100 +370,17 @@ public void Start(IEnumerable sources, bool begin = true)
foreach (var source in sources)
{
- AddSource(source);
+ _sources.Add(source);
+ source.OnDataReceived += _source_OnDataReceived;
source.Start();
}
}
- private void AddSource(T source) where T : class, IAdapterSource
- {
- _sources.Add(source);
- source.OnDataReceived += _source_OnDataReceived;
-
- Type sourceType = source.GetType();
- AddDataItemsFromSource(sourceType);
- }
-
- ///
- /// Adds Data Items from properties in the that are decorated with the .
- ///
- /// Reference to the type to perform reflection on.
- /// A recursive prefix for the Data Item names.
- private void AddDataItemsFromSource(Type sourceType, string dataItemPrefix = "")
+ private void _source_OnDataReceived(IAdapterDataModel data, DataReceivedEventArgs e)
{
- // TODO: Cache the property map
- var properties = sourceType.GetProperties(BindingFlags.Instance | BindingFlags.Public)
- .ToArray();
- var dataItemProperties = properties
- .Where(o => o.GetCustomAttribute(typeof(DataItemAttribute)) != null)
- .ToArray();
- foreach (var property in dataItemProperties)
+ if (this.TryAddDataItems(data) && this.TryUpdateValues(data))
{
- var dataItemAttribute = property.GetCustomAttribute();
- string dataItemName = dataItemPrefix + dataItemAttribute.Name;
-
- switch (dataItemAttribute)
- {
- case DataItemPartialAttribute _:
- AddDataItemsFromSource(property.PropertyType, dataItemName);
- break;
- case EventAttribute _:
- AddDataItem(new Event(dataItemName));
- break;
- case SampleAttribute _:
- AddDataItem(new Sample(dataItemName));
- break;
- case ConditionAttribute _:
- AddDataItem(new Condition(dataItemName));
- break;
- case TimeSeriesAttribute _:
- AddDataItem(new TimeSeries(dataItemName));
- break;
- case MessageAttribute _:
- AddDataItem(new Message(dataItemName));
- break;
- default:
- break;
- }
- }
- }
-
- private void _source_OnDataReceived(object sender, DataReceivedEventArgs e)
- {
- updateValuesFromSource(sender);
-
- Send(DataItemSendTypes.Changed);
- }
-
- ///
- /// A recursive method for reflecting on the properties of the .
- ///
- /// Reference to the source object. This value will change throughout the recursion.
- /// As the Data Item Prefix
- private void updateValuesFromSource(object source, string dataItemPrefix = "")
- {
- // TODO: Cache the property map
- Type sourceType = source.GetType();
- var dataItemProperties = sourceType.GetProperties().Where(o => o.GetCustomAttribute(typeof(DataItemAttribute)) != null);
- foreach (var property in dataItemProperties)
- {
- var dataItemAttribute = property.GetCustomAttribute();
- string dataItemName = dataItemPrefix + dataItemAttribute.Name;
- switch (dataItemAttribute)
- {
- case DataItemPartialAttribute _:
- updateValuesFromSource(property.GetValue(source), dataItemName);
- break;
- case EventAttribute _:
- case SampleAttribute _:
- case ConditionAttribute _:
- case TimeSeriesAttribute _:
- case MessageAttribute _:
- this[dataItemName].Value = property.GetValue(source);
- break;
- default:
- break;
- }
+ Send(DataItemSendTypes.Changed);
}
}
@@ -455,6 +392,7 @@ public virtual void Stop()
foreach (var source in _sources)
{
source.Stop();
+ source.OnDataReceived -= _source_OnDataReceived;
}
}
}
diff --git a/AdapterInterface/AdapterExtensions.cs b/AdapterInterface/AdapterExtensions.cs
new file mode 100644
index 00000000..4d9a30f4
--- /dev/null
+++ b/AdapterInterface/AdapterExtensions.cs
@@ -0,0 +1,139 @@
+using Mtconnect.AdapterInterface.Contracts.Attributes;
+using Mtconnect.AdapterInterface.DataItems;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reflection;
+
+namespace Mtconnect
+{
+ public static class AdapterExtensions
+ {
+ ///
+ /// Attempts to add a to the if the adapter does not already contain such a .
+ ///
+ /// Reference to the adapter to add the data item onto.
+ /// Reference to the data item to be added.
+ /// Flag for whether or not the data item has been added. Returns true if the data item has already been added.
+ public static bool TryAddDataItem(this Adapter adapter, DataItem dataItem)
+ {
+ if (adapter.Contains(dataItem)) return true;
+
+ adapter.AddDataItem(dataItem);
+
+ return true;
+ }
+
+ ///
+ /// Adds Data Items from properties in the that are decorated with the .
+ ///
+ /// Reference to the MTConnect to add the data items onto.
+ /// Reference to a data model containing s.
+ /// Flag for whether or not all decorated s were added to the adapter.
+ public static bool TryAddDataItems(this Adapter adapter, IAdapterDataModel model)
+ {
+ return adapter.TryAddDataItems(model.GetType());
+ }
+ private static PropertyInfo[] GetDataItemProperties(Type type)
+ {
+ return type.GetProperties(BindingFlags.Instance | BindingFlags.Public)
+ .Where(o => o.GetCustomAttribute(typeof(DataItemAttribute)) != null)
+ .ToArray();
+ }
+ private static Dictionary _dataItemProperties = new Dictionary();
+ private static bool TryAddDataItems(this Adapter adapter, Type dataModelType, string dataItemNamePrefix = "")
+ {
+ if (_dataItemProperties.TryGetValue(dataModelType, out PropertyInfo[] dataItemProperties)) return true;
+
+ dataItemProperties = GetDataItemProperties(dataModelType);
+ _dataItemProperties.Add(dataModelType, dataItemProperties);
+ bool allDataItemsAdded = true;
+
+
+ foreach (var property in dataItemProperties)
+ {
+ bool dataItemAdded = false;
+ var dataItemAttribute = property.GetCustomAttribute();
+ string dataItemName = dataItemNamePrefix + dataItemAttribute.Name;
+
+ switch (dataItemAttribute)
+ {
+ case DataItemPartialAttribute _:
+ dataItemAdded = adapter.TryAddDataItems(property.PropertyType, dataItemName);
+ break;
+ case EventAttribute _:
+ dataItemAdded = adapter.TryAddDataItem(new Event(dataItemName));
+ break;
+ case SampleAttribute _:
+ dataItemAdded = adapter.TryAddDataItem(new Sample(dataItemName));
+ break;
+ case ConditionAttribute _:
+ dataItemAdded = adapter.TryAddDataItem(new Condition(dataItemName));
+ break;
+ case TimeSeriesAttribute _:
+ dataItemAdded = adapter.TryAddDataItem(new TimeSeries(dataItemName));
+ break;
+ case MessageAttribute _:
+ dataItemAdded = adapter.TryAddDataItem(new Message(dataItemName));
+ break;
+ default:
+ dataItemAdded = false;
+ break;
+ }
+
+ if (!dataItemAdded) allDataItemsAdded = false;
+ }
+
+ return allDataItemsAdded;
+ }
+
+ ///
+ /// Attempts to update the s of the .
+ ///
+ /// Reference to the MTConnect to update the data items from.
+ /// Reference to the data model to update the s s from.
+ /// Flag for whether or not all values were updated from the .
+ public static bool TryUpdateValues(this Adapter adapter, IAdapterDataModel model)
+ {
+ return TryUpdateValues(adapter, model, string.Empty);
+ }
+ private static bool TryUpdateValues(this Adapter adapter, object model, string dataItemPrefix)
+ {
+ bool allDataItemsUpdated = true;
+
+ Type sourceType = model.GetType();
+
+ if (!_dataItemProperties.TryGetValue(sourceType, out PropertyInfo[] dataItemProperties))
+ {
+ dataItemProperties = GetDataItemProperties(sourceType);
+ }
+ foreach (var property in dataItemProperties)
+ {
+ bool dataItemUpdated = true;
+
+ var dataItemAttribute = property.GetCustomAttribute();
+ string dataItemName = dataItemPrefix + dataItemAttribute.Name;
+ switch (dataItemAttribute)
+ {
+ case DataItemPartialAttribute _:
+ dataItemUpdated = adapter.TryUpdateValues(property.GetValue(model), dataItemName);
+ break;
+ case EventAttribute _:
+ case SampleAttribute _:
+ case ConditionAttribute _:
+ case TimeSeriesAttribute _:
+ case MessageAttribute _:
+ adapter[dataItemName].Value = property.GetValue(model);
+ break;
+ default:
+ dataItemUpdated = false;
+ break;
+ }
+
+ if (!dataItemUpdated) allDataItemsUpdated = false;
+ }
+
+ return allDataItemsUpdated;
+ }
+ }
+}
\ No newline at end of file
diff --git a/AdapterInterface/AdapterInterface.csproj b/AdapterInterface/AdapterInterface.csproj
index f4a0da75..84664550 100644
--- a/AdapterInterface/AdapterInterface.csproj
+++ b/AdapterInterface/AdapterInterface.csproj
@@ -17,11 +17,11 @@
icon.jpg
git
MTConnect;Adapter;Interface
- Introduction of encryption of scripts within the App.config. These scripts can be used to transform the Data Item values.
+ Added extra logging and refactored DataItem modeling.
icon.ico
https://github.com/TrueAnalyticsSolutions/MtconnectCore.Adapter
$(ProjectUrl)
- 1.0.7-alpha
+ 1.0.8-alpha
diff --git a/AdapterInterface/IAdapterDataModel.cs b/AdapterInterface/IAdapterDataModel.cs
new file mode 100644
index 00000000..26f34a05
--- /dev/null
+++ b/AdapterInterface/IAdapterDataModel.cs
@@ -0,0 +1,11 @@
+using System.Threading;
+
+namespace Mtconnect
+{
+ ///
+ /// A generic interface for instantiating a source for a MTConnect Adapter.
+ ///
+ public interface IAdapterDataModel
+ {
+ }
+}
diff --git a/AdapterInterface/IAdapterSource.cs b/AdapterInterface/IAdapterSource.cs
index d72ffd26..967b3e75 100644
--- a/AdapterInterface/IAdapterSource.cs
+++ b/AdapterInterface/IAdapterSource.cs
@@ -1,15 +1,17 @@
-
+using System;
+using System.Threading;
+
namespace Mtconnect
{
///
/// Handler for ingesting data from a MTConnect Adapter source.
///
- /// Reference to the Adapter source.
+ /// Reference to the data model.
/// Event arguments containing data received from the MTConnect Adapter source.
- public delegate void DataReceivedHandler(object sender, DataReceivedEventArgs e);
+ public delegate void DataReceivedHandler(IAdapterDataModel data, DataReceivedEventArgs e);
///
- /// A generic interface for instantiating a source for a MTConnect Adapter.
+ /// A generic interface for classes used to derive s from use.
///
public interface IAdapterSource
{
@@ -21,10 +23,11 @@ public interface IAdapterSource
///
/// Instructs the Adapter source to begin collecting data.
///
- void Start();
+ /// Token for cancelling the startup method.
+ void Start(CancellationToken token = default);
///
- /// Instructs the Adapter source to stop collecting data.
+ /// Instructs the Adapter source to stop collecting and/or processing data.
///
void Stop();
}
diff --git a/MtconnectCore.TcpAdapter/TcpAdapter.cs b/MtconnectCore.TcpAdapter/TcpAdapter.cs
index 2750d142..687c6b61 100644
--- a/MtconnectCore.TcpAdapter/TcpAdapter.cs
+++ b/MtconnectCore.TcpAdapter/TcpAdapter.cs
@@ -1,10 +1,13 @@
using System;
using System.Collections;
+using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
+using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading;
+using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Mtconnect.AdapterInterface;
using Mtconnect.AdapterInterface.Contracts;
@@ -15,55 +18,80 @@ namespace Mtconnect
///
/// An implementation of a MTConnect Adapter that publishes data thru a TCP stream.
///
- public sealed class TcpAdapter : Adapter
+ public sealed class TcpAdapter : Adapter, IDisposable
{
+ private bool _disposing = false;
+
+ ///
+ /// Event that fires when a message is received from a .
+ ///
+ public event TcpConnectionDataReceived ClientDataReceived;
+
///
/// The Port property to set and get the mPort. This will only take affect when the adapter is stopped.
///
public int Port { get; private set; } = 7878;
+ ///
+ /// The maximum number of kvp connections allowed to exist at any given point.
+ ///
+ public int MaxConnections { get; private set; } = 2;
+
+ private CancellationTokenSource _listerCancellationSource = new CancellationTokenSource();
///
/// The listening thread for new connections
///
- private Thread _listenerThread;
+ private Task _listenerThread;
///
- /// A list of all the client connections.
+ /// A list of all the kvp connections.
///
- private Dictionary _clients = new Dictionary();
+ private ConcurrentDictionary _clients { get; set; } = new ConcurrentDictionary();
///
- /// A count of client threads.
+ /// A count of tracked s.
///
- private CountdownEvent _activeClients = new CountdownEvent(1);
+ private CountdownEvent _activeClients { get; set; } = new CountdownEvent(1);
///
/// The server socket.
///
- private TcpListener _listener;
+ private TcpListener _listener { get; set; }
///
/// Constructs a new .
///
///
- public TcpAdapter(TcpAdapterOptions options) : base(options)
+ public TcpAdapter(TcpAdapterOptions options, ILogger logger = null) : base(options, logger)
{
Port = options.Port;
+ MaxConnections = options.MaxConcurrentConnections;
}
///
- public override void Start(bool begin = true)
+ public override void Start(bool begin = true, CancellationToken token = default)
{
if (State <= AdapterStates.NotStarted)
{
+ _logger?.LogInformation("Starting Adapter");
State = AdapterStates.Starting;
+ // Start TcpListener
_listener = new TcpListener(IPAddress.Any, Port);
_listener.Start();
- _listenerThread = new Thread(new ThreadStart(ListenForClients));
- _listenerThread.Start();
+ // Start before the _listenerThread because it relies on state being Busy
State = AdapterStates.Started;
+
+ // Setup task that listens for new clients
+ _listerCancellationSource.Token.Register(Stop);
+ _listenerThread = Task.Factory.StartNew(
+ ListenForClients,
+ _listerCancellationSource.Token,
+ TaskCreationOptions.LongRunning,
+ TaskScheduler.Default
+ );
+
}
if (begin) Begin();
@@ -76,19 +104,20 @@ public override void Stop()
if (State > AdapterStates.NotStarted)
{
+ _logger?.LogInformation("Stopping Adapter");
State = AdapterStates.Stopping;
- // Wait 2 seconds for the thread to exit.
- _listenerThread.Join((int)(2 * Heartbeat));
+ // Queue the _listerThread to cancel
+ _listerCancellationSource?.Cancel();
- foreach (Object obj in _clients)
+ // Dispose of all tracked clients and clear connections from memory.
+ foreach (var kvp in _clients)
{
- Stream client = (Stream)obj;
- client.Close();
+ kvp.Value.Dispose();
}
_clients.Clear();
- // Wait for all client threads to exit.
+ // Wait for all kvp threads to exit.
_activeClients.Wait(2000);
State = AdapterStates.Stopped;
@@ -111,38 +140,28 @@ public override void Send(DataItemSendTypes sendType = DataItemSendTypes.Changed
HasBegun = false;
}
-#if DEBUG
- ///
- /// For testing, add a io stream to the adapter.
- ///
- /// The client who sent the text
- /// A IO Stream
- public void addClientStream(string clientId, Stream aStream)
- {
- _clients.Add(clientId, aStream);
- Send(DataItemSendTypes.All, clientId);
- }
-#endif
-
///
protected override void Write(string message, string clientId = null)
{
_logger?.LogDebug("Sending message: {Message}", message);
-
- if (clientId == null)
+ lock(_clients)
{
- foreach (var kvp in _clients)
+ if (_clients.Any())
{
- lock (kvp.Value)
+ if (clientId == null)
{
- WriteToClient(kvp.Key, message);
+ foreach (var kvp in _clients)
+ {
+ kvp.Value.Write(message);
+ }
+ }
+ else if (_clients.TryGetValue(clientId, out var client) && client != null)
+ {
+ lock (client)
+ {
+ client.Write(message);
+ }
}
- }
- } else if (_clients.ContainsKey(clientId))
- {
- lock (_clients[clientId])
- {
- WriteToClient(clientId, message);
}
}
}
@@ -155,215 +174,134 @@ public void FlushAll()
{
foreach (var kvp in _clients)
kvp.Value.Flush();
-
- }
-
- ///
- /// Receive data from a client and implement heartbeat ping/pong protocol.
- ///
- /// The client who sent the text
- /// The line of text
- private bool Receive(string clientId, string line)
- {
- Stream clientStream = null;
- if (!_clients.TryGetValue(clientId, out clientStream))
- return false;
-
- bool heartbeat = false;
- // TODO: Implement const for * PING
- if (line.StartsWith("* PING") && Heartbeat > 0)
- {
- heartbeat = true;
- lock (clientStream)
- {
- _logger?.LogDebug("Received PING, sending PONG");
- WriteToClient(clientId, PONG);
- clientStream.Flush();
- }
- }
-
- return heartbeat;
}
-
- ///
- /// Send text to a client as a byte array. Handles execptions and remove the client from the list of clients if the write fails. Also makes sure the client connection is closed when it fails.
- ///
- /// Reference to the registered client id for the TCP stream.
- /// The message to send to the client.
- private void WriteToClient(string clientId, string message) => WriteToClient(clientId, Encoder.GetBytes(message.ToCharArray()));
///
- /// Send text to a client as a byte array. Handles execptions and remove the client from the list of clients if the write fails. Also makes sure the client connection is closed when it fails.
+ /// Listens for new TCP clients.
///
- /// The client to send the message to
- /// The message
- private void WriteToClient(string clientId, byte[] message)
- {
- Stream clientStream = null;
- try
- {
- if (_clients.TryGetValue(clientId, out clientStream))
- {
- clientStream.Write(message, 0, message.Length);
- } else
- {
- _logger?.LogWarning("Could not find client from id '{clientId}'", clientId);
- }
- }
- catch (Exception e)
- {
- // TODO: Convert to constant
- _logger?.LogError(e, "Failed to write to client stream");
- try
- {
- clientStream?.Close();
- }
- catch (Exception f)
- {
- _logger?.LogError(f, "Failed to close client stream");
- } finally
- {
- if (_clients.ContainsKey(clientId))
- _clients.Remove(clientId);
- }
- }
- }
-
- ///
- protected override void HeartbeatClient(object client)
+ private void ListenForClients()
{
- _activeClients.AddCount();
- TcpClient tcpClient = (TcpClient)client;
- NetworkStream clientStream = tcpClient.GetStream();
- string clientId = ((IPEndPoint)tcpClient.Client.RemoteEndPoint).Address.ToString();
-
- if (!_clients.ContainsKey(clientId))
- _clients.Add(clientId, null);
- _clients[clientId] = clientStream;
-
- ArrayList readList = new ArrayList();
- bool heartbeatActive = false;
-
- byte[] message = new byte[4096];
- int length = 0;
+ State = AdapterStates.Busy;
try
{
- while (State == AdapterStates.Busy && tcpClient.Connected)
+ while (State == AdapterStates.Busy)
{
- int bytesRead = 0;
-
- try
- {
- readList.Clear();
- readList.Add(tcpClient.Client);
- if (Heartbeat > 0 && heartbeatActive)
- Socket.Select(readList, null, null, (int)(Heartbeat * 2000));
- if (readList.Count == 0 && heartbeatActive)
- {
- _logger?.LogWarning("Heartbeat timed out, closing connection");
- break;
- }
-
- //blocks until a client sends a message
- bytesRead = clientStream.Read(message, length, 4096 - length);
- }
- catch (Exception e)
- {
- //a socket error has occured
- _logger?.LogError(e, "Failed to read heartbeat client message");
- break;
- }
+ if (!_listener.Pending()) continue;
- if (bytesRead == 0)
+ //blocks until a kvp has connected to the server
+ var client = new TcpConnection(_listener.AcceptTcpClient(), (int)Heartbeat);
+ if (_activeClients.CurrentCount >= MaxConnections)
{
- //the client has disconnected from the server
- _logger?.LogWarning("No bytes were read from heartbeat client");
- break;
+ _logger?.LogWarning(
+ "Denied connection to '{ClientId}', too many concurrent connections ({ActiveConnections}/{MaxConnections})",
+ client.ClientId,
+ _activeClients.CurrentCount,
+ MaxConnections
+ );
+ continue;
}
- // See if we have a line
- int pos = length;
- length += bytesRead;
- int eol = 0;
- for (int i = pos; i < length; i++)
+ if (!_clients.ContainsKey(client.ClientId) && _activeClients.TryAddCount())
{
- if (message[i] == '\n')
+ _logger?.LogInformation("New client connection '{ClientId}'", client.ClientId);
+ if (_clients.TryAdd(client.ClientId, client))
{
-
- String line = Encoder.GetString(message, eol, i);
- if (Receive(clientId, line)) heartbeatActive = true;
- eol = i + 1;
+ client.OnDisconnected += Client_OnConnectionDisconnected;
+ client.OnDataReceived += Client_OnReceivedData;
+ client.Connect();
+ // Issue command for underlying Adapter to send all DataItem current values to the newly added kvp
+ Send(DataItemSendTypes.All, client.ClientId);
+ } else
+ {
+ _activeClients.Signal(); // Undo try add
+ _logger?.LogError("Failed to add client '{ClientId}'", client.ClientId);
}
- }
-
- // Remove the lines that have been processed.
- if (eol > 0)
+ } else
{
- length = length - eol;
- // Shift the message array to remove the lines.
- if (length > 0)
- Array.Copy(message, eol, message, 0, length);
+ _logger?.LogWarning("Client '{ClientId}' already has an established connection", client.ClientId);
}
}
}
catch (Exception e)
{
- _logger?.LogError(e, "Failed to process heartbeat client");
+ _logger?.LogError(e, "Exception occurred while waiting for connection");
}
-
finally
{
- try
- {
- if (_clients.ContainsKey(clientId))
- {
- _clients.Remove(clientId);
- }
- tcpClient.Close();
- }
- catch (Exception e)
- {
- _logger?.LogError(e, "Failed to cleanup heartbeat client connection");
- }
- _activeClients.Signal();
+ State = AdapterStates.Started;
+ _listener.Stop();
}
}
+ private const string PING = "* PING";
///
- /// Listens for new TCP clients.
+ /// ReceiveClient data from a kvp and implement heartbeat ping/pong protocol.
///
- private void ListenForClients()
+ private bool Client_OnReceivedData(TcpConnection connection, string message)
{
- State = AdapterStates.Busy;
-
- try
+ bool heartbeat = false;
+ if (message.StartsWith(PING) && Heartbeat > 0)
{
- while (State == AdapterStates.Busy)
+ heartbeat = true;
+ lock (connection)
{
- //blocks until a client has connected to the server
- TcpClient tcpClient = _listener.AcceptTcpClient();
- string clientId = ((IPEndPoint)tcpClient.Client.RemoteEndPoint).Address.ToString();
-
- //create a thread to handle communication
- //with connected client
- Thread clientThread = new Thread(new ParameterizedThreadStart(HeartbeatClient));
- clientThread.Start(tcpClient);
-
- // Issue command for underlying Adapter to send all DataItem current values to the newly added client
- Send(DataItemSendTypes.All, clientId);
- clientThread.Join();
+ _logger?.LogInformation("Received PING from client {ClientId}, sending PONG", connection.ClientId);
+ connection.Write(PONG);
+ connection.Flush();
}
}
- catch (Exception e)
+
+ if (ClientDataReceived != null) ClientDataReceived(connection, message);
+
+ return heartbeat;
+ }
+
+ private void Client_OnConnectionDisconnected(TcpConnection connection, Exception ex = null)
+ {
+ if (!_clients.ContainsKey(connection.ClientId))
{
- _logger?.LogError(e, "Exception occurred while waiting for connection");
+ _logger?.LogWarning("Client '{ClientId}' is not tracked", connection.ClientId);
+ return;
}
- finally
+
+ lock (_clients)
{
- State = AdapterStates.Started;
- _listener.Stop();
+ if (ex == null)
+ {
+ _logger?.LogInformation("Client disconnected '{ClientId}'", connection.ClientId);
+ } else
+ {
+ _logger?.LogError("Client '{ClientId}' disconnected due to error: \r\n\t{Error}", connection.ClientId, ex);
+ }
+ if (_clients.TryRemove(connection.ClientId, out TcpConnection client))
+ {
+ if (_activeClients.Signal())
+ {
+ _logger?.LogInformation("No clients connected");
+ }
+ }
+ }
+ }
+
+ public void Dispose()
+ {
+ if (_disposing) return;
+ _disposing = true;
+
+ // Stop the TcpListener
+ _listener?.Stop();
+
+ // Dispose of all tracked clients and clear connections from memory.
+ foreach (var kvp in _clients)
+ {
+ kvp.Value.Dispose();
}
+ _clients.Clear();
+
+ // Dispose of the _listenerThread
+ _listenerThread?.Dispose();
+ _listerCancellationSource?.Dispose();
}
}
}
diff --git a/MtconnectCore.TcpAdapter/TcpAdapter.csproj b/MtconnectCore.TcpAdapter/TcpAdapter.csproj
index 7f6aabb1..408a34e3 100644
--- a/MtconnectCore.TcpAdapter/TcpAdapter.csproj
+++ b/MtconnectCore.TcpAdapter/TcpAdapter.csproj
@@ -18,8 +18,8 @@
https://github.com/TrueAnalyticsSolutions/MtconnectCore.Adapter
git
Mtconnect;Adapter;TCP;TAMS;
- An implementation of the generic MTConnect® Adapter library that listens for TCP clients and publishes data in a pipe-delimitted stream.
- 1.0.7-alpha
+ Added extra logging and refined TcpConnections.
+ 1.0.8-alpha
diff --git a/MtconnectCore.TcpAdapter/TcpAdapterOptions.cs b/MtconnectCore.TcpAdapter/TcpAdapterOptions.cs
index 32ed6892..2d2cac82 100644
--- a/MtconnectCore.TcpAdapter/TcpAdapterOptions.cs
+++ b/MtconnectCore.TcpAdapter/TcpAdapterOptions.cs
@@ -14,14 +14,20 @@ public sealed class TcpAdapterOptions : AdapterOptions
///
public int Port { get; private set; }
+ ///
+ /// The maximum number of connections allowed at any given point.
+ ///
+ public int MaxConcurrentConnections { get; private set; }
+
///
/// Constructs the most basic options for configuring a MTConnect Adapter.
///
///
///
- public TcpAdapterOptions(double heartbeat = 10_000, int port = 7878) : base(heartbeat)
+ public TcpAdapterOptions(double heartbeat = 10_000, int port = 7878, int maxConnections = 2) : base(heartbeat)
{
Port = port;
+ MaxConcurrentConnections = maxConnections;
}
public override Dictionary UpdateFromConfig()
@@ -33,6 +39,11 @@ public override Dictionary UpdateFromConfig()
Port = port;
}
+ if (adapterSettings.ContainsKey("maxConnections") && Int32.TryParse(adapterSettings["maxConnections"].ToString(), out int maxConnections))
+ {
+ MaxConcurrentConnections = maxConnections;
+ }
+
return adapterSettings;
}
}
diff --git a/MtconnectCore.TcpAdapter/TcpConnection.cs b/MtconnectCore.TcpAdapter/TcpConnection.cs
new file mode 100644
index 00000000..a6dd3bbc
--- /dev/null
+++ b/MtconnectCore.TcpAdapter/TcpConnection.cs
@@ -0,0 +1,194 @@
+using System;
+using System.Collections;
+using System.Net;
+using System.Net.Sockets;
+using System.Security.Policy;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Mtconnect
+{
+ public delegate void TcpConnectionConnected(TcpConnection connection);
+ public delegate void TcpConnectionDisconnected(TcpConnection connection, Exception ex = null);
+ public delegate bool TcpConnectionDataReceived(TcpConnection connection, string message);
+ public class TcpConnection : IDisposable
+ {
+ private bool _disposing { get; set; } = false;
+
+ ///
+ /// An event that fires when the underlying client stream is opened and connected.
+ ///
+ public event TcpConnectionConnected OnConnected;
+
+ ///
+ /// An event that fires when the underlying client stream is closed and disconnected.
+ ///
+ public event TcpConnectionDisconnected OnDisconnected;
+
+ ///
+ /// An event that fires when data is fully parsed from the underlying client stream. Note that a new line is used to determine the end of a full message.
+ ///
+ public event TcpConnectionDataReceived OnDataReceived;
+
+ ///
+ /// Maximum amount of binary data to receive at a time.
+ ///
+ private const int BUFFER_SIZE = 4096;
+
+ public ASCIIEncoding Encoder { get; set; } = new ASCIIEncoding();
+
+ ///
+ /// Reference to the address.
+ ///
+ public string ClientId { get; private set; }
+
+ ///
+ /// The period of time (in milliseconds) to timeout stream reading.
+ ///
+ public int Heartbeat { get; set; }
+
+ ///
+ /// Reference to the connection to the .
+ ///
+ private TcpClient _client { get; set; }
+
+ ///
+ /// Reference to the underlying client stream. Note, only available between and calls.
+ ///
+ private NetworkStream _stream { get; set; }
+
+ public TcpConnection(TcpClient client, int heartbeat = 1000)
+ {
+ _client = client;
+ Heartbeat = heartbeat;
+ IPEndPoint clientIp = (IPEndPoint)_client.Client.RemoteEndPoint;
+ ClientId = $"{clientIp.Address}:{clientIp.Port}";
+ }
+
+ ///
+ /// Connects the underlying client stream and begins receiving data.
+ ///
+ public void Connect()
+ {
+ // Disconnect before attempting to connect again to ensure resources are disposed of
+ if (_stream != null) Disconnect();
+
+ _stream = _client.GetStream();
+ Task.Run(() => receive());
+
+ if (OnConnected != null) OnConnected(this);
+ }
+
+ ///
+ /// Disconnects the underlying client stream and disposes of it. Note that this leaves the connection to the TCP client alone.
+ ///
+ public void Disconnect(Exception ex = null)
+ {
+ if (_stream == null) return;
+
+ _stream?.Close();
+ _stream?.Dispose();
+ _stream = null;
+
+ if (!_disposing && OnDisconnected != null) OnDisconnected(this, ex);
+ }
+
+ ///
+ /// Writes a message to the underlying client stream.
+ ///
+ /// Message to send.
+ public void Write(string message) => Write(Encoder.GetBytes(message));
+ ///
+ /// Writes a binary message to the underlying client stream.
+ ///
+ /// Message to send.
+ public void Write(byte[] message)
+ {
+ try
+ {
+ _stream?.Write(message, 0, message.Length);
+ }
+ catch (Exception ex)
+ {
+ Disconnect();
+ }
+ }
+
+ ///
+ /// Flushes the underlying client stream.
+ ///
+ public void Flush()
+ {
+ _stream?.Flush();
+ }
+
+ ///
+ /// Continuously reads messages from the underlying client stream.
+ ///
+ private void receive()
+ {
+ Exception ex = null;
+ bool heartbeatActive = false;
+
+ byte[] message = new byte[BUFFER_SIZE];
+ int length = 0;
+
+ ArrayList readList = new ArrayList();
+
+ while (_client.Connected)
+ {
+ if (!_stream.DataAvailable) continue;
+
+ int bytesRead = 0;
+
+ readList.Clear();
+ readList.Add(_client.Client);
+ if (Heartbeat > 0 && heartbeatActive)
+ Socket.Select(readList, null, null, (int)(Heartbeat * 2000));
+ if (readList.Count == 0 && heartbeatActive)
+ {
+ ex = new TimeoutException("Heartbeat timed out, closing connection");
+ break;
+ }
+ bytesRead = _stream.Read(message, length, BUFFER_SIZE - length);
+
+ // See if we have a line
+ int pos = length;
+ length += bytesRead;
+ int eol = 0;
+ for (int i = pos; i < length; i++)
+ {
+ if (message[i] == '\n')
+ {
+
+ String line = Encoder.GetString(message, eol, i);
+
+ if (OnDataReceived != null)
+ heartbeatActive = OnDataReceived(this, line);
+
+ eol = i + 1;
+ }
+ }
+
+ // Remove the lines that have been processed.
+ if (eol > 0)
+ {
+ length = length - eol;
+ // Shift the message array to remove the lines.
+ if (length > 0)
+ Array.Copy(message, eol, message, 0, length);
+ }
+ }
+
+ Disconnect(ex);
+ }
+
+ public void Dispose()
+ {
+ _disposing = true;
+ _client?.Dispose();
+ Disconnect();
+ _disposing = false;
+ }
+ }
+}
diff --git a/SampleAdapter/App.config b/SampleAdapter/App.config
index 4032290f..e16d950b 100644
--- a/SampleAdapter/App.config
+++ b/SampleAdapter/App.config
@@ -4,7 +4,8 @@
-
+
+
diff --git a/SampleAdapter/PCModel.cs b/SampleAdapter/PCModel.cs
index 27943f88..92592501 100644
--- a/SampleAdapter/PCModel.cs
+++ b/SampleAdapter/PCModel.cs
@@ -16,17 +16,7 @@ public class PCStatusMonitor : IAdapterSource
///
public event DataReceivedHandler? OnDataReceived;
- [Event("avail")]
- public string? Availability { get; set; }
-
- [Sample("xPos")]
- public int? XPosition { get; set; }
-
- [Sample("yPos")]
- public int? YPosition { get; set; }
-
- [Event("prog")]
- public string? WindowTitle { get; set; }
+ public PCStatus Model { get; private set; } = new PCStatus();
private System.Timers.Timer Timer = new System.Timers.Timer();
@@ -42,18 +32,18 @@ public PCStatusMonitor(int sampleRate = 50)
private void Timer_Elapsed(object? sender, System.Timers.ElapsedEventArgs e)
{
- Availability = "AVAILABLE";
+ Model.Availability = "AVAILABLE";
Point lpPoint;
if (WindowHandles.GetCursorPos(out lpPoint))
{
- XPosition = lpPoint.X;
- YPosition = lpPoint.Y;
+ Model.XPosition = lpPoint.X;
+ Model.YPosition = lpPoint.Y;
}
else
{
- XPosition = null;
- YPosition = null;
+ Model.XPosition = null;
+ Model.YPosition = null;
}
try
@@ -61,25 +51,25 @@ private void Timer_Elapsed(object? sender, System.Timers.ElapsedEventArgs e)
string activeWindowTitle = WindowHandles.GetActiveWindowTitle();
if (!string.IsNullOrEmpty(activeWindowTitle))
{
- WindowTitle = activeWindowTitle;
+ Model.WindowTitle = activeWindowTitle;
}
else
{
- WindowTitle = null;
+ Model.WindowTitle = null;
}
}
catch (Exception ex)
{
- WindowTitle = null;
+ Model.WindowTitle = null;
}
if (OnDataReceived != null)
{
- OnDataReceived(this, new DataReceivedEventArgs());
+ OnDataReceived(Model, new DataReceivedEventArgs());
}
}
- public void Start()
+ public void Start(CancellationToken token = default)
{
Timer.Start();
}
@@ -89,4 +79,18 @@ public void Stop()
Timer.Stop();
}
}
+ public class PCStatus : IAdapterDataModel
+ {
+ [Event("avail")]
+ public string? Availability { get; set; }
+
+ [Sample("xPos")]
+ public int? XPosition { get; set; }
+
+ [Sample("yPos")]
+ public int? YPosition { get; set; }
+
+ [Event("prog")]
+ public string? WindowTitle { get; set; }
+ }
}
\ No newline at end of file
diff --git a/SampleAdapter/Program.cs b/SampleAdapter/Program.cs
index 1f7fcae5..bb755d6b 100644
--- a/SampleAdapter/Program.cs
+++ b/SampleAdapter/Program.cs
@@ -1,5 +1,6 @@
// See https://aka.ms/new-console-template for more information
using ConsoulLibrary;
+using Microsoft.Extensions.Logging;
using Mtconnect;
using Mtconnect.AdapterInterface.DataItems;
using SampleAdapter.PC;
@@ -19,10 +20,15 @@ public static class Program
public static void Main(string[] args)
{
+ ConsoulLibrary.RenderOptions.WriteMode = RenderOptions.WriteModes.SuppressBlacklist;
+ ConsoulLibrary.RenderOptions.BlacklistColors.Add( ConsoleColor.Gray);
+ var loggerFactory = LoggerFactory.Create(o => { o.AddConsoulLogger();o.SetMinimumLevel(LogLevel.Debug); });
+ var logger = loggerFactory.CreateLogger();
+
var options = new TcpAdapterOptions();
options.UpdateFromConfig();
- Adapter = new TcpAdapter(options);
+ Adapter = new TcpAdapter(options, logger);
Adapter.Start(Model);
Consoul.Write("Reporting: AVAILABILITY, Mouse X-Position, Mouse Y-Position, Active Window Title");
diff --git a/SampleAdapter/SampleAdapter.csproj b/SampleAdapter/SampleAdapter.csproj
index 6a2d3975..3ac153f5 100644
--- a/SampleAdapter/SampleAdapter.csproj
+++ b/SampleAdapter/SampleAdapter.csproj
@@ -14,7 +14,8 @@
-
+
+