Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Addition of Expiration Date meta data for tracking in the SDX #532

Merged
merged 11 commits into from
Feb 5, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public enum TimeToLive {
private String deliverystart;
private String deliverystop;
private String recordId = null;
private String estimatedRemovalDate;

public OdeGeoRegion getServiceRegion() {
return serviceRegion;
Expand Down Expand Up @@ -88,6 +89,14 @@ public void setRecordId(String recordId) {
this.recordId = recordId;
}

public String getEstimatedRemovalDate(){
return estimatedRemovalDate;
}

public void setEstimatedRemovalDate(String estimatedRemovalDate){
this.estimatedRemovalDate = estimatedRemovalDate;
}

@Override
public int hashCode() {
final int prime = 31;
Expand All @@ -98,6 +107,7 @@ public int hashCode() {
result = prime * result + ((recordId == null) ? 0 : recordId.hashCode());
result = prime * result + ((serviceRegion == null) ? 0 : serviceRegion.hashCode());
result = prime * result + ((ttl == null) ? 0 : ttl.hashCode());
result = prime * result + ((estimatedRemovalDate == null) ? 0 : estimatedRemovalDate.hashCode());
return result;
}

Expand Down Expand Up @@ -137,6 +147,11 @@ public boolean equals(Object obj) {
return false;
if (ttl != other.ttl)
return false;
if (estimatedRemovalDate == null) {
if (other.estimatedRemovalDate != null)
return false;
} else if (!estimatedRemovalDate.equals(other.estimatedRemovalDate))
return false;
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,27 +104,27 @@ public Asn1CommandManager(OdeProperties odeProperties) {

}

public void depositToSdw(String asdBytes) throws Asn1CommandManagerException {
public void depositToSdw(String depositObj) throws Asn1CommandManagerException {

if (this.odeProperties.shouldDepositSdwMessagesOverWebsocket()) {
try {
depositor.deposit(asdBytes);
depositor.deposit(depositObj);

logger.info("Deposited message to SDW directly via websocket");
logger.debug("Message deposited: {}", asdBytes);
logger.debug("Message deposited: {}", depositObj);
EventLogger.logger.info("Deposited message to SDW directly via websocket");
EventLogger.logger.debug("Message deposited: {}", asdBytes);
EventLogger.logger.debug("Message deposited: {}", depositObj);
} catch (DdsRequestManagerException e) {
String msg = "Failed to deposit message to SDW";
throw new Asn1CommandManagerException(msg, e);
}
} else {
stringMessageProducer.send(this.getDepositTopic(), null, asdBytes);
stringMessageProducer.send(this.getDepositTopic(), null, depositObj);

logger.info("Published message to SDW deposit topic");
EventLogger.logger.info("Published message to SDW deposit topic");
logger.debug("Message deposited: {}", asdBytes);
EventLogger.logger.debug("Message deposited: {}", asdBytes);
logger.debug("Message deposited: {}", depositObj);
EventLogger.logger.debug("Message deposited: {}", depositObj);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ public class Asn1EncodedDataRouter extends AbstractSubscriberProcessor<String, S

private static final String BYTES = "bytes";

private static final String MESSAGE_FRAME = "MessageFrame";
private static final String MESSAGE_FRAME = "MessageFrame";

private static final String ERROR_ON_DDS_DEPOSIT = "Error on DDS deposit.";
private static final String ERROR_ON_DDS_DEPOSIT = "Error on DDS deposit.";

public static class Asn1EncodedDataRouterException extends Exception {
public static class Asn1EncodedDataRouterException extends Exception {

private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -97,23 +97,23 @@ public Object process(String consumedData) {
if (request.has(TimTransmogrifier.RSUS_STRING)) {
JSONObject rsusIn = (JSONObject) request.get(TimTransmogrifier.RSUS_STRING);
if (rsusIn.has(TimTransmogrifier.RSUS_STRING)) {
Object rsu = rsusIn.get(TimTransmogrifier.RSUS_STRING);
JSONArray rsusOut = new JSONArray();
if (rsu instanceof JSONArray) {
logger.debug("Multiple RSUs exist in the request: {}", request);
JSONArray rsusInArray = (JSONArray) rsu;
for (int i = 0; i < rsusInArray.length(); i++) {
rsusOut.put(rsusInArray.get(i));
}
request.put(TimTransmogrifier.RSUS_STRING, rsusOut);
} else if (rsu instanceof JSONObject) {
logger.debug("Single RSU exists in the request: {}", request);
rsusOut.put(rsu);
request.put(TimTransmogrifier.RSUS_STRING, rsusOut);
} else {
logger.debug("No RSUs exist in the request: {}", request);
request.remove(TimTransmogrifier.RSUS_STRING);
}
Object rsu = rsusIn.get(TimTransmogrifier.RSUS_STRING);
JSONArray rsusOut = new JSONArray();
if (rsu instanceof JSONArray) {
logger.debug("Multiple RSUs exist in the request: {}", request);
JSONArray rsusInArray = (JSONArray) rsu;
for (int i = 0; i < rsusInArray.length(); i++) {
rsusOut.put(rsusInArray.get(i));
}
request.put(TimTransmogrifier.RSUS_STRING, rsusOut);
} else if (rsu instanceof JSONObject) {
logger.debug("Single RSU exists in the request: {}", request);
rsusOut.put(rsu);
request.put(TimTransmogrifier.RSUS_STRING, rsusOut);
} else {
logger.debug("No RSUs exist in the request: {}", request);
request.remove(TimTransmogrifier.RSUS_STRING);
}
}
}

Expand All @@ -123,7 +123,7 @@ public Object process(String consumedData) {
processEncodedTim(servicerequest, consumedObj);
} else {
throw new Asn1EncodedDataRouterException("Invalid or missing '"
+ TimTransmogrifier.REQUEST_STRING + "' object in the encoder response");
+ TimTransmogrifier.REQUEST_STRING + "' object in the encoder response");
}
} catch (Exception e) {
String msg = "Error in processing received message from ASN.1 Encoder module: " + consumedData;
Expand Down Expand Up @@ -183,47 +183,47 @@ public void processEncodedTim(ServiceRequest request, JSONObject consumedObj) {
if (odeProperties.dataSigningEnabled()) {
logger.debug("Sending message for signature! ");
String base64EncodedTim = CodecUtils.toBase64(
CodecUtils.fromHex(hexEncodedTim));
CodecUtils.fromHex(hexEncodedTim));
JSONObject matadataObjs = consumedObj.getJSONObject(AppContext.METADATA_STRING);
// get max duration time and convert from minutes to milliseconds (unsigned
// integer valid 0 to 2^32-1 in units of
// milliseconds.) from metadata
// integer valid 0 to 2^32-1 in units of
// milliseconds.) from metadata
int maxDurationTime = Integer.valueOf(matadataObjs.get("maxDurationTime").toString()) * 60 * 1000;
String timpacketID = matadataObjs.getString("odePacketID");
String timStartDateTime = matadataObjs.getString("odeTimStartDateTime");
String timpacketID = matadataObjs.getString("odePacketID");
String timStartDateTime = matadataObjs.getString("odeTimStartDateTime");
String signedResponse = asn1CommandManager.sendForSignature(base64EncodedTim,maxDurationTime);
try {
hexEncodedTim = CodecUtils.toHex(
CodecUtils.fromBase64(
JsonUtils.toJSONObject(JsonUtils.toJSONObject(signedResponse).getString("result")).getString("message-signed")));
CodecUtils.fromBase64(
JsonUtils.toJSONObject(JsonUtils.toJSONObject(signedResponse).getString("result")).getString("message-signed")));

JSONObject TimWithExpiration = new JSONObject();
TimWithExpiration.put("packetID", timpacketID);
TimWithExpiration.put("startDateTime", timStartDateTime);
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
try {
JSONObject jsonResult = JsonUtils
.toJSONObject((JsonUtils.toJSONObject(signedResponse).getString("result")));
// messageExpiry uses unit of seconds
long messageExpiry = Long.valueOf(jsonResult.getString("message-expiry"));
TimWithExpiration.put("expirationDate", dateFormat.format(new Date(messageExpiry * 1000)));
} catch (Exception e) {
logger.error("Unable to get expiration date from signed messages response {}", e);
TimWithExpiration.put("expirationDate", "null");
}

try {
Date parsedtimTimeStamp = dateFormat.parse(timStartDateTime);
Date requiredExpirationDate = new Date();
requiredExpirationDate.setTime(parsedtimTimeStamp.getTime() + maxDurationTime);
TimWithExpiration.put("requiredExpirationDate", dateFormat.format(requiredExpirationDate));
} catch (Exception e) {
logger.error("Unable to parse requiredExpirationDate {}", e);
TimWithExpiration.put("requiredExpirationDate", "null");
}
//publish to Tim expiration kafka
stringMsgProducer.send(odeProperties.getKafkaTopicSignedOdeTimJsonExpiration(), null,
TimWithExpiration.toString());
TimWithExpiration.put("packetID", timpacketID);
TimWithExpiration.put("startDateTime", timStartDateTime);
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
try {
JSONObject jsonResult = JsonUtils
.toJSONObject((JsonUtils.toJSONObject(signedResponse).getString("result")));
// messageExpiry uses unit of seconds
long messageExpiry = Long.valueOf(jsonResult.getString("message-expiry"));
TimWithExpiration.put("expirationDate", dateFormat.format(new Date(messageExpiry * 1000)));
} catch (Exception e) {
logger.error("Unable to get expiration date from signed messages response {}", e);
TimWithExpiration.put("expirationDate", "null");
}

try {
Date parsedtimTimeStamp = dateFormat.parse(timStartDateTime);
Date requiredExpirationDate = new Date();
requiredExpirationDate.setTime(parsedtimTimeStamp.getTime() + maxDurationTime);
TimWithExpiration.put("requiredExpirationDate", dateFormat.format(requiredExpirationDate));
} catch (Exception e) {
logger.error("Unable to parse requiredExpirationDate {}", e);
TimWithExpiration.put("requiredExpirationDate", "null");
}
//publish to Tim expiration kafka
stringMsgProducer.send(odeProperties.getKafkaTopicSignedOdeTimJsonExpiration(), null,
TimWithExpiration.toString());

} catch (JsonUtilsException e1) {
logger.error("Unable to parse signed message response {}", e1);
Expand All @@ -232,7 +232,7 @@ public void processEncodedTim(ServiceRequest request, JSONObject consumedObj) {

if (null != request.getSnmp() && null != request.getRsus() && null != hexEncodedTim) {
logger.info("Sending message to RSUs...");
asn1CommandManager.sendToRsus(request, hexEncodedTim);
asn1CommandManager.sendToRsus(request, hexEncodedTim);
}

if (request.getSdw() != null) {
Expand All @@ -254,10 +254,13 @@ public void processEncodedTim(ServiceRequest request, JSONObject consumedObj) {
// Case 3
JSONObject asdObj = dataObj.getJSONObject(Asn1CommandManager.ADVISORY_SITUATION_DATA_STRING);
try {
asn1CommandManager.depositToSdw(asdObj.getString(BYTES));
JSONObject deposit = new JSONObject();
deposit.put("estimatedRemovalDate", request.getSdw().getEstimatedRemovalDate());
deposit.put("encodedMsg", asdObj.getString(BYTES));
asn1CommandManager.depositToSdw(deposit.toString());
} catch (JSONException | Asn1CommandManagerException e) {
String msg = ERROR_ON_DDS_DEPOSIT;
logger.error(msg, e);
String msg = ERROR_ON_DDS_DEPOSIT;
logger.error(msg, e);
}
} else {
logger.debug("Unsigned ASD received. Depositing it to SDW.");
Expand All @@ -283,39 +286,42 @@ public void processEncodedTimUnsecured(ServiceRequest request, JSONObject consum
logger.error("ASD structure present in metadata but not in JSONObject!");
}

if (null != asdObj) {
String asdBytes = asdObj.getString(BYTES);

// Deposit to DDS
String ddsMessage = "";
try {
asn1CommandManager.depositToSdw(asdBytes);
ddsMessage = "\"dds_deposit\":{\"success\":\"true\"}";
logger.info("DDS deposit successful.");
} catch (Exception e) {
ddsMessage = "\"dds_deposit\":{\"success\":\"false\"}";
String msg = ERROR_ON_DDS_DEPOSIT;
logger.error(msg, e);
EventLogger.logger.error(msg, e);
}

responseList.put("ddsMessage", ddsMessage);
} else if (logger.isErrorEnabled()) { // Added to avoid Sonar's "Invoke method(s) only conditionally." code smell
String msg = "ASN.1 Encoder did not return ASD encoding {}";
EventLogger.logger.error(msg, consumedObj.toString());
logger.error(msg, consumedObj.toString());
}
if (null != asdObj) {
String asdBytes = asdObj.getString(BYTES);

// Deposit to DDS
String ddsMessage = "";
try {
JSONObject deposit = new JSONObject();
deposit.put("estimatedRemovalDate", request.getSdw().getEstimatedRemovalDate());
deposit.put("encodedMsg", asdBytes);
asn1CommandManager.depositToSdw(deposit.toString());
ddsMessage = "\"dds_deposit\":{\"success\":\"true\"}";
logger.info("DDS deposit successful.");
} catch (Exception e) {
ddsMessage = "\"dds_deposit\":{\"success\":\"false\"}";
String msg = ERROR_ON_DDS_DEPOSIT;
logger.error(msg, e);
EventLogger.logger.error(msg, e);
}

responseList.put("ddsMessage", ddsMessage);
} else if (logger.isErrorEnabled()) { // Added to avoid Sonar's "Invoke method(s) only conditionally." code smell
String msg = "ASN.1 Encoder did not return ASD encoding {}";
EventLogger.logger.error(msg, consumedObj.toString());
logger.error(msg, consumedObj.toString());
}
}

if (dataObj.has(MESSAGE_FRAME)) {
JSONObject mfObj = dataObj.getJSONObject(MESSAGE_FRAME);
String encodedTim = mfObj.getString(BYTES);
logger.debug("Encoded message - phase 2: {}", encodedTim);

// only send message to rsu if snmp, rsus, and message frame fields are present
if (null != request.getSnmp() && null != request.getRsus() && null != encodedTim) {
logger.debug("Encoded message phase 3: {}", encodedTim);
asn1CommandManager.sendToRsus(request, encodedTim);
// only send message to rsu if snmp, rsus, and message frame fields are present
if (null != request.getSnmp() && null != request.getRsus() && null != encodedTim) {
logger.debug("Encoded message phase 3: {}", encodedTim);
asn1CommandManager.sendToRsus(request, encodedTim);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.io.IOException;
import java.text.ParseException;

import org.json.JSONObject;
import org.junit.jupiter.api.Test;

import mockit.Capturing;
Expand Down Expand Up @@ -58,6 +59,15 @@ public void testPackageSignedTimIntoAsd() {
testAsn1CommandManager.packageSignedTimIntoAsd(injectableOdeTravelerInputData.getRequest(), "message");
}

@Test
public void depositToSDWJsonShouldCallMessageProducer() throws Asn1CommandManagerException {
JSONObject deposit = new JSONObject();
deposit.put("estimatedRemovalDate", "2023-11-04T17:47:11-05:00");
deposit.put("encodedMsg", "message");

testAsn1CommandManager.depositToSdw(deposit.toString());
}

@Test
public void depositToSDWShouldCallMessageProducer() throws Asn1CommandManagerException {
testAsn1CommandManager.depositToSdw("message");
Expand All @@ -66,14 +76,14 @@ public void depositToSDWShouldCallMessageProducer() throws Asn1CommandManagerExc
@Test
public void testSendToRsus(@Mocked OdeTravelerInputData mockOdeTravelerInputData)
throws DdsRequestManagerException, IOException, ParseException {

testAsn1CommandManager.sendToRsus(mockOdeTravelerInputData.getRequest(), "message");
}

@Test
public void testSendToRsusSnmpException(@Mocked OdeTravelerInputData mockOdeTravelerInputData)
throws DdsRequestManagerException, IOException, ParseException {

testAsn1CommandManager.sendToRsus(mockOdeTravelerInputData.getRequest(), "message");
}

Expand Down
Loading