Skip to content

Commit

Permalink
Merge pull request #554 from John-Wiens/generic-receiver
Browse files Browse the repository at this point in the history
Generic receiver
  • Loading branch information
dan-du-car authored Sep 25, 2024
2 parents bb847a3 + 89b819c commit 2222b23
Show file tree
Hide file tree
Showing 16 changed files with 546 additions and 328 deletions.
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ services:
- "44920:44920/udp"
- "44930:44930/udp"
- "44940:44940/udp"
- "44990:44990/udp"
- "5555:5555/udp"
- "6666:6666/udp"
environment:
Expand Down
20 changes: 20 additions & 0 deletions jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/OdeProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ public class OdeProperties implements EnvironmentAware {
private String kafkaTopicOdePsmJson = "topic.OdePsmJson";
private int psmReceiverPort = 44940;
private int psmBufferSize = 500;

// Generic Receiver
private int genericReceiverPort = 44990;
private int genericBufferSize = 2000;

// DriverAlerts
private String kafkaTopicDriverAlertJson = "topic.OdeDriverAlertJson";
Expand Down Expand Up @@ -470,6 +474,22 @@ public void setPsmBufferSize(int psmBufferSize) {
this.psmBufferSize = psmBufferSize;
}

public int getGenericReceiverPort() {
return genericReceiverPort;
}

public void setGenericReceiverPort(int genericReceiverPort) {
this.genericReceiverPort = genericReceiverPort;
}

public int getGenericBufferSize() {
return genericBufferSize;
}

public void setGenericBufferSize(int psmBufferSize) {
this.genericBufferSize = genericBufferSize;
}

public void setUploadLocationRoot(String uploadLocationRoot) {
this.uploadLocationRoot = uploadLocationRoot;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,19 +164,25 @@ private void publishList(XmlUtils xmlUtils, List<OdeData> dataList) throws JsonP
} else {
// Determine the message type (MAP, TIM, SSM, SRM, or PSM)
String messageType = UperUtil.determineMessageType(msgPayload);
if (messageType == "MAP") {
if (messageType.equals("MAP")) {
publisher.publish(JsonUtils.toJson(odeData, false),
publisher.getOdeProperties().getKafkaTopicOdeRawEncodedMAPJson());
} else if (messageType == "TIM") {
} else if(messageType.equals("SPAT")){
publisher.publish(JsonUtils.toJson(odeData, false),
publisher.getOdeProperties().getKafkaTopicOdeRawEncodedSPATJson());
} else if (messageType.equals("TIM")) {
publisher.publish(JsonUtils.toJson(odeData, false),
publisher.getOdeProperties().getKafkaTopicOdeRawEncodedTIMJson());
} else if (messageType == "SSM") {
} else if (messageType.equals("BSM")) {
publisher.publish(JsonUtils.toJson(odeData, false),
publisher.getOdeProperties().getKafkaTopicOdeRawEncodedBSMJson());
} else if (messageType.equals("SSM")) {
publisher.publish(JsonUtils.toJson(odeData, false),
publisher.getOdeProperties().getKafkaTopicOdeRawEncodedSSMJson());
} else if (messageType == "SRM") {
} else if (messageType.equals("SRM")) {
publisher.publish(JsonUtils.toJson(odeData, false),
publisher.getOdeProperties().getKafkaTopicOdeRawEncodedSRMJson());
} else if (messageType == "PSM") {
} else if (messageType.equals("PSM")) {
publisher.publish(JsonUtils.toJson(odeData, false),
publisher.getOdeProperties().getKafkaTopicOdeRawEncodedPSMJson());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
package us.dot.its.jpo.ode.udp;

import java.net.DatagramSocket;
import java.net.DatagramPacket;
import java.net.SocketException;

import org.apache.tomcat.util.buf.HexUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import us.dot.its.jpo.ode.OdeProperties;
import us.dot.its.jpo.ode.model.OdeAsn1Payload;
import us.dot.its.jpo.ode.uper.UperUtil;

public abstract class AbstractUdpReceiverPublisher implements Runnable {

Expand Down Expand Up @@ -57,24 +53,6 @@ public AbstractUdpReceiverPublisher(OdeProperties odeProps, int port, int buffer
}
}

public OdeAsn1Payload getPayloadHexString(DatagramPacket packet, UperUtil.SupportedMessageTypes msgType) {
String startFlag = UperUtil.getStartFlag(msgType);
// extract the actual packet from the buffer
byte[] payload = packet.getData();
if (payload == null)
return null;
// convert bytes to hex string and verify identity
String payloadHexString = HexUtils.toHexString(payload).toLowerCase();
if (payloadHexString.indexOf(startFlag) == -1)
return null;

logger.debug("Full {} packet: {}", msgType, payloadHexString);
payloadHexString = UperUtil.stripDot3Header(payloadHexString, startFlag);
logger.debug("Stripped {} packet: {}", msgType, payloadHexString);

OdeAsn1Payload timPayload = new OdeAsn1Payload(HexUtils.fromHexString(payloadHexString));

return timPayload;
}


}
238 changes: 238 additions & 0 deletions jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/UdpHexDecoder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
package us.dot.its.jpo.ode.udp;

import java.net.DatagramPacket;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;

import org.apache.tomcat.util.buf.HexUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import us.dot.its.jpo.ode.model.OdeAsn1Data;
import us.dot.its.jpo.ode.model.OdeAsn1Payload;
import us.dot.its.jpo.ode.model.OdeBsmMetadata;
import us.dot.its.jpo.ode.model.OdeBsmMetadata.BsmSource;
import us.dot.its.jpo.ode.model.OdeLogMetadata.RecordType;
import us.dot.its.jpo.ode.model.OdeLogMetadata.SecurityResultCode;
import us.dot.its.jpo.ode.model.OdeLogMsgMetadataLocation;
import us.dot.its.jpo.ode.model.OdeMapMetadata;
import us.dot.its.jpo.ode.model.OdeMapMetadata.MapSource;
import us.dot.its.jpo.ode.model.OdeMsgMetadata.GeneratedBy;
import us.dot.its.jpo.ode.model.OdePsmMetadata;
import us.dot.its.jpo.ode.model.OdePsmMetadata.PsmSource;
import us.dot.its.jpo.ode.model.OdeSpatMetadata;
import us.dot.its.jpo.ode.model.OdeSpatMetadata.SpatSource;
import us.dot.its.jpo.ode.model.OdeSrmMetadata;
import us.dot.its.jpo.ode.model.OdeSrmMetadata.SrmSource;
import us.dot.its.jpo.ode.model.OdeSsmMetadata;
import us.dot.its.jpo.ode.model.OdeSsmMetadata.SsmSource;
import us.dot.its.jpo.ode.model.OdeTimMetadata;
import us.dot.its.jpo.ode.model.ReceivedMessageDetails;
import us.dot.its.jpo.ode.model.RxSource;
import us.dot.its.jpo.ode.uper.UperUtil;
import us.dot.its.jpo.ode.util.JsonUtils;

public class UdpHexDecoder {

private static Logger logger = LoggerFactory.getLogger(UdpHexDecoder.class);

public static OdeAsn1Payload getPayloadHexString(DatagramPacket packet, UperUtil.SupportedMessageTypes msgType) {
String startFlag = UperUtil.getStartFlag(msgType);
// extract the actual packet from the buffer
byte[] payload = packet.getData();
if (payload == null)
return null;
// convert bytes to hex string and verify identity
String payloadHexString = HexUtils.toHexString(payload).toLowerCase();
if (payloadHexString.indexOf(startFlag) == -1)
return null;

logger.debug("Full {} packet: {}", msgType, payloadHexString);

payloadHexString = UperUtil.stripTrailingZeros(UperUtil.stripDot3Header(payloadHexString, startFlag)).toLowerCase();
logger.debug("Stripped {} packet: {}", msgType, payloadHexString);

OdeAsn1Payload odePayload = new OdeAsn1Payload(HexUtils.fromHexString(payloadHexString));

return odePayload;
}

public static String buildJsonMapFromPacket(DatagramPacket packet){
String senderIp = packet.getAddress().getHostAddress();
int senderPort = packet.getPort();
logger.debug("Packet received from {}:{}", senderIp, senderPort);

// Create OdeMsgPayload and OdeLogMetadata objects and populate them
OdeAsn1Payload mapPayload = getPayloadHexString(packet, UperUtil.SupportedMessageTypes.MAP);
if (mapPayload == null)
return null;
OdeMapMetadata mapMetadata = new OdeMapMetadata(mapPayload);

// Add header data for the decoding process
mapMetadata.setOdeReceivedAt(getUtcTimeString());

mapMetadata.setOriginIp(senderIp);
mapMetadata.setMapSource(MapSource.RSU);
mapMetadata.setRecordType(RecordType.mapTx);
mapMetadata.setRecordGeneratedBy(GeneratedBy.RSU);
mapMetadata.setSecurityResultCode(SecurityResultCode.success);

return JsonUtils.toJson(new OdeAsn1Data(mapMetadata, mapPayload), false);
}

public static String buildJsonSpatFromPacket(DatagramPacket packet){
String senderIp = packet.getAddress().getHostAddress();
int senderPort = packet.getPort();
logger.debug("Packet received from {}:{}", senderIp, senderPort);

// Create OdeMsgPayload and OdeLogMetadata objects and populate them
OdeAsn1Payload spatPayload = getPayloadHexString(packet, UperUtil.SupportedMessageTypes.SPAT);
if (spatPayload == null)
return null;
OdeSpatMetadata spatMetadata = new OdeSpatMetadata(spatPayload);

// Add header data for the decoding process
spatMetadata.setOdeReceivedAt(getUtcTimeString());

spatMetadata.setOriginIp(senderIp);
spatMetadata.setSpatSource(SpatSource.RSU);
spatMetadata.setRecordType(RecordType.spatTx);
spatMetadata.setRecordGeneratedBy(GeneratedBy.RSU);
spatMetadata.setSecurityResultCode(SecurityResultCode.success);


return JsonUtils.toJson(new OdeAsn1Data(spatMetadata, spatPayload), false);
}

public static String buildJsonTimFromPacket(DatagramPacket packet){

String senderIp = packet.getAddress().getHostAddress();
int senderPort = packet.getPort();
logger.debug("Packet received from {}:{}", senderIp, senderPort);

// Create OdeMsgPayload and OdeLogMetadata objects and populate them
OdeAsn1Payload timPayload = getPayloadHexString(packet, UperUtil.SupportedMessageTypes.TIM);
if (timPayload == null)
return null;
OdeTimMetadata timMetadata = new OdeTimMetadata(timPayload);

// Add header data for the decoding process
timMetadata.setOdeReceivedAt(getUtcTimeString());

timMetadata.setOriginIp(senderIp);
timMetadata.setRecordType(RecordType.timMsg);
timMetadata.setRecordGeneratedBy(GeneratedBy.RSU);
timMetadata.setSecurityResultCode(SecurityResultCode.success);
return JsonUtils.toJson(new OdeAsn1Data(timMetadata, timPayload), false);
}

public static String buildJsonBsmFromPacket(DatagramPacket packet){
String senderIp = packet.getAddress().getHostAddress();
int senderPort = packet.getPort();
logger.debug("Packet received from {}:{}", senderIp, senderPort);

OdeAsn1Payload bsmPayload = getPayloadHexString(packet, UperUtil.SupportedMessageTypes.BSM);
if (bsmPayload == null)
return null;
OdeBsmMetadata bsmMetadata = new OdeBsmMetadata(bsmPayload);

// Set BSM Metadata values that can be assumed from the UDP endpoint
bsmMetadata.setOdeReceivedAt(getUtcTimeString());

ReceivedMessageDetails receivedMessageDetails = new ReceivedMessageDetails();
OdeLogMsgMetadataLocation locationData = new OdeLogMsgMetadataLocation(
"unavailable",
"unavailable",
"unavailable",
"unavailable",
"unavailable");
receivedMessageDetails.setRxSource(RxSource.RSU);
receivedMessageDetails.setLocationData(locationData);
bsmMetadata.setReceivedMessageDetails(receivedMessageDetails);

bsmMetadata.setOriginIp(senderIp);
bsmMetadata.setBsmSource(BsmSource.EV);
bsmMetadata.setRecordType(RecordType.bsmTx);
bsmMetadata.setRecordGeneratedBy(GeneratedBy.OBU);
bsmMetadata.setSecurityResultCode(SecurityResultCode.success);

return JsonUtils.toJson(new OdeAsn1Data(bsmMetadata, bsmPayload), false);
}

public static String buildJsonSsmFromPacket(DatagramPacket packet){
String senderIp = packet.getAddress().getHostAddress();
int senderPort = packet.getPort();
logger.debug("Packet received from {}:{}", senderIp, senderPort);

// Create OdeMsgPayload and OdeLogMetadata objects and populate them
OdeAsn1Payload ssmPayload = getPayloadHexString(packet, UperUtil.SupportedMessageTypes.SSM);
if (ssmPayload == null)
return null;
OdeSsmMetadata ssmMetadata = new OdeSsmMetadata(ssmPayload);

// Add header data for the decoding process
ssmMetadata.setOdeReceivedAt(getUtcTimeString());

ssmMetadata.setOriginIp(senderIp);
ssmMetadata.setSsmSource(SsmSource.RSU);
ssmMetadata.setRecordType(RecordType.ssmTx);
ssmMetadata.setRecordGeneratedBy(GeneratedBy.RSU);
ssmMetadata.setSecurityResultCode(SecurityResultCode.success);

return JsonUtils.toJson(new OdeAsn1Data(ssmMetadata, ssmPayload), false);
}

public static String buildJsonSrmFromPacket(DatagramPacket packet){
String senderIp = packet.getAddress().getHostAddress();
int senderPort = packet.getPort();
logger.debug("Packet received from {}:{}", senderIp, senderPort);

// Create OdeMsgPayload and OdeLogMetadata objects and populate them
OdeAsn1Payload srmPayload = getPayloadHexString(packet, UperUtil.SupportedMessageTypes.SRM);
if (srmPayload == null)
return null;
OdeSrmMetadata srmMetadata = new OdeSrmMetadata(srmPayload);

// Add header data for the decoding process
srmMetadata.setOdeReceivedAt(getUtcTimeString());

srmMetadata.setOriginIp(senderIp);
srmMetadata.setSrmSource(SrmSource.RSU);
srmMetadata.setRecordType(RecordType.srmTx);
srmMetadata.setRecordGeneratedBy(GeneratedBy.OBU);
srmMetadata.setSecurityResultCode(SecurityResultCode.success);

return JsonUtils.toJson(new OdeAsn1Data(srmMetadata, srmPayload), false);
}

public static String buildJsonPsmFromPacket(DatagramPacket packet){
String senderIp = packet.getAddress().getHostAddress();
int senderPort = packet.getPort();
logger.debug("Packet received from {}:{}", senderIp, senderPort);

// Create OdeMsgPayload and OdeLogMetadata objects and populate them
OdeAsn1Payload psmPayload = getPayloadHexString(packet, UperUtil.SupportedMessageTypes.PSM);
if (psmPayload == null)
return null;
OdePsmMetadata psmMetadata = new OdePsmMetadata(psmPayload);
// Add header data for the decoding process
psmMetadata.setOdeReceivedAt(getUtcTimeString());

psmMetadata.setOriginIp(senderIp);
psmMetadata.setPsmSource(PsmSource.RSU);
psmMetadata.setRecordType(RecordType.psmTx);
psmMetadata.setRecordGeneratedBy(GeneratedBy.UNKNOWN);
psmMetadata.setSecurityResultCode(SecurityResultCode.success);

return JsonUtils.toJson(new OdeAsn1Data(psmMetadata, psmPayload), false);
}

public static String getUtcTimeString(){
ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC);
String timestamp = utc.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"));
return timestamp;
}


}
Loading

0 comments on commit 2222b23

Please sign in to comment.