Skip to content

Commit

Permalink
Fix race condition when IoT message arrives immediately after subscri…
Browse files Browse the repository at this point in the history
…be (#2794)
  • Loading branch information
schuenadel authored Mar 11, 2022
1 parent 8d9005f commit 5d00416
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1477,6 +1477,9 @@ public void subscribeToTopic(final String topic, final AWSIotMqttQos qos,

if (null != mqttClient) {
try {
final AWSIotMqttTopic topicModel = new AWSIotMqttTopic(topic, qos, callback);
topicListeners.put(topic, topicModel);

if (subscriptionStatusCallback != null) {
mqttClient.subscribe(topic, qos.asInt(), null, new IMqttActionListener() {
@Override
Expand All @@ -1495,14 +1498,14 @@ public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
mqttClient.subscribe(topic, qos.asInt());
}
} catch (final MqttException e) {
if(subscriptionStatusCallback != null) {
topicListeners.remove(topic);

if (subscriptionStatusCallback != null) {
subscriptionStatusCallback.onFailure(e);
} else {
throw new AmazonClientException("Client error when subscribing.", e);
}
}
final AWSIotMqttTopic topicModel = new AWSIotMqttTopic(topic, qos, callback);
topicListeners.put(topic, topicModel);
}
}

Expand Down Expand Up @@ -2051,4 +2054,4 @@ enum AuthenticationMode {
public boolean getSessionPresent() {
return sessionPresent;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1131,6 +1131,48 @@ public void testSubscribeToTopic() throws Exception {
assertEquals("unit/test/topic" + "test payload", mcb.receivedMessages.get(0));
}

@Test
public void testSubscribeToTopicWithImmediateMessageArriving() throws Exception {
final MockMqttClient mockClient = new MockMqttClient();

AWSIotMqttManager testClient = new AWSIotMqttManager("test-client",
Region.getRegion(Regions.US_EAST_1), TEST_ENDPOINT_PREFIX);
testClient.setMqttClient(mockClient);

KeyStore testKeystore = AWSIotKeystoreHelper.getIotKeystore(CERT_ID, KEYSTORE_PATH,
KEYSTORE_NAME, KEYSTORE_PASSWORD);
testClient.connect(testKeystore, null);

TestNewMessageCallback mcb = new TestNewMessageCallback();

AWSIotMqttSubscriptionStatusCallback subscriptionStatusCallback = new AWSIotMqttSubscriptionStatusCallback() {
@Override
public void onSuccess() {
MqttMessage msg = new MqttMessage();
msg.setPayload("test payload".getBytes(StringUtils.UTF8));
try {
mockClient.mockCallback.messageArrived("unit/test/topic", msg);
} catch (Exception e) {
fail("Could not simulate arriving message: " + e);
}
}

@Override
public void onFailure(Throwable exception) {
fail("Subscribing failed while simulate arriving message: " + exception);
}
};

testClient.subscribeToTopic("unit/test/topic", AWSIotMqttQos.QOS0, subscriptionStatusCallback, mcb);

assertEquals(1, mockClient.subscribeCalls);
assertTrue(mockClient.mockSubscriptions.containsKey("unit/test/topic"));
assertEquals((Integer) 0, mockClient.mockSubscriptions.get("unit/test/topic"));

assertEquals(1, mcb.receivedMessages.size());
assertEquals("unit/test/topic" + "test payload", mcb.receivedMessages.get(0));
}

@Test(expected = IllegalArgumentException.class)
public void testSubscribeToTopicNullTopic() throws Exception {
MockMqttClient mockClient = new MockMqttClient();
Expand Down

0 comments on commit 5d00416

Please sign in to comment.