Skip to content

Commit

Permalink
initial commit with working ingest on bsm type messages
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael7371 committed Sep 21, 2023
1 parent 4cf0908 commit 36de40d
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.ArrayList;
import java.util.List;

import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -47,6 +48,7 @@
import us.dot.its.jpo.ode.model.SerialId;
import us.dot.its.jpo.ode.util.JsonUtils;
import us.dot.its.jpo.ode.util.XmlUtils;
import us.dot.its.jpo.ode.util.JsonUtils.JsonUtilsException;

public class LogFileToAsn1CodecPublisher implements Asn1CodecPublisher {

Expand All @@ -61,6 +63,7 @@ public LogFileToAsn1CodecPublisherException(String string, Exception e) {
}

protected static final Logger logger = LoggerFactory.getLogger(LogFileToAsn1CodecPublisher.class);
private static final String BSM_START_FLAG = "0014";

protected StringPublisher publisher;
protected LogFileParser fileParser;
Expand Down Expand Up @@ -146,6 +149,7 @@ private void publishList(XmlUtils xmlUtils, List<OdeData> dataList) throws JsonP
serialId.setBundleSize(dataList.size());
for (OdeData odeData : dataList) {
OdeLogMetadata msgMetadata = (OdeLogMetadata) odeData.getMetadata();
OdeMsgPayload msgPayload = (OdeMsgPayload) odeData.getPayload();
msgMetadata.setSerialId(serialId);

if (isDriverAlertRecord()) {
Expand All @@ -165,14 +169,13 @@ private void publishList(XmlUtils xmlUtils, List<OdeData> dataList) throws JsonP
}

if(isSpatRecord() && msgMetadata instanceof OdeSpatMetadata
&& !((OdeSpatMetadata)msgMetadata).getIsCertPresent() )
{
&& !((OdeSpatMetadata)msgMetadata).getIsCertPresent() ) {
//Nothing: If Spat log file and IEEE1609Cert is not present, Skip the Ieee1609Dot2Data encoding
}
else
{
Asn1Encoding msgEncoding = new Asn1Encoding("root", "Ieee1609Dot2Data", EncodingRule.COER);
msgMetadata.addEncoding(msgEncoding);
} else {
if (checkHeader(msgPayload) == "Ieee1609Dot2Data"){
Asn1Encoding msgEncoding = new Asn1Encoding("root", "Ieee1609Dot2Data", EncodingRule.COER);
msgMetadata.addEncoding(msgEncoding);
}
}

Asn1Encoding unsecuredDataEncoding = new Asn1Encoding("unsecuredData", "MessageFrame",EncodingRule.UPER);
Expand All @@ -186,4 +189,22 @@ private void publishList(XmlUtils xmlUtils, List<OdeData> dataList) throws JsonP
}
}

public String checkHeader(OdeMsgPayload payload) {
JSONObject payloadJson;
String header = null;
try {
payloadJson = JsonUtils.toJSONObject(payload.getData().toJson());
String hexPacket = payloadJson.getString("bytes");
int startIndex = hexPacket.indexOf(BSM_START_FLAG);
logger.debug("checkHeader hexPacket: " + hexPacket + "\n startIndex:" + startIndex);
if (startIndex < 10 && startIndex != 0){
header = "Ieee1609Dot2Data";
}
} catch (JsonUtilsException e) {
logger.error("JsonUtilsException while checking message header. Stacktrace: " + e.toString());

}
return header;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,12 @@ public void setBsmSource(BsmSource bsmSource) {

public void setBsmSource(byte[] code) {
try {
setBsmSource(BsmSource.values()[code[0]]);
if (code[0] != 10){
setBsmSource(BsmSource.values()[code[0]]);
} else {
logger.debug("Removing newline character");
setStep(1);
}
} catch (Exception e) {
logger.error("Invalid BsmSource: {}. Valid values are {}-{} inclusive",
code, 0, BsmSource.values());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public ParserStatus parseStep(BufferedInputStream bis, int length) throws FilePa
try {
bis.reset();
} catch (IOException ioe) {
throw new FileParserException("Error reseting Input Stream to marked position", ioe);
throw new FileParserException("Error resetting Input Stream to marked position", ioe);
}
}
return ParserStatus.PARTIAL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,21 @@
import java.nio.ByteOrder;
import java.util.Arrays;

import org.apache.tomcat.util.buf.HexUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import us.dot.its.jpo.ode.util.CodecUtils;

public class PayloadParser extends LogFileParser {

private static Logger logger = LoggerFactory.getLogger(PayloadParser.class);

private static final String BSM_START_FLAG = "0014"; // these bytes indicate
// start of BSM payload
private static final int HEADER_MINIMUM_SIZE = 20; // WSMP headers are at
// least 20 bytes long

public static final int PAYLOAD_LENGTH_LENGTH = 2;

protected short payloadLength;
Expand All @@ -44,15 +55,16 @@ public ParserStatus parseFile(BufferedInputStream bis, String fileName) throws F
status = parseStep(bis, PAYLOAD_LENGTH_LENGTH);
if (status != ParserStatus.COMPLETE)
return status;
setPayloadLength(CodecUtils.bytesToShort(readBuffer, 0, PAYLOAD_LENGTH_LENGTH, ByteOrder.LITTLE_ENDIAN));
short length = CodecUtils.bytesToShort(readBuffer, 0, PAYLOAD_LENGTH_LENGTH, ByteOrder.LITTLE_ENDIAN);
setPayloadLength(length);
}

// Step 10 - copy payload bytes
if (getStep() == 1) {
status = parseStep(bis, getPayloadLength());
if (status != ParserStatus.COMPLETE)
return status;
setPayload(Arrays.copyOf(readBuffer, getPayloadLength()));
setPayload(removeHeader(Arrays.copyOf(readBuffer, getPayloadLength())));
}

resetStep();
Expand Down Expand Up @@ -90,4 +102,33 @@ public void writeTo(OutputStream os) throws IOException {
os.write(payload, 0, payloadLength);
}


/**
* Attempts to strip WSMP header bytes. If message starts with "0014",
* message is raw BSM. Otherwise, headers are >= 20 bytes, so look past that
* for start of payload BSM.
*
* @param packet
*/
public byte[] removeHeader(byte[] packet) {
String hexPacket = HexUtils.toHexString(packet);

int startIndex = hexPacket.indexOf(BSM_START_FLAG);
if (startIndex == 0) {
logger.debug("Message is raw BSM with no headers.");
} else if (startIndex == -1) {
logger.error("Message contains no BSM start flag.");
logger.debug("Payload hex: " + hexPacket);
return null;
} else if (startIndex < 10){
logger.error("Message has supported header.");
} else {
// We likely found a message with a header, look past the first 20
// bytes for the start of the BSM
int trueStartIndex = HEADER_MINIMUM_SIZE
+ hexPacket.substring(HEADER_MINIMUM_SIZE, hexPacket.length()).indexOf(BSM_START_FLAG);
hexPacket = hexPacket.substring(trueStartIndex, hexPacket.length());
}
return HexUtils.fromHexString(hexPacket);
}
}
19 changes: 19 additions & 0 deletions scripts/tests/asn1_decoder_input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import socket
import time
import os
from kafka import KafkaProducer

# Currently set to oim-dev environment's ODE
KAFKA_BROKER = "172.29.11.116:9092" # Replace with your Kafka broker address and port
KAFKA_TOPIC = "topic.Asn1DecoderInput" # Replace with the Kafka topic you want to publish to
producer = KafkaProducer(bootstrap_servers=KAFKA_BROKER)

# MESSAGE = "<OdeAsn1Data><metadata><bsmSource>EV</bsmSource><logFileName>bsmTx.gz</logFileName><recordType>bsmTx</recordType><securityResultCode>success</securityResultCode><receivedMessageDetails><locationData><latitude>40.5659199</latitude><longitude>-105.031773</longitude><elevation>1456.1</elevation><speed>0.28</speed><heading>268.275</heading></locationData><rxSource>NA</rxSource></receivedMessageDetails><encodings><encodings><elementName>root</elementName><elementType>Ieee1609Dot2Data</elementType><encodingRule>COER</encodingRule></encodings><encodings><elementName>unsecuredData</elementName><elementType>MessageFrame</elementType><encodingRule>UPER</encodingRule></encodings></encodings><payloadType>us.dot.its.jpo.ode.model.OdeAsn1Payload</payloadType><serialId><streamId>fffda5e3-f6fc-4007-9a19-8b1e53794754</streamId><bundleSize>16</bundleSize><bundleId>0</bundleId><recordId>7</recordId><serialNumber>7</serialNumber></serialId><odeReceivedAt>2023-09-18T23:18:26.489467Z</odeReceivedAt><schemaVersion>6</schemaVersion><maxDurationTime>0</maxDurationTime><recordGeneratedAt>2018-05-01T15:13:55.396Z</recordGeneratedAt><recordGeneratedBy>OBU</recordGeneratedBy><sanitized>false</sanitized><odePacketID/><odeTimStartDateTime/><originIp/></metadata><payload><dataType>us.dot.its.jpo.ode.model.OdeHexByteArray</dataType><data><bytes>0014465284a9ea8c4f2326e260f5965c652f25311414100070000000fdfa1fa1007fff80005a0fa0007cc040ff2b4037ef71fffc0fe6bc044afcbfffc0fe783e940e3bfffcfffec800400120000235da5fd7f72880afc46273f760137e80834179d2ce9abefb3aaddf73892ee65aba28109def53b15481d278542634a1d72d66c9c9d1eca86ef845f46ce16a9e755726247deb3cf024c3aa48532db346d3d50000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000</bytes></data></payload></OdeAsn1Data>"
# MESSAGE = "<OdeAsn1Data><metadata><bsmSource>RV</bsmSource><logFileName/><recordType>bsmTx</recordType><securityResultCode>unknown</securityResultCode><receivedMessageDetails><locationData><latitude/><longitude/><elevation/><speed/><heading/></locationData><rxSource>RV</rxSource></receivedMessageDetails><encodings><encodings><elementName>root</elementName><elementType>Ieee1609Dot2Data</elementType><encodingRule>COER</encodingRule></encodings><encodings><elementName>unsecuredData</elementName><elementType>MessageFrame</elementType><encodingRule>UPER</encodingRule></encodings></encodings><payloadType>us.dot.its.jpo.ode.model.OdeAsn1Payload</payloadType><serialId><streamId>615961ac-58f1-42ac-98e1-8abbb0d3b131</streamId><bundleSize>1</bundleSize><bundleId>0</bundleId><recordId>0</recordId><serialNumber>0</serialNumber></serialId><odeReceivedAt>2023-09-19T17:07:30.847914Z</odeReceivedAt><schemaVersion>6</schemaVersion><maxDurationTime>0</maxDurationTime><recordGeneratedAt/><recordGeneratedBy/><sanitized>false</sanitized><odePacketID/><odeTimStartDateTime/><originIp>172.30.0.1</originIp></metadata><payload><dataType>us.dot.its.jpo.ode.model.OdeHexByteArray</dataType><data><bytes>0014464744A9EA8C6C8826E260F8965C653C253A1414100070000000FDFA1FA1007FFF80005A0FA0007CC040FF2E403B2F97FFFC0FE6EC047EFF1FFFC0FE7B3E974E5FFFFCFFFEC800400120000235DA5E0F396080AFC46273F760137E80837276D28AD605B71BA6F8F737702A2ABC0C566785DCC3DFAE23B414DE7144924C5FA5C4BDF2F389263EDB16E4108BDB930AA8F507C4E0418732E0805A707B6161</bytes></data></payload></OdeAsn1Data>"

MESSAGE = "<OdeAsn1Data><metadata><bsmSource>EV</bsmSource><logFileName>bsmTx_2023-09-18T22%3A41%3A25.374_2229401004593.gz</logFileName><recordType>bsmTx</recordType><securityResultCode>unknown</securityResultCode><receivedMessageDetails><locationData><latitude>40.4740337</latitude><longitude>-104.9692039</longitude><elevation>1496.5</elevation><speed>0</speed><heading>0</heading></locationData><rxSource>NA</rxSource></receivedMessageDetails><encodings><encodings><elementName>unsecuredData</elementName><elementType>MessageFrame</elementType><encodingRule>UPER</encodingRule></encodings></encodings><payloadType>us.dot.its.jpo.ode.model.OdeAsn1Payload</payloadType><serialId><streamId>7074ba3c-042b-48a3-b88e-1e6a82cfe464</streamId><bundleSize>33</bundleSize><bundleId>0</bundleId><recordId>26</recordId><serialNumber>26</serialNumber></serialId><odeReceivedAt>2023-09-18T23:51:35.120811Z</odeReceivedAt><schemaVersion>6</schemaVersion><maxDurationTime>0</maxDurationTime><recordGeneratedAt>2023-09-18T22:45:45.625Z</recordGeneratedAt><recordGeneratedBy>OBU</recordGeneratedBy><sanitized>false</sanitized><odePacketID/><odeTimStartDateTime/><originIp/></metadata><payload><dataType>us.dot.its.jpo.ode.model.OdeHexByteArray</dataType><data><bytes>0014464744A9EA8C6C8826E260F8965C653C253A1414100070000000FDFA1FA1007FFF80005A0FA0007CC040FF2E403B2F97FFFC0FE6EC047EFF1FFFC0FE7B3E974E5FFFFCFFFEC800400120000235DA5E0F396080AFC46273F760137E80837276D28AD605B71BA6F8F737702A2ABC0C566785DCC3DFAE23B414DE7144924C5FA5C4BDF2F389263EDB16E4108BDB930AA8F507C4E0418732E0805A707B6161</bytes></data></payload></OdeAsn1Data>"


MESSAGE_BYTES = MESSAGE.encode('utf-8')
producer.send(KAFKA_TOPIC, value=MESSAGE_BYTES)
producer.close()

0 comments on commit 36de40d

Please sign in to comment.