diff --git a/README.md b/README.md index 47320b6f4..8cff13859 100644 --- a/README.md +++ b/README.md @@ -90,6 +90,7 @@ Supported message types: - SPaT - SRM - SSM +- PSM 1. Navigate to the [UDP sender Python scripts](<./scripts/tests/>) in the project. 2. Ensure the environment variable "DOCKER_HOST_IP" has been set in the shell that will be running the script. This must be set to the same IP that the ODE deployments are using. diff --git a/docker-compose.yml b/docker-compose.yml index 6f94ead50..f72610a36 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -21,7 +21,7 @@ services: KAFKA_ADVERTISED_HOST_NAME: ${DOCKER_HOST_IP} KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" - KAFKA_CREATE_TOPICS: "topic.OdeBsmPojo:1:1,topic.OdeSpatTxPojo:1:1,topic.OdeSpatPojo:1:1,topic.OdeSpatJson:1:1,topic.FilteredOdeSpatJson:1:1,topic.OdeSpatRxJson:1:1,topic.OdeSpatRxPojo:1:1,topic.OdeBsmJson:1:1,topic.FilteredOdeBsmJson:1:1,topic.OdeTimJson:1:1,topic.OdeTimBroadcastJson:1:1,topic.J2735TimBroadcastJson:1:1,topic.OdeDriverAlertJson:1:1,topic.Asn1DecoderInput:1:1,topic.Asn1DecoderOutput:1:1,topic.Asn1EncoderInput:1:1,topic.Asn1EncoderOutput:1:1,topic.SDWDepositorInput:1:1,topic.OdeTIMCertExpirationTimeJson:1:1,topic.OdeRawEncodedBSMJson:1:1,topic.OdeRawEncodedSPATJson:1:1,topic.OdeRawEncodedTIMJson:1:1,topic.OdeRawEncodedMAPJson:1:1,topic.OdeMapTxPojo:1:1,topic.OdeMapJson:1:1,topic.OdeRawEncodedSSMJson:1:1,topic.OdeSsmPojo:1:1,topic.OdeSsmJson:1:1,topic.OdeRawEncodedSRMJson:1:1,topic.OdeSrmTxPojo:1:1,topic.OdeSrmJson:1:1" + KAFKA_CREATE_TOPICS: "topic.OdeBsmPojo:1:1,topic.OdeSpatTxPojo:1:1,topic.OdeSpatPojo:1:1,topic.OdeSpatJson:1:1,topic.FilteredOdeSpatJson:1:1,topic.OdeSpatRxJson:1:1,topic.OdeSpatRxPojo:1:1,topic.OdeBsmJson:1:1,topic.FilteredOdeBsmJson:1:1,topic.OdeTimJson:1:1,topic.OdeTimBroadcastJson:1:1,topic.J2735TimBroadcastJson:1:1,topic.OdeDriverAlertJson:1:1,topic.Asn1DecoderInput:1:1,topic.Asn1DecoderOutput:1:1,topic.Asn1EncoderInput:1:1,topic.Asn1EncoderOutput:1:1,topic.SDWDepositorInput:1:1,topic.OdeTIMCertExpirationTimeJson:1:1,topic.OdeRawEncodedBSMJson:1:1,topic.OdeRawEncodedSPATJson:1:1,topic.OdeRawEncodedTIMJson:1:1,topic.OdeRawEncodedMAPJson:1:1,topic.OdeMapTxPojo:1:1,topic.OdeMapJson:1:1,topic.OdeRawEncodedSSMJson:1:1,topic.OdeSsmPojo:1:1,topic.OdeSsmJson:1:1,topic.OdeRawEncodedSRMJson:1:1,topic.OdeSrmTxPojo:1:1,topic.OdeSrmJson:1:1,topic.OdeRawEncodedPSMJson:1:1,topic.OdePsmTxPojo:1:1,topic.OdePsmJson:1:1" KAFKA_DELETE_TOPIC_ENABLED: "true" KAFKA_CLEANUP_POLICY: "delete" # delete old logs KAFKA_LOG_RETENTION_HOURS: 2 @@ -53,6 +53,7 @@ services: - "44910:44910/udp" - "44920:44920/udp" - "44930:44930/udp" + - "44940:44940/udp" - "5555:5555/udp" - "6666:6666/udp" environment: diff --git a/docs/data-flow-diagrams/README.md b/docs/data-flow-diagrams/README.md index 4abd33494..b057a3df4 100644 --- a/docs/data-flow-diagrams/README.md +++ b/docs/data-flow-diagrams/README.md @@ -108,4 +108,13 @@ The purpose of these diagrams is to show: 1. The [ACM](https://github.com/usdot-jpo-ode/asn1_codec) pulls from the Asn1DecoderInput topic, decodes the SSM, and pushes it to the Asn1DecoderOutput topic. 1. The AsnCodecRouterServiceController class pulls from the Asn1DecoderOutput topic and passes the SSM to the Asn1DecodedDataRouter class. 1. The Asn1DecodedDataRouter class pushes the SSM to the OdeSsmTxPojo and OdeSsmJson topics. -1. The FileUploadController pulls from the OdeSsmJson topic and offloads the SSM. \ No newline at end of file +1. The FileUploadController pulls from the OdeSsmJson topic and offloads the SSM. + +### PSM Data Flow +1. The PSM comes in through the PsmReceiver class and is pushed to the OdeRawEncodedPSMJson topic. +1. The AsnCodecMessageServiceController class pulls from the OdeRawEncodedPSMJson topic and passes the PSM to the Asn1DecodePSMJSON class. +1. The Asn1DecodePSMJSON class pushes the PSM to the Asn1DecoderInput topic. +1. The [ACM](https://github.com/usdot-jpo-ode/asn1_codec) pulls from the Asn1DecoderInput topic, decodes the PSM, and pushes it to the Asn1DecoderOutput topic. +1. The AsnCodecRouterServiceController class pulls from the Asn1DecoderOutput topic and passes the PSM to the Asn1DecodedDataRouter class. +1. The Asn1DecodedDataRouter class pushes the PSM to the OdePsmTxPojo and OdePsmJson topics. +1. The FileUploadController pulls from the OdePsmJson topic and offloads the PSM. \ No newline at end of file diff --git a/docs/data-flow-diagrams/psm/PSM Data Flow.drawio b/docs/data-flow-diagrams/psm/PSM Data Flow.drawio new file mode 100644 index 000000000..9c3bf6f15 --- /dev/null +++ b/docs/data-flow-diagrams/psm/PSM Data Flow.drawio @@ -0,0 +1,121 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/docs/data-flow-diagrams/psm/PSM Data Flow.drawio.png b/docs/data-flow-diagrams/psm/PSM Data Flow.drawio.png new file mode 100644 index 000000000..854c0ec2c Binary files /dev/null and b/docs/data-flow-diagrams/psm/PSM Data Flow.drawio.png differ diff --git a/docs/images/userguide/figure1.png b/docs/images/userguide/figure1.png index ff5ca5d5b..3bdb90bda 100644 Binary files a/docs/images/userguide/figure1.png and b/docs/images/userguide/figure1.png differ diff --git a/docs/schemas/README.md b/docs/schemas/README.md index f30782744..58da1e5a9 100644 --- a/docs/schemas/README.md +++ b/docs/schemas/README.md @@ -7,6 +7,7 @@ The jpo-ode supports receiving and decoding ASN1 messages from RSUs. The support - [topic.OdeSpatJson](../../jpo-ode-core/src/main/resources/schemas/schema-spat.json) - [topic.OdeSrmJson](../../jpo-ode-core/src/main/resources/schemas/schema-srm.json) - [topic.OdeSsmJson](../../jpo-ode-core/src/main/resources/schemas/schema-ssm.json) +- [topic.OdePsmJson](../../jpo-ode-core/src/main/resources/schemas/schema-psm.json) The output JSON of the ODE is complex but it is similar to the official standard of J2735 with some minor differences due to the form of their deserialized POJOs. To help implement proper data validation for the JSON output of the ODE into any data pipeline infrastructure, you may use the provided validation schemas within this directory. diff --git a/jpo-ode-core/src/main/java/us/dot/its/jpo/ode/model/OdeLogMetadata.java b/jpo-ode-core/src/main/java/us/dot/its/jpo/ode/model/OdeLogMetadata.java index ffa206a34..3929cc7f8 100644 --- a/jpo-ode-core/src/main/java/us/dot/its/jpo/ode/model/OdeLogMetadata.java +++ b/jpo-ode-core/src/main/java/us/dot/its/jpo/ode/model/OdeLogMetadata.java @@ -28,7 +28,7 @@ public class OdeLogMetadata extends OdeMsgMetadata { private static final long serialVersionUID = -8601265839394150140L; public enum RecordType { - bsmLogDuringEvent, rxMsg, dnMsg, bsmTx, driverAlert, mapTx, spatTx, ssmTx, srmTx, timMsg, unsupported + bsmLogDuringEvent, rxMsg, dnMsg, bsmTx, driverAlert, mapTx, spatTx, ssmTx, srmTx, timMsg, psmTx, unsupported } public enum SecurityResultCode { diff --git a/jpo-ode-core/src/main/java/us/dot/its/jpo/ode/model/OdePsmData.java b/jpo-ode-core/src/main/java/us/dot/its/jpo/ode/model/OdePsmData.java new file mode 100644 index 000000000..294f58730 --- /dev/null +++ b/jpo-ode-core/src/main/java/us/dot/its/jpo/ode/model/OdePsmData.java @@ -0,0 +1,29 @@ +package us.dot.its.jpo.ode.model; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import static com.fasterxml.jackson.annotation.JsonTypeInfo.*; + +public class OdePsmData extends OdeData { + + private static final long serialVersionUID = 4944935387116447760L; + + public OdePsmData() { + super(); + } + + public OdePsmData(OdeMsgMetadata metadata, OdeMsgPayload payload) { + super(metadata, payload); + } + + @Override + @JsonTypeInfo(use = Id.CLASS, include = As.EXISTING_PROPERTY, defaultImpl = OdePsmMetadata.class) + public void setMetadata(OdeMsgMetadata metadata) { + super.setMetadata(metadata); + } + + @Override + @JsonTypeInfo(use = Id.CLASS, include = As.EXISTING_PROPERTY, defaultImpl = OdePsmPayload.class) + public void setPayload(OdeMsgPayload payload) { + super.setPayload(payload); + } +} diff --git a/jpo-ode-core/src/main/java/us/dot/its/jpo/ode/model/OdePsmMetadata.java b/jpo-ode-core/src/main/java/us/dot/its/jpo/ode/model/OdePsmMetadata.java new file mode 100644 index 000000000..c090f1d1f --- /dev/null +++ b/jpo-ode-core/src/main/java/us/dot/its/jpo/ode/model/OdePsmMetadata.java @@ -0,0 +1,37 @@ +package us.dot.its.jpo.ode.model; + + +public class OdePsmMetadata extends OdeLogMetadata { + + private static final long serialVersionUID = 1L; + + public enum PsmSource { + RSU, V2X, MMITSS, unknown + } + + private PsmSource psmSource; + private String originIp; + + public PsmSource getPsmSource() { + return psmSource; + } + public OdePsmMetadata() { + super(); + } + + public OdePsmMetadata(OdeMsgPayload payload) { + super(payload); + } + + public void setPsmSource(PsmSource psmSource) { + this.psmSource = psmSource; + } + + public String getOriginIp() { + return originIp; + } + + public void setOriginIp(String originIp) { + this.originIp = originIp; + } +} diff --git a/jpo-ode-core/src/main/java/us/dot/its/jpo/ode/model/OdePsmPayload.java b/jpo-ode-core/src/main/java/us/dot/its/jpo/ode/model/OdePsmPayload.java new file mode 100644 index 000000000..75f87399b --- /dev/null +++ b/jpo-ode-core/src/main/java/us/dot/its/jpo/ode/model/OdePsmPayload.java @@ -0,0 +1,29 @@ +package us.dot.its.jpo.ode.model; + +import com.fasterxml.jackson.annotation.*; + +import us.dot.its.jpo.ode.plugin.j2735.J2735PSM; + +public class OdePsmPayload extends OdeMsgPayload { + + private static final long serialVersionUID = 1L; + + public OdePsmPayload() { + this(new J2735PSM()); + } + + @JsonCreator + public OdePsmPayload( @JsonProperty("data") J2735PSM psm) { + super(psm); + this.setData(psm); + } + + @JsonProperty("data") + public J2735PSM getPsm() { + return (J2735PSM) getData(); + } + + public void setPsm(J2735PSM psm) { + setData(psm); + } +} diff --git a/jpo-ode-core/src/main/resources/schemas/schema-psm.json b/jpo-ode-core/src/main/resources/schemas/schema-psm.json new file mode 100644 index 000000000..6a78e1960 --- /dev/null +++ b/jpo-ode-core/src/main/resources/schemas/schema-psm.json @@ -0,0 +1,766 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object", + "properties": { + "metadata": { + "type": "object", + "properties": { + "originIp": { + "type": "string" + }, + "psmSource": { + "type": "string" + }, + "logFileName": { + "type": "string" + }, + "recordType": { + "type": "string" + }, + "securityResultCode": { + "type": "string" + }, + "receivedMessageDetails": { + "type": "object", + "properties": { + "locationData": { + "type": "null" + }, + "rxSource": { + "type": "string" + } + }, + "required": [ + "rxSource" + ] + }, + "encodings": { + "type": "null" + }, + "payloadType": { + "type": "string" + }, + "serialId": { + "type": "object", + "properties": { + "streamId": { + "type": "string" + }, + "bundleSize": { + "type": "integer" + }, + "bundleId": { + "type": "integer" + }, + "recordId": { + "type": "integer" + }, + "serialNumber": { + "type": "integer" + } + }, + "required": [ + "streamId", + "bundleSize", + "bundleId", + "recordId", + "serialNumber" + ] + }, + "odeReceivedAt": { + "type": "string" + }, + "schemaVersion": { + "type": "integer" + }, + "maxDurationTime": { + "type": "integer" + }, + "odePacketID": { + "type": "string" + }, + "odeTimStartDateTime": { + "type": "string" + }, + "recordGeneratedAt": { + "type": "string" + }, + "recordGeneratedBy": { + "type": "null" + }, + "sanitized": { + "type": "boolean" + } + }, + "required": [ + "originIp", + "psmSource", + "logFileName", + "recordType", + "receivedMessageDetails", + "payloadType", + "serialId", + "odeReceivedAt", + "schemaVersion", + "maxDurationTime", + "odePacketID", + "odeTimStartDateTime", + "recordGeneratedAt", + "sanitized" + ] + }, + "payload": { + "type": "object", + "properties": { + "dataType": { + "type": "string" + }, + "data": { + "type": "object", + "properties": { + "basicType": { + "type": [ + "string", + "null" + ] + }, + "secMark": { + "type": [ + "integer", + "null" + ] + }, + "msgCnt": { + "type": [ + "integer", + "null" + ] + }, + "id": { + "type": [ + "string", + "null" + ] + }, + "position": { + "properties": { + "elevation": { + "type": [ + "number", + "null" + ] + }, + "latitude": { + "type": "number" + }, + "longitude": { + "type": "number" + } + }, + "required": [ + "latitude", + "longitude" + ], + "type": "object" + }, + "accuracy": { + "properties": { + "orientation": { + "type": [ + "number", + "null" + ] + }, + "semiMajor": { + "type": [ + "number", + "null" + ] + }, + "semiMinor": { + "type": [ + "number", + "null" + ] + } + }, + "required": [], + "type": "object" + }, + "speed": { + "type": "number" + }, + "heading": { + "type": "integer" + }, + "accelSet": { + "properties": { + "accelLat": { + "type": "integer" + }, + "accelLong": { + "type": "integer" + }, + "accelVert": { + "type": "integer" + }, + "accelYaw": { + "type": "integer" + } + }, + "required": [ + "accelLat", + "accelLong", + "accelVert", + "accelYaw" + ], + "type": "object" + }, + "pathHistory": { + "properties": { + "crumbData": { + "items": { + "properties": { + "elevationOffset": { + "type": "number" + }, + "heading": { + "type": [ + "number", + "null" + ] + }, + "latOffset": { + "type": "number" + }, + "lonOffset": { + "type": "number" + }, + "posAccuracy": { + "properties": { + "orientation": { + "type": "number" + }, + "semiMajor": { + "type": "number" + }, + "semiMinor": { + "type": "number" + } + }, + "required": [ + "semiMajor", + "semiMinor", + "orientation" + ], + "type": [ + "object", + "null" + ] + }, + "speed": { + "type": [ + "number", + "null" + ] + }, + "timeOffset": { + "type": "number" + } + }, + "required": [ + "elevationOffset", + "latOffset", + "lonOffset", + "timeOffset" + ], + "type": "object" + }, + "maxItems": 23, + "minItems": 1, + "type": "array" + }, + "currGNSSstatus": { + "properties": { + "aPDOPofUnder5": { + "type": "boolean" + }, + "baseStationType": { + "type": "boolean" + }, + "inViewOfUnder5": { + "type": "boolean" + }, + "isHealthy": { + "type": "boolean" + }, + "isMonitored": { + "type": "boolean" + }, + "localCorrectionsPresent": { + "type": "boolean" + }, + "networkCorrectionsPresent": { + "type": "boolean" + }, + "unavailable": { + "type": "boolean" + } + }, + "required": [ + "unavailable", + "isHealthy", + "isMonitored", + "baseStationType", + "aPDOPofUnder5", + "inViewOfUnder5", + "localCorrectionsPresent", + "networkCorrectionsPresent" + ], + "type": [ + "object", + "null" + ] + }, + "initialPosition": { + "properties": { + "heading": { + "type": [ + "number", + "null" + ] + }, + "posAccuracy": { + "properties": { + "orientation": { + "type": [ + "number", + "null" + ] + }, + "semiMajor": { + "type": [ + "number", + "null" + ] + }, + "semiMinor": { + "type": [ + "number", + "null" + ] + } + }, + "required": [], + "type": "object" + }, + "posConfidence": { + "properties": { + "elevation": { + "enum": [ + "UNAVAILABLE", + "ELEV_500_00", + "ELEV_200_00", + "ELEV_100_00", + "ELEV_050_00", + "ELEV_020_00", + "ELEV_010_00", + "ELEV_005_00", + "ELEV_002_00", + "ELEV_001_00", + "ELEV_000_50", + "ELEV_000_20", + "ELEV_000_10", + "ELEV_000_05", + "ELEV_000_02", + "ELEV_000_01" + ], + "type": "string" + }, + "pos": { + "enum": [ + "UNAVAILABLE", + "A500M", + "A200M", + "A100M", + "A50M", + "A20M", + "A10M", + "A5M", + "A2M", + "A1M", + "A50CM", + "A20CM", + "A10CM", + "A5CM", + "A2CM", + "A1CM" + ], + "type": "string" + } + }, + "required": [ + "pos", + "elevation" + ], + "type": [ + "object", + "null" + ] + }, + "position": { + "properties": { + "elevation": { + "type": [ + "number", + "null" + ] + }, + "latitude": { + "type": "number" + }, + "longitude": { + "type": "number" + } + }, + "required": [ + "latitude", + "longitude" + ], + "type": "object" + }, + "speed": { + "properties": { + "speed": { + "type": "number" + }, + "transmission": { + "enum": [ + "NEUTRAL", + "PARK", + "FORWARDGEARS", + "REVERSEGEARS", + "RESERVED1", + "RESERVED2", + "RESERVED3", + "UNAVAILABLE" + ], + "type": "string" + } + }, + "required": [ + "speed", + "transmission" + ], + "type": [ + "object", + "null" + ] + }, + "speedConfidence": { + "properties": { + "heading": { + "enum": [ + "UNAVAILABLE", + "PREC10DEG", + "PREC05DEG", + "PREC01DEG", + "PREC0_1DEG", + "PREC0_05DEG", + "PREC0_01DEG", + "PREC0_0125DEG" + ], + "type": "string" + }, + "speed": { + "enum": [ + "UNAVAILABLE", + "PREC100MS", + "PREC10MS", + "PREC5MS", + "PREC1MS", + "PREC0_1MS", + "PREC0_05MS", + "PREC0_01MS" + ], + "type": "string" + }, + "throttle": { + "enum": [ + "UNAVAILABLE", + "PREC10PERCENT", + "PREC1PERCENT", + "PREC0_5PERCENT" + ], + "type": "string" + } + }, + "required": [ + "heading", + "speed", + "throttle" + ], + "type": [ + "object", + "null" + ] + }, + "timeConfidence": { + "enum": [ + "UNAVAILABLE", + "TIME_100_000", + "TIME_050_000", + "TIME_020_000", + "TIME_010_000", + "TIME_002_000", + "TIME_001_000", + "TIME_000_500", + "TIME_000_200", + "TIME_000_100", + "TIME_000_050", + "TIME_000_020", + "TIME_000_010", + "TIME_000_005", + "TIME_000_002", + "TIME_000_001", + "TIME_000_000_5", + "TIME_000_000_2", + "TIME_000_000_1", + "TIME_000_000_05", + "TIME_000_000_02", + "TIME_000_000_01", + "TIME_000_000_005", + "TIME_000_000_002", + "TIME_000_000_001", + "TIME_000_000_000_5", + "TIME_000_000_000_2", + "TIME_000_000_000_1", + "TIME_000_000_000_05", + "TIME_000_000_000_02", + "TIME_000_000_000_01", + "TIME_000_000_000_005", + "TIME_000_000_000_002", + "TIME_000_000_000_001", + "TIME_000_000_000_000_5", + "TIME_000_000_000_000_2", + "TIME_000_000_000_000_1", + "TIME_000_000_000_000_05", + "TIME_000_000_000_000_02", + "TIME_000_000_000_000_01", + null + ], + "type": [ + "string", + "null" + ] + }, + "utcTime": { + "properties": { + "day": { + "type": [ + "integer", + "null" + ] + }, + "hour": { + "type": [ + "integer", + "null" + ] + }, + "minute": { + "type": [ + "integer", + "null" + ] + }, + "month": { + "type": [ + "integer", + "null" + ] + }, + "offset": { + "type": [ + "integer", + "null" + ] + }, + "second": { + "type": [ + "integer", + "null" + ] + }, + "year": { + "type": [ + "integer", + "null" + ] + } + }, + "required": [], + "type": [ + "object", + "null" + ] + } + }, + "required": [ + "position" + ], + "type": [ + "object", + "null" + ] + } + }, + "required": [ + "crumbData" + ], + "type": [ + "object", + "null" + ] + }, + "pathPrediction": { + "properties": { + "confidence": { + "type": "number" + }, + "radiusOfCurve": { + "type": "number" + } + }, + "required": [ + "confidence", + "radiusOfCurve" + ], + "type": [ + "object", + "null" + ] + }, + "propulsion": { + "properties": { + "human": { + "type": [ + "string", + "null" + ] + }, + "animal": { + "type": [ + "string", + "null" + ] + }, + "motor": { + "type": [ + "string", + "null" + ] + } + }, + "required": [], + "type": [ + "object", + "null" + ] + }, + "useState": { + "type": [ + "string", + "null" + ] + }, + "crossRequest": { + "type": [ + "boolean", + "null" + ] + }, + "crossState": { + "type": [ + "boolean", + "null" + ] + }, + "clusterSize": { + "type": [ + "string", + "null" + ] + }, + "clusterRadius": { + "type": [ + "integer", + "null" + ] + }, + "activityType": { + "type": [ + "string", + "null" + ] + }, + "activitySubType": { + "type": [ + "string", + "null" + ] + }, + "assistType": { + "type": [ + "string", + "null" + ] + }, + "sizing": { + "type": [ + "string", + "null" + ] + }, + "attachment": { + "type": [ + "string", + "null" + ] + }, + "attachmentRadius": { + "type": [ + "integer", + "null" + ] + }, + "animalType": { + "type": [ + "string", + "null" + ] + } + }, + "required": [ + "basicType", + "secMark", + "msgCnt", + "id", + "position", + "accuracy", + "speed", + "heading" + ] + } + }, + "required": [ + "dataType", + "data" + ] + } + }, + "required": [ + "metadata", + "payload" + ] +} \ No newline at end of file diff --git a/jpo-ode-core/src/test/java/us/dot/its/jpo/ode/model/OdePsmDataTest.java b/jpo-ode-core/src/test/java/us/dot/its/jpo/ode/model/OdePsmDataTest.java new file mode 100644 index 000000000..40b5cc405 --- /dev/null +++ b/jpo-ode-core/src/test/java/us/dot/its/jpo/ode/model/OdePsmDataTest.java @@ -0,0 +1,47 @@ +package us.dot.its.jpo.ode.model; + +import com.fasterxml.jackson.databind.JsonNode; +import com.networknt.schema.JsonSchema; +import com.networknt.schema.JsonSchemaFactory; +import com.networknt.schema.SpecVersion; +import com.networknt.schema.ValidationMessage; +import java.util.Set; + +import org.junit.Test; +import static org.junit.Assert.*; + +import us.dot.its.jpo.ode.util.JsonUtils; + +public class OdePsmDataTest { + + final String json = "{\"metadata\":{\"logFileName\":\"\",\"maxDurationTime\":0,\"odePacketID\":\"\",\"odeReceivedAt\":\"2023-09-21T15:30:14.926500Z\",\"odeTimStartDateTime\":\"\",\"originIp\":\"192.168.16.1\",\"payloadType\":\"us.dot.its.jpo.ode.model.OdePsmPayload\",\"psmSource\":\"RSU\",\"receivedMessageDetails\":{\"rxSource\":\"NA\"},\"recordGeneratedAt\":\"\",\"recordType\":\"psmTx\",\"sanitized\":false,\"schemaVersion\":6,\"securityResultCode\":\"success\",\"serialId\":{\"bundleId\":0,\"bundleSize\":1,\"recordId\":0,\"serialNumber\":0,\"streamId\":\"06cc1c17-e331-4806-a8ee-456b98c6517b\"}},\"payload\":{\"data\":{\"accuracy\":{\"orientation\":44.9951935489,\"semiMajor\":1.0,\"semiMinor\":1.0},\"basicType\":\"aPEDESTRIAN\",\"heading\":8898,\"id\":\"24779D7E\",\"msgCnt\":26,\"position\":{\"latitude\":40.2397377,\"longitude\":-74.2761437},\"secMark\":3564,\"speed\":0},\"dataType\":\"us.dot.its.jpo.ode.plugin.j2735.J2735PSM\"}}"; + + @Test + public void shouldDeserializeJson() { + final var deserialized = (OdePsmData)JsonUtils.fromJson(json, OdePsmData.class); + assertNotNull(deserialized); + assertTrue(deserialized.getMetadata() instanceof OdePsmMetadata); + assertTrue(deserialized.getPayload() instanceof OdePsmPayload); + + } + + @Test + public void serializationShouldNotAddClassProperty() { + final var deserialized = (OdePsmData)JsonUtils.fromJson(json, OdePsmData.class); + final String serialized = deserialized.toJson(false); + assertFalse(serialized.contains("@class")); + } + + @Test + public void shouldValidateJson() throws Exception { + final var deserialized = (OdePsmData)JsonUtils.fromJson(json, OdePsmData.class); + final String serialized = deserialized.toJson(false); + + // Load json schema from resource + JsonSchemaFactory factory = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4); + final JsonSchema schema = factory.getSchema(getClass().getClassLoader().getResource("schemas/schema-psm.json").toURI()); + final JsonNode node = (JsonNode)JsonUtils.fromJson(serialized, JsonNode.class); + Set validationMessages = schema.validate(node); + assertEquals(String.format("Json validation errors: %s", validationMessages), 0, validationMessages.size()); + } +} diff --git a/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735AnimalPropelledType.java b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735AnimalPropelledType.java new file mode 100644 index 000000000..39bd049d9 --- /dev/null +++ b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735AnimalPropelledType.java @@ -0,0 +1,8 @@ +package us.dot.its.jpo.ode.plugin.j2735; + +public enum J2735AnimalPropelledType { + UNAVAILABLE, + OTHERTYPES, + ANIMALMOUNTED, + ANIMALDRAWNCARRIAGE +} \ No newline at end of file diff --git a/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735AnimalType.java b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735AnimalType.java new file mode 100644 index 000000000..eb43e2c2f --- /dev/null +++ b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735AnimalType.java @@ -0,0 +1,8 @@ +package us.dot.its.jpo.ode.plugin.j2735; + +public enum J2735AnimalType { + UNAVAILABLE, + SERVICEUSE, + PET, + FARM +} diff --git a/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735Attachment.java b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735Attachment.java new file mode 100644 index 000000000..f17777cdd --- /dev/null +++ b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735Attachment.java @@ -0,0 +1,11 @@ +package us.dot.its.jpo.ode.plugin.j2735; + +public enum J2735Attachment { + UNAVAILABLE, + STROLLER, + BICYCLETRAILER, + CART, + WHEELCHAIR, + OTHERWALKASSISTATTACHMENTS, + PET +} diff --git a/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735DSRCmsgID.java b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735DSRCmsgID.java index 957ff2a54..69a8b0f7b 100644 --- a/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735DSRCmsgID.java +++ b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735DSRCmsgID.java @@ -20,7 +20,7 @@ public enum J2735DSRCmsgID { - BasicSafetyMessage(20), TravelerInformation(31), SPATMessage(19), MAPMessage(18), SSMMessage(30), SRMMessage(29); + BasicSafetyMessage(20), TravelerInformation(31), SPATMessage(19), MAPMessage(18), SSMMessage(30), SRMMessage(29), PersonalSafetyMessage(32); private int msgID; diff --git a/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735HumanPropelledType.java b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735HumanPropelledType.java new file mode 100644 index 000000000..33d497fd3 --- /dev/null +++ b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735HumanPropelledType.java @@ -0,0 +1,11 @@ +package us.dot.its.jpo.ode.plugin.j2735; + + +public enum J2735HumanPropelledType { + UNAVAILABLE, + OTHERTYPES, + ONFOOT, + SKATEBOARD, + PUSHORKICKSCOOTER, + WHEELCHAIR +} \ No newline at end of file diff --git a/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735MotorizedPropelledType.java b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735MotorizedPropelledType.java new file mode 100644 index 000000000..6d2236d9f --- /dev/null +++ b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735MotorizedPropelledType.java @@ -0,0 +1,10 @@ +package us.dot.its.jpo.ode.plugin.j2735; + +public enum J2735MotorizedPropelledType { + UNAVAILABLE, + OTHERTYPES, + WHEELCHAIR, + BICYCLE, + SCOOTER, + SELFBALANCINGDEVICE +} \ No newline at end of file diff --git a/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735NumberOfParticipantsInCluster.java b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735NumberOfParticipantsInCluster.java new file mode 100644 index 000000000..a482657e2 --- /dev/null +++ b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735NumberOfParticipantsInCluster.java @@ -0,0 +1,8 @@ +package us.dot.its.jpo.ode.plugin.j2735; + +public enum J2735NumberOfParticipantsInCluster { + UNAVAILABLE, + SMALL, + MEDIUM, + LARGE +} diff --git a/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735PSM.java b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735PSM.java new file mode 100644 index 000000000..59727542d --- /dev/null +++ b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735PSM.java @@ -0,0 +1,237 @@ +package us.dot.its.jpo.ode.plugin.j2735; + +import us.dot.its.jpo.ode.plugin.asn1.Asn1Object; + +public class J2735PSM extends Asn1Object { + + private static final long serialVersionUID = 1L; + + // REQUIRED parameters + private J2735PersonalDeviceUserType basicType; + private Integer secMark; + private Integer msgCnt; + private String id; + private OdePosition3D position; + private J2735PositionalAccuracy accuracy; + private Integer speed; + private Integer heading; + + // OPTIONAL parameters + private J2735AccelerationSet4Way accelSet; + private J2735PathHistory pathHistory; + private J2735PropelledInformation propulsion; + private J2735BitString useState; + private Boolean crossRequest; + private Boolean crossState; + private J2735NumberOfParticipantsInCluster clusterSize; + private Integer clusterRadius; + private J2735PublicSafetyEventResponderWorkerType eventResponderType; + private J2735PublicSafetyAndRoadWorkerActivity activityType; + private J2735PublicSafetyDirectingTrafficSubType activitySubType; + private J2735PersonalAssistive assistType; + private J2735UserSizeAndBehaviour sizing; + private J2735Attachment attachment; + private Integer attachmentRadius; + private J2735AnimalType animalType; + + public J2735PersonalDeviceUserType getBasicType() { + return this.basicType; + } + + public void setBasicType(J2735PersonalDeviceUserType basicType) { + this.basicType = basicType; + } + + public Integer getSecMark() { + return this.secMark; + } + + public void setSecMark(Integer secMark) { + this.secMark = secMark; + } + + public Integer getMsgCnt() { + return this.msgCnt; + } + + public void setMsgCnt(Integer msgCnt) { + this.msgCnt = msgCnt; + } + + public String getId() { + return this.id; + } + + public void setId(String id) { + this.id = id; + } + + public OdePosition3D getPosition() { + return this.position; + } + + public void setPosition(OdePosition3D position) { + this.position = position; + } + + public J2735PositionalAccuracy getAccuracy() { + return this.accuracy; + } + + public void setAccuracy(J2735PositionalAccuracy accuracy) { + this.accuracy = accuracy; + } + + public Integer getSpeed() { + return this.speed; + } + + public void setSpeed(Integer speed) { + this.speed = speed; + } + + public Integer getHeading() { + return this.heading; + } + + public void setHeading(Integer heading) { + this.heading = heading; + } + + public J2735AccelerationSet4Way getAccelSet() { + return this.accelSet; + } + + public void setAccelSet(J2735AccelerationSet4Way accelSet) { + this.accelSet = accelSet; + } + + public J2735PathHistory getPathHistory() { + return this.pathHistory; + } + + public void setPathHistory(J2735PathHistory pathHistory) { + this.pathHistory = pathHistory; + } + + public J2735PropelledInformation getPropulsion() { + return this.propulsion; + } + + public void setPropulsion(J2735PropelledInformation propulsion) { + this.propulsion = propulsion; + } + + public J2735BitString getUseState() { + return this.useState; + } + + public void setUseState(J2735BitString useState) { + this.useState = useState; + } + + public Boolean isCrossRequest() { + return this.crossRequest; + } + + public Boolean getCrossRequest() { + return this.crossRequest; + } + + public void setCrossRequest(Boolean crossRequest) { + this.crossRequest = crossRequest; + } + + public Boolean isCrossState() { + return this.crossState; + } + + public Boolean getCrossState() { + return this.crossState; + } + + public void setCrossState(Boolean crossState) { + this.crossState = crossState; + } + + public J2735NumberOfParticipantsInCluster getClusterSize() { + return this.clusterSize; + } + + public void setClusterSize(J2735NumberOfParticipantsInCluster clusterSize) { + this.clusterSize = clusterSize; + } + + public Integer getClusterRadius() { + return this.clusterRadius; + } + + public void setClusterRadius(Integer clusterRadius) { + this.clusterRadius = clusterRadius; + } + + public J2735PublicSafetyEventResponderWorkerType getEventResponderType() { + return this.eventResponderType; + } + + public void setEventResponderType(J2735PublicSafetyEventResponderWorkerType eventResponderType) { + this.eventResponderType = eventResponderType; + } + + public J2735PublicSafetyAndRoadWorkerActivity getActivityType() { + return this.activityType; + } + + public void setActivityType(J2735PublicSafetyAndRoadWorkerActivity activityType) { + this.activityType = activityType; + } + + public J2735PublicSafetyDirectingTrafficSubType getActivitySubType() { + return this.activitySubType; + } + + public void setActivitySubType(J2735PublicSafetyDirectingTrafficSubType activitySubType) { + this.activitySubType = activitySubType; + } + + public J2735PersonalAssistive getAssistType() { + return this.assistType; + } + + public void setAssistType(J2735PersonalAssistive assistType) { + this.assistType = assistType; + } + + public J2735UserSizeAndBehaviour getSizing() { + return this.sizing; + } + + public void setSizing(J2735UserSizeAndBehaviour sizing) { + this.sizing = sizing; + } + + public J2735Attachment getAttachment() { + return this.attachment; + } + + public void setAttachment(J2735Attachment attachment) { + this.attachment = attachment; + } + + public Integer getAttachmentRadius() { + return this.attachmentRadius; + } + + public void setAttachmentRadius(Integer attachmentRadius) { + this.attachmentRadius = attachmentRadius; + } + + public J2735AnimalType getAnimalType() { + return this.animalType; + } + + public void setAnimalType(J2735AnimalType animalType) { + this.animalType = animalType; + } + +} diff --git a/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735PersonalAssistive.java b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735PersonalAssistive.java new file mode 100644 index 000000000..bce2b9f15 --- /dev/null +++ b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735PersonalAssistive.java @@ -0,0 +1,10 @@ +package us.dot.its.jpo.ode.plugin.j2735; + +public enum J2735PersonalAssistive { + UNAVAILABLE, + OTHERTYPE, + VISION, + HEARING, + MOVEMENT, + COGNITION +} diff --git a/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735PersonalDeviceUsageState.java b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735PersonalDeviceUsageState.java new file mode 100644 index 000000000..e9da047d5 --- /dev/null +++ b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735PersonalDeviceUsageState.java @@ -0,0 +1,13 @@ +package us.dot.its.jpo.ode.plugin.j2735; + +public enum J2735PersonalDeviceUsageState { + UNAVAILABLE, + OTHER, + IDLE, + LISTENINGTOAUDIO, + TYPING, + CALLING, + PLAYINGGAMES, + READING, + VIEWING +} diff --git a/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735PersonalDeviceUserType.java b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735PersonalDeviceUserType.java new file mode 100644 index 000000000..ed5ef9c61 --- /dev/null +++ b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735PersonalDeviceUserType.java @@ -0,0 +1,9 @@ +package us.dot.its.jpo.ode.plugin.j2735; + +public enum J2735PersonalDeviceUserType { + unavailable, + aPEDESTRIAN, + aPEDALCYCLIST, + aPUBLICSAFETYWORKER, + anANIMAL +} diff --git a/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735PropelledInformation.java b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735PropelledInformation.java new file mode 100644 index 000000000..abe9f3502 --- /dev/null +++ b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735PropelledInformation.java @@ -0,0 +1,37 @@ +package us.dot.its.jpo.ode.plugin.j2735; + +import us.dot.its.jpo.ode.plugin.asn1.Asn1Object; + +public class J2735PropelledInformation extends Asn1Object { + + private static final long serialVersionUID = 1L; + + private J2735HumanPropelledType human; + private J2735AnimalPropelledType animal; + private J2735MotorizedPropelledType motor; + + public J2735HumanPropelledType getHuman() { + return this.human; + } + + public void setHuman(J2735HumanPropelledType human) { + this.human = human; + } + + public J2735AnimalPropelledType getAnimal() { + return this.animal; + } + + public void setAnimal(J2735AnimalPropelledType animal) { + this.animal = animal; + } + + public J2735MotorizedPropelledType getMotor() { + return this.motor; + } + + public void setMotor(J2735MotorizedPropelledType motor) { + this.motor = motor; + } + +} \ No newline at end of file diff --git a/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735PublicSafetyAndRoadWorkerActivity.java b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735PublicSafetyAndRoadWorkerActivity.java new file mode 100644 index 000000000..c2f574110 --- /dev/null +++ b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735PublicSafetyAndRoadWorkerActivity.java @@ -0,0 +1,10 @@ +package us.dot.its.jpo.ode.plugin.j2735; + +public enum J2735PublicSafetyAndRoadWorkerActivity { + UNAVAILABLE, + WORKINGONROAD, + SETTINGUPCLOSURES, + RESPONDINGTOEVENTS, + DIRECTINGTRAFFIC, + OTHERACTIVITIES +} \ No newline at end of file diff --git a/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735PublicSafetyDirectingTrafficSubType.java b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735PublicSafetyDirectingTrafficSubType.java new file mode 100644 index 000000000..73baf7f5c --- /dev/null +++ b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735PublicSafetyDirectingTrafficSubType.java @@ -0,0 +1,10 @@ +package us.dot.its.jpo.ode.plugin.j2735; + +public enum J2735PublicSafetyDirectingTrafficSubType { + UNAVAILABLE, + POLICEANDTRAFFICOFFICERS, + TRAFFICCONTROLPERSONS, + CIVILDEFENSENATIONALGUARDMILITARYPOLICE, + EMERGENCYORGANIZATIONPERSONNEL, + HIGHWAYSERVICEVEHICLEPERSONNEL +} \ No newline at end of file diff --git a/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735PublicSafetyEventResponderWorkerType.java b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735PublicSafetyEventResponderWorkerType.java new file mode 100644 index 000000000..59632aefb --- /dev/null +++ b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735PublicSafetyEventResponderWorkerType.java @@ -0,0 +1,12 @@ +package us.dot.its.jpo.ode.plugin.j2735; + +public enum J2735PublicSafetyEventResponderWorkerType { + UNAVAILABLE, + TOWOPERATER, + FIREANDEMSWORKER, + ADOTWORKER, + LAWENFORCEMENT, + HAZMATRESPONDER, + ANIMALCONTROLWORKER, + OTHERPERSONNEL +} diff --git a/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735UserSizeAndBehaviour.java b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735UserSizeAndBehaviour.java new file mode 100644 index 000000000..609050888 --- /dev/null +++ b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/J2735UserSizeAndBehaviour.java @@ -0,0 +1,9 @@ +package us.dot.its.jpo.ode.plugin.j2735; + +public enum J2735UserSizeAndBehaviour { + UNAVAILABLE, + SMALLSTATURE, + LARGESTATURE, + ERRATICMOVING, + SLOWMOVING +} diff --git a/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/builders/PSMBuilder.java b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/builders/PSMBuilder.java new file mode 100644 index 000000000..c3fefae44 --- /dev/null +++ b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/builders/PSMBuilder.java @@ -0,0 +1,164 @@ +package us.dot.its.jpo.ode.plugin.j2735.builders; + +import com.fasterxml.jackson.databind.JsonNode; + +import us.dot.its.jpo.ode.plugin.j2735.J2735NumberOfParticipantsInCluster; +import us.dot.its.jpo.ode.plugin.j2735.J2735PSM; +import us.dot.its.jpo.ode.plugin.j2735.J2735PersonalAssistive; +import us.dot.its.jpo.ode.plugin.j2735.J2735PersonalDeviceUsageState; +import us.dot.its.jpo.ode.plugin.j2735.J2735PersonalDeviceUserType; +import us.dot.its.jpo.ode.plugin.j2735.J2735PublicSafetyAndRoadWorkerActivity; +import us.dot.its.jpo.ode.plugin.j2735.J2735PublicSafetyDirectingTrafficSubType; +import us.dot.its.jpo.ode.plugin.j2735.J2735PublicSafetyEventResponderWorkerType; +import us.dot.its.jpo.ode.plugin.j2735.J2735UserSizeAndBehaviour; +import us.dot.its.jpo.ode.plugin.j2735.OdePosition3D; +import us.dot.its.jpo.ode.plugin.j2735.J2735AnimalType; +import us.dot.its.jpo.ode.plugin.j2735.J2735Attachment; + +public class PSMBuilder { + + private PSMBuilder() { + throw new UnsupportedOperationException(); + } + + public static J2735PSM genericPSM(JsonNode PSMMessage) { + J2735PSM genericPSM = new J2735PSM(); + JsonNode basicType = PSMMessage.get("basicType"); + if (basicType != null) { + genericPSM.setBasicType(J2735PersonalDeviceUserType.valueOf(basicType.fields().next().getKey())); + } + + JsonNode secMark = PSMMessage.get("secMark"); + if (secMark != null) { + genericPSM.setSecMark(secMark.asInt()); + } + + JsonNode msgCnt = PSMMessage.get("msgCnt"); + if (msgCnt != null) { + genericPSM.setMsgCnt(msgCnt.asInt()); + } + + JsonNode id = PSMMessage.get("id"); + if (id != null) { + genericPSM.setId(id.asText().replaceAll("\\s", "")); + } + + JsonNode position = PSMMessage.get("position"); + if (position != null) { + OdePosition3D positionObj= new OdePosition3D(); + if(position.get("lat") != null) + { + positionObj.setLatitude(LatitudeBuilder.genericLatitude(position.get("lat"))); + } + + if(position.get("long") != null) + { + positionObj.setLongitude(LongitudeBuilder.genericLongitude(position.get("long"))); + } + + if(position.get("elevation") != null) + { + positionObj.setElevation(ElevationBuilder.genericElevation(position.get("elevation"))); + } + genericPSM.setPosition(positionObj); + } + + JsonNode accuracy = PSMMessage.get("accuracy"); + if (accuracy != null) { + genericPSM.setAccuracy(PositionalAccuracyBuilder.genericPositionalAccuracy(accuracy)); + } + + JsonNode speed = PSMMessage.get("speed"); + if (speed != null) { + genericPSM.setSpeed(speed.asInt()); + } + + JsonNode heading = PSMMessage.get("heading"); + if (heading != null) { + genericPSM.setHeading(heading.asInt()); + } + + // Optional Parameters Begins here: + JsonNode accelSet = PSMMessage.get("accelSet"); + if (accelSet != null) { + genericPSM.setAccelSet(AccelerationSet4WayBuilder.genericAccelerationSet4Way(accelSet)); + } + + JsonNode pathHistory = PSMMessage.get("pathHistory"); + if (pathHistory != null) { + genericPSM.setPathHistory(PathHistoryBuilder.genericPathHistory(pathHistory)); + } + + JsonNode propulsion = PSMMessage.get("propulsion"); + if (propulsion != null) { + genericPSM.setPropulsion(PropelledInformationBuilder.genericPropelledInformation(propulsion)); + } + + JsonNode useState = PSMMessage.get("useState"); + if (useState != null) { + genericPSM.setUseState(BitStringBuilder.genericBitString(useState,J2735PersonalDeviceUsageState.values())); + } + + JsonNode crossRequest = PSMMessage.get("crossRequest"); + if (crossRequest != null) { + genericPSM.setCrossRequest(crossRequest.asBoolean()); + } + + JsonNode crossState = PSMMessage.get("crossState"); + if (crossState != null) { + genericPSM.setCrossState(crossState.asBoolean()); + } + + JsonNode clusterSize = PSMMessage.get("clusterSize"); + if (crossState != null) { + genericPSM.setClusterSize(J2735NumberOfParticipantsInCluster.valueOf(clusterSize.fields().next().getKey().toUpperCase())); + } + + JsonNode clusterRadius = PSMMessage.get("clusterRadius"); + if (clusterRadius != null) { + genericPSM.setClusterRadius(clusterRadius.asInt()); + } + + JsonNode eventResponderType = PSMMessage.get("eventResponderType"); + if (eventResponderType != null) { + genericPSM.setEventResponderType(J2735PublicSafetyEventResponderWorkerType.valueOf(eventResponderType.fields().next().getKey().toUpperCase())); + } + + JsonNode activityType = PSMMessage.get("activityType"); + if (activityType != null) { + genericPSM.setActivityType(J2735PublicSafetyAndRoadWorkerActivity.valueOf(activityType.fields().next().getKey().toUpperCase())); + } + + JsonNode activitySubType = PSMMessage.get("activitySubType"); + if (activitySubType != null) { + genericPSM.setActivitySubType(J2735PublicSafetyDirectingTrafficSubType.valueOf(activitySubType.fields().next().getKey().toUpperCase())); + } + + JsonNode assistType = PSMMessage.get("assistType"); + if (assistType != null) { + genericPSM.setAssistType(J2735PersonalAssistive.valueOf(assistType.fields().next().getKey().toUpperCase())); + } + + JsonNode sizing = PSMMessage.get("sizing"); + if (sizing != null) { + genericPSM.setSizing(J2735UserSizeAndBehaviour.valueOf(assistType.fields().next().getKey().toUpperCase())); + } + + JsonNode attachment = PSMMessage.get("attachment"); + if (attachment != null) { + genericPSM.setAttachment(J2735Attachment.valueOf(attachment.fields().next().getKey().toUpperCase())); + } + + JsonNode attachmentRadius = PSMMessage.get("attachmentRadius"); + if (attachmentRadius != null) { + genericPSM.setAttachmentRadius(attachmentRadius.asInt()); + } + + JsonNode animalType = PSMMessage.get("animalType"); + if (animalType != null) { + genericPSM.setAnimalType(J2735AnimalType.valueOf(animalType.fields().next().getKey().toUpperCase())); + } + + return genericPSM; + } +} diff --git a/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/builders/PropelledInformationBuilder.java b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/builders/PropelledInformationBuilder.java new file mode 100644 index 000000000..be4553472 --- /dev/null +++ b/jpo-ode-plugins/src/main/java/us/dot/its/jpo/ode/plugin/j2735/builders/PropelledInformationBuilder.java @@ -0,0 +1,38 @@ +package us.dot.its.jpo.ode.plugin.j2735.builders; + +import com.fasterxml.jackson.databind.JsonNode; + +import us.dot.its.jpo.ode.plugin.j2735.J2735HumanPropelledType; +import us.dot.its.jpo.ode.plugin.j2735.J2735AnimalPropelledType; +import us.dot.its.jpo.ode.plugin.j2735.J2735MotorizedPropelledType; + +import us.dot.its.jpo.ode.plugin.j2735.J2735PropelledInformation; + +public class PropelledInformationBuilder { + + private PropelledInformationBuilder() { + throw new UnsupportedOperationException(); + } + + public static J2735PropelledInformation genericPropelledInformation(JsonNode propelledInformation) { + J2735PropelledInformation pi = new J2735PropelledInformation(); + + JsonNode human = propelledInformation.get("human"); + if (human != null){ + pi.setHuman(J2735HumanPropelledType.valueOf(human.asText().toUpperCase())); + } + + JsonNode animal = propelledInformation.get("animal"); + if (animal != null){ + pi.setAnimal(J2735AnimalPropelledType.valueOf(animal.asText().toUpperCase())); + } + + JsonNode motor = propelledInformation.get("motor"); + if (motor != null){ + pi.setMotor(J2735MotorizedPropelledType.valueOf(motor.asText().toUpperCase())); + } + + return pi; + } + +} diff --git a/jpo-ode-plugins/src/test/java/us/dot/its/jpo/ode/plugin/j2735/builders/PropelledInformationBuilderTest.java b/jpo-ode-plugins/src/test/java/us/dot/its/jpo/ode/plugin/j2735/builders/PropelledInformationBuilderTest.java new file mode 100644 index 000000000..acfab385b --- /dev/null +++ b/jpo-ode-plugins/src/test/java/us/dot/its/jpo/ode/plugin/j2735/builders/PropelledInformationBuilderTest.java @@ -0,0 +1,82 @@ +/******************************************************************************* + * Copyright 2018 572682 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy + * of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + ******************************************************************************/ +package us.dot.its.jpo.ode.plugin.j2735.builders; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import com.fasterxml.jackson.databind.JsonNode; + +import us.dot.its.jpo.ode.plugin.j2735.J2735PropelledInformation; + +import us.dot.its.jpo.ode.util.XmlUtils; +import us.dot.its.jpo.ode.util.XmlUtils.XmlUtilsException; + +public class PropelledInformationBuilderTest { + + @Test + public void shouldTranslatePropelledinformationHuman() { + JsonNode jsonPropulsion = null; + try { + jsonPropulsion = XmlUtils.toObjectNode( + "SKATEBOARDUNAVAILABLEUNAVAILABLE"); + } catch (XmlUtilsException e) { + fail("XML parsing error:" + e); + } + + J2735PropelledInformation actualPropulsion = PropelledInformationBuilder.genericPropelledInformation(jsonPropulsion.get("propulsion")); + + assertNotNull(actualPropulsion); + String expected = "{\"human\":\"SKATEBOARD\",\"animal\":\"UNAVAILABLE\",\"motor\":\"UNAVAILABLE\"}"; + assertEquals(expected , actualPropulsion.toString()); + } + + @Test + public void shouldTranslatePropelledinformationAnimal() { + JsonNode jsonPropulsion = null; + try { + jsonPropulsion = XmlUtils.toObjectNode( + "UNAVAILABLEANIMALDRAWNCARRIAGEUNAVAILABLE"); + } catch (XmlUtilsException e) { + fail("XML parsing error:" + e); + } + + J2735PropelledInformation actualPropulsion = PropelledInformationBuilder.genericPropelledInformation(jsonPropulsion.get("propulsion")); + + assertNotNull(actualPropulsion); + String expected = "{\"human\":\"UNAVAILABLE\",\"animal\":\"ANIMALDRAWNCARRIAGE\",\"motor\":\"UNAVAILABLE\"}"; + assertEquals(expected , actualPropulsion.toString()); + } + + @Test + public void shouldTranslatePropelledinformationMotor() { + JsonNode jsonPropulsion = null; + try { + jsonPropulsion = XmlUtils.toObjectNode( + "UNAVAILABLEUNAVAILABLESELFBALANCINGDEVICE"); + } catch (XmlUtilsException e) { + fail("XML parsing error:" + e); + } + + J2735PropelledInformation actualPropulsion = PropelledInformationBuilder.genericPropelledInformation(jsonPropulsion.get("propulsion")); + + assertNotNull(actualPropulsion); + String expected = "{\"human\":\"UNAVAILABLE\",\"animal\":\"UNAVAILABLE\",\"motor\":\"SELFBALANCINGDEVICE\"}"; + assertEquals(expected , actualPropulsion.toString()); + } + +} diff --git a/jpo-ode-plugins/src/test/java/us/dot/its/jpo/ode/plugin/j2735/builders/PsmBuilderTest.java b/jpo-ode-plugins/src/test/java/us/dot/its/jpo/ode/plugin/j2735/builders/PsmBuilderTest.java new file mode 100644 index 000000000..8f1f5510c --- /dev/null +++ b/jpo-ode-plugins/src/test/java/us/dot/its/jpo/ode/plugin/j2735/builders/PsmBuilderTest.java @@ -0,0 +1,48 @@ +/******************************************************************************* + * Copyright 2018 572682 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy + * of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + ******************************************************************************/ +package us.dot.its.jpo.ode.plugin.j2735.builders; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import com.fasterxml.jackson.databind.JsonNode; + +import us.dot.its.jpo.ode.plugin.j2735.J2735PSM; +import us.dot.its.jpo.ode.util.XmlUtils; +import us.dot.its.jpo.ode.util.XmlUtils.XmlUtilsException; + +public class PsmBuilderTest { + + @Test + public void shouldTranslatePSM() { + + JsonNode jsonPsm = null; + try { + jsonPsm = XmlUtils.toObjectNode( + "MessageFrame3235642624779D7E402397377-7427614372020819108898"); + } catch (XmlUtilsException e) { + fail("XML parsing error:" + e); + } + + J2735PSM actualPsm = PSMBuilder.genericPSM(jsonPsm.findValue("PersonalSafetyMessage")); + + assertNotNull(actualPsm); + String expected = "{\"basicType\":\"aPEDESTRIAN\",\"secMark\":3564,\"msgCnt\":26,\"id\":\"24779D7E\",\"position\":{\"latitude\":40.2397377,\"longitude\":-74.2761437},\"accuracy\":{\"semiMajor\":1.00,\"semiMinor\":1.00,\"orientation\":44.9951935489},\"speed\":0,\"heading\":8898}"; + assertEquals(expected , actualPsm.toString()); + } + +} diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/OdeProperties.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/OdeProperties.java index 1058a22f5..bf21c071a 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/OdeProperties.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/OdeProperties.java @@ -172,6 +172,13 @@ public class OdeProperties implements EnvironmentAware { private String kafkaTopicOdeMapJson = "topic.OdeMapJson"; private int mapReceiverPort = 44920; private int mapBufferSize = 2048; + + // PSM + private String kafkaTopicOdeRawEncodedPSMJson = "topic.OdeRawEncodedPSMJson"; + private String kafkaTopicOdePsmTxPojo = "topic.OdePsmTxPojo"; + private String kafkaTopicOdePsmJson = "topic.OdePsmJson"; + private int psmReceiverPort = 44940; + private int psmBufferSize = 500; // DriverAlerts private String kafkaTopicDriverAlertJson = "topic.OdeDriverAlertJson"; @@ -474,6 +481,22 @@ public void setMapBufferSize(int mapBufferSize) { this.mapBufferSize = mapBufferSize; } + public int getPsmReceiverPort() { + return psmReceiverPort; + } + + public void setPsmReceiverPort(int psmReceiverPort) { + this.psmReceiverPort = psmReceiverPort; + } + + public int getPsmBufferSize() { + return psmBufferSize; + } + + public void setPsmBufferSize(int psmBufferSize) { + this.psmBufferSize = psmBufferSize; + } + public String getDdsCasUrl() { return ddsCasUrl; } @@ -869,6 +892,34 @@ public void setKafkaTopicOdeMapJson(String kafkaTopicOdeMapJson) { this.kafkaTopicOdeMapJson = kafkaTopicOdeMapJson; } + public String getKafkaTopicOdeRawEncodedPSMJson() { + return kafkaTopicOdeRawEncodedPSMJson; + } + + public void setKafkaTopicOdeRawEncodedPSMJson(String kafkaTopicOdeRawEncodedPSMJson) { + this.kafkaTopicOdeRawEncodedPSMJson = kafkaTopicOdeRawEncodedPSMJson; + } + + + public String getKafkaTopicOdePsmTxPojo() { + return kafkaTopicOdePsmTxPojo; + } + + + public void setKafkaTopicOdePsmTxPojo(String kafkaTopicOdePsmTxPojo) { + this.kafkaTopicOdePsmTxPojo = kafkaTopicOdePsmTxPojo; + } + + + public String getKafkaTopicOdePsmJson() { + return kafkaTopicOdePsmJson; + } + + + public void setKafkaTopicOdePsmJson(String kafkaTopicOdePsmJson) { + this.kafkaTopicOdePsmJson = kafkaTopicOdePsmJson; + } + public String getKafkaTopicOdeRawEncodedSSMJson() { return kafkaTopicOdeRawEncodedSSMJson; } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/coder/OdePsmDataCreatorHelper.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/coder/OdePsmDataCreatorHelper.java new file mode 100644 index 000000000..a72228336 --- /dev/null +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/coder/OdePsmDataCreatorHelper.java @@ -0,0 +1,59 @@ +package us.dot.its.jpo.ode.coder; + +import java.io.IOException; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import us.dot.its.jpo.ode.context.AppContext; +import us.dot.its.jpo.ode.model.OdePsmData; +import us.dot.its.jpo.ode.model.OdePsmMetadata; +import us.dot.its.jpo.ode.model.OdePsmPayload; +import us.dot.its.jpo.ode.model.ReceivedMessageDetails; +import us.dot.its.jpo.ode.model.RxSource; +import us.dot.its.jpo.ode.plugin.j2735.builders.PSMBuilder; +import us.dot.its.jpo.ode.util.JsonUtils; +import us.dot.its.jpo.ode.util.XmlUtils; +import us.dot.its.jpo.ode.util.XmlUtils.XmlUtilsException; + +public class OdePsmDataCreatorHelper { + + public OdePsmDataCreatorHelper() { + } + + public static OdePsmData createOdePsmData(String consumedData) throws XmlUtilsException { + ObjectNode consumed = XmlUtils.toObjectNode(consumedData); + + JsonNode metadataNode = consumed.findValue(AppContext.METADATA_STRING); + if (metadataNode instanceof ObjectNode) { + ObjectNode object = (ObjectNode) metadataNode; + object.remove(AppContext.ENCODINGS_STRING); + + // Psm header file does not have a location and use predefined set required + // RxSource + ReceivedMessageDetails receivedMessageDetails = new ReceivedMessageDetails(); + receivedMessageDetails.setRxSource(RxSource.NA); + ObjectMapper objectPsmper = new ObjectMapper(); + JsonNode jsonNode; + try { + jsonNode = objectPsmper.readTree(receivedMessageDetails.toJson()); + object.set(AppContext.RECEIVEDMSGDETAILS_STRING, jsonNode); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + OdePsmMetadata metadata = (OdePsmMetadata) JsonUtils.fromJson(metadataNode.toString(), OdePsmMetadata.class); + + if (metadata.getSchemaVersion() <= 4) { + metadata.setReceivedMessageDetails(null); + } + + OdePsmPayload payload = new OdePsmPayload(PSMBuilder.genericPSM(consumed.findValue("PersonalSafetyMessage"))); + return new OdePsmData(metadata, payload); + } +} diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/asn1/Asn1DecodedDataRouter.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/asn1/Asn1DecodedDataRouter.java index 451c595df..f2b9c393d 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/asn1/Asn1DecodedDataRouter.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/asn1/Asn1DecodedDataRouter.java @@ -25,6 +25,7 @@ import us.dot.its.jpo.ode.coder.OdeSpatDataCreatorHelper; import us.dot.its.jpo.ode.coder.OdeSsmDataCreatorHelper; import us.dot.its.jpo.ode.coder.OdeSrmDataCreatorHelper; +import us.dot.its.jpo.ode.coder.OdePsmDataCreatorHelper; import us.dot.its.jpo.ode.context.AppContext; import us.dot.its.jpo.ode.model.OdeAsn1Data; import us.dot.its.jpo.ode.model.OdeBsmData; @@ -47,6 +48,7 @@ public class Asn1DecodedDataRouter extends AbstractSubscriberProcessor mapProducer; private MessageProducer ssmProducer; private MessageProducer srmProducer; + private MessageProducer psmProducer; public Asn1DecodedDataRouter(OdeProperties odeProps) { super(); @@ -63,6 +65,8 @@ public Asn1DecodedDataRouter(OdeProperties odeProps) { odeProps.getKafkaProducerType(), odeProperties.getKafkaTopicsDisabledSet()); this.srmProducer = MessageProducer.defaultStringMessageProducer(odeProps.getKafkaBrokers(), odeProps.getKafkaProducerType(), odeProperties.getKafkaTopicsDisabledSet()); + this.psmProducer = MessageProducer.defaultStringMessageProducer(odeProps.getKafkaBrokers(), + odeProps.getKafkaProducerType(), odeProperties.getKafkaTopicsDisabledSet()); } @Override @@ -135,6 +139,14 @@ public Object process(String consumedData) { // Send all SRMs also to OdeSrmJson srmProducer.send(odeProperties.getKafkaTopicOdeSrmJson(), getRecord().key(), odeSrmData); logger.debug("Submitted to SRM Pojo topic"); + } else if (messageId == J2735DSRCmsgID.PersonalSafetyMessage.getMsgID()) { + String odePsmData = OdePsmDataCreatorHelper.createOdePsmData(consumedData).toString(); + if (recordType == RecordType.psmTx) { + psmProducer.send(odeProperties.getKafkaTopicOdePsmTxPojo(), getRecord().key(), odePsmData); + } + // Send all PSMs also to OdePsmJson + psmProducer.send(odeProperties.getKafkaTopicOdePsmJson(), getRecord().key(), odePsmData); + logger.debug("Submitted to PSM Pojo topic"); } } catch (Exception e) { logger.error("Failed to route received data: " + consumedData, e); diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/asn1/message/Asn1DecodePSMJSON.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/asn1/message/Asn1DecodePSMJSON.java new file mode 100644 index 000000000..59b519c2e --- /dev/null +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/asn1/message/Asn1DecodePSMJSON.java @@ -0,0 +1,95 @@ +package us.dot.its.jpo.ode.services.asn1.message; + +import java.util.Set; + +import org.json.JSONArray; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import us.dot.its.jpo.ode.OdeProperties; +import us.dot.its.jpo.ode.coder.StringPublisher; +import us.dot.its.jpo.ode.model.Asn1Encoding; +import us.dot.its.jpo.ode.model.Asn1Encoding.EncodingRule; +import us.dot.its.jpo.ode.model.OdeAsn1Data; +import us.dot.its.jpo.ode.model.OdeAsn1Payload; +import us.dot.its.jpo.ode.model.OdeData; +import us.dot.its.jpo.ode.model.OdeHexByteArray; +import us.dot.its.jpo.ode.model.OdeLogMetadata.RecordType; +import us.dot.its.jpo.ode.model.OdeLogMetadata.SecurityResultCode; +import us.dot.its.jpo.ode.model.OdePsmMetadata; +import us.dot.its.jpo.ode.model.OdePsmMetadata.PsmSource; +import us.dot.its.jpo.ode.model.OdeMsgPayload; + +/*** + * Encoded message Processor + */ +public class Asn1DecodePSMJSON extends AbstractAsn1DecodeMessageJSON { + private static final String PSMContentType = "PsmMessageContent"; + + private Logger logger = LoggerFactory.getLogger(this.getClass()); + + + public Asn1DecodePSMJSON(OdeProperties odeProps) { + super(new StringPublisher(odeProps)); + } + + @Override + protected Object process(String consumedData) { + + OdeData odeData = null; + OdeMsgPayload payload = null; + + try { + logger.info("Processing PSM data"); + logger.debug("PSM data: {}", consumedData); + JSONObject rawJSONObject = new JSONObject(consumedData); + Set keys = rawJSONObject.keySet(); + for (Object key : keys) + { + //Send encoded PSM content to Codec service to decode PSM + if (key != null && key.toString().equals(PSMContentType)) { + OdePsmMetadata metadata = null; + + JSONArray rawPSMJsonContentArray = rawJSONObject.getJSONArray(PSMContentType); + for (int i = 0; i < rawPSMJsonContentArray.length(); i++) { + JSONObject rawPSMJsonContent = (JSONObject) rawPSMJsonContentArray.get(i); + String encodedPayload = rawPSMJsonContent.get("payload").toString(); + JSONObject rawmetadata = (JSONObject) rawPSMJsonContent.get("metadata"); + logger.debug("RAW PSM: {}", encodedPayload); + // construct payload + payload = new OdeAsn1Payload(new OdeHexByteArray(encodedPayload)); + + // construct metadata + metadata = new OdePsmMetadata(payload); + metadata.setOdeReceivedAt(rawmetadata.getString("utctimestamp")); + metadata.setOriginIp(rawmetadata.getString("originRsu")); + metadata.setRecordType(RecordType.psmTx); + metadata.setSecurityResultCode(SecurityResultCode.success); + + if (rawmetadata.getString("source").equals("RSU")) + metadata.setPsmSource(PsmSource.RSU); + else + metadata.setPsmSource(PsmSource.V2X); + + Asn1Encoding unsecuredDataEncoding = new Asn1Encoding("unsecuredData", "MessageFrame", + EncodingRule.UPER); + metadata.addEncoding(unsecuredDataEncoding); + + // construct odeData + odeData = new OdeAsn1Data(metadata, payload); + + publishEncodedMessageToAsn1Decoder(odeData); + } + + } + else { + logger.error("Error received invalid key from consumed message"); + } + } + } catch (Exception e) { + logger.error("Error publishing to Asn1DecoderInput: {}", e.getMessage()); + } + return null; + } +} diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/asn1/message/AsnCodecMessageServiceController.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/asn1/message/AsnCodecMessageServiceController.java index f05c8a95a..e7954ca46 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/asn1/message/AsnCodecMessageServiceController.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/asn1/message/AsnCodecMessageServiceController.java @@ -72,7 +72,15 @@ public AsnCodecMessageServiceController(OdeProperties odeProps) { Asn1DecodeMAPJSON asn1DecodeMAPSON = new Asn1DecodeMAPJSON(odeProps); MessageConsumer asn1RawMAPJSONConsumer = MessageConsumer.defaultStringMessageConsumer( odeProps.getKafkaBrokers(), this.getClass().getSimpleName(), asn1DecodeMAPSON); - asn1RawMAPJSONConsumer.setName("asn1DecodeMAPSON"); + asn1RawMAPJSONConsumer.setName("asn1DecodeMAPJSON"); asn1DecodeMAPSON.start(asn1RawMAPJSONConsumer, odeProps.getKafkaTopicOdeRawEncodedMAPJson()); + + //PSM + logger.info("Send encoded PSM to ASN.1 Decoder"); + Asn1DecodePSMJSON asn1DecodePSMSON = new Asn1DecodePSMJSON(odeProps); + MessageConsumer asn1RawPSMJSONConsumer = MessageConsumer.defaultStringMessageConsumer( + odeProps.getKafkaBrokers(), this.getClass().getSimpleName(), asn1DecodePSMSON); + asn1RawPSMJSONConsumer.setName("asn1DecodePSMJSON"); + asn1DecodePSMSON.start(asn1RawPSMJSONConsumer, odeProps.getKafkaTopicOdeRawEncodedPSMJson()); } } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/controller/UdpServicesController.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/controller/UdpServicesController.java index fbbec3aca..f16c657bd 100644 --- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/controller/UdpServicesController.java +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/controller/UdpServicesController.java @@ -12,6 +12,7 @@ import us.dot.its.jpo.ode.udp.srm.SrmReceiver; import us.dot.its.jpo.ode.udp.spat.SpatReceiver; import us.dot.its.jpo.ode.udp.map.MapReceiver; +import us.dot.its.jpo.ode.udp.psm.PsmReceiver; /** * Centralized UDP service dispatcher. @@ -49,6 +50,9 @@ public UdpServicesController(OdeProperties odeProps) { // MAP internal port rm.submit(new MapReceiver(odeProps)); + // PSM internal port + rm.submit(new PsmReceiver(odeProps)); + logger.debug("UDP receiver services started."); } } diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/psm/PsmReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/psm/PsmReceiver.java new file mode 100644 index 000000000..4cea6ae6a --- /dev/null +++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/psm/PsmReceiver.java @@ -0,0 +1,126 @@ +package us.dot.its.jpo.ode.udp.psm; + +import java.net.DatagramPacket; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import org.json.JSONArray; +import org.json.JSONObject; + +import org.apache.tomcat.util.buf.HexUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +import us.dot.its.jpo.ode.coder.StringPublisher; +import us.dot.its.jpo.ode.OdeProperties; +import us.dot.its.jpo.ode.udp.AbstractUdpReceiverPublisher; + +public class PsmReceiver extends AbstractUdpReceiverPublisher { + private static Logger logger = LoggerFactory.getLogger(PsmReceiver.class); + + private static final String PSM_START_FLAG = "0020"; // these bytes indicate + // start of PSM payload + private static final int HEADER_MINIMUM_SIZE = 6; // WSMP headers are at + // least 20 bytes long + + private StringPublisher psmPublisher; + + @Autowired + public PsmReceiver(OdeProperties odeProps) { + this(odeProps, odeProps.getPsmReceiverPort(), odeProps.getPsmBufferSize()); + + this.psmPublisher = new StringPublisher(odeProps); + } + + public PsmReceiver(OdeProperties odeProps, int port, int bufferSize) { + super(odeProps, port, bufferSize); + + this.psmPublisher = new StringPublisher(odeProps); + } + + @Override + public void run() { + + logger.debug("PSM UDP Receiver Service started."); + + byte[] buffer = new byte[bufferSize]; + + DatagramPacket packet = new DatagramPacket(buffer, buffer.length); + + do { + try { + logger.debug("Waiting for UDP PSM packets..."); + socket.receive(packet); + if (packet.getLength() > 0) { + senderIp = packet.getAddress().getHostAddress(); + senderPort = packet.getPort(); + logger.debug("Packet received from {}:{}", senderIp, senderPort); + + // extract the actualPacket from the buffer + byte[] payload = removeHeader(packet.getData()); + if (payload == null) + continue; + String payloadHexString = HexUtils.toHexString(payload); + logger.debug("Packet: {}", payloadHexString); + + // Add header data for the decoding process + ZonedDateTime utc = ZonedDateTime.now(ZoneOffset.UTC); + String timestamp = utc.format(DateTimeFormatter.ISO_INSTANT); + + JSONObject metadataObject = new JSONObject(); + metadataObject.put("utctimestamp", timestamp); + metadataObject.put("originRsu", senderIp); + metadataObject.put("source", "RSU"); + + JSONObject messageObject = new JSONObject(); + messageObject.put("metadata", metadataObject); + messageObject.put("payload", payloadHexString); + + JSONArray messageList = new JSONArray(); + messageList.put(messageObject); + + JSONObject jsonObject = new JSONObject(); + jsonObject.put("PsmMessageContent", messageList); + + logger.debug("PSM JSON Object: {}", jsonObject.toString()); + + // Submit JSON to the OdeRawEncodedMessageJson Kafka Topic + this.psmPublisher.publish(jsonObject.toString(), + this.psmPublisher.getOdeProperties().getKafkaTopicOdeRawEncodedPSMJson()); + } + } catch (Exception e) { + logger.error("Error receiving packet", e); + } + } while (!isStopped()); + } + + /** + * Attempts to strip WSMP header bytes. If message starts with "0013", message + * is raw SPAT. Otherwise, headers are >= 20 bytes, so look past that for start + * of payload SPAT. + * + * @param packet + */ + public byte[] removeHeader(byte[] packet) { + String hexPacket = HexUtils.toHexString(packet); + logger.debug("PSM packet: {}", hexPacket); + + int startIndex = hexPacket.indexOf(PSM_START_FLAG); + if (startIndex == 0) { + logger.debug("Message is raw PSM with no headers."); + } else if (startIndex == -1) { + logger.error("Message contains no PSM start flag."); + return null; + } else { + // We likely found a message with a header, look past the first 20 + // bytes for the start of the PSM + int trueStartIndex = HEADER_MINIMUM_SIZE + + hexPacket.substring(HEADER_MINIMUM_SIZE, hexPacket.length()).indexOf(PSM_START_FLAG); + hexPacket = hexPacket.substring(trueStartIndex, hexPacket.length()); + } + logger.debug("PSM packet substring: {}", hexPacket); + + return HexUtils.fromHexString(hexPacket); + } +} diff --git a/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/coder/OdePsmDataCreatorHelperTest.java b/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/coder/OdePsmDataCreatorHelperTest.java new file mode 100644 index 000000000..843edc837 --- /dev/null +++ b/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/coder/OdePsmDataCreatorHelperTest.java @@ -0,0 +1,30 @@ +package us.dot.its.jpo.ode.coder; + +import static org.junit.Assert.assertNotNull; + +import org.junit.Test; + +import us.dot.its.jpo.ode.model.OdePsmData; +import us.dot.its.jpo.ode.util.XmlUtils.XmlUtilsException; + +public class OdePsmDataCreatorHelperTest { + @Test + public void testConstructor() { + OdePsmDataCreatorHelper helper = new OdePsmDataCreatorHelper(); + assertNotNull(helper); + } + + @Test + public void testCreateOdeSrmData() { + String consumedData = "psmTxsuccessunsecuredDataMessageFrameUPERus.dot.its.jpo.ode.model.OdeAsn1Payload884b5a67-b92b-4b54-a6cd-456d342d691610002023-09-21T16:57:41.776089Z60falseRSU192.168.32.1MessageFrame3235642624779D7E402397377-7427614372020819108898"; + + OdePsmData psmData; + try { + psmData = OdePsmDataCreatorHelper.createOdePsmData(consumedData); + assertNotNull(psmData); + } catch (XmlUtilsException e) { + e.printStackTrace(); + } + + } +} diff --git a/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/services/asn1/message/Asn1DecodePSMJSONTest.java b/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/services/asn1/message/Asn1DecodePSMJSONTest.java new file mode 100644 index 000000000..e70868742 --- /dev/null +++ b/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/services/asn1/message/Asn1DecodePSMJSONTest.java @@ -0,0 +1,51 @@ +package us.dot.its.jpo.ode.services.asn1.message; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.json.JSONException; +import org.json.JSONObject; +import org.junit.jupiter.api.Test; + +import us.dot.its.jpo.ode.OdeProperties; +import us.dot.its.jpo.ode.model.OdeAsn1Payload; +import us.dot.its.jpo.ode.model.OdeData; +import us.dot.its.jpo.ode.model.OdeHexByteArray; +import us.dot.its.jpo.ode.model.OdePsmMetadata; +import us.dot.its.jpo.ode.util.XmlUtils.XmlUtilsException; + +public class Asn1DecodePSMJSONTest { + + private final String json = "{\"PsmMessageContent\": [{ \"metadata\": { \"utctimestamp:\"2020-11-30T23:45:24.913657Z\" }, \"payload\":\"011d0000201a0000024ea0dc91de75f84da102c23f042dc41414ffff0006ba1000270000000111f7ca7986010000\"}]}"; + private JSONObject jsonObj = new JSONObject(); + + @Test + public void testConstructor() { + OdeProperties properties = new OdeProperties(); + properties.setKafkaBrokers("localhost:9092"); + assertEquals(properties.getKafkaTopicOdePsmJson(), "topic.OdePsmJson"); + } + + @Test + public void testProcessPsmJson() throws XmlUtilsException, JSONException { + OdeProperties properties = new OdeProperties(); + properties.setKafkaBrokers("localhost:9092"); + Asn1DecodePSMJSON testDecodePsmJson = new Asn1DecodePSMJSON(properties); + assertEquals(testDecodePsmJson.process(json), null); + + // metadata + OdeData obj = new OdeData(); + OdePsmMetadata jsonMetadataObj = new OdePsmMetadata(); + jsonMetadataObj.setOdeReceivedAt("2020-11-30T23:45:24.913657Z"); + jsonObj.put("metadata", jsonMetadataObj); + + // payload + String encodedPayload = "011d0000201a0000024ea0dc91de75f84da102c23f042dc41414ffff0006ba1000270000000111f7ca7986010000"; + obj.setMetadata(jsonMetadataObj); + obj.setPayload(new OdeAsn1Payload(new OdeHexByteArray(encodedPayload))); + + assertEquals("{\"bytes\":\"011d0000201a0000024ea0dc91de75f84da102c23f042dc41414ffff0006ba1000270000000111f7ca7986010000\"}", + obj.getPayload().getData().toJson()); + + } + +} diff --git a/scripts/tests/udpsender_psm.py b/scripts/tests/udpsender_psm.py new file mode 100644 index 000000000..c0ef020b7 --- /dev/null +++ b/scripts/tests/udpsender_psm.py @@ -0,0 +1,20 @@ +import socket +import time +import os + +# Currently set to oim-dev environment's ODE +# UDP_IP = os.getenv('DOCKER_HOST_IP') +UDP_IP = os.getenv('DOCKER_HOST_IP') +UDP_PORT = 44940 +MESSAGE = "011d0000201a0000021bd86891de75f84da101c13f042e2214141fff00022c2000270000000163b2cc7986010000" + +print("UDP target IP:", UDP_IP) +print("UDP target port:", UDP_PORT) +#print("message:", MESSAGE) + +sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP + +while True: + time.sleep(5) + print("sending PSM every 5 second") + sock.sendto(bytes.fromhex(MESSAGE), (UDP_IP, UDP_PORT))