Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retain IEEE 1609.2 Security Headers #65

Merged
merged 14 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
330 changes: 329 additions & 1 deletion docs/data-flow-diagrams/ODE Data Flow Overview.drawio

Large diffs are not rendered by default.

Binary file modified docs/data-flow-diagrams/ODE Data Flow Overview.drawio.png
dmccoystephenson marked this conversation as resolved.
Show resolved Hide resolved
dmccoystephenson marked this conversation as resolved.
Show resolved Hide resolved
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
97 changes: 53 additions & 44 deletions docs/data-flow-diagrams/README.md

Large diffs are not rendered by default.

187 changes: 186 additions & 1 deletion docs/data-flow-diagrams/bsm/BSM Data Flow.drawio

Large diffs are not rendered by default.

Binary file modified docs/data-flow-diagrams/bsm/BSM Data Flow.drawio.png
dmccoystephenson marked this conversation as resolved.
Show resolved Hide resolved
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
161 changes: 160 additions & 1 deletion docs/data-flow-diagrams/map/MAP Data Flow.drawio

Large diffs are not rendered by default.

Binary file modified docs/data-flow-diagrams/map/MAP Data Flow.drawio.png
dmccoystephenson marked this conversation as resolved.
Show resolved Hide resolved
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
221 changes: 220 additions & 1 deletion docs/data-flow-diagrams/spat/SPAT Data Flow.drawio

Large diffs are not rendered by default.

Binary file modified docs/data-flow-diagrams/spat/SPAT Data Flow.drawio.png
dmccoystephenson marked this conversation as resolved.
Show resolved Hide resolved
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
316 changes: 315 additions & 1 deletion docs/data-flow-diagrams/tim/TIM Data Flow.drawio

Large diffs are not rendered by default.

Binary file modified docs/data-flow-diagrams/tim/TIM Data Flow.drawio.png
dmccoystephenson marked this conversation as resolved.
Show resolved Hide resolved
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
37 changes: 36 additions & 1 deletion jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/OdeProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public class OdeProperties implements EnvironmentAware {
private Set<String> kafkaTopicsDisabledSet = new HashSet<>();

// BSM
private final String BSM_START_FLAG = "0014";
private String kafkaTopicOdeBsmPojo = "topic.OdeBsmPojo";
private String kafkaTopicOdeBsmJson = "topic.OdeBsmJson";
private String kafkaTopicOdeBsmRxPojo = "topic.OdeBsmRxPojo";
Expand All @@ -130,6 +131,7 @@ public class OdeProperties implements EnvironmentAware {
private int bsmBufferSize = 500;

// TIM
private final String TIM_START_FLAG = "001f";
dmccoystephenson marked this conversation as resolved.
Show resolved Hide resolved
private String kafkaTopicOdeTimJson = "topic.OdeTimJson";
private String kafkaTopicOdeDNMsgJson = "topic.OdeDNMsgJson";
private String kafkaTopicOdeTimRxJson = "topic.OdeTimRxJson";
Expand All @@ -142,6 +144,7 @@ public class OdeProperties implements EnvironmentAware {
private int timBufferSize = 500;

//SPAT
private final String SPAT_START_FLAG = "0013";
private String kafkaTopicOdeSpatTxPojo = "topic.OdeSpatTxPojo";
private String kafkaTopicOdeSpatPojo = "topic.OdeSpatPojo";
private String kafkaTopicOdeSpatJson = "topic.OdeSpatJson";
Expand All @@ -150,30 +153,34 @@ public class OdeProperties implements EnvironmentAware {
private String kafkaTopicFilteredOdeSpatJson = "topic.FilteredOdeSpatJson";
private String kafkaTopicOdeRawEncodedSPATJson = "topic.OdeRawEncodedSPATJson";
private int spatReceiverPort = 44910;
private int spatBufferSize = 1000;
private int spatBufferSize = 500;

//SSM
private final String SSM_START_FLAG = "001e";
private String kafkaTopicOdeSsmPojo = "topic.OdeSsmPojo";
private String kafkaTopicOdeSsmJson = "topic.OdeSsmJson";
private String kafkaTopicOdeRawEncodedSSMJson = "topic.OdeRawEncodedSSMJson";
private int ssmReceiverPort = 44900;
private int ssmBufferSize = 500;

//SRM
private final String SRM_START_FLAG = "001d";
private String kafkaTopicOdeSrmTxPojo = "topic.OdeSrmTxPojo";
private String kafkaTopicOdeSrmJson = "topic.OdeSrmJson";
private String kafkaTopicOdeRawEncodedSRMJson = "topic.OdeRawEncodedSRMJson";
private int srmReceiverPort = 44930;
private int srmBufferSize = 500;

//MAP
private final String MAP_START_FLAG = "0012";
private String kafkaTopicOdeRawEncodedMAPJson = "topic.OdeRawEncodedMAPJson";
private String kafkaTopicOdeMapTxPojo = "topic.OdeMapTxPojo";
private String kafkaTopicOdeMapJson = "topic.OdeMapJson";
private int mapReceiverPort = 44920;
private int mapBufferSize = 2048;

// PSM
private final String PSM_START_FLAG = "0020";
private String kafkaTopicOdeRawEncodedPSMJson = "topic.OdeRawEncodedPSMJson";
private String kafkaTopicOdePsmTxPojo = "topic.OdePsmTxPojo";
private String kafkaTopicOdePsmJson = "topic.OdePsmJson";
Expand Down Expand Up @@ -385,6 +392,10 @@ public void setVerboseJson(Boolean verboseJson) {
this.verboseJson = verboseJson;
}

public String getBsmStartFlag() {
return BSM_START_FLAG;
}

public int getBsmReceiverPort() {
return bsmReceiverPort;
}
Expand All @@ -401,6 +412,10 @@ public void setBsmBufferSize(int bsmBufferSize) {
this.bsmBufferSize = bsmBufferSize;
}

public String getTimStartFlag() {
return TIM_START_FLAG;
}

public int getTimReceiverPort() {
return timReceiverPort;
}
Expand All @@ -417,6 +432,10 @@ public void setTimBufferSize(int timBufferSize) {
this.timBufferSize = timBufferSize;
}

public String getSsmStartFlag() {
return SSM_START_FLAG;
}

public int getSsmReceiverPort() {
return ssmReceiverPort;
}
Expand All @@ -433,6 +452,10 @@ public void setSsmBufferSize(int ssmBufferSize) {
this.ssmBufferSize = ssmBufferSize;
}

public String getSrmStartFlag() {
return SRM_START_FLAG;
}

public int getSrmReceiverPort() {
return srmReceiverPort;
}
Expand All @@ -449,6 +472,10 @@ public void setSrmBufferSize(int srmBufferSize) {
this.srmBufferSize = srmBufferSize;
}

public String getSpatStartFlag() {
return SPAT_START_FLAG;
}

public int getSpatReceiverPort() {
return spatReceiverPort;
}
Expand All @@ -465,6 +492,10 @@ public void setSpatBufferSize(int spatBufferSize) {
this.spatBufferSize = spatBufferSize;
}

public String getMapStartFlag() {
return MAP_START_FLAG;
}

public int getMapReceiverPort() {
return mapReceiverPort;
}
Expand All @@ -481,6 +512,10 @@ public void setMapBufferSize(int mapBufferSize) {
this.mapBufferSize = mapBufferSize;
}

public String getPsmStartFlag() {
return PSM_START_FLAG;
}

public int getPsmReceiverPort() {
return psmReceiverPort;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import us.dot.its.jpo.ode.importer.parser.LogFileParser;
import us.dot.its.jpo.ode.importer.parser.RxMsgFileParser;
import us.dot.its.jpo.ode.importer.parser.SpatLogFileParser;
import us.dot.its.jpo.ode.model.Asn1Encoding;
import us.dot.its.jpo.ode.model.Asn1Encoding.EncodingRule;
import us.dot.its.jpo.ode.model.OdeAsn1Data;
import us.dot.its.jpo.ode.model.OdeAsn1Payload;
import us.dot.its.jpo.ode.model.OdeBsmMetadata;
Expand Down Expand Up @@ -156,76 +154,67 @@ public boolean isSpatRecord() {

private void publishList(XmlUtils xmlUtils, List<OdeData> dataList) throws JsonProcessingException {
serialId.setBundleSize(dataList.size());

for (OdeData odeData : dataList) {
OdeLogMetadata msgMetadata = (OdeLogMetadata) odeData.getMetadata();
OdeMsgPayload msgPayload = (OdeMsgPayload) odeData.getPayload();
msgMetadata.setSerialId(serialId);

if (isDriverAlertRecord()) {
logger.debug("Publishing a driverAlert.");

publisher.publish(JsonUtils.toJson(odeData, false),
publisher.getOdeProperties().getKafkaTopicDriverAlertJson());
} else if (isBsmRecord()) {
publisher.publish(JsonUtils.toJson(odeData, false),
publisher.getOdeProperties().getKafkaTopicOdeRawEncodedBSMJson());
} else if (isSpatRecord()) {
publisher.publish(JsonUtils.toJson(odeData, false),
publisher.getOdeProperties().getKafkaTopicOdeRawEncodedSPATJson());
} else {
if (isBsmRecord()) {
logger.debug("Publishing a BSM");
} else if (isSpatRecord()) {
logger.debug("Publishing a Spat");
} else {
logger.debug("Publishing a TIM or MAP");
}

if (!(isSpatRecord() && msgMetadata instanceof OdeSpatMetadata
&& !((OdeSpatMetadata) msgMetadata).getIsCertPresent())) {
if (checkHeader(msgPayload) == "Ieee1609Dot2Data") {
Asn1Encoding msgEncoding = new Asn1Encoding("root", "Ieee1609Dot2Data", EncodingRule.COER);
msgMetadata.addEncoding(msgEncoding);
}
// Determine the message type (MAP or TIM are the current other options)
String messageType = determineMessageType(msgPayload);
if (messageType == "MAP") {
publisher.publish(JsonUtils.toJson(odeData, false),
publisher.getOdeProperties().getKafkaTopicOdeRawEncodedMAPJson());
} else if (messageType == "TIM") {
publisher.publish(JsonUtils.toJson(odeData, false),
publisher.getOdeProperties().getKafkaTopicOdeRawEncodedTIMJson());
}

Asn1Encoding unsecuredDataEncoding = new Asn1Encoding("unsecuredData", "MessageFrame",
EncodingRule.UPER);
msgMetadata.addEncoding(unsecuredDataEncoding);

publisher.publish(xmlUtils.toXml(odeData),
publisher.getOdeProperties().getKafkaTopicAsn1DecoderInput());
dmccoystephenson marked this conversation as resolved.
Show resolved Hide resolved
}

serialId.increment();
}
}


/**
* Checks the header of the OdeMsgPayload and determines the encoding rule to be used in the Asn1DecoderInput XML.
* The payload is checked for various message start flags. It will add the encoding rule to the Asn1DecoderInput XML
* to tell the ASN1 codec to extract data from the header into the output message.
* Determines the message type based off the most likely start flag
*
* @param payload The OdeMsgPayload to check the header of.
* @return The encoding rule to be used in the Asn1DecoderInput XML.
* @param payload The OdeMsgPayload to check the content of.
*/
public String checkHeader(OdeMsgPayload payload) {
JSONObject payloadJson;
String header = null;
public String determineMessageType(OdeMsgPayload payload) {
String messageType = "";
try {
payloadJson = JsonUtils.toJSONObject(payload.getData().toJson());
String hexPacket = payloadJson.getString("bytes");

for (String key : msgStartFlags.keySet()) {
String startFlag = msgStartFlags.get(key);
int startIndex = hexPacket.toLowerCase().indexOf(startFlag);
logger.debug("Start index for " + key + "(" + startFlag + ")" + " is: " + startIndex);
if (startIndex <= 20 && startIndex != 0 && startIndex != -1) {
logger.debug("Message has supported Ieee1609Dot2Data header, adding encoding rule to Asn1DecoderInput XML");
header = "Ieee1609Dot2Data";
break;
JSONObject payloadJson = JsonUtils.toJSONObject(payload.getData().toJson());
String hexString = payloadJson.getString("bytes").toLowerCase();

HashMap<String, Integer> flagIndexes = new HashMap<String, Integer>();
flagIndexes.put("MAP", hexString.indexOf(msgStartFlags.get("MAP")));
flagIndexes.put("TIM", hexString.indexOf(msgStartFlags.get("TIM")));

int lowestIndex = Integer.MAX_VALUE;
for (String key : flagIndexes.keySet()) {
if (flagIndexes.get(key) == -1) {
logger.debug("This message is not of type " + key);
continue;
dmccoystephenson marked this conversation as resolved.
Show resolved Hide resolved
}
if (flagIndexes.get(key) < lowestIndex) {
Michael7371 marked this conversation as resolved.
Show resolved Hide resolved
messageType = key;
lowestIndex = flagIndexes.get(key);
}
logger.debug("Payload JSON: " + payloadJson);
}
} catch (JsonUtilsException e) {
logger.error("JsonUtilsException while checking message header. Stacktrace: " + e.toString());

}
return header;
return messageType;
}

// This method will check if the next character is a newline character (0x0A in hex or 10 in converted decimal)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,6 @@ public class PayloadParser extends LogFileParser {
private static Logger logger = LoggerFactory.getLogger(PayloadParser.class);
private static HashMap<String, String> msgStartFlags = new HashMap<String, String>();

// Maximum header size for 1609 payload headers
private static final int HEADER_SIZE_1609 = 20;

// Maximum header size for WSMP payload headers
private static final int HEADER_SIZE_WSMP = 35;

public static final int PAYLOAD_LENGTH = 2;

protected short payloadLength;
Expand All @@ -63,7 +57,6 @@ public ParserStatus parseFile(BufferedInputStream bis, String fileName) throws F
if (status != ParserStatus.COMPLETE)
return status;
short length = CodecUtils.bytesToShort(readBuffer, 0, PAYLOAD_LENGTH, ByteOrder.LITTLE_ENDIAN);
logger.debug("Payload length is: " + length);
setPayloadLength(length);
}

Expand All @@ -72,7 +65,7 @@ public ParserStatus parseFile(BufferedInputStream bis, String fileName) throws F
status = parseStep(bis, getPayloadLength());
if (status != ParserStatus.COMPLETE)
return status;
setPayload(removeHeader(Arrays.copyOf(readBuffer, getPayloadLength())));
setPayload(stripDot3Header(Arrays.copyOf(readBuffer, getPayloadLength())));
}

resetStep();
Expand Down Expand Up @@ -104,43 +97,41 @@ public LogFileParser setPayload(byte[] payload) {
return this;
}

@Override
public void writeTo(OutputStream os) throws IOException {
os.write(CodecUtils.shortToBytes(payloadLength, ByteOrder.LITTLE_ENDIAN));
os.write(payload, 0, payloadLength);
}
@Override
public void writeTo(OutputStream os) throws IOException {
os.write(CodecUtils.shortToBytes(payloadLength, ByteOrder.LITTLE_ENDIAN));
os.write(payload, 0, payloadLength);
}


// Removes the 1609.2 header but will keep the 1609.1 header
public byte[] removeHeader(byte[] packet) {
String hexPacket = HexUtils.toHexString(packet);
/* Strips the 1609.3 and unsigned 1609.2 headers if they are present.
Will return the payload with a signed 1609.2 header if it is present.
Otherwise, returns just the payload. */
public byte[] stripDot3Header(byte[] packet) {
String hexString = HexUtils.toHexString(packet);
String hexPacketParsed = "";
for (String key : msgStartFlags.keySet()) {
String startFlag = msgStartFlags.get(key);
int startIndex = hexPacket.toLowerCase().indexOf(startFlag);
if (hexPacketParsed.equals("")) {
logger.debug("Start index for: " + key + " is: " + startIndex);
if (startIndex == -1) {
logger.debug("Message does not have header for: " + key);
continue;
} else if (startIndex <= HEADER_SIZE_1609) {
logger.debug("Message has supported header. startIndex: " + startIndex + " msgFlag: " + startFlag);
hexPacketParsed = hexPacket;
// If the header type is WSMP, the header will be stripped from the payload.
} else if (startIndex > HEADER_SIZE_1609 && startIndex < HEADER_SIZE_WSMP) {
int trueStartIndex = HEADER_SIZE_1609
+ hexPacket.substring(HEADER_SIZE_1609, hexPacket.length()).indexOf(startFlag);
logger.debug("Found payload start at: " + trueStartIndex);
hexPacketParsed = hexPacket.substring(trueStartIndex, hexPacket.length());
}
}

for (String start_flag : msgStartFlags.values()) {
int payloadStartIndex = hexString.indexOf(start_flag);
if (payloadStartIndex == -1)
continue;

String headers = hexString.substring(0, payloadStartIndex);
String payload = hexString.substring(payloadStartIndex, hexString.length());
// Look for the index of the start flag of a signed 1609.2 header, if one exists
int signedDot2StartIndex = headers.indexOf("038100");
if (signedDot2StartIndex == -1)
hexPacketParsed = payload;
else
hexPacketParsed = headers.substring(signedDot2StartIndex, headers.length()) + payload;
Michael7371 marked this conversation as resolved.
Show resolved Hide resolved
break;
}

if (hexPacketParsed.equals("")) {
hexPacketParsed = hexPacket;
logger.debug("Could not identify a Header in the following packet: " + hexPacketParsed);
} else {
logger.debug("Payload hex: " + hexPacketParsed);
hexPacketParsed = hexString;
logger.debug("Packet is not a BSM, TIM or Map message: " + hexPacketParsed);
}

return HexUtils.fromHexString(hexPacketParsed);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,22 @@ public abstract class AbstractAsn1DecodeMessageJSON extends AbstractSubscriberPr
private Logger logger = LoggerFactory.getLogger(this.getClass());

protected StringPublisher codecPublisher;
protected String payload_start_flag;

public AbstractAsn1DecodeMessageJSON() {
super();
}

public AbstractAsn1DecodeMessageJSON(StringPublisher codecPublisher) {
public AbstractAsn1DecodeMessageJSON(StringPublisher codecPublisher, String payload_start_flag) {
super();
this.codecPublisher = codecPublisher;
this.payload_start_flag = payload_start_flag;
}

// Strips the IEEE 1609.2 security header (if it exists) and returns the payload
protected String stripDot2Header(String hexString) {
hexString = hexString.toLowerCase();
return hexString.substring(hexString.indexOf(payload_start_flag), hexString.length());
payneBrandon marked this conversation as resolved.
Show resolved Hide resolved
}

protected void publishEncodedMessageToAsn1Decoder(OdeData odeData) {
Expand Down
Loading
Loading