Skip to content

Commit

Permalink
1609.2 headers saved in kafka and log offload and UDP endpoints pipel…
Browse files Browse the repository at this point in the history
…ines merged. Currently only support BSM for both
  • Loading branch information
drewjj committed Mar 14, 2024
1 parent 22e0986 commit 87623c9
Show file tree
Hide file tree
Showing 10 changed files with 180 additions and 339 deletions.
37 changes: 36 additions & 1 deletion jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/OdeProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public class OdeProperties implements EnvironmentAware {
private Set<String> kafkaTopicsDisabledSet = new HashSet<>();

// BSM
private final String BSM_START_FLAG = "0014";
private String kafkaTopicOdeBsmPojo = "topic.OdeBsmPojo";
private String kafkaTopicOdeBsmJson = "topic.OdeBsmJson";
private String kafkaTopicOdeBsmRxPojo = "topic.OdeBsmRxPojo";
Expand All @@ -130,6 +131,7 @@ public class OdeProperties implements EnvironmentAware {
private int bsmBufferSize = 500;

// TIM
private final String TIM_START_FLAG = "001f";
private String kafkaTopicOdeTimJson = "topic.OdeTimJson";
private String kafkaTopicOdeDNMsgJson = "topic.OdeDNMsgJson";
private String kafkaTopicOdeTimRxJson = "topic.OdeTimRxJson";
Expand All @@ -142,6 +144,7 @@ public class OdeProperties implements EnvironmentAware {
private int timBufferSize = 500;

//SPAT
private final String SPAT_START_FLAG = "0013";
private String kafkaTopicOdeSpatTxPojo = "topic.OdeSpatTxPojo";
private String kafkaTopicOdeSpatPojo = "topic.OdeSpatPojo";
private String kafkaTopicOdeSpatJson = "topic.OdeSpatJson";
Expand All @@ -150,30 +153,34 @@ public class OdeProperties implements EnvironmentAware {
private String kafkaTopicFilteredOdeSpatJson = "topic.FilteredOdeSpatJson";
private String kafkaTopicOdeRawEncodedSPATJson = "topic.OdeRawEncodedSPATJson";
private int spatReceiverPort = 44910;
private int spatBufferSize = 1000;
private int spatBufferSize = 500;

//SSM
private final String SSM_START_FLAG = "001e";
private String kafkaTopicOdeSsmPojo = "topic.OdeSsmPojo";
private String kafkaTopicOdeSsmJson = "topic.OdeSsmJson";
private String kafkaTopicOdeRawEncodedSSMJson = "topic.OdeRawEncodedSSMJson";
private int ssmReceiverPort = 44900;
private int ssmBufferSize = 500;

//SRM
private final String SRM_START_FLAG = "001d";
private String kafkaTopicOdeSrmTxPojo = "topic.OdeSrmTxPojo";
private String kafkaTopicOdeSrmJson = "topic.OdeSrmJson";
private String kafkaTopicOdeRawEncodedSRMJson = "topic.OdeRawEncodedSRMJson";
private int srmReceiverPort = 44930;
private int srmBufferSize = 500;

//MAP
private final String MAP_START_FLAG = "0012";
private String kafkaTopicOdeRawEncodedMAPJson = "topic.OdeRawEncodedMAPJson";
private String kafkaTopicOdeMapTxPojo = "topic.OdeMapTxPojo";
private String kafkaTopicOdeMapJson = "topic.OdeMapJson";
private int mapReceiverPort = 44920;
private int mapBufferSize = 2048;

// PSM
private final String PSM_START_FLAG = "0020";
private String kafkaTopicOdeRawEncodedPSMJson = "topic.OdeRawEncodedPSMJson";
private String kafkaTopicOdePsmTxPojo = "topic.OdePsmTxPojo";
private String kafkaTopicOdePsmJson = "topic.OdePsmJson";
Expand Down Expand Up @@ -385,6 +392,10 @@ public void setVerboseJson(Boolean verboseJson) {
this.verboseJson = verboseJson;
}

public String getBsmStartFlag() {
return BSM_START_FLAG;
}

public int getBsmReceiverPort() {
return bsmReceiverPort;
}
Expand All @@ -401,6 +412,10 @@ public void setBsmBufferSize(int bsmBufferSize) {
this.bsmBufferSize = bsmBufferSize;
}

public String getTimStartFlag() {
return TIM_START_FLAG;
}

public int getTimReceiverPort() {
return timReceiverPort;
}
Expand All @@ -417,6 +432,10 @@ public void setTimBufferSize(int timBufferSize) {
this.timBufferSize = timBufferSize;
}

public String getSsmStartFlag() {
return SSM_START_FLAG;
}

public int getSsmReceiverPort() {
return ssmReceiverPort;
}
Expand All @@ -433,6 +452,10 @@ public void setSsmBufferSize(int ssmBufferSize) {
this.ssmBufferSize = ssmBufferSize;
}

public String getSrmStartFlag() {
return SRM_START_FLAG;
}

public int getSrmReceiverPort() {
return srmReceiverPort;
}
Expand All @@ -449,6 +472,10 @@ public void setSrmBufferSize(int srmBufferSize) {
this.srmBufferSize = srmBufferSize;
}

public String getSpatStartFlag() {
return SPAT_START_FLAG;
}

public int getSpatReceiverPort() {
return spatReceiverPort;
}
Expand All @@ -465,6 +492,10 @@ public void setSpatBufferSize(int spatBufferSize) {
this.spatBufferSize = spatBufferSize;
}

public String getMapStartFlag() {
return MAP_START_FLAG;
}

public int getMapReceiverPort() {
return mapReceiverPort;
}
Expand All @@ -481,6 +512,10 @@ public void setMapBufferSize(int mapBufferSize) {
this.mapBufferSize = mapBufferSize;
}

public String getPsmStartFlag() {
return PSM_START_FLAG;
}

public int getPsmReceiverPort() {
return psmReceiverPort;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,76 +156,57 @@ public boolean isSpatRecord() {

private void publishList(XmlUtils xmlUtils, List<OdeData> dataList) throws JsonProcessingException {
serialId.setBundleSize(dataList.size());

for (OdeData odeData : dataList) {
OdeLogMetadata msgMetadata = (OdeLogMetadata) odeData.getMetadata();
OdeMsgPayload msgPayload = (OdeMsgPayload) odeData.getPayload();
msgMetadata.setSerialId(serialId);

if (isDriverAlertRecord()) {
logger.debug("Publishing a driverAlert.");

publisher.publish(JsonUtils.toJson(odeData, false),
publisher.getOdeProperties().getKafkaTopicDriverAlertJson());
} else if (isBsmRecord()) {
publisher.publish(JsonUtils.toJson(odeData, false),
publisher.getOdeProperties().getKafkaTopicOdeRawEncodedBSMJson());
} else if (isSpatRecord()) {
publisher.publish(JsonUtils.toJson(odeData, false),
publisher.getOdeProperties().getKafkaTopicOdeRawEncodedSPATJson());
} else {
if (isBsmRecord()) {
logger.debug("Publishing a BSM");
} else if (isSpatRecord()) {
logger.debug("Publishing a Spat");
} else {
logger.debug("Publishing a TIM or MAP");
}

if (!(isSpatRecord() && msgMetadata instanceof OdeSpatMetadata
&& !((OdeSpatMetadata) msgMetadata).getIsCertPresent())) {
if (checkHeader(msgPayload) == "Ieee1609Dot2Data") {
Asn1Encoding msgEncoding = new Asn1Encoding("root", "Ieee1609Dot2Data", EncodingRule.COER);
msgMetadata.addEncoding(msgEncoding);
}
// Determine if it is a MAP or a TIM message
String msgtype = isMapOrTim(msgPayload);
if (msgtype == "MAP") {
publisher.publish(JsonUtils.toJson(odeData, false),
publisher.getOdeProperties().getKafkaTopicOdeRawEncodedMAPJson());
} else if (msgtype == "TIM") {
publisher.publish(JsonUtils.toJson(odeData, false),
publisher.getOdeProperties().getKafkaTopicOdeRawEncodedTIMJson());
}

Asn1Encoding unsecuredDataEncoding = new Asn1Encoding("unsecuredData", "MessageFrame",
EncodingRule.UPER);
msgMetadata.addEncoding(unsecuredDataEncoding);

publisher.publish(xmlUtils.toXml(odeData),
publisher.getOdeProperties().getKafkaTopicAsn1DecoderInput());
}
serialId.increment();
}

serialId.increment();
}


/**
* Checks the header of the OdeMsgPayload and determines the encoding rule to be used in the Asn1DecoderInput XML.
* The payload is checked for various message start flags. It will add the encoding rule to the Asn1DecoderInput XML
* to tell the ASN1 codec to extract data from the header into the output message.
* Checks the payload to see if the message is a Map or TIM message
*
* @param payload The OdeMsgPayload to check the header of.
* @return The encoding rule to be used in the Asn1DecoderInput XML.
* @param payload The OdeMsgPayload to check the content of.
*/
public String checkHeader(OdeMsgPayload payload) {
JSONObject payloadJson;
String header = null;
public String isMapOrTim(OdeMsgPayload payload) {
try {
payloadJson = JsonUtils.toJSONObject(payload.getData().toJson());
String hexPacket = payloadJson.getString("bytes");

for (String key : msgStartFlags.keySet()) {
String startFlag = msgStartFlags.get(key);
int startIndex = hexPacket.toLowerCase().indexOf(startFlag);
logger.debug("Start index for " + key + "(" + startFlag + ")" + " is: " + startIndex);
if (startIndex <= 20 && startIndex != 0 && startIndex != -1) {
logger.debug("Message has supported Ieee1609Dot2Data header, adding encoding rule to Asn1DecoderInput XML");
header = "Ieee1609Dot2Data";
break;
}
logger.debug("Payload JSON: " + payloadJson);
}
JSONObject payloadJson = JsonUtils.toJSONObject(payload.getData().toJson());
String hexString = payloadJson.getString("bytes").toLowerCase();
int mapStartIndex = hexString.indexOf(msgStartFlags.get("MAP"));
int timStartIndex = hexString.indexOf(msgStartFlags.get("TIM"));

if (mapStartIndex != -1)
return "MAP";
else if (timStartIndex != -1)
return "TIM";
} catch (JsonUtilsException e) {
logger.error("JsonUtilsException while checking message header. Stacktrace: " + e.toString());

}
return header;
return "";
}

// This method will check if the next character is a newline character (0x0A in hex or 10 in converted decimal)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,6 @@ public class PayloadParser extends LogFileParser {
private static Logger logger = LoggerFactory.getLogger(PayloadParser.class);
private static HashMap<String, String> msgStartFlags = new HashMap<String, String>();

// Maximum header size for 1609 payload headers
private static final int HEADER_SIZE_1609 = 20;

// Maximum header size for WSMP payload headers
private static final int HEADER_SIZE_WSMP = 35;

public static final int PAYLOAD_LENGTH = 2;

protected short payloadLength;
Expand All @@ -63,7 +57,6 @@ public ParserStatus parseFile(BufferedInputStream bis, String fileName) throws F
if (status != ParserStatus.COMPLETE)
return status;
short length = CodecUtils.bytesToShort(readBuffer, 0, PAYLOAD_LENGTH, ByteOrder.LITTLE_ENDIAN);
logger.debug("Payload length is: " + length);
setPayloadLength(length);
}

Expand All @@ -72,7 +65,7 @@ public ParserStatus parseFile(BufferedInputStream bis, String fileName) throws F
status = parseStep(bis, getPayloadLength());
if (status != ParserStatus.COMPLETE)
return status;
setPayload(removeHeader(Arrays.copyOf(readBuffer, getPayloadLength())));
setPayload(stripDot3Header(Arrays.copyOf(readBuffer, getPayloadLength())));
}

resetStep();
Expand Down Expand Up @@ -104,43 +97,41 @@ public LogFileParser setPayload(byte[] payload) {
return this;
}

@Override
public void writeTo(OutputStream os) throws IOException {
os.write(CodecUtils.shortToBytes(payloadLength, ByteOrder.LITTLE_ENDIAN));
os.write(payload, 0, payloadLength);
}
@Override
public void writeTo(OutputStream os) throws IOException {
os.write(CodecUtils.shortToBytes(payloadLength, ByteOrder.LITTLE_ENDIAN));
os.write(payload, 0, payloadLength);
}


// Removes the 1609.2 header but will keep the 1609.1 header
public byte[] removeHeader(byte[] packet) {
String hexPacket = HexUtils.toHexString(packet);
/* Strips the 1609.3 and unsigned 1609.2 headers if they are present.
Will return the payload with a signed 1609.2 header if it is present.
Otherwise, returns just the payload. */
public byte[] stripDot3Header(byte[] packet) {
String hexString = HexUtils.toHexString(packet);
String hexPacketParsed = "";
for (String key : msgStartFlags.keySet()) {
String startFlag = msgStartFlags.get(key);
int startIndex = hexPacket.toLowerCase().indexOf(startFlag);
if (hexPacketParsed.equals("")) {
logger.debug("Start index for: " + key + " is: " + startIndex);
if (startIndex == -1) {
logger.debug("Message does not have header for: " + key);
continue;
} else if (startIndex <= HEADER_SIZE_1609) {
logger.debug("Message has supported header. startIndex: " + startIndex + " msgFlag: " + startFlag);
hexPacketParsed = hexPacket;
// If the header type is WSMP, the header will be stripped from the payload.
} else if (startIndex > HEADER_SIZE_1609 && startIndex < HEADER_SIZE_WSMP) {
int trueStartIndex = HEADER_SIZE_1609
+ hexPacket.substring(HEADER_SIZE_1609, hexPacket.length()).indexOf(startFlag);
logger.debug("Found payload start at: " + trueStartIndex);
hexPacketParsed = hexPacket.substring(trueStartIndex, hexPacket.length());
}
}

for (String start_flag : msgStartFlags.values()) {
int payloadStartIndex = hexString.indexOf(start_flag);
if (payloadStartIndex == -1)
continue;

String headers = hexString.substring(0, payloadStartIndex);
String payload = hexString.substring(payloadStartIndex, hexString.length());
// Look for the index of the start flag of a signed 1609.2 header, if one exists
int signedDot2StartIndex = headers.indexOf("038100");
if (signedDot2StartIndex == -1)
hexPacketParsed = payload;
else
hexPacketParsed = headers.substring(signedDot2StartIndex, headers.length()) + payload;
break;
}

if (hexPacketParsed.equals("")) {
hexPacketParsed = hexPacket;
logger.debug("Could not identify a Header in the following packet: " + hexPacketParsed);
} else {
logger.debug("Payload hex: " + hexPacketParsed);
hexPacketParsed = hexString;
logger.debug("Packet is not a BSM, TIM or Map message: " + hexPacketParsed);
}

return HexUtils.fromHexString(hexPacketParsed);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,21 @@ public abstract class AbstractAsn1DecodeMessageJSON extends AbstractSubscriberPr
private Logger logger = LoggerFactory.getLogger(this.getClass());

protected StringPublisher codecPublisher;
protected String payload_start_flag;

public AbstractAsn1DecodeMessageJSON() {
super();
}

public AbstractAsn1DecodeMessageJSON(StringPublisher codecPublisher) {
public AbstractAsn1DecodeMessageJSON(StringPublisher codecPublisher, String payload_start_flag) {
super();
this.codecPublisher = codecPublisher;
this.payload_start_flag = payload_start_flag;
}

// Strips the IEEE 1609.2 security header (if it exists) and returns the payload
protected String stripDot2Header(String hexString) {
return hexString.substring(hexString.indexOf(payload_start_flag), hexString.length());
}

protected void publishEncodedMessageToAsn1Decoder(OdeData odeData) {
Expand Down
Loading

0 comments on commit 87623c9

Please sign in to comment.