Skip to content

Commit

Permalink
Add logic to connect and retry connection for CARLA Ambassador (#189)
Browse files Browse the repository at this point in the history
<!-- Thanks for the contribution, this is awesome. -->

# PR Details
## Description
Add functionality to CARLA Ambassador to, on initialization, attempt to
connect to CARLA CDASim Adapter, with some retry limit. This avoids
deployment timing issues that occasionally cause the CARLA Ambassador to
drop Sensor Registrations if the CARLA CDA Sim Adapter is not up yet.

**NOTE** This PR requires closing of
(usdot-fhwa-stol/carla-sensor-lib#2)
<!--- Describe your changes in detail -->

## Related GitHub Issue

<!--- This project only accepts pull requests related to open GitHub
issues or Jira Keys -->
<!--- If suggesting a new feature or change, please discuss it in an
issue first -->
<!--- If fixing a bug, there should be an issue describing it with steps
to reproduce -->
<!--- Please DO NOT name partially fixed issues, instead open an issue
specific to this fix -->
<!--- Please link to the issue here: -->

## Related Jira Key
[CDAR-667
](https://usdot-carma.atlassian.net/browse/CDAR-667)<!-- e.g. CAR-123
-->

## Motivation and Context
Improve robustness of deployment by reattempting failing connection on
startup, that could fail purely due to CDASim deployment timing issues.
<!--- Why is this change required? What problem does it solve? -->

## How Has This Been Tested?
Sim Computer
<!--- Please describe in detail how you tested your changes. -->
<!--- Include details of your testing environment, and the tests you ran
to -->
<!--- see how your change affects other areas of the code, etc. -->

## Types of changes

<!--- What types of changes does your code introduce? Put an `x` in all
the boxes that apply: -->

- [x] Defect fix (non-breaking change that fixes an issue)
- [ ] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (fix or feature that cause existing functionality
to change)

## Checklist:

<!--- Go over all the following points, and put an `x` in all the boxes
that apply. -->
<!--- If you're unsure about any of these, don't hesitate to ask. We're
here to help! -->

- [ ] I have added any new packages to the sonar-scanner.properties file
- [ ] My change requires a change to the documentation.
- [ ] I have updated the documentation accordingly.
- [x] I have read the
[**CONTRIBUTING**](https://github.com/usdot-fhwa-stol/carma-platform/blob/develop/Contributing.md)
document.
- [ ] I have added tests to cover my changes.
- [ ] All new and existing tests passed.
  • Loading branch information
paulbourelly999 authored Jan 11, 2024
1 parent 734e4d0 commit 9ac3c4e
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,22 +213,23 @@ public void initialize(long startTime, long endTime) throws InternalFederateExce
log.error("Error during advanceTime request", e);
throw new InternalFederateException(e);
}
// Start the CARLA simulator
startCarlaLocal();
//initialize CarlaXmlRpcClient
//set the connected server URL
try{
if (carlaXmlRpcClient== null) {
URL xmlRpcServerUrl = new URL(carlaConfig.carlaCDASimAdapterUrl);
carlaXmlRpcClient = new CarlaXmlRpcClient(xmlRpcServerUrl);
}
carlaXmlRpcClient.initialize();

}
catch (MalformedURLException m)
{
log.error("Errors occurred with {}", m.getMessage());
throw new InternalFederateException("Carla Ambassador initialization failed due to CARLA CDA Sim Adapter"
+ "connection! Check carla_config.json!", m);
}
// Start the CARLA simulator
startCarlaLocal();


}

/**
Expand Down Expand Up @@ -352,7 +353,10 @@ public synchronized void processTimeAdvanceGrant(long time) throws InternalFeder
}

try {

if ( time == 0 ) {
// Try to connect to CARLA CDA Sim Adapter on first timestep
carlaXmlRpcClient.connect(60);
}
// if the simulation step received from CARLA, advance CARLA federate local
// simulation time
if (isSimulationStep) {
Expand All @@ -376,12 +380,17 @@ public synchronized void processTimeAdvanceGrant(long time) throws InternalFeder
for (DetectedObjectInteraction detectionInteraction: detectedObjectInteractions) {
this.rti.triggerInteraction(detectionInteraction);
}
} catch (IllegalValueException e) {
log.error("Error during advanceTime(" + time + ")", e);
}
catch (IllegalValueException e) {
log.error("Failed to process advance time grant due to : ", e);
}
catch (XmlRpcException e ) {
throw new InternalFederateException("Failed to process advance time grant due to CARLA CDA Sim "
+ "Adapter connection! Check carla_config.json!", e);
}
catch (XmlRpcException e) {
log.error("Failed to connect to CARLA Adapter : ", e);
carlaXmlRpcClient.closeConnection();
catch (InterruptedException e) {
log.error("Failed to process advance time grant due to failed thread sleep!", e);
Thread.currentThread().interrupt();
}
}

Expand Down Expand Up @@ -411,6 +420,7 @@ public void finishSimulation() throws InternalFederateException {
connectionProcess.waitFor(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.warn("Something went wrong when stopping a process", e);
Thread.currentThread().interrupt();
} finally {
connectionProcess.destroy();
}
Expand Down Expand Up @@ -524,6 +534,7 @@ else if (interaction.getTypeId().equals(DetectorRegistration.TYPE_ID)) {
/**
* Method to call XMLRPC method to create sensor on reception of DetectionRegistration interactions.
* @param interaction Interaction triggered by Ambassadors attempting to create sensors in CARLA.
* @throws InterruptedException
*/
private void receiveInteraction(DetectorRegistration interaction) {
try {
Expand All @@ -532,7 +543,6 @@ private void receiveInteraction(DetectorRegistration interaction) {
}
catch(XmlRpcException e) {
log.error("Error occurred attempting to create sensor : {}\n{}", interaction.getDetector(), e);
carlaXmlRpcClient.closeConnection();
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,33 +35,47 @@
*/
public class CarlaXmlRpcClient{

private boolean isConnected;
private static final String CREATE_SENSOR = "create_simulated_semantic_lidar_sensor";
private static final String GET_DETECTED_OBJECTS = "get_detected_objects";
private static final String CONNECT ="connect";

private XmlRpcClient client;
private URL xmlRpcServerUrl;
private final Logger log = LoggerFactory.getLogger(this.getClass());


public CarlaXmlRpcClient(URL xmlRpcServerUrl) {
this.xmlRpcServerUrl = xmlRpcServerUrl;
}


/**
* Initialize XmlRpcClient.
* @param xmlRpcServerUrl
*/
public void initialize()
{
XmlRpcClientConfigImpl config = new XmlRpcClientConfigImpl();
config.setServerURL(xmlRpcServerUrl);
// Set reply and connection timeout (both in ms)
config.setReplyTimeout(6000);
config.setConnectionTimeout(10000);
client = new XmlRpcClient();
client.setConfig(config);
isConnected = true;
}


public void connect(int retryAttempts) throws XmlRpcException, InterruptedException{
boolean connected = false;
int currentAttempt = 1;
while( !connected && retryAttempts >= currentAttempt ) {
try {
log.info("Attempting to connect to CARLA CDA Sim Adapter ... ");
Object[] params = new Object[]{};
client.execute(CONNECT, params);
connected = true;
}
catch(XmlRpcException e) {
log.error("Connection attempt {} to connect to CARLA CDA Sim Adapter failed!", currentAttempt, e);
// Sleep for 1 second betweeen attempts
Thread.sleep(1000);
currentAttempt++;
}
}
if (!connected) {
throw new XmlRpcException("Failed to connect to XML RPC Server with config " + client.getConfig() + " !");
}
log.info("Connected successfully to CARLA CDA Sim Adapter!");
}
/**
* Calls CARLA CDA Sim Adapter create_sensor XMLRPC method and logs sensor ID of created sensor.
* @param registration DetectorRegistration interaction used to create sensor.
Expand All @@ -70,15 +84,10 @@ public void initialize()
public void createSensor(DetectorRegistration registration) throws XmlRpcException{
List<Double> location = Arrays.asList(registration.getDetector().getLocation().getX(), registration.getDetector().getLocation().getY(), registration.getDetector().getLocation().getZ());
List<Double> orientation = Arrays.asList(registration.getDetector().getOrientation().getPitch(), registration.getDetector().getOrientation().getRoll(), registration.getDetector().getOrientation().getYaw());

if (isConnected) {
Object[] params = new Object[]{registration.getInfrastructureId(), registration.getDetector().getSensorId(), location, orientation};
Object result = client.execute(CREATE_SENSOR, params);
log.info((String)result);
}
else {
log.warn("XMLRpcClient is not connected to CARLA Adapter!");
}
Object[] params = new Object[]{registration.getInfrastructureId(), registration.getDetector().getSensorId(), location, orientation};
Object result = client.execute(CREATE_SENSOR, params);
log.info((String)result);

}
/**
* Calls CARLA CDA Sim Adapter get_detected_objects XMLRPC method and returns an array of DetectedObject.
Expand All @@ -88,25 +97,12 @@ public void createSensor(DetectorRegistration registration) throws XmlRpcExcepti
* @throws XmlRpcException if XMLRPC call fails or connection is lost.
*/
public DetectedObject[] getDetectedObjects(String infrastructureId ,String sensorId) throws XmlRpcException{
if (isConnected) {
Object[] params = new Object[]{infrastructureId, sensorId};
Object result = client.execute(GET_DETECTED_OBJECTS, params);
log.debug("Detections from infrastructure {} sensor {} : {}", infrastructureId, sensorId, result);
String jsonResult = (String)result;
Gson gson = new Gson();
return gson.fromJson(jsonResult,DetectedObject[].class);
}
else {
throw new XmlRpcException("XMLRpcClient is not connected to CARLA Adapter!");

}
}
/**
* Method to set isConnected field to false. Does not actually close the underlying http connection session but
* is used to avoid repeated timeouts/exceptions on misconfiguration of XMLRPC client.
*/
public void closeConnection() {
log.warn("Closing XML RPC Client connection in CARLA Ambassador!");
isConnected = false;
Object[] params = new Object[]{infrastructureId, sensorId};
Object result = client.execute(GET_DETECTED_OBJECTS, params);
log.debug("Detections from infrastructure {} sensor {} : {}", infrastructureId, sensorId, result);
String jsonResult = (String)result;
Gson gson = new Gson();
return gson.fromJson(jsonResult,DetectedObject[].class);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,5 @@ public class CarlaConfiguration implements Serializable {
public String carlaCDASimAdapterUrl;



}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.junit.rules.TemporaryFolder;
import org.mockito.internal.util.reflection.FieldSetter;

import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
Expand Down Expand Up @@ -115,7 +116,8 @@ public void initialize() throws Throwable {
ambassador.initialize(0, 100 * TIME.SECOND);
// ASSERT
verify(rtiMock, times(1)).requestAdvanceTime(eq(0L), eq(0L), eq((byte) 1));
verify(carlaXmlRpcClientMock, times(1)).initialize();


}

@Test
Expand Down Expand Up @@ -179,11 +181,15 @@ public void processTimeAdvanceGrantException() throws InternalFederateException,

when(carlaXmlRpcClientMock.getDetectedObjects(registration.getInfrastructureId(), registration.getDetector().getSensorId() )).thenThrow(XmlRpcException.class);
// Verify that when exceptiopn is thrown by CarlaXmlRpcClient, no interactions are trigger and exception is caught
ambassador.processTimeAdvanceGrant(100);
try {
ambassador.processTimeAdvanceGrant(0);
}catch (Exception e) {
assertEquals(InternalFederateException.class, e.getClass());
assertEquals(XmlRpcException.class, e.getCause().getClass());
}

verify(carlaXmlRpcClientMock, times(1)).getDetectedObjects(registration.getInfrastructureId(), registration.getDetector().getSensorId());
verify(rtiMock, times(0)).triggerInteraction(any(DetectedObjectInteraction.class));
verify(carlaXmlRpcClientMock, times(1)).closeConnection();

}

Expand All @@ -207,9 +213,6 @@ public void processDetectorRegistrationInteractionException() throws XmlRpcExcep
ambassador.processInteraction(registration);

verify(carlaXmlRpcClientMock, times(1)).createSensor(registration);
verify(carlaXmlRpcClientMock, times(1)).closeConnection();


}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package org.eclipse.mosaic.fed.carla.carlaconnect;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -39,13 +43,14 @@ public class CarlaXmlRpcClientTest {
* @throws MalformedURLException
*/
@Before
public void setup() throws NoSuchFieldException, MalformedURLException{
public void setup() throws NoSuchFieldException, MalformedURLException, XmlRpcException{
mockClient = mock(XmlRpcClient.class);
URL xmlRpcServerUrl = new URL("http://test_url");
URL xmlRpcServerUrl = new URL("http://127.0.0.1:8090/RPC2");
carlaConnection = new CarlaXmlRpcClient(xmlRpcServerUrl);
carlaConnection.initialize();
// Set mock after initialize since initialize overwrites member
when( mockClient.execute(eq("connect"), any(Object[].class))).thenReturn(true);
FieldSetter.setField(carlaConnection, carlaConnection.getClass().getDeclaredField("client"), mockClient);

// Set mock after initialize since initialize overwrites member
}

/**
Expand All @@ -68,6 +73,25 @@ public void testCreateSensor() throws XmlRpcException {
// Verify following method was called on mock
verify( mockClient, times(1)).execute("create_simulated_semantic_lidar_sensor", params);
}
@Test
public void testConnect() throws XmlRpcException, InterruptedException{
carlaConnection.connect(2);
verify(mockClient, times(1)).execute( eq("connect"), any(Object[].class));
}

@Test
public void testConnectRetry() {
try {
when(mockClient.execute(eq("connect"), any(Object[].class))).thenThrow(new XmlRpcException(""));
carlaConnection.connect(10);
verify(mockClient, times(10)).execute( eq("connect"), any(Object[].class));
}
catch (Exception e) {
String message = e.getMessage();
assertTrue(message.startsWith("Failed to connect to XML RPC Server with config"));
assertEquals(XmlRpcException.class,e.getClass());
}
}

/**
* Test GetDectedObjects
Expand Down Expand Up @@ -194,41 +218,4 @@ public void testGetDetectedObjects() throws XmlRpcException {

}

/**
* Test close connection
* @throws XmlRpcException
*/
@Test
public void testCloseConnection() throws XmlRpcException {
// Create request params
// Create Detector Registration
Detector detector = new Detector("sensorID1", DetectorType.SEMANTIC_LIDAR, new Orientation( 0.0,0.0,0.0), CartesianPoint.ORIGO);
DetectorRegistration registration = new DetectorRegistration(0, detector, "rsu_2");
List<Double> location = Arrays.asList(registration.getDetector().getLocation().getX(), registration.getDetector().getLocation().getY(), registration.getDetector().getLocation().getZ());
List<Double> orientation = Arrays.asList(registration.getDetector().getOrientation().getPitch(), registration.getDetector().getOrientation().getRoll(), registration.getDetector().getOrientation().getYaw());
Object[] params = new Object[]{registration.getInfrastructureId(), registration.getDetector().getSensorId(), location, orientation};
// Tell mock to return sensor ID when following method is called with following parameters
when( mockClient.execute("create_simulated_semantic_lidar_sensor", params)).thenReturn(registration.getSenderId());
Object[] get_detected_object_params = new Object[]{registration.getInfrastructureId(), registration.getDetector().getSensorId()};
// Tell mock to return sensor ID when following method is called with following parameters
when( mockClient.execute("get_detected_objects", get_detected_object_params)).thenReturn("");
carlaConnection.closeConnection();
try {
carlaConnection.createSensor(registration);
}
catch( Exception e ) {
assertEquals(XmlRpcException.class, e.getClass());
assertEquals("XMLRpcClient is not connected to CARLA Adapter!", e.getMessage());
}
try {
carlaConnection.getDetectedObjects(registration.getInfrastructureId(), registration.getDetector().getSensorId());
}
catch( Exception e ) {
assertEquals(XmlRpcException.class, e.getClass());
assertEquals("XMLRpcClient is not connected to CARLA Adapter!", e.getMessage());
}
// Assert execute is never called on XMLRPC Client after connection is closed.
verify( mockClient, times(0)).execute(any(String.class), any(Object[].class));

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,4 @@
"bridgePath": "./scenarios/Town04_10/carla; bridge.bat",
"carlaConnectionPort": 8913,
"carlaCDASimAdapterUrl":"http://127.0.0.1:8090/RPC2"

}

0 comments on commit 9ac3c4e

Please sign in to comment.