Skip to content

Commit

Permalink
Merge branch 'udp-security-headers' into Fix/timestamp-issue
Browse files Browse the repository at this point in the history
  • Loading branch information
mwodahl committed Apr 8, 2024
2 parents 7a8deb0 + 917d760 commit aa6c3d6
Show file tree
Hide file tree
Showing 41 changed files with 2,154 additions and 1,385 deletions.
330 changes: 329 additions & 1 deletion docs/data-flow-diagrams/ODE Data Flow Overview.drawio

Large diffs are not rendered by default.

Binary file modified docs/data-flow-diagrams/ODE Data Flow Overview.drawio.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
97 changes: 53 additions & 44 deletions docs/data-flow-diagrams/README.md

Large diffs are not rendered by default.

187 changes: 186 additions & 1 deletion docs/data-flow-diagrams/bsm/BSM Data Flow.drawio

Large diffs are not rendered by default.

Binary file modified docs/data-flow-diagrams/bsm/BSM Data Flow.drawio.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
161 changes: 160 additions & 1 deletion docs/data-flow-diagrams/map/MAP Data Flow.drawio

Large diffs are not rendered by default.

Binary file modified docs/data-flow-diagrams/map/MAP Data Flow.drawio.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
221 changes: 220 additions & 1 deletion docs/data-flow-diagrams/spat/SPAT Data Flow.drawio

Large diffs are not rendered by default.

Binary file modified docs/data-flow-diagrams/spat/SPAT Data Flow.drawio.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
316 changes: 315 additions & 1 deletion docs/data-flow-diagrams/tim/TIM Data Flow.drawio

Large diffs are not rendered by default.

Binary file modified docs/data-flow-diagrams/tim/TIM Data Flow.drawio.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
37 changes: 36 additions & 1 deletion jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/OdeProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public class OdeProperties implements EnvironmentAware {
private Set<String> kafkaTopicsDisabledSet = new HashSet<>();

// BSM
private final String BSM_START_FLAG = "0014";
private String kafkaTopicOdeBsmPojo = "topic.OdeBsmPojo";
private String kafkaTopicOdeBsmJson = "topic.OdeBsmJson";
private String kafkaTopicOdeBsmRxPojo = "topic.OdeBsmRxPojo";
Expand All @@ -130,6 +131,7 @@ public class OdeProperties implements EnvironmentAware {
private int bsmBufferSize = 500;

// TIM
private final String TIM_START_FLAG = "001f";
private String kafkaTopicOdeTimJson = "topic.OdeTimJson";
private String kafkaTopicOdeDNMsgJson = "topic.OdeDNMsgJson";
private String kafkaTopicOdeTimRxJson = "topic.OdeTimRxJson";
Expand All @@ -142,6 +144,7 @@ public class OdeProperties implements EnvironmentAware {
private int timBufferSize = 500;

//SPAT
private final String SPAT_START_FLAG = "0013";
private String kafkaTopicOdeSpatTxPojo = "topic.OdeSpatTxPojo";
private String kafkaTopicOdeSpatPojo = "topic.OdeSpatPojo";
private String kafkaTopicOdeSpatJson = "topic.OdeSpatJson";
Expand All @@ -150,30 +153,34 @@ public class OdeProperties implements EnvironmentAware {
private String kafkaTopicFilteredOdeSpatJson = "topic.FilteredOdeSpatJson";
private String kafkaTopicOdeRawEncodedSPATJson = "topic.OdeRawEncodedSPATJson";
private int spatReceiverPort = 44910;
private int spatBufferSize = 1000;
private int spatBufferSize = 500;

//SSM
private final String SSM_START_FLAG = "001e";
private String kafkaTopicOdeSsmPojo = "topic.OdeSsmPojo";
private String kafkaTopicOdeSsmJson = "topic.OdeSsmJson";
private String kafkaTopicOdeRawEncodedSSMJson = "topic.OdeRawEncodedSSMJson";
private int ssmReceiverPort = 44900;
private int ssmBufferSize = 500;

//SRM
private final String SRM_START_FLAG = "001d";
private String kafkaTopicOdeSrmTxPojo = "topic.OdeSrmTxPojo";
private String kafkaTopicOdeSrmJson = "topic.OdeSrmJson";
private String kafkaTopicOdeRawEncodedSRMJson = "topic.OdeRawEncodedSRMJson";
private int srmReceiverPort = 44930;
private int srmBufferSize = 500;

//MAP
private final String MAP_START_FLAG = "0012";
private String kafkaTopicOdeRawEncodedMAPJson = "topic.OdeRawEncodedMAPJson";
private String kafkaTopicOdeMapTxPojo = "topic.OdeMapTxPojo";
private String kafkaTopicOdeMapJson = "topic.OdeMapJson";
private int mapReceiverPort = 44920;
private int mapBufferSize = 2048;

// PSM
private final String PSM_START_FLAG = "0020";
private String kafkaTopicOdeRawEncodedPSMJson = "topic.OdeRawEncodedPSMJson";
private String kafkaTopicOdePsmTxPojo = "topic.OdePsmTxPojo";
private String kafkaTopicOdePsmJson = "topic.OdePsmJson";
Expand Down Expand Up @@ -385,6 +392,10 @@ public void setVerboseJson(Boolean verboseJson) {
this.verboseJson = verboseJson;
}

public String getBsmStartFlag() {
return BSM_START_FLAG;
}

public int getBsmReceiverPort() {
return bsmReceiverPort;
}
Expand All @@ -401,6 +412,10 @@ public void setBsmBufferSize(int bsmBufferSize) {
this.bsmBufferSize = bsmBufferSize;
}

public String getTimStartFlag() {
return TIM_START_FLAG;
}

public int getTimReceiverPort() {
return timReceiverPort;
}
Expand All @@ -417,6 +432,10 @@ public void setTimBufferSize(int timBufferSize) {
this.timBufferSize = timBufferSize;
}

public String getSsmStartFlag() {
return SSM_START_FLAG;
}

public int getSsmReceiverPort() {
return ssmReceiverPort;
}
Expand All @@ -433,6 +452,10 @@ public void setSsmBufferSize(int ssmBufferSize) {
this.ssmBufferSize = ssmBufferSize;
}

public String getSrmStartFlag() {
return SRM_START_FLAG;
}

public int getSrmReceiverPort() {
return srmReceiverPort;
}
Expand All @@ -449,6 +472,10 @@ public void setSrmBufferSize(int srmBufferSize) {
this.srmBufferSize = srmBufferSize;
}

public String getSpatStartFlag() {
return SPAT_START_FLAG;
}

public int getSpatReceiverPort() {
return spatReceiverPort;
}
Expand All @@ -465,6 +492,10 @@ public void setSpatBufferSize(int spatBufferSize) {
this.spatBufferSize = spatBufferSize;
}

public String getMapStartFlag() {
return MAP_START_FLAG;
}

public int getMapReceiverPort() {
return mapReceiverPort;
}
Expand All @@ -481,6 +512,10 @@ public void setMapBufferSize(int mapBufferSize) {
this.mapBufferSize = mapBufferSize;
}

public String getPsmStartFlag() {
return PSM_START_FLAG;
}

public int getPsmReceiverPort() {
return psmReceiverPort;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import us.dot.its.jpo.ode.importer.parser.LogFileParser;
import us.dot.its.jpo.ode.importer.parser.RxMsgFileParser;
import us.dot.its.jpo.ode.importer.parser.SpatLogFileParser;
import us.dot.its.jpo.ode.model.Asn1Encoding;
import us.dot.its.jpo.ode.model.Asn1Encoding.EncodingRule;
import us.dot.its.jpo.ode.model.OdeAsn1Data;
import us.dot.its.jpo.ode.model.OdeAsn1Payload;
import us.dot.its.jpo.ode.model.OdeBsmMetadata;
Expand Down Expand Up @@ -156,76 +154,67 @@ public boolean isSpatRecord() {

private void publishList(XmlUtils xmlUtils, List<OdeData> dataList) throws JsonProcessingException {
serialId.setBundleSize(dataList.size());

for (OdeData odeData : dataList) {
OdeLogMetadata msgMetadata = (OdeLogMetadata) odeData.getMetadata();
OdeMsgPayload msgPayload = (OdeMsgPayload) odeData.getPayload();
msgMetadata.setSerialId(serialId);

if (isDriverAlertRecord()) {
logger.debug("Publishing a driverAlert.");

publisher.publish(JsonUtils.toJson(odeData, false),
publisher.getOdeProperties().getKafkaTopicDriverAlertJson());
} else if (isBsmRecord()) {
publisher.publish(JsonUtils.toJson(odeData, false),
publisher.getOdeProperties().getKafkaTopicOdeRawEncodedBSMJson());
} else if (isSpatRecord()) {
publisher.publish(JsonUtils.toJson(odeData, false),
publisher.getOdeProperties().getKafkaTopicOdeRawEncodedSPATJson());
} else {
if (isBsmRecord()) {
logger.debug("Publishing a BSM");
} else if (isSpatRecord()) {
logger.debug("Publishing a Spat");
} else {
logger.debug("Publishing a TIM or MAP");
}

if (!(isSpatRecord() && msgMetadata instanceof OdeSpatMetadata
&& !((OdeSpatMetadata) msgMetadata).getIsCertPresent())) {
if (checkHeader(msgPayload) == "Ieee1609Dot2Data") {
Asn1Encoding msgEncoding = new Asn1Encoding("root", "Ieee1609Dot2Data", EncodingRule.COER);
msgMetadata.addEncoding(msgEncoding);
}
// Determine the message type (MAP or TIM are the current other options)
String messageType = determineMessageType(msgPayload);
if (messageType == "MAP") {
publisher.publish(JsonUtils.toJson(odeData, false),
publisher.getOdeProperties().getKafkaTopicOdeRawEncodedMAPJson());
} else if (messageType == "TIM") {
publisher.publish(JsonUtils.toJson(odeData, false),
publisher.getOdeProperties().getKafkaTopicOdeRawEncodedTIMJson());
}

Asn1Encoding unsecuredDataEncoding = new Asn1Encoding("unsecuredData", "MessageFrame",
EncodingRule.UPER);
msgMetadata.addEncoding(unsecuredDataEncoding);

publisher.publish(xmlUtils.toXml(odeData),
publisher.getOdeProperties().getKafkaTopicAsn1DecoderInput());
}

serialId.increment();
}
}


/**
* Checks the header of the OdeMsgPayload and determines the encoding rule to be used in the Asn1DecoderInput XML.
* The payload is checked for various message start flags. It will add the encoding rule to the Asn1DecoderInput XML
* to tell the ASN1 codec to extract data from the header into the output message.
* Determines the message type based off the most likely start flag
*
* @param payload The OdeMsgPayload to check the header of.
* @return The encoding rule to be used in the Asn1DecoderInput XML.
* @param payload The OdeMsgPayload to check the content of.
*/
public String checkHeader(OdeMsgPayload payload) {
JSONObject payloadJson;
String header = null;
public String determineMessageType(OdeMsgPayload payload) {
String messageType = "";
try {
payloadJson = JsonUtils.toJSONObject(payload.getData().toJson());
String hexPacket = payloadJson.getString("bytes");

for (String key : msgStartFlags.keySet()) {
String startFlag = msgStartFlags.get(key);
int startIndex = hexPacket.toLowerCase().indexOf(startFlag);
logger.debug("Start index for " + key + "(" + startFlag + ")" + " is: " + startIndex);
if (startIndex <= 20 && startIndex != 0 && startIndex != -1) {
logger.debug("Message has supported Ieee1609Dot2Data header, adding encoding rule to Asn1DecoderInput XML");
header = "Ieee1609Dot2Data";
break;
JSONObject payloadJson = JsonUtils.toJSONObject(payload.getData().toJson());
String hexString = payloadJson.getString("bytes").toLowerCase();

HashMap<String, Integer> flagIndexes = new HashMap<String, Integer>();
flagIndexes.put("MAP", hexString.indexOf(msgStartFlags.get("MAP")));
flagIndexes.put("TIM", hexString.indexOf(msgStartFlags.get("TIM")));

int lowestIndex = Integer.MAX_VALUE;
for (String key : flagIndexes.keySet()) {
if (flagIndexes.get(key) == -1) {
logger.debug("This message is not of type " + key);
continue;
}
if (flagIndexes.get(key) < lowestIndex) {
messageType = key;
lowestIndex = flagIndexes.get(key);
}
logger.debug("Payload JSON: " + payloadJson);
}
} catch (JsonUtilsException e) {
logger.error("JsonUtilsException while checking message header. Stacktrace: " + e.toString());

}
return header;
return messageType;
}

// This method will check if the next character is a newline character (0x0A in hex or 10 in converted decimal)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,6 @@ public class PayloadParser extends LogFileParser {
private static Logger logger = LoggerFactory.getLogger(PayloadParser.class);
private static HashMap<String, String> msgStartFlags = new HashMap<String, String>();

// Maximum header size for 1609 payload headers
private static final int HEADER_SIZE_1609 = 20;

// Maximum header size for WSMP payload headers
private static final int HEADER_SIZE_WSMP = 35;

public static final int PAYLOAD_LENGTH = 2;

protected short payloadLength;
Expand All @@ -63,7 +57,6 @@ public ParserStatus parseFile(BufferedInputStream bis, String fileName) throws F
if (status != ParserStatus.COMPLETE)
return status;
short length = CodecUtils.bytesToShort(readBuffer, 0, PAYLOAD_LENGTH, ByteOrder.LITTLE_ENDIAN);
logger.debug("Payload length is: " + length);
setPayloadLength(length);
}

Expand All @@ -72,7 +65,7 @@ public ParserStatus parseFile(BufferedInputStream bis, String fileName) throws F
status = parseStep(bis, getPayloadLength());
if (status != ParserStatus.COMPLETE)
return status;
setPayload(removeHeader(Arrays.copyOf(readBuffer, getPayloadLength())));
setPayload(stripDot3Header(Arrays.copyOf(readBuffer, getPayloadLength())));
}

resetStep();
Expand Down Expand Up @@ -104,43 +97,41 @@ public LogFileParser setPayload(byte[] payload) {
return this;
}

@Override
public void writeTo(OutputStream os) throws IOException {
os.write(CodecUtils.shortToBytes(payloadLength, ByteOrder.LITTLE_ENDIAN));
os.write(payload, 0, payloadLength);
}
@Override
public void writeTo(OutputStream os) throws IOException {
os.write(CodecUtils.shortToBytes(payloadLength, ByteOrder.LITTLE_ENDIAN));
os.write(payload, 0, payloadLength);
}


// Removes the 1609.2 header but will keep the 1609.1 header
public byte[] removeHeader(byte[] packet) {
String hexPacket = HexUtils.toHexString(packet);
/* Strips the 1609.3 and unsigned 1609.2 headers if they are present.
Will return the payload with a signed 1609.2 header if it is present.
Otherwise, returns just the payload. */
public byte[] stripDot3Header(byte[] packet) {
String hexString = HexUtils.toHexString(packet);
String hexPacketParsed = "";
for (String key : msgStartFlags.keySet()) {
String startFlag = msgStartFlags.get(key);
int startIndex = hexPacket.toLowerCase().indexOf(startFlag);
if (hexPacketParsed.equals("")) {
logger.debug("Start index for: " + key + " is: " + startIndex);
if (startIndex == -1) {
logger.debug("Message does not have header for: " + key);
continue;
} else if (startIndex <= HEADER_SIZE_1609) {
logger.debug("Message has supported header. startIndex: " + startIndex + " msgFlag: " + startFlag);
hexPacketParsed = hexPacket;
// If the header type is WSMP, the header will be stripped from the payload.
} else if (startIndex > HEADER_SIZE_1609 && startIndex < HEADER_SIZE_WSMP) {
int trueStartIndex = HEADER_SIZE_1609
+ hexPacket.substring(HEADER_SIZE_1609, hexPacket.length()).indexOf(startFlag);
logger.debug("Found payload start at: " + trueStartIndex);
hexPacketParsed = hexPacket.substring(trueStartIndex, hexPacket.length());
}
}

for (String start_flag : msgStartFlags.values()) {
int payloadStartIndex = hexString.indexOf(start_flag);
if (payloadStartIndex == -1)
continue;

String headers = hexString.substring(0, payloadStartIndex);
String payload = hexString.substring(payloadStartIndex, hexString.length());
// Look for the index of the start flag of a signed 1609.2 header, if one exists
int signedDot2StartIndex = headers.indexOf("038100");
if (signedDot2StartIndex == -1)
hexPacketParsed = payload;
else
hexPacketParsed = headers.substring(signedDot2StartIndex, headers.length()) + payload;
break;
}

if (hexPacketParsed.equals("")) {
hexPacketParsed = hexPacket;
logger.debug("Could not identify a Header in the following packet: " + hexPacketParsed);
} else {
logger.debug("Payload hex: " + hexPacketParsed);
hexPacketParsed = hexString;
logger.debug("Packet is not a BSM, TIM or Map message: " + hexPacketParsed);
}

return HexUtils.fromHexString(hexPacketParsed);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,22 @@ public abstract class AbstractAsn1DecodeMessageJSON extends AbstractSubscriberPr
private Logger logger = LoggerFactory.getLogger(this.getClass());

protected StringPublisher codecPublisher;
protected String payload_start_flag;

public AbstractAsn1DecodeMessageJSON() {
super();
}

public AbstractAsn1DecodeMessageJSON(StringPublisher codecPublisher) {
public AbstractAsn1DecodeMessageJSON(StringPublisher codecPublisher, String payload_start_flag) {
super();
this.codecPublisher = codecPublisher;
this.payload_start_flag = payload_start_flag;
}

// Strips the IEEE 1609.2 security header (if it exists) and returns the payload
protected String stripDot2Header(String hexString) {
hexString = hexString.toLowerCase();
return hexString.substring(hexString.indexOf(payload_start_flag), hexString.length());
}

protected void publishEncodedMessageToAsn1Decoder(OdeData odeData) {
Expand Down
Loading

0 comments on commit aa6c3d6

Please sign in to comment.