From 36de40d2bc3aaebfe2c53b6f68c2403bb5e54f3d Mon Sep 17 00:00:00 2001 From: Michael7371 <40476797+Michael7371@users.noreply.github.com> Date: Thu, 21 Sep 2023 15:20:56 -0600 Subject: [PATCH] initial commit with working ingest on bsm type messages --- .../stream/LogFileToAsn1CodecPublisher.java | 35 ++++++++++++--- .../ode/importer/parser/BsmLogFileParser.java | 7 ++- .../ode/importer/parser/LogFileParser.java | 2 +- .../ode/importer/parser/PayloadParser.java | 45 ++++++++++++++++++- scripts/tests/asn1_decoder_input.py | 19 ++++++++ 5 files changed, 97 insertions(+), 11 deletions(-) create mode 100644 scripts/tests/asn1_decoder_input.py diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/coder/stream/LogFileToAsn1CodecPublisher.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/coder/stream/LogFileToAsn1CodecPublisher.java index 4d8768f01..f103045f1 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/coder/stream/LogFileToAsn1CodecPublisher.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/coder/stream/LogFileToAsn1CodecPublisher.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.List; +import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +48,7 @@ import us.dot.its.jpo.ode.model.SerialId; import us.dot.its.jpo.ode.util.JsonUtils; import us.dot.its.jpo.ode.util.XmlUtils; +import us.dot.its.jpo.ode.util.JsonUtils.JsonUtilsException; public class LogFileToAsn1CodecPublisher implements Asn1CodecPublisher { @@ -61,6 +63,7 @@ public LogFileToAsn1CodecPublisherException(String string, Exception e) { } protected static final Logger logger = LoggerFactory.getLogger(LogFileToAsn1CodecPublisher.class); + private static final String BSM_START_FLAG = "0014"; protected StringPublisher publisher; protected LogFileParser fileParser; @@ -146,6 +149,7 @@ private void publishList(XmlUtils xmlUtils, List dataList) throws JsonP serialId.setBundleSize(dataList.size()); for (OdeData odeData : dataList) { OdeLogMetadata msgMetadata = (OdeLogMetadata) odeData.getMetadata(); + OdeMsgPayload msgPayload = (OdeMsgPayload) odeData.getPayload(); msgMetadata.setSerialId(serialId); if (isDriverAlertRecord()) { @@ -165,14 +169,13 @@ private void publishList(XmlUtils xmlUtils, List dataList) throws JsonP } if(isSpatRecord() && msgMetadata instanceof OdeSpatMetadata - && !((OdeSpatMetadata)msgMetadata).getIsCertPresent() ) - { + && !((OdeSpatMetadata)msgMetadata).getIsCertPresent() ) { //Nothing: If Spat log file and IEEE1609Cert is not present, Skip the Ieee1609Dot2Data encoding - } - else - { - Asn1Encoding msgEncoding = new Asn1Encoding("root", "Ieee1609Dot2Data", EncodingRule.COER); - msgMetadata.addEncoding(msgEncoding); + } else { + if (checkHeader(msgPayload) == "Ieee1609Dot2Data"){ + Asn1Encoding msgEncoding = new Asn1Encoding("root", "Ieee1609Dot2Data", EncodingRule.COER); + msgMetadata.addEncoding(msgEncoding); + } } Asn1Encoding unsecuredDataEncoding = new Asn1Encoding("unsecuredData", "MessageFrame",EncodingRule.UPER); @@ -186,4 +189,22 @@ private void publishList(XmlUtils xmlUtils, List dataList) throws JsonP } } + public String checkHeader(OdeMsgPayload payload) { + JSONObject payloadJson; + String header = null; + try { + payloadJson = JsonUtils.toJSONObject(payload.getData().toJson()); + String hexPacket = payloadJson.getString("bytes"); + int startIndex = hexPacket.indexOf(BSM_START_FLAG); + logger.debug("checkHeader hexPacket: " + hexPacket + "\n startIndex:" + startIndex); + if (startIndex < 10 && startIndex != 0){ + header = "Ieee1609Dot2Data"; + } + } catch (JsonUtilsException e) { + logger.error("JsonUtilsException while checking message header. Stacktrace: " + e.toString()); + + } + return header; + } + } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/importer/parser/BsmLogFileParser.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/importer/parser/BsmLogFileParser.java index 392f10143..bc766cc42 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/importer/parser/BsmLogFileParser.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/importer/parser/BsmLogFileParser.java @@ -100,7 +100,12 @@ public void setBsmSource(BsmSource bsmSource) { public void setBsmSource(byte[] code) { try { - setBsmSource(BsmSource.values()[code[0]]); + if (code[0] != 10){ + setBsmSource(BsmSource.values()[code[0]]); + } else { + logger.debug("Removing newline character"); + setStep(1); + } } catch (Exception e) { logger.error("Invalid BsmSource: {}. Valid values are {}-{} inclusive", code, 0, BsmSource.values()); diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/importer/parser/LogFileParser.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/importer/parser/LogFileParser.java index f7189134c..07fe0059f 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/importer/parser/LogFileParser.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/importer/parser/LogFileParser.java @@ -118,7 +118,7 @@ public ParserStatus parseStep(BufferedInputStream bis, int length) throws FilePa try { bis.reset(); } catch (IOException ioe) { - throw new FileParserException("Error reseting Input Stream to marked position", ioe); + throw new FileParserException("Error resetting Input Stream to marked position", ioe); } } return ParserStatus.PARTIAL; diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/importer/parser/PayloadParser.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/importer/parser/PayloadParser.java index f9eb83291..8fd92bcc3 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/importer/parser/PayloadParser.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/importer/parser/PayloadParser.java @@ -21,10 +21,21 @@ import java.nio.ByteOrder; import java.util.Arrays; +import org.apache.tomcat.util.buf.HexUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import us.dot.its.jpo.ode.util.CodecUtils; public class PayloadParser extends LogFileParser { + private static Logger logger = LoggerFactory.getLogger(PayloadParser.class); + + private static final String BSM_START_FLAG = "0014"; // these bytes indicate + // start of BSM payload + private static final int HEADER_MINIMUM_SIZE = 20; // WSMP headers are at + // least 20 bytes long + public static final int PAYLOAD_LENGTH_LENGTH = 2; protected short payloadLength; @@ -44,7 +55,8 @@ public ParserStatus parseFile(BufferedInputStream bis, String fileName) throws F status = parseStep(bis, PAYLOAD_LENGTH_LENGTH); if (status != ParserStatus.COMPLETE) return status; - setPayloadLength(CodecUtils.bytesToShort(readBuffer, 0, PAYLOAD_LENGTH_LENGTH, ByteOrder.LITTLE_ENDIAN)); + short length = CodecUtils.bytesToShort(readBuffer, 0, PAYLOAD_LENGTH_LENGTH, ByteOrder.LITTLE_ENDIAN); + setPayloadLength(length); } // Step 10 - copy payload bytes @@ -52,7 +64,7 @@ public ParserStatus parseFile(BufferedInputStream bis, String fileName) throws F status = parseStep(bis, getPayloadLength()); if (status != ParserStatus.COMPLETE) return status; - setPayload(Arrays.copyOf(readBuffer, getPayloadLength())); + setPayload(removeHeader(Arrays.copyOf(readBuffer, getPayloadLength()))); } resetStep(); @@ -90,4 +102,33 @@ public void writeTo(OutputStream os) throws IOException { os.write(payload, 0, payloadLength); } + + /** + * Attempts to strip WSMP header bytes. If message starts with "0014", + * message is raw BSM. Otherwise, headers are >= 20 bytes, so look past that + * for start of payload BSM. + * + * @param packet + */ + public byte[] removeHeader(byte[] packet) { + String hexPacket = HexUtils.toHexString(packet); + + int startIndex = hexPacket.indexOf(BSM_START_FLAG); + if (startIndex == 0) { + logger.debug("Message is raw BSM with no headers."); + } else if (startIndex == -1) { + logger.error("Message contains no BSM start flag."); + logger.debug("Payload hex: " + hexPacket); + return null; + } else if (startIndex < 10){ + logger.error("Message has supported header."); + } else { + // We likely found a message with a header, look past the first 20 + // bytes for the start of the BSM + int trueStartIndex = HEADER_MINIMUM_SIZE + + hexPacket.substring(HEADER_MINIMUM_SIZE, hexPacket.length()).indexOf(BSM_START_FLAG); + hexPacket = hexPacket.substring(trueStartIndex, hexPacket.length()); + } + return HexUtils.fromHexString(hexPacket); + } } diff --git a/scripts/tests/asn1_decoder_input.py b/scripts/tests/asn1_decoder_input.py new file mode 100644 index 000000000..9a196afd5 --- /dev/null +++ b/scripts/tests/asn1_decoder_input.py @@ -0,0 +1,19 @@ +import socket +import time +import os +from kafka import KafkaProducer + +# Currently set to oim-dev environment's ODE +KAFKA_BROKER = "172.29.11.116:9092" # Replace with your Kafka broker address and port +KAFKA_TOPIC = "topic.Asn1DecoderInput" # Replace with the Kafka topic you want to publish to +producer = KafkaProducer(bootstrap_servers=KAFKA_BROKER) + +# MESSAGE = "EVbsmTx.gzbsmTxsuccess40.5659199-105.0317731456.10.28268.275NArootIeee1609Dot2DataCOERunsecuredDataMessageFrameUPERus.dot.its.jpo.ode.model.OdeAsn1Payloadfffda5e3-f6fc-4007-9a19-8b1e53794754160772023-09-18T23:18:26.489467Z602018-05-01T15:13:55.396ZOBUfalseus.dot.its.jpo.ode.model.OdeHexByteArray0014465284a9ea8c4f2326e260f5965c652f25311414100070000000fdfa1fa1007fff80005a0fa0007cc040ff2b4037ef71fffc0fe6bc044afcbfffc0fe783e940e3bfffcfffec800400120000235da5fd7f72880afc46273f760137e80834179d2ce9abefb3aaddf73892ee65aba28109def53b15481d278542634a1d72d66c9c9d1eca86ef845f46ce16a9e755726247deb3cf024c3aa48532db346d3d50000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000" +# MESSAGE = "RVbsmTxunknownRVrootIeee1609Dot2DataCOERunsecuredDataMessageFrameUPERus.dot.its.jpo.ode.model.OdeAsn1Payload615961ac-58f1-42ac-98e1-8abbb0d3b13110002023-09-19T17:07:30.847914Z60false172.30.0.1us.dot.its.jpo.ode.model.OdeHexByteArray0014464744A9EA8C6C8826E260F8965C653C253A1414100070000000FDFA1FA1007FFF80005A0FA0007CC040FF2E403B2F97FFFC0FE6EC047EFF1FFFC0FE7B3E974E5FFFFCFFFEC800400120000235DA5E0F396080AFC46273F760137E80837276D28AD605B71BA6F8F737702A2ABC0C566785DCC3DFAE23B414DE7144924C5FA5C4BDF2F389263EDB16E4108BDB930AA8F507C4E0418732E0805A707B6161" + +MESSAGE = "EVbsmTx_2023-09-18T22%3A41%3A25.374_2229401004593.gzbsmTxunknown40.4740337-104.96920391496.500NAunsecuredDataMessageFrameUPERus.dot.its.jpo.ode.model.OdeAsn1Payload7074ba3c-042b-48a3-b88e-1e6a82cfe46433026262023-09-18T23:51:35.120811Z602023-09-18T22:45:45.625ZOBUfalseus.dot.its.jpo.ode.model.OdeHexByteArray0014464744A9EA8C6C8826E260F8965C653C253A1414100070000000FDFA1FA1007FFF80005A0FA0007CC040FF2E403B2F97FFFC0FE6EC047EFF1FFFC0FE7B3E974E5FFFFCFFFEC800400120000235DA5E0F396080AFC46273F760137E80837276D28AD605B71BA6F8F737702A2ABC0C566785DCC3DFAE23B414DE7144924C5FA5C4BDF2F389263EDB16E4108BDB930AA8F507C4E0418732E0805A707B6161" + + +MESSAGE_BYTES = MESSAGE.encode('utf-8') +producer.send(KAFKA_TOPIC, value=MESSAGE_BYTES) +producer.close()