Skip to content

Commit

Permalink
feat(iot-dev): Make SDK thread names configurable (#1720)
Browse files Browse the repository at this point in the history
- By default, don't change the existing behavior where threads are given unique prefix/postfix so users can correlate threads to clients
- Allow users to opt out of these unique thread names
- Allow users to provide custom prefix and postfix to thread names

#1715
  • Loading branch information
timtay-microsoft authored Jun 6, 2023
1 parent e438f2e commit ba8f982
Show file tree
Hide file tree
Showing 15 changed files with 287 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@
import com.microsoft.azure.sdk.iot.provisioning.security.SecurityProviderSymmetricKey;
import com.microsoft.azure.sdk.iot.provisioning.security.SecurityProviderTpm;
import com.microsoft.azure.sdk.iot.provisioning.security.SecurityProviderX509;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import lombok.*;
import lombok.extern.slf4j.Slf4j;

import javax.net.ssl.SSLContext;
Expand Down Expand Up @@ -86,6 +83,14 @@ public final class ClientConfiguration
@Setter(AccessLevel.PACKAGE)
private int sendInterval = DEFAULT_SEND_INTERVAL_IN_MILLISECONDS;

@Getter
private String threadNameSuffix = null;

@Getter
private String threadNamePrefix = null;

private boolean useIdentifiableThreadNames = true;

private IotHubAuthenticationProvider authenticationProvider;

/**
Expand Down Expand Up @@ -215,6 +220,9 @@ private void setClientOptionValues(ClientOptions clientOptions)
this.amqpOpenDeviceSessionsTimeout = clientOptions != null && clientOptions.getAmqpDeviceSessionTimeout() != 0 ? clientOptions.getAmqpDeviceSessionTimeout() : DEFAULT_AMQP_OPEN_DEVICE_SESSIONS_TIMEOUT_IN_SECONDS;
this.proxySettings = clientOptions != null && clientOptions.getProxySettings() != null ? clientOptions.getProxySettings() : null;
this.sendInterval = clientOptions != null && clientOptions.getSendInterval() != 0 ? clientOptions.getSendInterval() : DEFAULT_SEND_INTERVAL_IN_MILLISECONDS;
this.threadNamePrefix = clientOptions != null ? clientOptions.getThreadNamePrefix() : null;
this.threadNameSuffix = clientOptions != null ? clientOptions.getThreadNameSuffix() : null;
this.useIdentifiableThreadNames = clientOptions == null || clientOptions.isUsingIdentifiableThreadNames();

if (proxySettings != null)
{
Expand Down Expand Up @@ -632,6 +640,12 @@ public AuthType getAuthenticationType()
}
}

public boolean isUsingIdentifiableThreadNames()
{
// Using a manually written method here to override the name that Lombok would have given it
return this.useIdentifiableThreadNames;
}

/**
* Sets the device operation timeout
* @param timeout the amount of time, in milliseconds, that a given device operation can last before expiring
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,41 @@ public final class ClientOptions
@Getter
@Builder.Default
private final int receiveInterval = RECEIVE_PERIOD_MILLIS;

/**
* The prefix that will be applied to the names of all threads created by this client. If
* {@link #useIdentifiableThreadNames} is set to true, then this value is ignored and this client will create the
* prefix for you.
*/
@Getter
@Builder.Default
private final String threadNamePrefix = null;

/**
* The suffix that will be applied to the names of all threads created by this client. If
* {@link #useIdentifiableThreadNames} is set to true, then this value is ignored and this client will create the
* suffix for you.
*/
@Getter
@Builder.Default
private final String threadNameSuffix = null;

/**
* If true, all threads created by this client will use names that are unique. This is useful in applications that manage
* multiple device/module clients and want to be able to correlate logs to a particular client. In addition,
* the {@link #threadNamePrefix} and {@link #threadNameSuffix} values will be ignored.
*
* If false, all threads created by this client will use simple names that describe the thread's purpose, but are
* indistinguishable from the same threads created by a different client instance. However, users may still alter
* these thread names by providing values for the {@link #threadNamePrefix} and {@link #threadNameSuffix}.
*/
@Builder.Default
private final boolean useIdentifiableThreadNames = true;


public boolean isUsingIdentifiableThreadNames()
{
// Using a manually written method here to override the name that Lombok would have given it
return this.useIdentifiableThreadNames;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ final class DeviceIO implements IotHubConnectionStatusChangeCallback

this.state = IotHubConnectionStatus.DISCONNECTED;

this.sendTask = new IotHubSendTask(this.transport);
this.receiveTask = new IotHubReceiveTask(this.transport);
this.reconnectTask = new IotHubReconnectTask(this.transport);
this.sendTask = new IotHubSendTask(this.transport, config.isUsingIdentifiableThreadNames(), config.getThreadNamePrefix(), config.getThreadNameSuffix());
this.receiveTask = new IotHubReceiveTask(this.transport, config.isUsingIdentifiableThreadNames(), config.getThreadNamePrefix(), config.getThreadNameSuffix());
this.reconnectTask = new IotHubReconnectTask(this.transport, config.isUsingIdentifiableThreadNames(), config.getThreadNamePrefix(), config.getThreadNameSuffix());
}

DeviceIO(
Expand All @@ -76,13 +76,16 @@ final class DeviceIO implements IotHubConnectionStatusChangeCallback
SSLContext sslContext,
ProxySettings proxySettings,
int keepAliveInterval,
int sendInterval)
int sendInterval,
boolean useIdentifiableThreadNames,
String threadNamePrefix,
String threadNameSuffix)
{
this.state = IotHubConnectionStatus.DISCONNECTED;
this.transport = new IotHubTransport(hostName, protocol, sslContext, proxySettings, this, keepAliveInterval, sendInterval);
this.sendTask = new IotHubSendTask(this.transport);
this.receiveTask = new IotHubReceiveTask(this.transport);
this.reconnectTask = new IotHubReconnectTask(this.transport);
this.transport = new IotHubTransport(hostName, protocol, sslContext, proxySettings, this, keepAliveInterval, sendInterval, useIdentifiableThreadNames, threadNamePrefix, threadNameSuffix);
this.sendTask = new IotHubSendTask(this.transport, useIdentifiableThreadNames, threadNamePrefix, threadNameSuffix);
this.receiveTask = new IotHubReceiveTask(this.transport, useIdentifiableThreadNames, threadNamePrefix, threadNameSuffix);
this.reconnectTask = new IotHubReconnectTask(this.transport, useIdentifiableThreadNames, threadNamePrefix, threadNameSuffix);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ public MultiplexingClient(String hostName, IotHubClientProtocol protocol, Multip
int sendMessagesPerThread = options != null ? options.getMaxMessagesSentPerSendInterval() : DEFAULT_MAX_MESSAGES_TO_SEND_PER_THREAD;
int keepAliveInterval = options != null ? options.getKeepAliveInterval() : DEFAULT_KEEP_ALIVE_INTERVAL_IN_SECONDS;
int sendInterval = (int) (options != null ? options.getSendInterval() : DEFAULT_SEND_INTERVAL_IN_MILLISECONDS);
String threadNamePrefix = options != null ? options.getThreadNamePrefix() : null;
String threadNameSuffix = options != null ? options.getThreadNameSuffix() : null;
boolean useIdentifiableThreadNames = options == null || options.isUsingIdentifiableThreadNames();

if (sendPeriod < 0)
{
Expand All @@ -127,7 +130,7 @@ else if (receivePeriod == 0) //default builder value for this option, signals th

// Optional settings from MultiplexingClientOptions
SSLContext sslContext = options != null ? options.getSslContext() : null;
this.deviceIO = new DeviceIO(hostName, protocol, sslContext, proxySettings, keepAliveInterval, sendInterval);
this.deviceIO = new DeviceIO(hostName, protocol, sslContext, proxySettings, keepAliveInterval, sendInterval, useIdentifiableThreadNames, threadNamePrefix, threadNameSuffix);
this.deviceIO.setMaxNumberOfMessagesSentPerSendThread(sendMessagesPerThread);
this.deviceIO.setSendPeriodInMilliseconds(sendPeriod);
this.deviceIO.setReceivePeriodInMilliseconds(receivePeriod);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,40 @@ public final class MultiplexingClientOptions
@Getter
@Builder.Default
public final int keepAliveInterval = DEFAULT_KEEP_ALIVE_INTERVAL_IN_SECONDS;

/**
* The prefix that will be applied to the names of all threads created by this client. If
* {@link #useIdentifiableThreadNames} is set to true, then this value is ignored and this client will create the
* prefix for you.
*/
@Getter
@Builder.Default
private final String threadNamePrefix = null;

/**
* The suffix that will be applied to the names of all threads created by this client. If
* {@link #useIdentifiableThreadNames} is set to true, then this value is ignored and this client will create the
* suffix for you.
*/
@Getter
@Builder.Default
private final String threadNameSuffix = null;

/**
* If true, all threads created by this client will use names that are unique. This is useful in applications that manage
* multiple device/module clients and want to be able to correlate logs to a particular client. In addition,
* the {@link #threadNamePrefix} and {@link #threadNameSuffix} values will be ignored.
*
* If false, all threads created by this client will use simple names that describe the thread's purpose, but are
* indistinguishable from the same threads created by a different client instance. However, users may still alter
* these thread names by providing values for the {@link #threadNamePrefix} and {@link #threadNameSuffix}.
*/
@Builder.Default
private final boolean useIdentifiableThreadNames = true;

public boolean isUsingIdentifiableThreadNames()
{
// Using a manually written method here to override the name that Lombok would have given it
return this.useIdentifiableThreadNames;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ public final class IotHubReceiveTask implements Runnable
{
private static final String THREAD_NAME = "azure-iot-sdk-IotHubReceiveTask";
private final IotHubTransport transport;
private final String threadNamePrefix;
private final String threadNameSuffix;
private final boolean useIdentifiableThreadNames;

// This lock is used to communicate state between this thread and the IoTHubTransport layer. This thread will
// wait until a message has been received in that layer before continuing. This means that if the transport layer
Expand All @@ -25,7 +28,7 @@ public final class IotHubReceiveTask implements Runnable
// layer's responsibility to notify this thread when a message has been received so that this thread can handle it.
private final Semaphore receiveThreadSemaphore;

public IotHubReceiveTask(IotHubTransport transport)
public IotHubReceiveTask(IotHubTransport transport, boolean useIdentifiableThreadNames, String threadNamePrefix, String threadNameSuffix)
{
if (transport == null)
{
Expand All @@ -34,13 +37,35 @@ public IotHubReceiveTask(IotHubTransport transport)

this.transport = transport;
this.receiveThreadSemaphore = this.transport.getReceiveThreadSemaphore();
this.useIdentifiableThreadNames = useIdentifiableThreadNames;
this.threadNamePrefix = threadNamePrefix;
this.threadNameSuffix = threadNameSuffix;
}

public void run()
{
String deviceClientId = this.transport.getDeviceClientUniqueIdentifier();
String connectionId = transport.getTransportConnectionId();
String threadName = deviceClientId + "-" + "Cxn" + connectionId + "-" + THREAD_NAME;
String threadName = "";
if (this.useIdentifiableThreadNames)
{
String deviceClientId = this.transport.getDeviceClientUniqueIdentifier();
String connectionId = transport.getTransportConnectionId();
threadName += deviceClientId + "-" + "Cxn" + connectionId + "-" + THREAD_NAME;
}
else
{
if (this.threadNamePrefix != null && !this.threadNamePrefix.isEmpty())
{
threadName += this.threadNamePrefix;
}

threadName += THREAD_NAME;

if (this.threadNameSuffix != null && !this.threadNameSuffix.isEmpty())
{
threadName += this.threadNameSuffix;
}
}

Thread.currentThread().setName(threadName);

try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@ public final class IotHubReconnectTask implements Runnable
{
private static final String THREAD_NAME = "azure-iot-sdk-IotHubReconnectTask";
private final IotHubTransport transport;
private final String threadNamePrefix;
private final String threadNameSuffix;
private final boolean useIdentifiableThreadNames;

// This lock is used to communicate state between this thread and the IoTHubTransport layer. This thread will
// wait until a disconnection event occurs in that layer before continuing. This means that if the transport layer
// has no connectivity problems, then this thread will do nothing and cost nothing.
private final Semaphore reconnectThreadSemaphore;

public IotHubReconnectTask(IotHubTransport transport)
public IotHubReconnectTask(IotHubTransport transport, boolean useIdentifiableThreadNames, String threadNamePrefix, String threadNameSuffix)
{
if (transport == null)
{
Expand All @@ -30,13 +33,35 @@ public IotHubReconnectTask(IotHubTransport transport)

this.transport = transport;
this.reconnectThreadSemaphore = this.transport.getReconnectThreadSemaphore();
this.useIdentifiableThreadNames = useIdentifiableThreadNames;
this.threadNamePrefix = threadNamePrefix;
this.threadNameSuffix = threadNameSuffix;
}

public void run()
{
String deviceClientId = this.transport.getDeviceClientUniqueIdentifier();
String connectionId = transport.getTransportConnectionId();
String threadName = deviceClientId + "-" + "Cxn" + connectionId + "-" + THREAD_NAME;
String threadName = "";
if (this.useIdentifiableThreadNames)
{
String deviceClientId = this.transport.getDeviceClientUniqueIdentifier();
String connectionId = transport.getTransportConnectionId();
threadName += deviceClientId + "-" + "Cxn" + connectionId + "-" + THREAD_NAME;
}
else
{
if (this.threadNamePrefix != null && !this.threadNamePrefix.isEmpty())
{
threadName += this.threadNamePrefix;
}

threadName += THREAD_NAME;

if (this.threadNameSuffix != null && !this.threadNameSuffix.isEmpty())
{
threadName += this.threadNameSuffix;
}
}

Thread.currentThread().setName(threadName);

try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ public final class IotHubSendTask implements Runnable
{
private static final String THREAD_NAME = "azure-iot-sdk-IotHubSendTask";
private final IotHubTransport transport;
private final String threadNamePrefix;
private final String threadNameSuffix;
private final boolean useIdentifiableThreadNames;

// This lock is used to communicate state between this thread and the IoTHubTransport layer. This thread will
// wait until a message or callback is queued in that layer before continuing. This means that if the transport layer
Expand All @@ -25,7 +28,7 @@ public final class IotHubSendTask implements Runnable
// so that this thread can handle it.
private final Semaphore sendThreadSemaphore;

public IotHubSendTask(IotHubTransport transport)
public IotHubSendTask(IotHubTransport transport, boolean useIdentifiableThreadNames, String threadNamePrefix, String threadNameSuffix)
{
if (transport == null)
{
Expand All @@ -34,13 +37,35 @@ public IotHubSendTask(IotHubTransport transport)

this.transport = transport;
this.sendThreadSemaphore = this.transport.getSendThreadSemaphore();
this.useIdentifiableThreadNames = useIdentifiableThreadNames;
this.threadNamePrefix = threadNamePrefix;
this.threadNameSuffix = threadNameSuffix;
}

public void run()
{
String deviceClientId = this.transport.getDeviceClientUniqueIdentifier();
String connectionId = transport.getTransportConnectionId();
String threadName = deviceClientId + "-" + "Cxn" + connectionId + "-" + THREAD_NAME;
String threadName = "";
if (this.useIdentifiableThreadNames)
{
String deviceClientId = this.transport.getDeviceClientUniqueIdentifier();
String connectionId = transport.getTransportConnectionId();
threadName += deviceClientId + "-" + "Cxn" + connectionId + "-" + THREAD_NAME;
}
else
{
if (this.threadNamePrefix != null && !this.threadNamePrefix.isEmpty())
{
threadName += this.threadNamePrefix;
}

threadName += THREAD_NAME;

if (this.threadNameSuffix != null && !this.threadNameSuffix.isEmpty())
{
threadName += this.threadNameSuffix;
}
}

Thread.currentThread().setName(threadName);

try
Expand Down
Loading

0 comments on commit ba8f982

Please sign in to comment.