Skip to content

Commit

Permalink
Initial Commit
Browse files Browse the repository at this point in the history
Added DetectedObject and Sensor
Updated Infrastructure Registration to include Sensors
Added interact for both DetectedObject and Sensor registration
Removed Infrastructure Time interface and moved functionality to Infrastructure Instance manager
  • Loading branch information
paulbourelly999 committed Jul 27, 2023
1 parent 443a82a commit 69a8ca8
Show file tree
Hide file tree
Showing 19 changed files with 1,219 additions and 204 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@

package org.eclipse.mosaic.fed.infrastructure.ambassador;

import org.eclipse.mosaic.interactions.sensor.Sensor;
import org.eclipse.mosaic.interactions.sensor.SensorRegistration;
import org.eclipse.mosaic.lib.geo.CartesianPoint;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.util.ArrayList;

/**
* InfrastructureInstance class represents a physical instance of an
Expand All @@ -35,29 +38,36 @@ public class InfrastructureInstance {
private InetAddress targetAddress;
private int rxMessagePort;
private int timeSyncPort;
private int simulatedInteractionPort;
private CartesianPoint location = null;
private DatagramSocket rxMsgsSocket = null;
private ArrayList<Sensor> sensors;

/**
* Constructor for InfrastructureInstance
*
* @param infrastructureId the ID of the infrastructure node
* @param targetAddress the target IP address of the infrastructure node
* @param rxMessagePort the receive message port of the infrastructure node
* @param timeSyncPort the time synchronization port of the infrastructure
* @param timeSyncPort the tiyhuuuuuuuuuuuuuuuuuuuuuu----------me synchronization port of the infrastructure
* node
* @param simulatedInteractionPort
* @param location the location of the infrastructure node in the
* simulated environment
*/
public InfrastructureInstance(String infrastructureId, InetAddress targetAddress,
int rxMessagePort, int timeSyncPort, CartesianPoint location) {
*/
public InfrastructureInstance(String infrastructureId, InetAddress targetAddress, int rxMessagePort,
int timeSyncPort, int simulatedInteractionPort, CartesianPoint location, ArrayList<Sensor> sensors) {
this.infrastructureId = infrastructureId;
this.targetAddress = targetAddress;
this.rxMessagePort = rxMessagePort;
this.timeSyncPort = timeSyncPort;
this.simulatedInteractionPort = simulatedInteractionPort;
this.location = location;
this.sensors = sensors;
}



/**
* Returns the target IP address of the infrastructure node
*
Expand Down Expand Up @@ -148,6 +158,15 @@ public void setTimeSyncPort(int timeSyncPort) {
this.timeSyncPort = timeSyncPort;
}

public boolean containsSensor(String sensorId) {
for (Sensor sensor : sensors) {
if (sensor.getSensorId().equals(sensorId) ) {
return true;
}
}
return false;
}

/**
* Creates a DatagramSocket object and binds it to this infrastructure
* instance's receive message port
Expand Down Expand Up @@ -190,4 +209,12 @@ public void sendTimeSyncMsgs(byte[] data) throws IOException {
rxMsgsSocket.send(packet);

}

public void sendInteraction(byte[] data) throws IOException {
if (rxMsgsSocket == null) {
throw new IllegalStateException("Attempted to send data before opening socket");
}
DatagramPacket packet = new DatagramPacket(data, data.length, targetAddress, simulatedInteractionPort);
rxMsgsSocket.send(packet);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,29 @@

package org.eclipse.mosaic.fed.infrastructure.ambassador;

import gov.dot.fhwa.saxton.CarmaV2xMessage;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

import org.eclipse.mosaic.interactions.communication.V2xMessageTransmission;
import org.eclipse.mosaic.interactions.sensor.DetectedObject;
import org.eclipse.mosaic.interactions.sensor.Sensor;
import org.eclipse.mosaic.lib.enums.AdHocChannel;
import org.eclipse.mosaic.lib.geo.GeoCircle;
import org.eclipse.mosaic.lib.geo.CartesianPoint;
import org.eclipse.mosaic.lib.geo.GeoCircle;
import org.eclipse.mosaic.lib.objects.addressing.AdHocMessageRoutingBuilder;
import org.eclipse.mosaic.lib.objects.v2x.ExternalV2xContent;
import org.eclipse.mosaic.lib.objects.v2x.ExternalV2xMessage;
import org.eclipse.mosaic.lib.objects.v2x.MessageRouting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import com.google.gson.Gson;

import gov.dot.fhwa.saxton.CarmaV2xMessage;

/**
* Session management class for Infrastructure instances communicating with
Expand Down Expand Up @@ -65,7 +71,10 @@ public void onNewRegistration(InfrastructureRegistrationMessage registration) {
InetAddress.getByName(registration.getRxMessageIpAddress()),
registration.getRxMessagePort(),
registration.getTimeSyncPort(),
registration.getLocation());
registration.getSimulatedInteractionPort(),
registration.getLocation(),
registration.getSensors());

} catch (UnknownHostException e) {
log.error("Failed to create infrastructure instance with ID '{}' due to an unknown host exception: {}",
registration.getInfrastructureId(), e.getMessage());
Expand All @@ -91,9 +100,9 @@ public void onNewRegistration(InfrastructureRegistrationMessage registration) {
*
*/
private void newInfrastructureInstance(String infrastructureId, InetAddress rxMessageIpAddress, int rxMessagePort,
int timeSyncPort, CartesianPoint location) {
int timeSyncPort, int simulatedInteractionPort, CartesianPoint location, ArrayList<Sensor> sensors) {
InfrastructureInstance tmp = new InfrastructureInstance(infrastructureId, rxMessageIpAddress, rxMessagePort,
timeSyncPort, location);
timeSyncPort, simulatedInteractionPort, location, sensors);
try {
tmp.bind();
log.info("New Infrastructure instance '{}' registered with Infrastructure Instance Manager.", infrastructureId);
Expand All @@ -105,6 +114,7 @@ private void newInfrastructureInstance(String infrastructureId, InetAddress rxMe
managedInstances.put(infrastructureId, tmp);
}


/**
* Callback to be invoked when CARMA Platform receives a V2X Message from the NS-3 simulation
* @param sourceAddr The V2X Message received
Expand Down Expand Up @@ -135,7 +145,7 @@ public V2xMessageTransmission onV2XMessageTx(InetAddress sourceAddr, CarmaV2xMes
}

/**
* Callback to be invoked when CARMA Platform receives a V2X Message from the NS-3 simulation
* Callback to be invoked when an RSU receives a V2X Message from the NS-3 simulation
* @param rxMsg The V2X Message received
* @param rxRsuId The Host ID of the vehicle receiving the data
* @throws RuntimeException If the socket used to communicate with the platform experiences failure
Expand All @@ -153,6 +163,59 @@ public void onV2XMessageRx(byte[] rxMsg, String rxRsuId) {
}
}

/**
* Callback to be invoked when a infrastructure instance receives a simulated object detection from
* a registered simulated sensors.
* @param detection
* @param sensorId
*/
public void onObjectDetectionInteraction(DetectedObject detection) {
for (InfrastructureInstance instance : managedInstances.values()) {
if ( instance.containsSensor(detection.getSensorId()) ) {
try {
instance.sendInteraction(encodeObjectDetection(detection));
// Assuming each sensor would only ever be registered to a single infrastructure instance
// break out of loop.
break;
}
catch( IOException e ) {
log.error("Error occured: {}", e);
}
}
}
}

private byte[] encodeTimeMessage(InfrastructureTimeMessage message ) {
return asJson(message).getBytes();
}

private String asJson( Object obj) {
Gson gson = new Gson();
return gson.toJson(obj);
}
private byte[] encodeObjectDetection(DetectedObject detection) {
return asJson(detection).getBytes();
}


/**
* This function is used to send out encoded timestep update to all registered
* instances the manager has on the managed instances map
*
* @param message This time message is used to store current seq and timestep
* from the ambassador side
* @throws IOException
*/
public void onTimeStepUpdate(InfrastructureTimeMessage message) throws IOException {
if (managedInstances.size() == 0) {
log.debug("There are no registered instances");
}

for (InfrastructureInstance currentInstance : managedInstances.values()) {
currentInstance.sendTimeSyncMsgs(encodeTimeMessage(message));
}
}

/**
* External helper function to allow the ambassador to check if a given vehicle
* ID is a registered CARMA Platform instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.eclipse.mosaic.interactions.communication.V2xMessageReception;
import org.eclipse.mosaic.interactions.communication.V2xMessageTransmission;
import org.eclipse.mosaic.interactions.mapping.RsuRegistration;
import org.eclipse.mosaic.interactions.sensor.DetectedObject;
import org.eclipse.mosaic.interactions.sensor.DetectedObjectInteraction;
import org.eclipse.mosaic.lib.enums.AdHocChannel;
import org.eclipse.mosaic.lib.geo.GeoPoint;
import org.eclipse.mosaic.lib.misc.Tuple;
Expand All @@ -41,6 +43,8 @@
import org.eclipse.mosaic.rti.api.InternalFederateException;
import org.eclipse.mosaic.rti.api.parameters.AmbassadorParameter;

import com.google.gson.Gson;

import javax.xml.bind.DatatypeConverter;
import java.io.IOException;
import java.net.Inet4Address;
Expand Down Expand Up @@ -72,8 +76,6 @@ public class InfrastructureMessageAmbassador extends AbstractFederateAmbassador
private Thread v2xMessageBackgroundThread;

private InfrastructureInstanceManager infrastructureInstanceManager = new InfrastructureInstanceManager();
private InfrastructureTimeInterface infrastructureTimeInterface = new InfrastructureTimeInterface(
infrastructureInstanceManager);

private int timeSyncSeq = 0;

Expand Down Expand Up @@ -154,8 +156,16 @@ public void processInteraction(Interaction interaction) throws InternalFederateE
if (interaction.getTypeId().equals(InfrastructureV2xMessageReception.TYPE_ID)) {
this.receiveInteraction((InfrastructureV2xMessageReception) interaction);
}
if (interaction.getTypeId().equals(DetectedObjectInteraction.TYPE_ID)) {
this.receiveDetectedObjectInteraction((DetectedObjectInteraction) interaction);
}
}


private synchronized void receiveDetectedObjectInteraction( DetectedObjectInteraction interaction) {
log.trace("Process Detected Object Interaction {}", interaction.toString());
infrastructureInstanceManager.onObjectDetectionInteraction(interaction.getDetectedObject());
}
/**
* Extract external message from received
* {@link InfrastructureV2xMessageReception} interaction.
Expand Down Expand Up @@ -191,6 +201,7 @@ private synchronized void receiveV2xReceptionInteraction(V2xMessageReception int
}
}


/**
*
* Creates an Ad-Hoc configuration object to represent the configuration of the
Expand Down Expand Up @@ -308,7 +319,7 @@ public synchronized void processTimeAdvanceGrant(long time) throws InternalFeder
timeSyncMessage.setSeq(timeSyncSeq);
// nanoseconds to milliseconds for InfrastructureTimeMessage
timeSyncMessage.setTimestep(currentSimulationTime/1000000);
infrastructureTimeInterface.onTimeStepUpdate(timeSyncMessage);
infrastructureInstanceManager.onTimeStepUpdate(timeSyncMessage);

// TODO: Handle any queued V2X message receiver's received messages

Expand Down
Loading

0 comments on commit 69a8ca8

Please sign in to comment.