Skip to content

Commit

Permalink
Merge pull request #116 from usdot-jpo-ode/dedup-spat
Browse files Browse the repository at this point in the history
Dedup spat
  • Loading branch information
John-Wiens authored Nov 13, 2024
2 parents 6636834 + fac6ee3 commit 39f7b64
Show file tree
Hide file tree
Showing 10 changed files with 336 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ public Properties createStreamProperties(String name) {

// Enable Compression
streamProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd");
streamProps.put(ProducerConfig.LINGER_MS_CONFIG, getLingerMs());
streamProps.put(ProducerConfig.LINGER_MS_CONFIG, getKafkaLingerMs());

if (confluentCloudEnabled) {
streamProps.put("ssl.endpoint.identification.algorithm", "https");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ public class DeduplicatorProperties implements EnvironmentAware {
private String confluentKey = null;
private String confluentSecret = null;

// Processed SPaT Configuration
private String kafkaTopicProcessedSpat;
private String kafkaTopicDeduplicatedProcessedSpat;
private boolean enableProcessedSpatDeduplication;
private String kafkaStateStoreProcessedSpatName = "ProcessedSpat-store";


private int lingerMs = 0;


Expand Down Expand Up @@ -257,7 +264,7 @@ public Properties createStreamProperties(String name) {
// streamProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 0);

streamProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd");
streamProps.put(ProducerConfig.LINGER_MS_CONFIG, getLingerMs());
streamProps.put(ProducerConfig.LINGER_MS_CONFIG, getKafkaLingerMs());

if (confluentCloudEnabled) {
streamProps.put("ssl.endpoint.identification.algorithm", "https");
Expand Down Expand Up @@ -406,6 +413,21 @@ public void setKafkaBrokers(String kafkaBrokers) {
this.kafkaBrokers = kafkaBrokers;
}

@Value("${kafkaTopicProcessedSpat}")
public void setKafkaTopicProcessedSpat(String kafkaTopicProcessedSpat) {
this.kafkaTopicProcessedSpat = kafkaTopicProcessedSpat;
}

@Value("${kafkaTopicDeduplicatedProcessedSpat}")
public void setKafkaTopicDeduplicatedProcessedSpat(String kafkaTopicDeduplicatedProcessedSpat) {
this.kafkaTopicDeduplicatedProcessedSpat = kafkaTopicDeduplicatedProcessedSpat;
}

@Value("${enableProcessedSpatDeduplication}")
public void setEnableProcessedSpatDeduplication(boolean enableProcessedSpatDeduplication) {
this.enableProcessedSpatDeduplication = enableProcessedSpatDeduplication;
}

@Override
public void setEnvironment(Environment environment) {
env = environment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import us.dot.its.jpo.deduplicator.deduplicator.topologies.OdeRawEncodedTimDeduplicatorTopology;
import us.dot.its.jpo.deduplicator.deduplicator.topologies.ProcessedMapDeduplicatorTopology;
import us.dot.its.jpo.deduplicator.deduplicator.topologies.ProcessedMapWktDeduplicatorTopology;
import us.dot.its.jpo.deduplicator.deduplicator.topologies.ProcessedSpatDeduplicatorTopology;

@Controller
@DependsOn("createKafkaTopics")
Expand Down Expand Up @@ -85,6 +86,14 @@ public DeduplicatorServiceController(final DeduplicatorProperties props,
odeRawEncodedTimDeduplicatorTopology.start();
}

if(props.isEnableProcessedSpatDeduplication()){
ProcessedSpatDeduplicatorTopology processedSpatDeduplicatorTopology = new ProcessedSpatDeduplicatorTopology(
props,
props.createStreamProperties("ProcessedSpatDeduplicator")
);
processedSpatDeduplicatorTopology.start();
}

if(props.isEnableOdeBsmDeduplication()){
BsmDeduplicatorTopology bsmDeduplicatorTopology = new BsmDeduplicatorTopology(props);
bsmDeduplicatorTopology.start();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package us.dot.its.jpo.deduplicator.deduplicator.processors;


import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;

import us.dot.its.jpo.deduplicator.DeduplicatorProperties;
import us.dot.its.jpo.geojsonconverter.pojos.spat.MovementEvent;
import us.dot.its.jpo.geojsonconverter.pojos.spat.MovementState;
import us.dot.its.jpo.geojsonconverter.pojos.spat.ProcessedSpat;

public class ProcessedSpatProcessor extends DeduplicationProcessor<ProcessedSpat>{

DeduplicatorProperties props;

public ProcessedSpatProcessor(DeduplicatorProperties props){
this.props = props;
this.storeName = props.getKafkaStateStoreProcessedSpatName();
}


@Override
public Instant getMessageTime(ProcessedSpat message) {
return message.getUtcTimeStamp().toInstant();
}

@Override
public boolean isDuplicate(ProcessedSpat lastMessage, ProcessedSpat newMessage) {

Instant newValueTime = getMessageTime(newMessage);
Instant oldValueTime = getMessageTime(lastMessage);

if(newValueTime.minus(Duration.ofMinutes(1)).isAfter(oldValueTime)){
return false;
}else{
HashMap<Integer, List<MovementEvent>> lastMessageStates = new HashMap<>();
for(MovementState state: lastMessage.getStates()){
lastMessageStates.put(state.getSignalGroup(), state.getStateTimeSpeed());
}

if(lastMessageStates.size() != newMessage.getStates().size()){
return false; // message cannot be duplicate if the signal groups have a different number of signal groups
}

for(MovementState state: newMessage.getStates()){
List<MovementEvent> lastMessageState = lastMessageStates.get(state.getSignalGroup());

if(lastMessageState == null){
return false; // messages cannot be duplicates if they have different signal groups
}


for(int i=0; i< state.getStateTimeSpeed().size(); i++){
if(state.getStateTimeSpeed().get(i).getEventState() != lastMessageState.get(i).getEventState()){
return false; // Some signal group light has changed. Therefore the SPaTs are different
}
}
}
}
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package us.dot.its.jpo.deduplicator.deduplicator.processors.suppliers;

import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;

import us.dot.its.jpo.deduplicator.DeduplicatorProperties;
import us.dot.its.jpo.deduplicator.deduplicator.processors.ProcessedSpatProcessor;
import us.dot.its.jpo.geojsonconverter.pojos.spat.ProcessedSpat;

public class ProcessedSpatProcessorSupplier implements ProcessorSupplier<String, ProcessedSpat, String, ProcessedSpat> {

DeduplicatorProperties props;

public ProcessedSpatProcessorSupplier(DeduplicatorProperties props){
this.props = props;
}

@Override
public Processor<String, ProcessedSpat, String, ProcessedSpat> get() {
return new ProcessedSpatProcessor(props);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,11 @@
import org.apache.kafka.streams.KafkaStreams.StateListener;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;

import us.dot.its.jpo.conflictmonitor.monitor.models.bsm.BsmRsuIdKey;
import us.dot.its.jpo.conflictmonitor.monitor.utils.BsmUtils;
import us.dot.its.jpo.deduplicator.DeduplicatorProperties;
import us.dot.its.jpo.deduplicator.deduplicator.processors.suppliers.OdeMapJsonProcessorSupplier;
import us.dot.its.jpo.geojsonconverter.DateJsonMapper;
import us.dot.its.jpo.geojsonconverter.partitioner.RsuIdPartitioner;
import us.dot.its.jpo.geojsonconverter.partitioner.RsuIntersectionKey;
import us.dot.its.jpo.geojsonconverter.serialization.JsonSerdes;
import us.dot.its.jpo.ode.model.OdeBsmData;
import us.dot.its.jpo.ode.model.OdeMapData;
import us.dot.its.jpo.ode.model.OdeMapMetadata;
import us.dot.its.jpo.ode.model.OdeMapPayload;
Expand Down Expand Up @@ -95,8 +91,6 @@ public Topology buildTopology() {
return newKey.toString();
}).repartition(Repartitioned.with(Serdes.String(), JsonSerdes.OdeMap()));

// KStream<String, OdeMapData> repartitionedStream = mapRekeyedStream.repartition();

KStream<String, OdeMapData> deduplicatedStream = mapRekeyedStream.process(new OdeMapJsonProcessorSupplier(props), props.getKafkaStateStoreOdeMapJsonName());

deduplicatedStream.to(props.getKafkaTopicDeduplicatedOdeMapJson(), Produced.with(Serdes.String(), JsonSerdes.OdeMap()));
Expand All @@ -106,13 +100,13 @@ public Topology buildTopology() {
}

public void stop() {
logger.info("Stopping Map deduplicator Socket Broadcast Topology.");
logger.info("Stopping Map Deduplicator Socket Broadcast Topology.");
if (streams != null) {
streams.close();
streams.cleanUp();
streams = null;
}
logger.info("Stopped Map deduplicator Socket Broadcast Topology.");
logger.info("Stopped Map Deduplicator Socket Broadcast Topology.");
}

StateListener stateListener;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package us.dot.its.jpo.deduplicator.deduplicator.topologies;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.KafkaStreams.StateListener;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;

import us.dot.its.jpo.deduplicator.DeduplicatorProperties;
import us.dot.its.jpo.geojsonconverter.DateJsonMapper;
import us.dot.its.jpo.deduplicator.deduplicator.processors.suppliers.ProcessedSpatProcessorSupplier;
import us.dot.its.jpo.geojsonconverter.pojos.spat.ProcessedSpat;
import us.dot.its.jpo.geojsonconverter.serialization.JsonSerdes;

import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.Stores;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.ObjectMapper;

import java.time.format.DateTimeFormatter;
import java.util.Properties;

public class ProcessedSpatDeduplicatorTopology {

private static final Logger logger = LoggerFactory.getLogger(ProcessedSpatDeduplicatorTopology.class);

Topology topology;
KafkaStreams streams;
String inputTopic;
String outputTopic;
Properties streamsProperties;
ObjectMapper objectMapper;
DateTimeFormatter formatter = DateTimeFormatter.ISO_INSTANT;
DeduplicatorProperties props;


public ProcessedSpatDeduplicatorTopology(DeduplicatorProperties props, Properties streamsProperties){
this.props = props;
this.streamsProperties = streamsProperties;
this.objectMapper = DateJsonMapper.getInstance();
}



public void start() {
if (streams != null && streams.state().isRunningOrRebalancing()) {
throw new IllegalStateException("Start called while streams is already running.");
}
Topology topology = buildTopology();
streams = new KafkaStreams(topology, streamsProperties);
if (exceptionHandler != null) streams.setUncaughtExceptionHandler(exceptionHandler);
if (stateListener != null) streams.setStateListener(stateListener);
logger.info("Starting Map Deduplicator Topology");
streams.start();
}

public Topology buildTopology() {
StreamsBuilder builder = new StreamsBuilder();

KStream<String, ProcessedSpat> inputStream = builder.stream(props.getKafkaTopicProcessedSpat(), Consumed.with(Serdes.String(), JsonSerdes.ProcessedSpat()));

builder.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(props.getKafkaStateStoreProcessedSpatName()),
Serdes.String(), JsonSerdes.ProcessedSpat()));

KStream<String, ProcessedSpat> deduplicatedStream = inputStream.process(new ProcessedSpatProcessorSupplier(props), props.getKafkaStateStoreProcessedSpatName());

deduplicatedStream.print(Printed.toSysOut());

deduplicatedStream.to(props.getKafkaTopicDeduplicatedProcessedSpat(), Produced.with(Serdes.String(), JsonSerdes.ProcessedSpat()));

return builder.build();

}

public void stop() {
logger.info("Stopping Processed SPaT deduplicator Socket Broadcast Topology.");
if (streams != null) {
streams.close();
streams.cleanUp();
streams = null;
}
logger.info("Stopped Processed SPaT deduplicator Socket Broadcast Topology.");
}

StateListener stateListener;
public void registerStateListener(StateListener stateListener) {
this.stateListener = stateListener;
}

StreamsUncaughtExceptionHandler exceptionHandler;
public void registerUncaughtExceptionHandler(StreamsUncaughtExceptionHandler exceptionHandler) {
this.exceptionHandler = exceptionHandler;
}
}
11 changes: 11 additions & 0 deletions jpo-deduplicator/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ odeBsmMaximumTimeDelta: 10000 # Milliseconds
odeBsmMaximumPositionDelta: 1 # Meter
odeBsmAlwaysIncludeAtSpeed: 1 # Meter / Second

# Processed Map Configuration
kafkaTopicProcessedSpat: topic.ProcessedSpat
kafkaTopicDeduplicatedProcessedSpat: topic.DeduplicatedProcessedSpat
enableProcessedSpatDeduplication: true


# Amount of time to wait to try and increase batching
kafka.linger_ms: 50
Expand Down Expand Up @@ -81,5 +86,11 @@ kafka.topics:
- name: ${kafkaTopicDeduplicatedOdeTimJson}
cleanupPolicy: delete
retentionMs: 300000
- name: ${kafkaTopicProcessedSpat}
cleanupPolicy: delete
retentionMs: 300000
- name: ${kafkaTopicDeduplicatedProcessedSpat}
cleanupPolicy: delete
retentionMs: 300000


Loading

0 comments on commit 39f7b64

Please sign in to comment.