diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/OdeProperties.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/OdeProperties.java index 98c57adf7..b32674343 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/OdeProperties.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/OdeProperties.java @@ -119,6 +119,7 @@ public class OdeProperties implements EnvironmentAware { private Set kafkaTopicsDisabledSet = new HashSet<>(); // BSM + private final String BSM_START_FLAG = "0014"; private String kafkaTopicOdeBsmPojo = "topic.OdeBsmPojo"; private String kafkaTopicOdeBsmJson = "topic.OdeBsmJson"; private String kafkaTopicOdeBsmRxPojo = "topic.OdeBsmRxPojo"; @@ -130,6 +131,7 @@ public class OdeProperties implements EnvironmentAware { private int bsmBufferSize = 500; // TIM + private final String TIM_START_FLAG = "001f"; private String kafkaTopicOdeTimJson = "topic.OdeTimJson"; private String kafkaTopicOdeDNMsgJson = "topic.OdeDNMsgJson"; private String kafkaTopicOdeTimRxJson = "topic.OdeTimRxJson"; @@ -142,6 +144,7 @@ public class OdeProperties implements EnvironmentAware { private int timBufferSize = 500; //SPAT + private final String SPAT_START_FLAG = "0013"; private String kafkaTopicOdeSpatTxPojo = "topic.OdeSpatTxPojo"; private String kafkaTopicOdeSpatPojo = "topic.OdeSpatPojo"; private String kafkaTopicOdeSpatJson = "topic.OdeSpatJson"; @@ -150,9 +153,10 @@ public class OdeProperties implements EnvironmentAware { private String kafkaTopicFilteredOdeSpatJson = "topic.FilteredOdeSpatJson"; private String kafkaTopicOdeRawEncodedSPATJson = "topic.OdeRawEncodedSPATJson"; private int spatReceiverPort = 44910; - private int spatBufferSize = 1000; + private int spatBufferSize = 500; //SSM + private final String SSM_START_FLAG = "001e"; private String kafkaTopicOdeSsmPojo = "topic.OdeSsmPojo"; private String kafkaTopicOdeSsmJson = "topic.OdeSsmJson"; private String kafkaTopicOdeRawEncodedSSMJson = "topic.OdeRawEncodedSSMJson"; @@ -160,6 +164,7 @@ public class OdeProperties implements EnvironmentAware { private int ssmBufferSize = 500; //SRM + private final String SRM_START_FLAG = "001d"; private String kafkaTopicOdeSrmTxPojo = "topic.OdeSrmTxPojo"; private String kafkaTopicOdeSrmJson = "topic.OdeSrmJson"; private String kafkaTopicOdeRawEncodedSRMJson = "topic.OdeRawEncodedSRMJson"; @@ -167,6 +172,7 @@ public class OdeProperties implements EnvironmentAware { private int srmBufferSize = 500; //MAP + private final String MAP_START_FLAG = "0012"; private String kafkaTopicOdeRawEncodedMAPJson = "topic.OdeRawEncodedMAPJson"; private String kafkaTopicOdeMapTxPojo = "topic.OdeMapTxPojo"; private String kafkaTopicOdeMapJson = "topic.OdeMapJson"; @@ -174,6 +180,7 @@ public class OdeProperties implements EnvironmentAware { private int mapBufferSize = 2048; // PSM + private final String PSM_START_FLAG = "0020"; private String kafkaTopicOdeRawEncodedPSMJson = "topic.OdeRawEncodedPSMJson"; private String kafkaTopicOdePsmTxPojo = "topic.OdePsmTxPojo"; private String kafkaTopicOdePsmJson = "topic.OdePsmJson"; @@ -385,6 +392,10 @@ public void setVerboseJson(Boolean verboseJson) { this.verboseJson = verboseJson; } + public String getBsmStartFlag() { + return BSM_START_FLAG; + } + public int getBsmReceiverPort() { return bsmReceiverPort; } @@ -401,6 +412,10 @@ public void setBsmBufferSize(int bsmBufferSize) { this.bsmBufferSize = bsmBufferSize; } + public String getTimStartFlag() { + return TIM_START_FLAG; + } + public int getTimReceiverPort() { return timReceiverPort; } @@ -417,6 +432,10 @@ public void setTimBufferSize(int timBufferSize) { this.timBufferSize = timBufferSize; } + public String getSsmStartFlag() { + return SSM_START_FLAG; + } + public int getSsmReceiverPort() { return ssmReceiverPort; } @@ -433,6 +452,10 @@ public void setSsmBufferSize(int ssmBufferSize) { this.ssmBufferSize = ssmBufferSize; } + public String getSrmStartFlag() { + return SRM_START_FLAG; + } + public int getSrmReceiverPort() { return srmReceiverPort; } @@ -449,6 +472,10 @@ public void setSrmBufferSize(int srmBufferSize) { this.srmBufferSize = srmBufferSize; } + public String getSpatStartFlag() { + return SPAT_START_FLAG; + } + public int getSpatReceiverPort() { return spatReceiverPort; } @@ -465,6 +492,10 @@ public void setSpatBufferSize(int spatBufferSize) { this.spatBufferSize = spatBufferSize; } + public String getMapStartFlag() { + return MAP_START_FLAG; + } + public int getMapReceiverPort() { return mapReceiverPort; } @@ -481,6 +512,10 @@ public void setMapBufferSize(int mapBufferSize) { this.mapBufferSize = mapBufferSize; } + public String getPsmStartFlag() { + return PSM_START_FLAG; + } + public int getPsmReceiverPort() { return psmReceiverPort; } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/coder/stream/LogFileToAsn1CodecPublisher.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/coder/stream/LogFileToAsn1CodecPublisher.java index 2b229ec7d..2af4828b8 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/coder/stream/LogFileToAsn1CodecPublisher.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/coder/stream/LogFileToAsn1CodecPublisher.java @@ -156,76 +156,57 @@ public boolean isSpatRecord() { private void publishList(XmlUtils xmlUtils, List dataList) throws JsonProcessingException { serialId.setBundleSize(dataList.size()); + for (OdeData odeData : dataList) { OdeLogMetadata msgMetadata = (OdeLogMetadata) odeData.getMetadata(); OdeMsgPayload msgPayload = (OdeMsgPayload) odeData.getPayload(); msgMetadata.setSerialId(serialId); if (isDriverAlertRecord()) { - logger.debug("Publishing a driverAlert."); - publisher.publish(JsonUtils.toJson(odeData, false), publisher.getOdeProperties().getKafkaTopicDriverAlertJson()); + } else if (isBsmRecord()) { + publisher.publish(JsonUtils.toJson(odeData, false), + publisher.getOdeProperties().getKafkaTopicOdeRawEncodedBSMJson()); + } else if (isSpatRecord()) { + publisher.publish(JsonUtils.toJson(odeData, false), + publisher.getOdeProperties().getKafkaTopicOdeRawEncodedSPATJson()); } else { - if (isBsmRecord()) { - logger.debug("Publishing a BSM"); - } else if (isSpatRecord()) { - logger.debug("Publishing a Spat"); - } else { - logger.debug("Publishing a TIM or MAP"); - } - - if (!(isSpatRecord() && msgMetadata instanceof OdeSpatMetadata - && !((OdeSpatMetadata) msgMetadata).getIsCertPresent())) { - if (checkHeader(msgPayload) == "Ieee1609Dot2Data") { - Asn1Encoding msgEncoding = new Asn1Encoding("root", "Ieee1609Dot2Data", EncodingRule.COER); - msgMetadata.addEncoding(msgEncoding); - } + // Determine if it is a MAP or a TIM message + String msgtype = isMapOrTim(msgPayload); + if (msgtype == "MAP") { + publisher.publish(JsonUtils.toJson(odeData, false), + publisher.getOdeProperties().getKafkaTopicOdeRawEncodedMAPJson()); + } else if (msgtype == "TIM") { + publisher.publish(JsonUtils.toJson(odeData, false), + publisher.getOdeProperties().getKafkaTopicOdeRawEncodedTIMJson()); } - - Asn1Encoding unsecuredDataEncoding = new Asn1Encoding("unsecuredData", "MessageFrame", - EncodingRule.UPER); - msgMetadata.addEncoding(unsecuredDataEncoding); - - publisher.publish(xmlUtils.toXml(odeData), - publisher.getOdeProperties().getKafkaTopicAsn1DecoderInput()); } - serialId.increment(); } + + serialId.increment(); } - /** - * Checks the header of the OdeMsgPayload and determines the encoding rule to be used in the Asn1DecoderInput XML. - * The payload is checked for various message start flags. It will add the encoding rule to the Asn1DecoderInput XML - * to tell the ASN1 codec to extract data from the header into the output message. + * Checks the payload to see if the message is a Map or TIM message * - * @param payload The OdeMsgPayload to check the header of. - * @return The encoding rule to be used in the Asn1DecoderInput XML. + * @param payload The OdeMsgPayload to check the content of. */ - public String checkHeader(OdeMsgPayload payload) { - JSONObject payloadJson; - String header = null; + public String isMapOrTim(OdeMsgPayload payload) { try { - payloadJson = JsonUtils.toJSONObject(payload.getData().toJson()); - String hexPacket = payloadJson.getString("bytes"); - - for (String key : msgStartFlags.keySet()) { - String startFlag = msgStartFlags.get(key); - int startIndex = hexPacket.toLowerCase().indexOf(startFlag); - logger.debug("Start index for " + key + "(" + startFlag + ")" + " is: " + startIndex); - if (startIndex <= 20 && startIndex != 0 && startIndex != -1) { - logger.debug("Message has supported Ieee1609Dot2Data header, adding encoding rule to Asn1DecoderInput XML"); - header = "Ieee1609Dot2Data"; - break; - } - logger.debug("Payload JSON: " + payloadJson); - } + JSONObject payloadJson = JsonUtils.toJSONObject(payload.getData().toJson()); + String hexString = payloadJson.getString("bytes").toLowerCase(); + int mapStartIndex = hexString.indexOf(msgStartFlags.get("MAP")); + int timStartIndex = hexString.indexOf(msgStartFlags.get("TIM")); + + if (mapStartIndex != -1) + return "MAP"; + else if (timStartIndex != -1) + return "TIM"; } catch (JsonUtilsException e) { logger.error("JsonUtilsException while checking message header. Stacktrace: " + e.toString()); - } - return header; + return ""; } // This method will check if the next character is a newline character (0x0A in hex or 10 in converted decimal) diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/importer/parser/PayloadParser.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/importer/parser/PayloadParser.java index 2436c3025..c3467aced 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/importer/parser/PayloadParser.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/importer/parser/PayloadParser.java @@ -33,12 +33,6 @@ public class PayloadParser extends LogFileParser { private static Logger logger = LoggerFactory.getLogger(PayloadParser.class); private static HashMap msgStartFlags = new HashMap(); - // Maximum header size for 1609 payload headers - private static final int HEADER_SIZE_1609 = 20; - - // Maximum header size for WSMP payload headers - private static final int HEADER_SIZE_WSMP = 35; - public static final int PAYLOAD_LENGTH = 2; protected short payloadLength; @@ -63,7 +57,6 @@ public ParserStatus parseFile(BufferedInputStream bis, String fileName) throws F if (status != ParserStatus.COMPLETE) return status; short length = CodecUtils.bytesToShort(readBuffer, 0, PAYLOAD_LENGTH, ByteOrder.LITTLE_ENDIAN); - logger.debug("Payload length is: " + length); setPayloadLength(length); } @@ -72,7 +65,7 @@ public ParserStatus parseFile(BufferedInputStream bis, String fileName) throws F status = parseStep(bis, getPayloadLength()); if (status != ParserStatus.COMPLETE) return status; - setPayload(removeHeader(Arrays.copyOf(readBuffer, getPayloadLength()))); + setPayload(stripDot3Header(Arrays.copyOf(readBuffer, getPayloadLength()))); } resetStep(); @@ -104,43 +97,41 @@ public LogFileParser setPayload(byte[] payload) { return this; } - @Override - public void writeTo(OutputStream os) throws IOException { - os.write(CodecUtils.shortToBytes(payloadLength, ByteOrder.LITTLE_ENDIAN)); - os.write(payload, 0, payloadLength); - } + @Override + public void writeTo(OutputStream os) throws IOException { + os.write(CodecUtils.shortToBytes(payloadLength, ByteOrder.LITTLE_ENDIAN)); + os.write(payload, 0, payloadLength); + } - // Removes the 1609.2 header but will keep the 1609.1 header - public byte[] removeHeader(byte[] packet) { - String hexPacket = HexUtils.toHexString(packet); + /* Strips the 1609.3 and unsigned 1609.2 headers if they are present. + Will return the payload with a signed 1609.2 header if it is present. + Otherwise, returns just the payload. */ + public byte[] stripDot3Header(byte[] packet) { + String hexString = HexUtils.toHexString(packet); String hexPacketParsed = ""; - for (String key : msgStartFlags.keySet()) { - String startFlag = msgStartFlags.get(key); - int startIndex = hexPacket.toLowerCase().indexOf(startFlag); - if (hexPacketParsed.equals("")) { - logger.debug("Start index for: " + key + " is: " + startIndex); - if (startIndex == -1) { - logger.debug("Message does not have header for: " + key); - continue; - } else if (startIndex <= HEADER_SIZE_1609) { - logger.debug("Message has supported header. startIndex: " + startIndex + " msgFlag: " + startFlag); - hexPacketParsed = hexPacket; - // If the header type is WSMP, the header will be stripped from the payload. - } else if (startIndex > HEADER_SIZE_1609 && startIndex < HEADER_SIZE_WSMP) { - int trueStartIndex = HEADER_SIZE_1609 - + hexPacket.substring(HEADER_SIZE_1609, hexPacket.length()).indexOf(startFlag); - logger.debug("Found payload start at: " + trueStartIndex); - hexPacketParsed = hexPacket.substring(trueStartIndex, hexPacket.length()); - } - } + + for (String start_flag : msgStartFlags.values()) { + int payloadStartIndex = hexString.indexOf(start_flag); + if (payloadStartIndex == -1) + continue; + + String headers = hexString.substring(0, payloadStartIndex); + String payload = hexString.substring(payloadStartIndex, hexString.length()); + // Look for the index of the start flag of a signed 1609.2 header, if one exists + int signedDot2StartIndex = headers.indexOf("038100"); + if (signedDot2StartIndex == -1) + hexPacketParsed = payload; + else + hexPacketParsed = headers.substring(signedDot2StartIndex, headers.length()) + payload; + break; } + if (hexPacketParsed.equals("")) { - hexPacketParsed = hexPacket; - logger.debug("Could not identify a Header in the following packet: " + hexPacketParsed); - } else { - logger.debug("Payload hex: " + hexPacketParsed); + hexPacketParsed = hexString; + logger.debug("Packet is not a BSM, TIM or Map message: " + hexPacketParsed); } + return HexUtils.fromHexString(hexPacketParsed); } } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/asn1/message/AbstractAsn1DecodeMessageJSON.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/asn1/message/AbstractAsn1DecodeMessageJSON.java index 1fe61439d..1cff45330 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/asn1/message/AbstractAsn1DecodeMessageJSON.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/asn1/message/AbstractAsn1DecodeMessageJSON.java @@ -14,14 +14,21 @@ public abstract class AbstractAsn1DecodeMessageJSON extends AbstractSubscriberPr private Logger logger = LoggerFactory.getLogger(this.getClass()); protected StringPublisher codecPublisher; + protected String payload_start_flag; public AbstractAsn1DecodeMessageJSON() { super(); } - public AbstractAsn1DecodeMessageJSON(StringPublisher codecPublisher) { + public AbstractAsn1DecodeMessageJSON(StringPublisher codecPublisher, String payload_start_flag) { super(); this.codecPublisher = codecPublisher; + this.payload_start_flag = payload_start_flag; + } + + // Strips the IEEE 1609.2 security header (if it exists) and returns the payload + protected String stripDot2Header(String hexString) { + return hexString.substring(hexString.indexOf(payload_start_flag), hexString.length()); } protected void publishEncodedMessageToAsn1Decoder(OdeData odeData) { diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/asn1/message/Asn1DecodeBSMJSON.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/asn1/message/Asn1DecodeBSMJSON.java index 9abf490ca..6bd39b696 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/asn1/message/Asn1DecodeBSMJSON.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/asn1/message/Asn1DecodeBSMJSON.java @@ -1,12 +1,12 @@ package us.dot.its.jpo.ode.services.asn1.message; -import java.util.Set; - -import org.json.JSONArray; +import org.apache.tomcat.util.buf.HexUtils; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.ObjectMapper; + import us.dot.its.jpo.ode.OdeProperties; import us.dot.its.jpo.ode.coder.StringPublisher; import us.dot.its.jpo.ode.model.Asn1Encoding; @@ -14,91 +14,34 @@ import us.dot.its.jpo.ode.model.OdeAsn1Data; import us.dot.its.jpo.ode.model.OdeAsn1Payload; import us.dot.its.jpo.ode.model.OdeBsmMetadata; -import us.dot.its.jpo.ode.model.OdeBsmMetadata.BsmSource; -import us.dot.its.jpo.ode.model.OdeData; -import us.dot.its.jpo.ode.model.OdeHexByteArray; -import us.dot.its.jpo.ode.model.OdeLogMetadata.RecordType; -import us.dot.its.jpo.ode.model.OdeLogMetadata.SecurityResultCode; -import us.dot.its.jpo.ode.model.OdeLogMsgMetadataLocation; -import us.dot.its.jpo.ode.model.OdeMsgPayload; -import us.dot.its.jpo.ode.model.ReceivedMessageDetails; -import us.dot.its.jpo.ode.model.RxSource; /*** * Encoded message Processor */ public class Asn1DecodeBSMJSON extends AbstractAsn1DecodeMessageJSON { - private static final String BSMContentType = "BsmMessageContent"; - private Logger logger = LoggerFactory.getLogger(this.getClass()); - + private ObjectMapper objectMapper = new ObjectMapper(); public Asn1DecodeBSMJSON(OdeProperties odeProps) { - super(new StringPublisher(odeProps)); + super(new StringPublisher(odeProps), odeProps.getBsmStartFlag()); } @Override protected Object process(String consumedData) { - - OdeData odeData = null; - OdeMsgPayload payload = null; - try { - logger.info("Processing BSM data"); - logger.debug("BSM data: {}", consumedData); - JSONObject rawJSONObject = new JSONObject(consumedData); - Set keys = rawJSONObject.keySet(); - for (Object key : keys) - { - //Send encoded BSM content to Codec service to decode BSM - if (key != null && key.toString().equals(BSMContentType)) { - /**process consumed data { "BsmMessageContent": [{ "metadata": { "utctimestamp:"2020-11-30T23:45:24.913657Z" }, "payload":"001480CF4B950C400022D2666E923D1EA6D4E28957BD55FFFFF001C758FD7E67D07F7FFF8000000002020218E1C1004A40196FBC042210115C030EF1408801021D4074CE7E1848101C5C0806E8E1A50101A84056EE8A1AB4102B840A9ADA21B9010259C08DEE1C1C560FFDDBFC070C0222210018BFCE309623120FFE9BFBB10C8238A0FFDC3F987114241610009BFB7113024780FFAC3F95F13A26800FED93FDD51202C5E0FE17BF9B31202FBAFFFEC87FC011650090019C70808440C83207873800000000001095084081C903447E31C12FC0"}]} - */ - OdeBsmMetadata metadata = null; - - JSONArray rawBSMJsonContentArray = rawJSONObject.getJSONArray(BSMContentType); - for (int i = 0; i < rawBSMJsonContentArray.length(); i++) { - JSONObject rawBSMJsonContent = (JSONObject) rawBSMJsonContentArray.get(i); - String encodedPayload = rawBSMJsonContent.get("payload").toString(); - JSONObject rawmetadata = (JSONObject) rawBSMJsonContent.get("metadata"); - logger.debug("RAW BSM: {}", encodedPayload); - // construct payload - payload = new OdeAsn1Payload(new OdeHexByteArray(encodedPayload)); - - // construct metadata - metadata = new OdeBsmMetadata(payload); - metadata.setOdeReceivedAt(rawmetadata.getString("utctimestamp")); - metadata.setRecordType(RecordType.bsmTx); - metadata.setSecurityResultCode(SecurityResultCode.success); - if (rawmetadata.has("originRsu")) - metadata.setOriginIp(rawmetadata.getString("originRsu")); - - // construct metadata: receivedMessageDetails - ReceivedMessageDetails receivedMessageDetails = new ReceivedMessageDetails(); - receivedMessageDetails.setRxSource(RxSource.RV); - - // construct metadata: locationData - OdeLogMsgMetadataLocation locationData = new OdeLogMsgMetadataLocation(); - receivedMessageDetails.setLocationData(locationData); + JSONObject rawBsmJsonObject = new JSONObject(consumedData); - metadata.setReceivedMessageDetails(receivedMessageDetails); - metadata.setBsmSource(BsmSource.RV); + String jsonStringMetadata = rawBsmJsonObject.get("metadata").toString(); + OdeBsmMetadata metadata = objectMapper.readValue(jsonStringMetadata, OdeBsmMetadata.class); - Asn1Encoding unsecuredDataEncoding = new Asn1Encoding("unsecuredData", "MessageFrame", - EncodingRule.UPER); - metadata.addEncoding(unsecuredDataEncoding); + Asn1Encoding unsecuredDataEncoding = new Asn1Encoding("unsecuredData", "MessageFrame", EncodingRule.UPER); + metadata.addEncoding(unsecuredDataEncoding); - // construct odeData - odeData = new OdeAsn1Data(metadata, payload); + String payloadHexString = ((JSONObject)((JSONObject) rawBsmJsonObject.get("payload")).get("data")).getString("bytes"); + payloadHexString = super.stripDot2Header(payloadHexString); + OdeAsn1Payload payload = new OdeAsn1Payload(HexUtils.fromHexString(payloadHexString)); - publishEncodedMessageToAsn1Decoder(odeData); - } - - } - else { - logger.error("Error received invalid key from consumed message"); - } - } + publishEncodedMessageToAsn1Decoder(new OdeAsn1Data(metadata, payload)); } catch (Exception e) { logger.error("Error publishing to Asn1DecoderInput: {}", e.getMessage()); } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/AbstractUdpReceiverPublisher.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/AbstractUdpReceiverPublisher.java index a4c4745ba..77067c2c4 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/AbstractUdpReceiverPublisher.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/AbstractUdpReceiverPublisher.java @@ -52,4 +52,19 @@ public AbstractUdpReceiverPublisher(OdeProperties odeProps, int port, int buffer logger.error("Error creating socket with port " + this.port, e); } } + + /* Strips the 1609.3 and unsigned 1609.2 headers if they are present. + Will return the payload with a signed 1609.2 header if it is present. + Otherwise, returns just the payload. */ + protected String stripDot3Header(String hexString, String payload_start_flag) { + int payloadStartIndex = hexString.indexOf(payload_start_flag); + String headers = hexString.substring(0, payloadStartIndex); + String payload = hexString.substring(payloadStartIndex, hexString.length()); + // Look for the index of the start flag of a signed 1609.2 header + int signedDot2StartIndex = headers.indexOf("038100"); + if (signedDot2StartIndex == -1) + return payload; + else + return headers.substring(signedDot2StartIndex, headers.length()) + payload; + } } \ No newline at end of file diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/bsm/BsmComparator.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/bsm/BsmComparator.java deleted file mode 100644 index 769ec2a72..000000000 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/bsm/BsmComparator.java +++ /dev/null @@ -1,23 +0,0 @@ -package us.dot.its.jpo.ode.udp.bsm; - -import java.util.Comparator; - -import us.dot.its.jpo.ode.plugin.j2735.J2735Bsm; - -/** - * Comparator for the priority queue to keep the chronological order of bsms - */ -public class BsmComparator implements Comparator { - - @Override - public int compare(J2735Bsm x, J2735Bsm y) { - // TODO - determine message arrival time - // for now we are using the BSM's time offset property - - int xt = x.getCoreData().getSecMark(); - int yt = y.getCoreData().getSecMark(); - - return Integer.compare(xt, yt); - } - -} diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/bsm/BsmReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/bsm/BsmReceiver.java index a61cfff3a..0a089be33 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/bsm/BsmReceiver.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/bsm/BsmReceiver.java @@ -4,27 +4,30 @@ import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; -import org.json.JSONArray; -import org.json.JSONObject; import org.apache.tomcat.util.buf.HexUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import us.dot.its.jpo.ode.coder.StringPublisher; import us.dot.its.jpo.ode.OdeProperties; +import us.dot.its.jpo.ode.coder.StringPublisher; +import us.dot.its.jpo.ode.model.OdeAsn1Data; +import us.dot.its.jpo.ode.model.OdeAsn1Payload; +import us.dot.its.jpo.ode.model.OdeBsmMetadata; +import us.dot.its.jpo.ode.model.OdeBsmMetadata.BsmSource; +import us.dot.its.jpo.ode.model.OdeLogMetadata.RecordType; +import us.dot.its.jpo.ode.model.OdeLogMetadata.SecurityResultCode; +import us.dot.its.jpo.ode.model.OdeLogMsgMetadataLocation; +import us.dot.its.jpo.ode.model.ReceivedMessageDetails; +import us.dot.its.jpo.ode.model.RxSource; import us.dot.its.jpo.ode.udp.AbstractUdpReceiverPublisher; +import us.dot.its.jpo.ode.util.JsonUtils; public class BsmReceiver extends AbstractUdpReceiverPublisher { private static Logger logger = LoggerFactory.getLogger(BsmReceiver.class); - private static final String BSM_START_FLAG = "0014"; // these bytes indicate - // start of BSM payload - private static final int HEADER_MINIMUM_SIZE = 20; // WSMP headers are at - // least 20 bytes long - private StringPublisher bsmPublisher; @Autowired @@ -51,7 +54,7 @@ public void run() { do { try { - logger.debug("Waiting for UDP packets..."); + logger.debug("Waiting for UDP BSM packets..."); socket.receive(packet); if (packet.getLength() > 0) { senderIp = packet.getAddress().getHostAddress(); @@ -59,65 +62,50 @@ public void run() { logger.debug("Packet received from {}:{}", senderIp, senderPort); // extract the actualPacket from the buffer - byte[] payload = removeHeader(packet.getData()); + byte[] payload = packet.getData(); if (payload == null) continue; - String payloadHexString = HexUtils.toHexString(payload); - logger.debug("Packet: {}", payloadHexString); - - // Add header data for the decoding process - ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); - String timestamp = utc.format(DateTimeFormatter.ISO_INSTANT); - - JSONObject metadataObject = new JSONObject(); - metadataObject.put("utctimestamp", timestamp); - metadataObject.put("originRsu", senderIp); - JSONObject messageObject = new JSONObject(); - messageObject.put("metadata", metadataObject); - messageObject.put("payload", payloadHexString); - - JSONArray messageList = new JSONArray(); - messageList.put(messageObject); + // convert bytes to hex string and verify identity + String payloadHexString = HexUtils.toHexString(payload); + if (payloadHexString.indexOf(odeProperties.getBsmStartFlag()) == -1) + continue; + logger.debug("Full BSM packet: {}", payloadHexString); + payloadHexString = super.stripDot3Header(payloadHexString, odeProperties.getBsmStartFlag()); + logger.debug("Stripped BSM packet: {}", payloadHexString); - JSONObject jsonObject = new JSONObject(); - jsonObject.put("BsmMessageContent", messageList); + // Create OdeMsgPayload and OdeLogMetadata objects and populate them + OdeAsn1Payload bsmPayload = new OdeAsn1Payload(HexUtils.fromHexString(payloadHexString)); + OdeBsmMetadata bsmMetadata = new OdeBsmMetadata(bsmPayload); - logger.debug("BSM JSON Object: {}", jsonObject.toString()); + // Set BSM Metadata values that can be assumed from the UDP endpoint + ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); + String timestamp = utc.format(DateTimeFormatter.ISO_INSTANT); + bsmMetadata.setOdeReceivedAt(timestamp); + + ReceivedMessageDetails receivedMessageDetails = new ReceivedMessageDetails(); + OdeLogMsgMetadataLocation locationData = new OdeLogMsgMetadataLocation( + "unavailable", + "unavailable", + "unavailable", + "unavailable", + "unavailable"); + receivedMessageDetails.setRxSource(RxSource.RSU); + receivedMessageDetails.setLocationData(locationData); + bsmMetadata.setReceivedMessageDetails(receivedMessageDetails); + + bsmMetadata.setOriginIp(senderIp); + bsmMetadata.setBsmSource(BsmSource.EV); + bsmMetadata.setRecordType(RecordType.bsmTx); + bsmMetadata.setSecurityResultCode(SecurityResultCode.success); // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic - this.bsmPublisher.publish(jsonObject.toString(), this.bsmPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedBSMJson()); + bsmPublisher.publish(JsonUtils.toJson(new OdeAsn1Data(bsmMetadata, bsmPayload), false), + bsmPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedBSMJson()); } } catch (Exception e) { logger.error("Error receiving packet", e); } } while (!isStopped()); } - - /** - * Attempts to strip WSMP header bytes. If message starts with "0014", - * message is raw BSM. Otherwise, headers are >= 20 bytes, so look past that - * for start of payload BSM. - * - * @param packet - */ - public byte[] removeHeader(byte[] packet) { - String hexPacket = HexUtils.toHexString(packet); - - int startIndex = hexPacket.indexOf(BSM_START_FLAG); - if (startIndex == 0) { - logger.debug("Message is raw BSM with no headers."); - } else if (startIndex == -1) { - logger.error("Message contains no BSM start flag."); - return null; - } else { - // We likely found a message with a header, look past the first 20 - // bytes for the start of the BSM - int trueStartIndex = HEADER_MINIMUM_SIZE - + hexPacket.substring(HEADER_MINIMUM_SIZE, hexPacket.length()).indexOf(BSM_START_FLAG); - hexPacket = hexPacket.substring(trueStartIndex, hexPacket.length()); - } - - return HexUtils.fromHexString(hexPacket); - } } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/upload/FileUploadController.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/upload/FileUploadController.java index 1f1bf852f..b3ab682dd 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/upload/FileUploadController.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/upload/FileUploadController.java @@ -34,7 +34,6 @@ import org.springframework.web.multipart.MultipartFile; import us.dot.its.jpo.ode.OdeProperties; -import us.dot.its.jpo.ode.exporter.StompStringExporter; import us.dot.its.jpo.ode.importer.ImporterDirectoryWatcher; import us.dot.its.jpo.ode.importer.ImporterDirectoryWatcher.ImporterFileType; import us.dot.its.jpo.ode.storage.StorageFileNotFoundException; @@ -42,8 +41,6 @@ @RestController public class FileUploadController { - private static final String FILTERED_OUTPUT_TOPIC = "/topic/filtered_messages"; - private static final String UNFILTERED_OUTPUT_TOPIC = "/topic/unfiltered_messages"; private static Logger logger = LoggerFactory.getLogger(FileUploadController.class); @@ -51,7 +48,7 @@ public class FileUploadController { @Autowired public FileUploadController( - StorageService storageService, OdeProperties odeProperties, + StorageService storageService, OdeProperties odeProperties, SimpMessagingTemplate template) { super(); this.storageService = storageService; @@ -59,7 +56,7 @@ public FileUploadController( ExecutorService threadPool = Executors.newCachedThreadPool(); Path logPath = Paths.get(odeProperties.getUploadLocationRoot(), - odeProperties.getUploadLocationObuLog()); + odeProperties.getUploadLocationObuLog()); logger.debug("UPLOADER - BSM log file upload directory: {}", logPath); Path failurePath = Paths.get(odeProperties.getUploadLocationRoot(), "failed"); logger.debug("UPLOADER - Failure directory: {}", failurePath); @@ -68,20 +65,6 @@ public FileUploadController( // Create the importers that watch folders for new/modified files threadPool.submit(new ImporterDirectoryWatcher(odeProperties, logPath, backupPath, failurePath, ImporterFileType.LOG_FILE, odeProperties.getFileWatcherPeriod())); - - // Create unfiltered exporters - threadPool.submit(new StompStringExporter(odeProperties, UNFILTERED_OUTPUT_TOPIC, template, odeProperties.getKafkaTopicOdeBsmJson())); - threadPool.submit(new StompStringExporter(odeProperties, UNFILTERED_OUTPUT_TOPIC, template, odeProperties.getKafkaTopicOdeTimJson())); - threadPool.submit(new StompStringExporter(odeProperties, UNFILTERED_OUTPUT_TOPIC, template, odeProperties.getKafkaTopicOdeSpatJson())); - threadPool.submit(new StompStringExporter(odeProperties, UNFILTERED_OUTPUT_TOPIC, template, odeProperties.getKafkaTopicOdeMapJson())); - threadPool.submit(new StompStringExporter(odeProperties, UNFILTERED_OUTPUT_TOPIC, template, odeProperties.getKafkaTopicOdeSsmJson())); - threadPool.submit(new StompStringExporter(odeProperties, UNFILTERED_OUTPUT_TOPIC, template, odeProperties.getKafkaTopicOdeSrmJson())); - threadPool.submit(new StompStringExporter(odeProperties, UNFILTERED_OUTPUT_TOPIC, template, odeProperties.getKafkaTopicDriverAlertJson())); - threadPool.submit(new StompStringExporter(odeProperties, UNFILTERED_OUTPUT_TOPIC, template, odeProperties.getKafkaTopicOdeTimBroadcastJson())); - - // Create filtered exporters - threadPool.submit(new StompStringExporter(odeProperties, FILTERED_OUTPUT_TOPIC, template, odeProperties.getKafkaTopicFilteredOdeBsmJson())); - threadPool.submit(new StompStringExporter(odeProperties, FILTERED_OUTPUT_TOPIC, template, odeProperties.getKafkaTopicFilteredOdeTimJson())); } @PostMapping("/upload/{type}") diff --git a/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/udp/bsm/BsmComparatorTest.java b/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/udp/bsm/BsmComparatorTest.java deleted file mode 100644 index 35decb55d..000000000 --- a/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/udp/bsm/BsmComparatorTest.java +++ /dev/null @@ -1,79 +0,0 @@ -/******************************************************************************* - * Copyright 2018 572682 - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy - * of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - ******************************************************************************/ -package us.dot.its.jpo.ode.udp.bsm; - -import static org.junit.Assert.assertEquals; - -import org.junit.jupiter.api.Test; - -import us.dot.its.jpo.ode.plugin.j2735.J2735Bsm; -import us.dot.its.jpo.ode.plugin.j2735.J2735BsmCoreData; -import us.dot.its.jpo.ode.udp.bsm.BsmComparator; - -public class BsmComparatorTest { - - @Test - public void shouldReturnXLessThanY() { - J2735BsmCoreData testDataX = new J2735BsmCoreData(); - testDataX.setSecMark(5); - J2735BsmCoreData testDataY = new J2735BsmCoreData(); - testDataY.setSecMark(6); - - J2735Bsm bsmX = new J2735Bsm(); - bsmX.setCoreData(testDataX); - - J2735Bsm bsmY = new J2735Bsm(); - bsmY.setCoreData(testDataY); - - BsmComparator testBsmComparator = new BsmComparator(); - assertEquals(-1, testBsmComparator.compare(bsmX, bsmY)); - } - - @Test - public void shouldReturnYLessThanX() { - J2735BsmCoreData testDataX = new J2735BsmCoreData(); - testDataX.setSecMark(6); - J2735BsmCoreData testDataY = new J2735BsmCoreData(); - testDataY.setSecMark(5); - - J2735Bsm bsmX = new J2735Bsm(); - bsmX.setCoreData(testDataX); - - J2735Bsm bsmY = new J2735Bsm(); - bsmY.setCoreData(testDataY); - - BsmComparator testBsmComparator = new BsmComparator(); - assertEquals(1, testBsmComparator.compare(bsmX, bsmY)); - } - - @Test - public void shouldReturnXEqualsY() { - J2735BsmCoreData testDataX = new J2735BsmCoreData(); - testDataX.setSecMark(5); - J2735BsmCoreData testDataY = new J2735BsmCoreData(); - testDataY.setSecMark(5); - - J2735Bsm bsmX = new J2735Bsm(); - bsmX.setCoreData(testDataX); - - J2735Bsm bsmY = new J2735Bsm(); - bsmY.setCoreData(testDataY); - - BsmComparator testBsmComparator = new BsmComparator(); - assertEquals(0, testBsmComparator.compare(bsmX, bsmY)); - } - -}