Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Generated TMC TIM Topic #100

Merged
merged 16 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .devcontainer/post-create.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ bin/kafka-topics.sh --create --topic "topic.OdeRawEncodedBSMJson" --bootstrap-se

# TIM
bin/kafka-topics.sh --create --topic "topic.OdeTimJson" --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
bin/kafka-topics.sh --create --topic "topic.OdeTimJsonTMCFiltered" --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
bin/kafka-topics.sh --create --topic "topic.OdeTimBroadcastJson" --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
bin/kafka-topics.sh --create --topic "topic.J2735TimBroadcastJson" --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
bin/kafka-topics.sh --create --topic "topic.OdeTIMCertExpirationTimeJson" --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
Expand Down
261 changes: 153 additions & 108 deletions docs/data-flow-diagrams/tim/TIM Data Flow.drawio

Large diffs are not rendered by default.

Binary file modified docs/data-flow-diagrams/tim/TIM Data Flow.drawio.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
11 changes: 11 additions & 0 deletions jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/OdeProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public class OdeProperties implements EnvironmentAware {

// TIM
private String kafkaTopicOdeTimJson = "topic.OdeTimJson";
private String kafkaTopicOdeTimJsonTMCFiltered = "topic.OdeTimJsonTMCFiltered";
private String kafkaTopicOdeDNMsgJson = "topic.OdeDNMsgJson";
private String kafkaTopicOdeTimRxJson = "topic.OdeTimRxJson";
private String kafkaTopicOdeTimBroadcastPojo = "topic.OdeTimBroadcastPojo";
Expand Down Expand Up @@ -228,6 +229,8 @@ void initialize() {
}
kafkaBrokers = dockerIp + ":" + DEFAULT_KAFKA_PORT;

logger.info("Kafka Brokers: {}", kafkaBrokers);

// URI for the security services /sign endpoint
if (securitySvcsSignatureUri == null) {
securitySvcsSignatureUri = "http://" + dockerIp + ":" + securitySvcsPort + "/"
Expand Down Expand Up @@ -590,6 +593,14 @@ public void setKafkaTopicOdeTimJson(String kafkaTopicOdeTimJson) {
this.kafkaTopicOdeTimJson = kafkaTopicOdeTimJson;
}

public String getKafkaTopicOdeTimJsonTMCFiltered() {
return kafkaTopicOdeTimJsonTMCFiltered;
}

public void setKafkaTopicOdeTimJsonTMCFiltered(String kafkaTopicOdeTimJsonTMCFiltered) {
this.kafkaTopicOdeTimJsonTMCFiltered = kafkaTopicOdeTimJsonTMCFiltered;
}

public String getUploadLocationObuLog() {
return uploadLocationObuLogLog;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package us.dot.its.jpo.ode;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.Stores;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;


public class OdeTimJsonTopology {
payneBrandon marked this conversation as resolved.
Show resolved Hide resolved

private static final Logger logger = LoggerFactory.getLogger(OdeTimJsonTopology.class);

private final Properties streamsProperties = new Properties();
private OdeProperties odeProperties = new OdeProperties();
static KafkaStreams streams;

public OdeTimJsonTopology(OdeProperties odeProps) {
this.odeProperties = odeProps;
if (this.odeProperties.getKafkaBrokers() != null) {
this.streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "KeyedOdeTimJson");
this.streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, odeProperties.getKafkaBrokers());
this.streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
this.streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

String kafkaType = System.getenv("KAFKA_TYPE");
if (kafkaType != null && kafkaType.equals("CONFLUENT")) {
addConfluentProperties(this.streamsProperties);
}
} else {
logger.error("Kafka Brokers not set in OdeProperties");
}
}

public void start() {
if (streams != null && streams.state().isRunningOrRebalancing()) {
throw new IllegalStateException("Start called while streams is already running.");
} else {
if (streams == null) {
streams = new KafkaStreams(buildTopology(), streamsProperties);
}
logger.info("Starting Ode Tim Json Topology");
streams.start();
}
}

public void stop() {
if (streams != null) {
logger.info("Stopping Ode Tim Json Topology");
streams.close();
}
}

public boolean isRunning() {
return streams != null && streams.state().isRunningOrRebalancing();
}

public Topology buildTopology() {
StreamsBuilder builder = new StreamsBuilder();
builder.table("topic.OdeTimJson", Materialized.<String, String>as(Stores.inMemoryKeyValueStore("timjson-store")));
return builder.build();
}

public String query(String uuid) {
return (String) streams.store(StoreQueryParameters.fromNameAndType("timjson-store", QueryableStoreTypes.keyValueStore())).get(uuid);
}

private void addConfluentProperties(Properties properties) {
String username = System.getenv("CONFLUENT_KEY");
String password = System.getenv("CONFLUENT_SECRET");

if (username != null && password != null) {
String auth = "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"" + username + "\" " +
"password=\"" + password + "\";";
this.streamsProperties.put("sasl.jaas.config", auth);
}
else {
logger.error("Environment variables CONFLUENT_KEY and CONFLUENT_SECRET are not set. Set these in the .env file to use Confluent Cloud");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;

import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import us.dot.its.jpo.ode.OdeProperties;
import us.dot.its.jpo.ode.OdeTimJsonTopology;
import us.dot.its.jpo.ode.context.AppContext;
import us.dot.its.jpo.ode.eventlog.EventLogger;
import us.dot.its.jpo.ode.model.OdeAsn1Data;
Expand Down Expand Up @@ -60,6 +62,7 @@ public Asn1EncodedDataRouterException(String string) {

private OdeProperties odeProperties;
private MessageProducer<String, String> stringMsgProducer;
private static OdeTimJsonTopology odeTimJsonTopology = null;
private Asn1CommandManager asn1CommandManager;
private boolean dataSigningEnabledRSU;
private boolean dataSigningEnabledSDW;
Expand All @@ -80,6 +83,14 @@ public Asn1EncodedDataRouter(OdeProperties odeProperties) {
this.dataSigningEnabledSDW = System.getenv("DATA_SIGNING_ENABLED_SDW") != null && !System.getenv("DATA_SIGNING_ENABLED_SDW").isEmpty()
? Boolean.parseBoolean(System.getenv("DATA_SIGNING_ENABLED_SDW"))
: true;

// Initialize and start the OdeTimJsonTopology if it is not already running
if (odeTimJsonTopology == null) {
odeTimJsonTopology = new OdeTimJsonTopology(odeProperties);
if (!odeTimJsonTopology.isRunning()) {
odeTimJsonTopology.start();
}
}
}

@Override
Expand Down Expand Up @@ -169,6 +180,7 @@ public ServiceRequest getServicerequest(JSONObject consumedObj) {
public void processEncodedTim(ServiceRequest request, JSONObject consumedObj) {

JSONObject dataObj = consumedObj.getJSONObject(AppContext.PAYLOAD_STRING).getJSONObject(AppContext.DATA_STRING);
JSONObject metadataObj = consumedObj.getJSONObject(AppContext.METADATA_STRING);

// CASE 1: no SDW in metadata (SNMP deposit only)
// - sign MF
Expand Down Expand Up @@ -226,6 +238,9 @@ public void processEncodedTim(ServiceRequest request, JSONObject consumedObj) {
hexEncodedTim = signTIM(hexEncodedTim, consumedObj);
}

// Deposit encoded & signed TIM to TMC-filtered topic if TMC-generated
depositToFilteredTopic(metadataObj, hexEncodedTim, request);

if (request.getSdw() != null) {
// Case 2 only

Expand Down Expand Up @@ -402,4 +417,32 @@ private String stripHeader(String encodedUnsignedTim) {
toReturn = encodedUnsignedTim.substring(index);
return toReturn;
}

private Void depositToFilteredTopic(JSONObject metadataObj, String hexEncodedTim, ServiceRequest request) {
try {
String generatedBy = metadataObj.getString("recordGeneratedBy");
String streamId = metadataObj.getJSONObject("serialId").getString("streamId");
if (generatedBy.equalsIgnoreCase("TMC")) {
try {
String timString = odeTimJsonTopology.query(streamId);

if (timString != null) {
// Set ASN1 data in TIM metadata
JSONObject timJSON = new JSONObject(timString);
JSONObject metadataJSON = timJSON.getJSONObject("metadata");
metadataJSON.put("asn1", hexEncodedTim);
timJSON.put("metadata", metadataJSON);

// Send the message w/ asn1 data to the TMC-filtered topic
stringMsgProducer.send(odeProperties.getKafkaTopicOdeTimJsonTMCFiltered(), null, timJSON.toString());
}
} catch (Exception e) {
logger.error("Error while updating TIM: {}", e.getMessage());
}
}
} catch (Exception e) {
logger.error("Error while fetching recordGeneratedBy field: {}", e.getMessage());
}
return null;
dmccoystephenson marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import us.dot.its.jpo.ode.util.XmlUtils;
import us.dot.its.jpo.ode.wrapper.MessageProducer;
import us.dot.its.jpo.ode.wrapper.serdes.OdeTimSerializer;
import java.util.UUID;

@RestController
public class TimDepositController {
Expand Down Expand Up @@ -251,7 +252,8 @@ public synchronized ResponseEntity<String> depositTim(String jsonString, Request
// publish Broadcast TIM to a J2735 compliant topic.
stringMsgProducer.send(odeProperties.getKafkaTopicJ2735TimBroadcastJson(), null, obfuscatedj2735Tim);
// publish J2735 TIM also to general un-filtered TIM topic
stringMsgProducer.send(odeProperties.getKafkaTopicOdeTimJson(), null, obfuscatedj2735Tim);
// with streamID as key
stringMsgProducer.send(odeProperties.getKafkaTopicOdeTimJson(), serialIdJ2735.getStreamId(), obfuscatedj2735Tim);

serialIdOde.increment();
serialIdJ2735.increment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public void testSettersAndGetters() {
String testKafkaTopicAsn1EncoderOutput = "testKafkaTopicAsn1EncoderOutput";
String testKafkaTopicOdeDNMsgJson = "testKafkaTopicOdeDNMsgJson";
String testKafkaTopicOdeTimJson = "testKafkaTopicOdeTimJson";
String testKafkaTopicOdeTimJsonTMCFiltered = "testKafkaTopicOdeTimJsonTMCFiltered";
String testKafkaTopicOdeBsmDuringEventPojo = "testKafkaTopicOdeBsmDuringEventPojo";
String testKafkaTopicOdeBsmRxPojo = "testKafkaTopicOdeBsmRxPojo";
String testKafkaTopicOdeBsmTxPojo = "testKafkaTopicOdeBsmTxPojo";
Expand Down Expand Up @@ -137,6 +138,7 @@ public void testSettersAndGetters() {
testOdeProperties.setKafkaTopicAsn1EncoderOutput(testKafkaTopicAsn1EncoderOutput);
testOdeProperties.setKafkaTopicOdeDNMsgJson(testKafkaTopicOdeDNMsgJson);
testOdeProperties.setKafkaTopicOdeTimJson(testKafkaTopicOdeTimJson);
testOdeProperties.setKafkaTopicOdeTimJsonTMCFiltered(testKafkaTopicOdeTimJsonTMCFiltered);
testOdeProperties.setKafkaTopicOdeBsmDuringEventPojo(testKafkaTopicOdeBsmDuringEventPojo);
testOdeProperties.setKafkaTopicOdeBsmRxPojo(testKafkaTopicOdeBsmRxPojo);
testOdeProperties.setKafkaTopicOdeBsmTxPojo(testKafkaTopicOdeBsmTxPojo);
Expand Down Expand Up @@ -199,6 +201,8 @@ public void testSettersAndGetters() {
testOdeProperties.getKafkaTopicOdeDNMsgJson());
assertEquals("Incorrect testKafkaTopicOdeTimJson", testKafkaTopicOdeTimJson,
testOdeProperties.getKafkaTopicOdeTimJson());
assertEquals("Incorrect testKafkaTopicOdeTimJsonTMCFiltered", testKafkaTopicOdeTimJsonTMCFiltered,
testOdeProperties.getKafkaTopicOdeTimJsonTMCFiltered());
assertEquals("Incorrect testKafkaTopicOdeBsmDuringEventPojo", testKafkaTopicOdeBsmDuringEventPojo,
testOdeProperties.getKafkaTopicOdeBsmDuringEventPojo());
assertEquals("Incorrect testKafkaTopicOdeBsmRxPojo", testKafkaTopicOdeBsmRxPojo,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package us.dot.its.jpo.ode;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterEach;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;


public class OdeTimJsonTopologyTest {

private OdeTimJsonTopology odeTimJsonTopology;
private KafkaStreams mockStreams;
private ReadOnlyKeyValueStore<String, String> mockStore;
private OdeProperties mockOdeProps;

@BeforeEach
public void setUp() throws NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException {
mockOdeProps = mock(OdeProperties.class);
odeTimJsonTopology = new OdeTimJsonTopology(mockOdeProps);
mockStreams = mock(KafkaStreams.class);
mockStore = mock(ReadOnlyKeyValueStore.class);

OdeTimJsonTopology.streams = mockStreams;
}

@AfterEach
public void tearDown() {
OdeTimJsonTopology.streams = null;
}

@Test
public void testStart() {
when(mockStreams.state()).thenReturn(KafkaStreams.State.NOT_RUNNING);
doNothing().when(mockStreams).start();

odeTimJsonTopology.start();

verify(mockStreams).start();
}

@Test
public void testStartWhenAlreadyRunning() {
when(mockStreams.state()).thenReturn(KafkaStreams.State.RUNNING);

IllegalStateException exception = assertThrows(IllegalStateException.class, () -> {
odeTimJsonTopology.start();
});

assertEquals("Start called while streams is already running.", exception.getMessage());
}

@Test
public void testStop() {
doNothing().when(mockStreams).close();

odeTimJsonTopology.stop();

verify(mockStreams).close();
}

@Test
public void testIsRunning() {
when(mockStreams.state()).thenReturn(KafkaStreams.State.RUNNING);

assertTrue(odeTimJsonTopology.isRunning());
}

@Test
public void testIsNotRunning() {
when(mockStreams.state()).thenReturn(KafkaStreams.State.NOT_RUNNING);

assertFalse(odeTimJsonTopology.isRunning());
}

@Test
public void testBuildTopology() {
Topology topology = odeTimJsonTopology.buildTopology();
assertNotNull(topology);
}

@Test
public void testQuery() {
String uuid = "test-uuid";
String expectedValue = "test-value";

when(mockStreams.store(any(StoreQueryParameters.class))).thenReturn(mockStore);
when(mockStore.get(uuid)).thenReturn(expectedValue);

String result = odeTimJsonTopology.query(uuid);

assertEquals(expectedValue, result);
}
}
Loading