Skip to content

Commit

Permalink
Merge pull request #45 from CDOT-CV/log-offloading-updates
Browse files Browse the repository at this point in the history
Commsignia Log Offloading Support
  • Loading branch information
payneBrandon authored Dec 26, 2023
2 parents 65dcdb4 + 57da3fd commit 760c581
Show file tree
Hide file tree
Showing 18 changed files with 233 additions and 106 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ Once the ODE is deployed and running locally, you may access the ODE's demonstra
- [WYDOT Log Records](data/wydotLogRecords.h)
3. Press `Upload` button to upload the file to ODE.

Upload records within the files must be embedding BSM and/or TIM messages wrapped in J2735 MessageFrame and ASN.1 UPER encoded, wrapped in IEEE 1609.2 envelope and ASN.1 COER encoded binary format. Please review the files in the [data](data) folder for samples of each supported type. By uploading a valid data file, you will be able to observe the decoded messages contained within the file appear in the web UI page while connected to the WebSocket interface.
Upload records within the files can be embedding BSM, MAP and/or TIM messages wrapped in J2735 MessageFrame and ASN.1 UPER encoded, wrapped in IEEE 1609.2 envelope and ASN.1 COER encoded binary format. Log processing of files that contain messages with WSMP headers within the ASN.1 UPER encoded messages is supported but the header will be removed before processing. Please review the files in the [data](data) folder for samples of each supported type. By uploading a valid data file, you will be able to observe the decoded messages contained within the file appear in the web UI page while connected to the WebSocket interface.

Another way data can be uploaded to the ODE is through copying the file to the location specified by the `ode.uploadLocationRoot/ode.uploadLocationObuLog`property. If not specified, Default locations would be `uploads/bsmlog`sub-directory off of the location where ODE is launched.

Expand Down
Binary file added data/bsmLogDuringEvent_commsignia.gz
Binary file not shown.
Binary file added data/bsmTx_commsignia.gz
Binary file not shown.
Binary file added data/rxMsg_commsignia_map.gz
Binary file not shown.
Binary file added data/rxMsg_commsignia_tim.gz
Binary file not shown.
Binary file added data/rxMsg_map_and_tim.gz
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
package us.dot.its.jpo.ode.coder.stream;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -47,6 +50,7 @@
import us.dot.its.jpo.ode.model.SerialId;
import us.dot.its.jpo.ode.util.JsonUtils;
import us.dot.its.jpo.ode.util.XmlUtils;
import us.dot.its.jpo.ode.util.JsonUtils.JsonUtilsException;

public class LogFileToAsn1CodecPublisher implements Asn1CodecPublisher {

Expand All @@ -61,6 +65,7 @@ public LogFileToAsn1CodecPublisherException(String string, Exception e) {
}

protected static final Logger logger = LoggerFactory.getLogger(LogFileToAsn1CodecPublisher.class);
protected static HashMap<String, String> msgStartFlags = new HashMap<String, String>();

protected StringPublisher publisher;
protected LogFileParser fileParser;
Expand All @@ -69,6 +74,9 @@ public LogFileToAsn1CodecPublisherException(String string, Exception e) {
public LogFileToAsn1CodecPublisher(StringPublisher dataPub) {
this.publisher = dataPub;
this.serialId = new SerialId();
msgStartFlags.put("BSM", "0014");
msgStartFlags.put("TIM", "001f");
msgStartFlags.put("MAP", "0012");
}

public List<OdeData> publish(BufferedInputStream bis, String fileName, ImporterFileType fileType)
Expand All @@ -77,7 +85,7 @@ public List<OdeData> publish(BufferedInputStream bis, String fileName, ImporterF
ParserStatus status;

List<OdeData> dataList = new ArrayList<>();
if (fileType == ImporterFileType.LEAR_LOG_FILE) {
if (fileType == ImporterFileType.LOG_FILE) {
fileParser = LogFileParser.factory(fileName);

do {
Expand All @@ -92,6 +100,7 @@ public List<OdeData> publish(BufferedInputStream bis, String fileName, ImporterF
} else {
logger.error("Failed to decode ASN.1 data");
}
bis = removeNextNewLineCharacter(bis);
} catch (Exception e) {
throw new LogFileToAsn1CodecPublisherException("Error parsing or publishing data.", e);
}
Expand Down Expand Up @@ -146,38 +155,34 @@ private void publishList(XmlUtils xmlUtils, List<OdeData> dataList) throws JsonP
serialId.setBundleSize(dataList.size());
for (OdeData odeData : dataList) {
OdeLogMetadata msgMetadata = (OdeLogMetadata) odeData.getMetadata();
OdeMsgPayload msgPayload = (OdeMsgPayload) odeData.getPayload();
msgMetadata.setSerialId(serialId);

if (isDriverAlertRecord()) {
logger.debug("Publishing a driverAlert.");

publisher.publish(JsonUtils.toJson(odeData, false),
publisher.getOdeProperties().getKafkaTopicDriverAlertJson());
}
else
{
} else {
if (isBsmRecord()) {
logger.debug("Publishing a BSM");
} else if(isSpatRecord()) {
} else if (isSpatRecord()) {
logger.debug("Publishing a Spat");
}else {
logger.debug("Publishing a TIM");
} else {
logger.debug("Publishing a TIM or MAP");
}

if(isSpatRecord() && msgMetadata instanceof OdeSpatMetadata
&& !((OdeSpatMetadata)msgMetadata).getIsCertPresent() )
{
//Nothing: If Spat log file and IEEE1609Cert is not present, Skip the Ieee1609Dot2Data encoding
}
else
{
Asn1Encoding msgEncoding = new Asn1Encoding("root", "Ieee1609Dot2Data", EncodingRule.COER);
msgMetadata.addEncoding(msgEncoding);

if (!(isSpatRecord() && msgMetadata instanceof OdeSpatMetadata
&& !((OdeSpatMetadata) msgMetadata).getIsCertPresent())) {
if (checkHeader(msgPayload) == "Ieee1609Dot2Data") {
Asn1Encoding msgEncoding = new Asn1Encoding("root", "Ieee1609Dot2Data", EncodingRule.COER);
msgMetadata.addEncoding(msgEncoding);
}
}

Asn1Encoding unsecuredDataEncoding = new Asn1Encoding("unsecuredData", "MessageFrame",EncodingRule.UPER);

Asn1Encoding unsecuredDataEncoding = new Asn1Encoding("unsecuredData", "MessageFrame",
EncodingRule.UPER);
msgMetadata.addEncoding(unsecuredDataEncoding);


publisher.publish(xmlUtils.toXml(odeData),
publisher.getOdeProperties().getKafkaTopicAsn1DecoderInput());
Expand All @@ -186,4 +191,43 @@ private void publishList(XmlUtils xmlUtils, List<OdeData> dataList) throws JsonP
}
}

public String checkHeader(OdeMsgPayload payload) {
JSONObject payloadJson;
String header = null;
try {
payloadJson = JsonUtils.toJSONObject(payload.getData().toJson());
String hexPacket = payloadJson.getString("bytes");

for (String key : msgStartFlags.keySet()) {
String startFlag = msgStartFlags.get(key);
int startIndex = hexPacket.toLowerCase().indexOf(startFlag);
logger.debug("Start index for " + key + "(" + startFlag + ")" + " is: " + startIndex);
if (startIndex <= 20 && startIndex != 0 && startIndex != -1) {
logger.debug("Message has supported Ieee1609Dot2Data header, adding encoding rule to Asn1DecoderInput XML");
header = "Ieee1609Dot2Data";
break;
}
logger.debug("Payload JSON: " + payloadJson);
}
} catch (JsonUtilsException e) {
logger.error("JsonUtilsException while checking message header. Stacktrace: " + e.toString());

}
return header;
}

// This method will check if the next character is a newline character (0x0A in hex or 10 in converted decimal)
// or if the next character does not contain a newline character it will put that character back into the buffered input stream
public BufferedInputStream removeNextNewLineCharacter(BufferedInputStream bis) {
try {
bis.mark(1);
int nextByte = bis.read();
if (nextByte != 10) { // If the next byte is not a newline
bis.reset(); // Reset the stream back to the most recent mark
}
} catch (IOException e) {
e.printStackTrace();
}
return bis;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
public class ImporterDirectoryWatcher implements Runnable {

public enum ImporterFileType {
LEAR_LOG_FILE, JSON_FILE
LOG_FILE, JSON_FILE
}

private static final Logger logger = LoggerFactory.getLogger(ImporterDirectoryWatcher.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,13 @@ public ParserStatus parseFile(BufferedInputStream bis, String fileName) throws F
return status;
setBsmSource(readBuffer);
}



if (getStep() == 2) {
status = nextStep(bis, fileName, locationParser);
if (status != ParserStatus.COMPLETE)
return status;
}

if (getStep() == 3) {
status = nextStep(bis, fileName, timeParser);
if (status != ParserStatus.COMPLETE)
Expand All @@ -79,7 +78,7 @@ public ParserStatus parseFile(BufferedInputStream bis, String fileName) throws F
if (status != ParserStatus.COMPLETE)
return status;
}

resetStep();
status = ParserStatus.COMPLETE;

Expand All @@ -101,16 +100,17 @@ public void setBsmSource(BsmSource bsmSource) {
public void setBsmSource(byte[] code) {
try {
setBsmSource(BsmSource.values()[code[0]]);

} catch (Exception e) {
logger.error("Invalid BsmSource: {}. Valid values are {}-{} inclusive",
code, 0, BsmSource.values());
logger.error("Invalid BsmSource: {}. Valid values are {}-{} inclusive",
code, 0, BsmSource.values());
setBsmSource(BsmSource.unknown);
}
}

@Override
public void writeTo(OutputStream os) throws IOException {
os.write((byte)bsmSource.ordinal());
super.writeTo(os);
}
@Override
public void writeTo(OutputStream os) throws IOException {
os.write((byte) bsmSource.ordinal());
super.writeTo(os);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.OutputStream;

public class DriverAlertFileParser extends LogFileParser {

private String alert;
Expand All @@ -32,7 +31,7 @@ public DriverAlertFileParser() {

@Override
public ParserStatus parseFile(BufferedInputStream bis, String fileName) throws FileParserException {

ParserStatus status;
try {
status = super.parseFile(bis, fileName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public ParserStatus parseStep(BufferedInputStream bis, int length) throws FilePa
try {
bis.reset();
} catch (IOException ioe) {
throw new FileParserException("Error reseting Input Stream to marked position", ioe);
throw new FileParserException("Error resetting Input Stream to marked position", ioe);
}
}
return ParserStatus.PARTIAL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,31 @@
import java.io.OutputStream;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.HashMap;

import org.apache.tomcat.util.buf.HexUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import us.dot.its.jpo.ode.util.CodecUtils;

public class PayloadParser extends LogFileParser {

public static final int PAYLOAD_LENGTH_LENGTH = 2;
private static Logger logger = LoggerFactory.getLogger(PayloadParser.class);
private static HashMap<String, String> msgStartFlags = new HashMap<String, String>();

private static final int HEADER_SIZE = 20;
public static final int PAYLOAD_LENGTH = 2;

protected short payloadLength;
protected byte[] payload;
protected String payloadType;

public PayloadParser() {
super();
msgStartFlags.put("BSM", "0014");
msgStartFlags.put("TIM", "001f");
msgStartFlags.put("MAP", "0012");
}

@Override
Expand All @@ -41,18 +54,20 @@ public ParserStatus parseFile(BufferedInputStream bis, String fileName) throws F
try {
// parse payload length
if (getStep() == 0) {
status = parseStep(bis, PAYLOAD_LENGTH_LENGTH);
status = parseStep(bis, PAYLOAD_LENGTH);
if (status != ParserStatus.COMPLETE)
return status;
setPayloadLength(CodecUtils.bytesToShort(readBuffer, 0, PAYLOAD_LENGTH_LENGTH, ByteOrder.LITTLE_ENDIAN));
short length = CodecUtils.bytesToShort(readBuffer, 0, PAYLOAD_LENGTH, ByteOrder.LITTLE_ENDIAN);
logger.debug("Payload length is: " + length);
setPayloadLength(length);
}

// Step 10 - copy payload bytes
if (getStep() == 1) {
status = parseStep(bis, getPayloadLength());
if (status != ParserStatus.COMPLETE)
return status;
setPayload(Arrays.copyOf(readBuffer, getPayloadLength()));
setPayload(removeHeader(Arrays.copyOf(readBuffer, getPayloadLength())));
}

resetStep();
Expand Down Expand Up @@ -90,4 +105,37 @@ public void writeTo(OutputStream os) throws IOException {
os.write(payload, 0, payloadLength);
}



public byte[] removeHeader(byte[] packet) {
String hexPacket = HexUtils.toHexString(packet);
String hexPacketParsed = "";
for (String key : msgStartFlags.keySet()) {
String startFlag = msgStartFlags.get(key);
int startIndex = hexPacket.toLowerCase().indexOf(startFlag);
if (hexPacketParsed == ""){
logger.debug("Start index for: " + key + " is: " + startIndex);
if (startIndex == -1) {
logger.debug("Message does not have header for: " + key);
break;
} else if (startIndex <= HEADER_SIZE) {
logger.debug("Message has supported header. startIndex: " + startIndex + " msgFlag: " + startFlag);
hexPacketParsed = hexPacket;
// Using a value of 35 as the largest index from preliminary testing data.
} else if (startIndex > HEADER_SIZE && startIndex < 35) {
int trueStartIndex = HEADER_SIZE
+ hexPacket.substring(HEADER_SIZE, hexPacket.length()).indexOf(startFlag);
logger.debug("Found payload start at: " + trueStartIndex);
hexPacketParsed = hexPacket.substring(trueStartIndex, hexPacket.length());
}
}
}
if (hexPacketParsed == ""){
hexPacketParsed = hexPacket;
logger.debug("Could not identify a Header in the following packet: " + hexPacketParsed);
} else {
logger.debug("Payload hex: " + hexPacketParsed);
}
return HexUtils.fromHexString(hexPacketParsed);
}
}
Loading

0 comments on commit 760c581

Please sign in to comment.