From cec2165b29a9d0e9c7df9ac4ec2fdcef20e61c33 Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Thu, 19 Oct 2023 16:49:48 -0700 Subject: [PATCH] fix(iot-dev): Fix 2 multiplexing scenario bugs - Fix bug where a closing multiplexed device would report its state as ```Disconnected``` followed by ```Disconnected_retrying``` and then finally ```Disconnected``` - Fix bug where a multiplexed device that has its device session closed didn't cancel all of its outgoing messages --- .../iot/device/transport/IotHubTransport.java | 56 ++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubTransport.java b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubTransport.java index f6104e05c3..2989540157 100644 --- a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubTransport.java +++ b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubTransport.java @@ -410,7 +410,9 @@ public void onMultiplexedDeviceSessionEstablished(String connectionId, String de @Override public void onMultiplexedDeviceSessionLost(TransportException e, String connectionId, String deviceId, boolean shouldReconnect) { - if (connectionId.equals(this.iotHubTransportConnection.getConnectionId())) + if (connectionId.equals(this.iotHubTransportConnection.getConnectionId()) + && multiplexedDeviceConnectionStates.containsKey(deviceId) + && multiplexedDeviceConnectionStates.get(deviceId).getConnectionStatus() == IotHubConnectionStatus.CONNECTED) { log.debug("The device session in the multiplexed connection to the IoT Hub has been lost for device {}", deviceId); if (shouldReconnect) @@ -1388,6 +1390,7 @@ private void singleDeviceReconnectAttemptAsync(String deviceSessionToReconnect) { this.updateStatus(IotHubConnectionStatus.DISCONNECTED, IotHubConnectionStatusChangeReason.RETRY_EXPIRED, transportException, deviceSessionToReconnect); log.debug("Reconnection for device {} was abandoned due to the operation timeout", deviceSessionToReconnect); + return; } multiplexedDeviceState.incrementReconnectionAttemptNumber(); @@ -1406,6 +1409,7 @@ private void singleDeviceReconnectAttemptAsync(String deviceSessionToReconnect) { this.updateStatus(IotHubConnectionStatus.DISCONNECTED, IotHubConnectionStatusChangeReason.RETRY_EXPIRED, transportException, deviceSessionToReconnect); log.debug("Reconnection for device {} was abandoned due to the retry policy", deviceSessionToReconnect); + return; } log.trace("Attempting to reconnect device session: attempt {}", multiplexedDeviceState.getReconnectionAttemptNumber()); @@ -1422,6 +1426,7 @@ private void singleDeviceReconnectAttemptAsync(String deviceSessionToReconnect) { this.updateStatus(IotHubConnectionStatus.DISCONNECTED, this.exceptionToStatusChangeReason(transportException), transportException, deviceSessionToReconnect); log.error("Reconnection for device {} was abandoned due to encountering a non-retryable exception", deviceSessionToReconnect, transportException); + return; } } } @@ -1766,6 +1771,55 @@ private void updateStatus(IotHubConnectionStatus newConnectionStatus, IotHubConn else if (newConnectionStatus == IotHubConnectionStatus.DISCONNECTED) { correlationCallbackCleanupThread.interrupt(); + finalizeMultiplexedDevicesMessages(deviceId); + } + } + + // When a multiplexed device has its session closed (expected or otherwise) all outgoing and pending + // messages should be removed and callbacks should be executed for each message to notify the user + // that the message was cancelled. + private void finalizeMultiplexedDevicesMessages(String deviceId) + { + //Check waiting packets, remove any that have expired. + IotHubTransportPacket packet = this.waitingPacketsQueue.poll(); + Queue packetsToAddBackIntoWaitingPacketsQueue = new LinkedBlockingQueue<>(); + while (packet != null) + { + if (packet.getMessage().getConnectionDeviceId().equals(deviceId)) + { + packet.setStatus(IotHubStatusCode.MESSAGE_CANCELLED_ONCLOSE); + this.addToCallbackQueue(packet); + } + else + { + //message didn't belong to this device, requeue it + packetsToAddBackIntoWaitingPacketsQueue.add(packet); + } + + packet = this.waitingPacketsQueue.poll(); + } + + //Requeue all the messages that belong to other devices. + this.waitingPacketsQueue.addAll(packetsToAddBackIntoWaitingPacketsQueue); + + //Check in progress messages + synchronized (this.inProgressMessagesLock) + { + List messageIdsThatBelongToDisconnectedDevice = new ArrayList<>(); + for (String messageId : this.inProgressPackets.keySet()) + { + if (this.inProgressPackets.get(messageId).getMessage().getConnectionDeviceId().equals(deviceId)) + { + messageIdsThatBelongToDisconnectedDevice.add(messageId); + } + } + + for (String messageId : messageIdsThatBelongToDisconnectedDevice) + { + IotHubTransportPacket expiredPacket = this.inProgressPackets.remove(messageId); + expiredPacket.setStatus(IotHubStatusCode.MESSAGE_CANCELLED_ONCLOSE); + this.addToCallbackQueue(expiredPacket); + } } }