newMap;
+ private String message;
+
+ public MapRevisionCounterEvent(){
+ super("MapRevisionCounter");
+ }
+
+}
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/events/SpatRevisionCounterEvent.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/events/SpatRevisionCounterEvent.java
new file mode 100644
index 00000000..044cd4e2
--- /dev/null
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/events/SpatRevisionCounterEvent.java
@@ -0,0 +1,23 @@
+package us.dot.its.jpo.conflictmonitor.monitor.models.events;
+
+import lombok.EqualsAndHashCode;
+import lombok.Generated;
+import lombok.Getter;
+import lombok.Setter;
+import us.dot.its.jpo.geojsonconverter.pojos.spat.ProcessedSpat;
+
+@Getter
+@Setter
+@EqualsAndHashCode(callSuper=true)
+@Generated
+public class SpatRevisionCounterEvent extends Event{
+
+ private ProcessedSpat previousSpat;
+ private ProcessedSpat newSpat;
+ private String message;
+
+ public SpatRevisionCounterEvent(){
+ super("SpatRevisionCounter");
+ }
+
+}
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/events/StopLineStopEvent.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/events/StopLineStopEvent.java
index 06401bef..732e5222 100644
--- a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/events/StopLineStopEvent.java
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/events/StopLineStopEvent.java
@@ -30,6 +30,7 @@ public class StopLineStopEvent extends Event{
private double timeStoppedDuringRed;
private double timeStoppedDuringYellow;
private double timeStoppedDuringGreen;
+ private double timeStoppedDuringDark;
public StopLineStopEvent(){
super("StopLineStop");
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/events/timestamp_delta/BaseTimestampDeltaEvent.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/events/timestamp_delta/BaseTimestampDeltaEvent.java
new file mode 100644
index 00000000..30231e74
--- /dev/null
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/events/timestamp_delta/BaseTimestampDeltaEvent.java
@@ -0,0 +1,36 @@
+package us.dot.its.jpo.conflictmonitor.monitor.models.events.timestamp_delta;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.EqualsAndHashCode;
+import lombok.Generated;
+import lombok.Getter;
+import lombok.Setter;
+import us.dot.its.jpo.conflictmonitor.monitor.models.events.Event;
+
+@Getter
+@Setter
+@EqualsAndHashCode(callSuper = true)
+@Generated
+@JsonIgnoreProperties(ignoreUnknown = true)
+public abstract class BaseTimestampDeltaEvent extends Event {
+
+ public BaseTimestampDeltaEvent(String eventType, String inputType) {
+ super(eventType);
+ this.inputType = inputType;
+ }
+
+ final String inputType;
+
+ /**
+ * The source RSU device ID
+ */
+ String source;
+
+ /**
+ * The timestamp difference
+ */
+ TimestampDelta delta;
+
+
+
+}
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/events/timestamp_delta/MapTimestampDeltaEvent.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/events/timestamp_delta/MapTimestampDeltaEvent.java
new file mode 100644
index 00000000..5076645c
--- /dev/null
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/events/timestamp_delta/MapTimestampDeltaEvent.java
@@ -0,0 +1,12 @@
+package us.dot.its.jpo.conflictmonitor.monitor.models.events.timestamp_delta;
+
+import us.dot.its.jpo.geojsonconverter.pojos.geojson.map.ProcessedMap;
+
+public class MapTimestampDeltaEvent
+ extends BaseTimestampDeltaEvent {
+
+ public MapTimestampDeltaEvent() {
+ super("MapTimestampDelta", ProcessedMap.class.getTypeName());
+ }
+
+}
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/events/timestamp_delta/SpatTimestampDeltaEvent.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/events/timestamp_delta/SpatTimestampDeltaEvent.java
new file mode 100644
index 00000000..5dcad789
--- /dev/null
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/events/timestamp_delta/SpatTimestampDeltaEvent.java
@@ -0,0 +1,10 @@
+package us.dot.its.jpo.conflictmonitor.monitor.models.events.timestamp_delta;
+
+import us.dot.its.jpo.geojsonconverter.pojos.spat.ProcessedSpat;
+
+public class SpatTimestampDeltaEvent
+ extends BaseTimestampDeltaEvent {
+ public SpatTimestampDeltaEvent() {
+ super("SpatTimestampDelta", ProcessedSpat.class.getName());
+ }
+}
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/events/timestamp_delta/TimestampDelta.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/events/timestamp_delta/TimestampDelta.java
new file mode 100644
index 00000000..45f565b8
--- /dev/null
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/events/timestamp_delta/TimestampDelta.java
@@ -0,0 +1,73 @@
+package us.dot.its.jpo.conflictmonitor.monitor.models.events.timestamp_delta;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import lombok.*;
+import lombok.extern.slf4j.Slf4j;
+import us.dot.its.jpo.geojsonconverter.DateJsonMapper;
+
+@Getter
+@Setter
+@EqualsAndHashCode
+@Generated
+@Slf4j
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class TimestampDelta {
+
+ /**
+ * The configured max delta (absolute value)
+ */
+ int maxDeltaMillis;
+
+ public void setMaxDeltaMillis(int maxDeltaMillis) {
+ this.maxDeltaMillis = Math.abs(maxDeltaMillis);
+ }
+
+ /**
+ * The ODE ingest timestamp
+ */
+ long odeIngestTimestampMillis;
+
+ /**
+ * The timestamp extracted from the message
+ */
+ long messageTimestampMillis;
+
+ /**
+ * @return The actual timestamp delta, signed
+ */
+ public long getDeltaMillis() {
+ return odeIngestTimestampMillis - messageTimestampMillis;
+ }
+
+ /**
+ * @return The magnitude of the delta, always positive.
+ */
+ public long getAbsDeltaMillis() {
+ return Math.abs(getDeltaMillis());
+ }
+
+ public boolean emitEvent() {
+ // If OdeReceivedAt is earlier than message timestamp, always emit an event
+ // otherwise emit one if the lag delta exceeds the max
+ return isOdeIngestBeforeMessageTimestamp() || isDeltaGreaterThanMax();
+ }
+
+ public boolean isOdeIngestBeforeMessageTimestamp() {
+ return getDeltaMillis() < 0;
+ }
+
+ public boolean isDeltaGreaterThanMax() {
+ return getAbsDeltaMillis() > maxDeltaMillis;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return DateJsonMapper.getInstance().writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ log.error("Exception serializing TimestampDelta Event to JSON", e);
+ }
+ return "";
+ }
+}
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/notifications/Notification.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/notifications/Notification.java
index e505c328..cc699646 100644
--- a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/notifications/Notification.java
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/notifications/Notification.java
@@ -17,6 +17,8 @@
import lombok.Getter;
import lombok.Setter;
import us.dot.its.jpo.conflictmonitor.monitor.models.notifications.app_health.KafkaStreamsAnomalyNotification;
+import us.dot.its.jpo.conflictmonitor.monitor.models.notifications.timestamp_delta.MapTimestampDeltaNotification;
+import us.dot.its.jpo.conflictmonitor.monitor.models.notifications.timestamp_delta.SpatTimestampDeltaNotification;
import us.dot.its.jpo.geojsonconverter.DateJsonMapper;
/**
@@ -38,6 +40,8 @@
@JsonSubTypes.Type(value = SignalStateConflictNotification.class, name = "SignalStateConflictNotification"),
@JsonSubTypes.Type(value = TimeChangeDetailsNotification.class, name = "TimeChangeDetailsNotification"),
@JsonSubTypes.Type(value = KafkaStreamsAnomalyNotification.class, name = "AppHealthNotification"),
+ @JsonSubTypes.Type(value = MapTimestampDeltaNotification.class, name = "MapTimestampDeltaNotification"),
+ @JsonSubTypes.Type(value = SpatTimestampDeltaNotification.class, name = "SpatTimestampDeltaNotification")
})
@Getter
@Setter
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/notifications/timestamp_delta/BaseTimestampDeltaNotification.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/notifications/timestamp_delta/BaseTimestampDeltaNotification.java
new file mode 100644
index 00000000..a99d00f0
--- /dev/null
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/notifications/timestamp_delta/BaseTimestampDeltaNotification.java
@@ -0,0 +1,49 @@
+package us.dot.its.jpo.conflictmonitor.monitor.models.notifications.timestamp_delta;
+
+import lombok.Getter;
+import lombok.Setter;
+import us.dot.its.jpo.conflictmonitor.monitor.models.events.ProcessingTimePeriod;
+import us.dot.its.jpo.conflictmonitor.monitor.models.notifications.Notification;
+
+/**
+ * Base class for a timestamp delta notification
+ */
+@Getter
+@Setter
+public abstract class BaseTimestampDeltaNotification extends Notification {
+
+ public BaseTimestampDeltaNotification(String notificationType) {
+ super(notificationType);
+ }
+
+ ProcessingTimePeriod timePeriod;
+ long numberOfEvents;
+
+ /**
+ * Minimum delta in milliseconds, signed.
+ * If this is less than 0, indicates there were events with OdeReceivedAt earlier
+ * than the message timestamp.
+ */
+ long minDeltaMillis;
+
+ /**
+ * Maximum delta in milliseconds, signed.
+ */
+ long maxDeltaMillis;
+
+ /**
+ * The median magnitude of the delta in milliseconds.
+ */
+ double absMedianDeltaMillis;
+
+
+ @Override
+ public String getUniqueId() {
+ return String.format("%s_%d_%d_%d_%d",
+ this.getNotificationType(),
+ this.getRoadRegulatorID(),
+ this.getIntersectionID(),
+ getTimePeriod() != null ? getTimePeriod().getBeginTimestamp() : 0,
+ getTimePeriod() != null ? getTimePeriod().getEndTimestamp() : 0);
+ }
+}
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/notifications/timestamp_delta/MapTimestampDeltaNotification.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/notifications/timestamp_delta/MapTimestampDeltaNotification.java
new file mode 100644
index 00000000..5fc65ae0
--- /dev/null
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/notifications/timestamp_delta/MapTimestampDeltaNotification.java
@@ -0,0 +1,11 @@
+package us.dot.its.jpo.conflictmonitor.monitor.models.notifications.timestamp_delta;
+
+import us.dot.its.jpo.conflictmonitor.monitor.models.events.timestamp_delta.MapTimestampDeltaEvent;
+
+public class MapTimestampDeltaNotification
+ extends BaseTimestampDeltaNotification {
+
+ public MapTimestampDeltaNotification() {
+ super("MapTimestampDeltaNotification");
+ }
+}
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/notifications/timestamp_delta/SpatTimestampDeltaNotification.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/notifications/timestamp_delta/SpatTimestampDeltaNotification.java
new file mode 100644
index 00000000..1e4e8085
--- /dev/null
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/models/notifications/timestamp_delta/SpatTimestampDeltaNotification.java
@@ -0,0 +1,9 @@
+package us.dot.its.jpo.conflictmonitor.monitor.models.notifications.timestamp_delta;
+
+public class SpatTimestampDeltaNotification
+ extends BaseTimestampDeltaNotification {
+
+ public SpatTimestampDeltaNotification() {
+ super("SpatTimestampDeltaNotification");
+ }
+}
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/processors/timestamp_deltas/BaseTimestampDeltaNotificationProcessor.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/processors/timestamp_deltas/BaseTimestampDeltaNotificationProcessor.java
new file mode 100644
index 00000000..787e3f1f
--- /dev/null
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/processors/timestamp_deltas/BaseTimestampDeltaNotificationProcessor.java
@@ -0,0 +1,164 @@
+package us.dot.its.jpo.conflictmonitor.monitor.processors.timestamp_deltas;
+
+import com.google.common.primitives.Doubles;
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.query.MultiVersionedKeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.VersionedRecordIterator;
+import org.slf4j.Logger;
+import us.dot.its.jpo.conflictmonitor.monitor.models.events.ProcessingTimePeriod;
+import us.dot.its.jpo.conflictmonitor.monitor.models.events.timestamp_delta.BaseTimestampDeltaEvent;
+import us.dot.its.jpo.conflictmonitor.monitor.models.events.timestamp_delta.TimestampDelta;
+import us.dot.its.jpo.conflictmonitor.monitor.models.notifications.timestamp_delta.BaseTimestampDeltaNotification;
+import us.dot.its.jpo.geojsonconverter.partitioner.RsuIntersectionKey;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+
+
+public abstract class BaseTimestampDeltaNotificationProcessor
+ extends ContextualProcessor {
+
+ abstract protected Logger getLogger();
+ abstract protected TNotification constructNotification();
+ abstract protected String getNotificationHeading();
+ abstract protected String getNotificationText();
+
+ final Duration retentionTime;
+ final String eventStoreName;
+ final String keyStoreName;
+
+ VersionedKeyValueStore eventStore;
+
+ // Store to keep track of all the keys. Needed because Versioned state stores don't support range queries yet.
+ KeyValueStore keyStore;
+
+ Cancellable punctuatorCancellationToken;
+
+ public BaseTimestampDeltaNotificationProcessor(final Duration retentionTime, final String eventStoreName,
+ final String keyStoreName) {
+ this.retentionTime = retentionTime;
+ this.eventStoreName = eventStoreName;
+ this.keyStoreName = keyStoreName;
+ }
+
+ @Override
+ public void init(ProcessorContext context) {
+ try {
+ super.init(context);
+ eventStore = context.getStateStore(eventStoreName);
+ keyStore = context.getStateStore(keyStoreName);
+ punctuatorCancellationToken = context.schedule(retentionTime, PunctuationType.WALL_CLOCK_TIME, this::punctuate);
+ } catch (Exception e) {
+ getLogger().error("Error initializing MapTimestampDeltaNotificationProcessor");
+ }
+ }
+
+ @Override
+ public void process(Record record) {
+ var key = record.key();
+ var value = record.value();
+ var timestamp = record.timestamp();
+ // Ignore tombstones
+ if (value == null) return;
+ keyStore.put(key, true);
+ eventStore.put(key, value, timestamp);
+ }
+
+ private void punctuate(final long timestamp) {
+ final Instant toTime = Instant.now();
+ final Instant fromTime = toTime.minus(retentionTime);
+
+ // Check every intersection for notifications
+ List keysToClean = new ArrayList<>();
+ try (var iterator = keyStore.all()) {
+ while (iterator.hasNext()) {
+ KeyValue keyValue = iterator.next();
+ RsuIntersectionKey key = keyValue.key;
+ assessmentForIntersection(key, fromTime, toTime, timestamp);
+ keysToClean.add(key);
+ }
+ } catch (Exception ex) {
+ getLogger().error("Error in punctuate method", ex);
+ }
+
+ // Clean up the store
+ for (RsuIntersectionKey key : keysToClean) {
+ keyStore.delete(key);
+ }
+ }
+
+ // Read stored events for one intersection, calculate statistics, and emit notifications
+ private void assessmentForIntersection(RsuIntersectionKey key, Instant fromTime, Instant toTime, long timestamp) {
+ var versionedQuery =
+ MultiVersionedKeyQuery.withKey(key)
+ .fromTime(fromTime)
+ .withAscendingTimestamps();
+ QueryResult> result =
+ eventStore.query(versionedQuery, PositionBound.unbounded(), new QueryConfig(false));
+ VersionedRecordIterator resultIterator = result.getResult();
+
+ SummaryStatistics stats = new SummaryStatistics();
+ DescriptiveStatistics absStats = new DescriptiveStatistics();
+ while (resultIterator.hasNext()) {
+ VersionedRecord record = resultIterator.next();
+ long recordTimestamp = record.timestamp();
+ Instant recordInstant = Instant.ofEpochMilli(recordTimestamp);
+ // Shouldn't happen but check timestamps, in case of stream-time vs clock time issue
+ if (recordInstant.isBefore(fromTime) || recordInstant.isAfter(toTime)) {
+ getLogger().warn("Record instant {} is not between {} and {}, skipping it.", recordInstant, fromTime, toTime);
+ continue;
+ }
+ TEvent event = record.value();
+ TimestampDelta delta = event.getDelta();
+ stats.addValue((double)delta.getDeltaMillis());
+ absStats.addValue((double)delta.getAbsDeltaMillis());
+ }
+
+ long numberOfEvents = stats.getN();
+ long minDeltaMillis = (long)stats.getMin();
+ long maxDeltaMillis = (long)stats.getMax();
+ double absMedianDelta = absStats.getPercentile(50.0);
+
+ if (numberOfEvents > 0) {
+ TNotification notification =
+ createNotification(key, fromTime, toTime, numberOfEvents, minDeltaMillis, maxDeltaMillis, absMedianDelta);
+ context().forward(new Record<>(key, notification, timestamp));
+ }
+ }
+
+ private TNotification createNotification(final RsuIntersectionKey key, final Instant fromTime, final Instant toTime,
+ final long numberOfEvents, final long minDeltaMillis, final long maxDeltaMillis,
+ final double absMedianDelta) {
+ final var notification = constructNotification();
+ final var timePeriod = new ProcessingTimePeriod();
+ timePeriod.setBeginTimestamp(fromTime.toEpochMilli());
+ timePeriod.setEndTimestamp(toTime.toEpochMilli());
+ notification.setTimePeriod(timePeriod);
+ notification.setIntersectionID(key.getIntersectionId());
+ notification.setRoadRegulatorID(key.getRegion());
+ notification.setNumberOfEvents(numberOfEvents);
+ notification.setMinDeltaMillis(minDeltaMillis);
+ notification.setMaxDeltaMillis(maxDeltaMillis);
+ notification.setAbsMedianDeltaMillis(absMedianDelta);
+ notification.setNotificationHeading(getNotificationHeading());
+ notification.setNotificationText(getNotificationText());
+ notification.setKey(notification.getUniqueId());
+ return notification;
+ }
+}
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/processors/timestamp_deltas/MapTimestampDeltaNotificationProcessor.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/processors/timestamp_deltas/MapTimestampDeltaNotificationProcessor.java
new file mode 100644
index 00000000..84fa9fb6
--- /dev/null
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/processors/timestamp_deltas/MapTimestampDeltaNotificationProcessor.java
@@ -0,0 +1,37 @@
+package us.dot.its.jpo.conflictmonitor.monitor.processors.timestamp_deltas;
+
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import us.dot.its.jpo.conflictmonitor.monitor.models.events.timestamp_delta.MapTimestampDeltaEvent;
+import us.dot.its.jpo.conflictmonitor.monitor.models.notifications.timestamp_delta.MapTimestampDeltaNotification;
+
+import java.time.Duration;
+
+@Slf4j
+public class MapTimestampDeltaNotificationProcessor
+ extends BaseTimestampDeltaNotificationProcessor {
+
+ public MapTimestampDeltaNotificationProcessor(Duration retentionTime, String eventStoreName, String keyStoreName) {
+ super(retentionTime, eventStoreName, keyStoreName);
+ }
+
+ @Override
+ protected Logger getLogger() {
+ return log;
+ }
+
+ @Override
+ protected MapTimestampDeltaNotification constructNotification() {
+ return new MapTimestampDeltaNotification();
+ }
+
+ @Override
+ protected String getNotificationHeading() {
+ return "MAP Timestamp Delta Notification";
+ }
+
+ @Override
+ protected String getNotificationText() {
+ return "There were differences between the ODE ingest time and message timestamp.";
+ }
+}
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/processors/timestamp_deltas/SpatTimestampDeltaNotificationProcessor.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/processors/timestamp_deltas/SpatTimestampDeltaNotificationProcessor.java
new file mode 100644
index 00000000..48679f3a
--- /dev/null
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/processors/timestamp_deltas/SpatTimestampDeltaNotificationProcessor.java
@@ -0,0 +1,37 @@
+package us.dot.its.jpo.conflictmonitor.monitor.processors.timestamp_deltas;
+
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import us.dot.its.jpo.conflictmonitor.monitor.models.events.timestamp_delta.SpatTimestampDeltaEvent;
+import us.dot.its.jpo.conflictmonitor.monitor.models.notifications.timestamp_delta.SpatTimestampDeltaNotification;
+
+import java.time.Duration;
+
+@Slf4j
+public class SpatTimestampDeltaNotificationProcessor
+ extends BaseTimestampDeltaNotificationProcessor {
+
+ public SpatTimestampDeltaNotificationProcessor(Duration retentionTime, String eventStoreName, String keyStoreName) {
+ super(retentionTime, eventStoreName, keyStoreName);
+ }
+
+ @Override
+ protected Logger getLogger() {
+ return log;
+ }
+
+ @Override
+ protected SpatTimestampDeltaNotification constructNotification() {
+ return new SpatTimestampDeltaNotification();
+ }
+
+ @Override
+ protected String getNotificationHeading() {
+ return "SPaT Timestamp Delta Notification";
+ }
+
+ @Override
+ protected String getNotificationText() {
+ return "There were differences between the ODE ingest time and message timestamp.";
+ }
+}
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/serialization/JsonSerdes.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/serialization/JsonSerdes.java
index e9c7b839..25261404 100644
--- a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/serialization/JsonSerdes.java
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/serialization/JsonSerdes.java
@@ -21,7 +21,6 @@
import us.dot.its.jpo.conflictmonitor.monitor.models.config.DefaultConfig;
import us.dot.its.jpo.conflictmonitor.monitor.models.config.IntersectionConfig;
import us.dot.its.jpo.conflictmonitor.monitor.models.config.IntersectionConfigKey;
-import us.dot.its.jpo.conflictmonitor.monitor.models.config.RsuConfigKey;
import us.dot.its.jpo.conflictmonitor.monitor.models.events.ConnectionOfTravelEvent;
import us.dot.its.jpo.conflictmonitor.monitor.models.events.IntersectionReferenceAlignmentEvent;
import us.dot.its.jpo.conflictmonitor.monitor.models.events.LaneDirectionOfTravelEvent;
@@ -34,7 +33,15 @@
import us.dot.its.jpo.conflictmonitor.monitor.models.events.broadcast_rate.SpatBroadcastRateEvent;
import us.dot.its.jpo.conflictmonitor.monitor.models.events.minimum_data.MapMinimumDataEvent;
import us.dot.its.jpo.conflictmonitor.monitor.models.events.minimum_data.SpatMinimumDataEvent;
+import us.dot.its.jpo.conflictmonitor.monitor.models.events.MapRevisionCounterEvent;
+import us.dot.its.jpo.conflictmonitor.monitor.models.events.SpatRevisionCounterEvent;
+import us.dot.its.jpo.conflictmonitor.monitor.models.events.BsmRevisionCounterEvent;
+import us.dot.its.jpo.conflictmonitor.monitor.models.events.timestamp_delta.BaseTimestampDeltaEvent;
+import us.dot.its.jpo.conflictmonitor.monitor.models.events.timestamp_delta.MapTimestampDeltaEvent;
+import us.dot.its.jpo.conflictmonitor.monitor.models.events.timestamp_delta.SpatTimestampDeltaEvent;
import us.dot.its.jpo.conflictmonitor.monitor.models.map.MapBoundingBox;
+import us.dot.its.jpo.conflictmonitor.monitor.models.notifications.timestamp_delta.MapTimestampDeltaNotification;
+import us.dot.its.jpo.conflictmonitor.monitor.models.notifications.timestamp_delta.SpatTimestampDeltaNotification;
import us.dot.its.jpo.conflictmonitor.monitor.models.spat.SpatTimeChangeDetailAggregator;
import us.dot.its.jpo.conflictmonitor.monitor.serialization.deserialization.GenericJsonDeserializer;
import us.dot.its.jpo.geojsonconverter.serialization.deserializers.JsonDeserializer;
@@ -140,6 +147,25 @@ public static Serde TimeChangeDetailsEvent() {
new JsonDeserializer<>(TimeChangeDetailsEvent.class));
}
+ public static Serde MapRevisionCounterEvent() {
+ return Serdes.serdeFrom(
+ new JsonSerializer(),
+ new JsonDeserializer<>(MapRevisionCounterEvent.class));
+ }
+
+ public static Serde SpatRevisionCounterEvent() {
+ return Serdes.serdeFrom(
+ new JsonSerializer(),
+ new JsonDeserializer<>(SpatRevisionCounterEvent.class));
+ }
+
+ public static Serde BsmRevisionCounterEvent() {
+ return Serdes.serdeFrom(
+ new JsonSerializer(),
+ new JsonDeserializer<>(BsmRevisionCounterEvent.class));
+ }
+
+
public static Serde SpatTimeChangeDetailAggregator() {
return Serdes.serdeFrom(
new JsonSerializer(),
@@ -336,12 +362,6 @@ public static Serde MapBroadcastRateNotification()
);
}
- public static Serde RsuConfigKey() {
- return Serdes.serdeFrom(
- new JsonSerializer(),
- new JsonDeserializer<>(RsuConfigKey.class)
- );
- }
public static Serde IntersectionConfigKey() {
return Serdes.serdeFrom(
@@ -356,4 +376,33 @@ public static Serde MapBoundingBox() {
new JsonDeserializer<>(MapBoundingBox.class)
);
}
+
+ public static Serde MapTimestampDeltaEvent() {
+ return Serdes.serdeFrom(
+ new JsonSerializer(),
+ new JsonDeserializer<>(MapTimestampDeltaEvent.class)
+ );
+ }
+
+ public static Serde SpatTimestampDeltaEvent() {
+ return Serdes.serdeFrom(
+ new JsonSerializer(),
+ new JsonDeserializer<>(SpatTimestampDeltaEvent.class)
+ );
+ }
+
+ public static Serde MapTimestampDeltaNotification() {
+ return Serdes.serdeFrom(
+ new JsonSerializer(),
+ new JsonDeserializer<>(MapTimestampDeltaNotification.class)
+ );
+ }
+
+ public static Serde SpatTimestampDeltaNotification() {
+ return Serdes.serdeFrom(
+ new JsonSerializer(),
+ new JsonDeserializer<>(SpatTimestampDeltaNotification.class)
+ );
+ }
+
}
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/serialization/deserialization/GenericJsonDeserializer.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/serialization/deserialization/GenericJsonDeserializer.java
index 8ca9cc12..226ef8eb 100644
--- a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/serialization/deserialization/GenericJsonDeserializer.java
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/serialization/deserialization/GenericJsonDeserializer.java
@@ -12,13 +12,16 @@
import us.dot.its.jpo.geojsonconverter.DateJsonMapper;
+/**
+ * Generic JSON deserializer for Kafka, deserializes from byte[] data.
+ */
public class GenericJsonDeserializer implements Deserializer {
private static Logger logger = LoggerFactory.getLogger(GenericJsonDeserializer.class);
protected final ObjectMapper mapper = DateJsonMapper.getInstance();
- private Class> genericClass;
+ final Class> genericClass;
public GenericJsonDeserializer(Class> genericClass) {
this.genericClass = genericClass;
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/serialization/deserialization/GenericJsonStringDeserializer.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/serialization/deserialization/GenericJsonStringDeserializer.java
new file mode 100644
index 00000000..69ff13e4
--- /dev/null
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/serialization/deserialization/GenericJsonStringDeserializer.java
@@ -0,0 +1,45 @@
+package us.dot.its.jpo.conflictmonitor.monitor.serialization.deserialization;
+
+import com.fasterxml.jackson.core.JacksonException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.*;
+import lombok.extern.slf4j.Slf4j;
+import us.dot.its.jpo.geojsonconverter.DateJsonMapper;
+
+import java.io.IOException;
+
+/**
+ * Generic JSON deserializer for REST controllers, deserializes from string.
+ */
+@Slf4j
+public class GenericJsonStringDeserializer extends JsonDeserializer {
+
+ protected final ObjectMapper mapper = DateJsonMapper.getInstance();
+
+ final Class> genericClass;
+
+ public GenericJsonStringDeserializer(Class> genericClass) {
+ this.genericClass = genericClass;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public T deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JacksonException {
+
+
+ JsonNode node = mapper.readTree(jsonParser);
+ if (node.has("type")) {
+ String type = node.get("type").asText();
+ Class> nestedClass = null;
+ try {
+ nestedClass = Class.forName(type);
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ JavaType javaType = mapper.getTypeFactory().constructParametricType(genericClass, nestedClass);
+ return (T)mapper.treeToValue(node, javaType);
+ }
+ return null;
+
+ }
+}
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/BsmRevisionCounterTopology.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/BsmRevisionCounterTopology.java
new file mode 100644
index 00000000..a3ccb84b
--- /dev/null
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/BsmRevisionCounterTopology.java
@@ -0,0 +1,131 @@
+package us.dot.its.jpo.conflictmonitor.monitor.topologies;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams.StateListener;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
+
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.BaseStreamsTopology;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.bsm_revision_counter.BsmRevisionCounterParameters;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.bsm_revision_counter.BsmRevisionCounterStreamsAlgorithm;
+import us.dot.its.jpo.conflictmonitor.monitor.models.events.BsmRevisionCounterEvent;
+import us.dot.its.jpo.conflictmonitor.monitor.serialization.JsonSerdes;
+import us.dot.its.jpo.ode.model.OdeBsmData;
+import us.dot.its.jpo.ode.plugin.j2735.J2735Bsm;
+
+import static us.dot.its.jpo.conflictmonitor.monitor.algorithms.bsm_revision_counter.BsmRevisionCounterConstants.DEFAULT_BSM_REVISION_COUNTER_ALGORITHM;
+
+import java.util.ArrayList;
+import java.util.Objects;
+
+@Component(DEFAULT_BSM_REVISION_COUNTER_ALGORITHM)
+public class BsmRevisionCounterTopology
+ extends BaseStreamsTopology
+ implements BsmRevisionCounterStreamsAlgorithm {
+
+ private static final Logger logger = LoggerFactory.getLogger(BsmRevisionCounterTopology.class);
+
+
+
+ @Override
+ protected Logger getLogger() {
+ return logger;
+ }
+
+ @Override
+ public Topology buildTopology() {
+ StreamsBuilder builder = new StreamsBuilder();
+
+ KStream inputStream = builder.stream(parameters.getBsmInputTopicName(), Consumed.with(Serdes.String(), JsonSerdes.OdeBsm()));
+
+ KStream eventStream = inputStream
+ .groupByKey(Grouped.with(Serdes.String(), JsonSerdes.OdeBsm()))
+ .aggregate(() -> new BsmRevisionCounterEvent(),
+ (key, newValue, aggregate) -> {
+ aggregate.setMessage(null);
+ if (aggregate.getNewBsm() == null){
+ aggregate.setNewBsm(newValue);
+ return aggregate;
+ }
+
+ //update the aggregate
+ aggregate.setPreviousBsm(aggregate.getNewBsm());
+ aggregate.setNewBsm(newValue);
+
+ J2735Bsm previousBsmPayload = (J2735Bsm) aggregate.getPreviousBsm().getPayload().getData();
+ J2735Bsm newBsmPayload = (J2735Bsm) aggregate.getNewBsm().getPayload().getData();
+
+ int newSecMark = newBsmPayload.getCoreData().getSecMark();
+
+ newBsmPayload.getCoreData().setSecMark(previousBsmPayload.getCoreData().getSecMark());
+ aggregate.getNewBsm().getMetadata().setOdeReceivedAt(aggregate.getPreviousBsm().getMetadata().getOdeReceivedAt());
+
+ int oldMetadataHash = Objects.hash(aggregate.getPreviousBsm().getMetadata().toJson());
+ int newMetadataHash = Objects.hash(aggregate.getNewBsm().getMetadata().toJson());
+ int oldPayloadHash = Objects.hash(previousBsmPayload.toJson());
+ int newPayloadHash = Objects.hash(newBsmPayload.toJson());
+
+ if (oldPayloadHash != newPayloadHash || oldMetadataHash != newMetadataHash){ //Contents of bsm message have changed
+ newBsmPayload.getCoreData().setSecMark(newSecMark);
+ aggregate.getNewBsm().getMetadata().setOdeReceivedAt(newValue.getMetadata().getOdeReceivedAt());
+ if (previousBsmPayload.getCoreData().getMsgCnt() == newBsmPayload.getCoreData().getMsgCnt()) { //Revision has not changed
+ aggregate.setMessage("Bsm message changed without msgCount increment.");
+ aggregate.setRoadRegulatorID(-1);
+ return aggregate;
+ }
+ else { //Revision has changed
+ return aggregate;
+ }
+ }
+ else { //Bsm messages are the same
+ return aggregate;
+
+ }
+
+ }, Materialized.with(Serdes.String(), us.dot.its.jpo.conflictmonitor.monitor.serialization.JsonSerdes.BsmRevisionCounterEvent()))
+ .toStream()
+ .flatMap((key, value) ->{
+ ArrayList> outputList = new ArrayList<>();
+ if (value.getMessage() != null){
+ outputList.add(new KeyValue<>(key, value));
+ }
+ return outputList;
+ });
+ eventStream.to(parameters.getBsmRevisionEventOutputTopicName(), Produced.with(Serdes.String(), us.dot.its.jpo.conflictmonitor.monitor.serialization.JsonSerdes.BsmRevisionCounterEvent()));
+
+ return builder.build();
+ }
+
+ public void stop() {
+ logger.info("Stopping Bsm Revision Counter Socket Broadcast Topology.");
+ if (streams != null) {
+ streams.close();
+ streams.cleanUp();
+ streams = null;
+ }
+ logger.info("Stopped Bsm Revision Counter Socket Broadcast Topology.");
+ }
+
+ StateListener stateListener;
+ public void registerStateListener(StateListener stateListener) {
+ this.stateListener = stateListener;
+ }
+
+ StreamsUncaughtExceptionHandler exceptionHandler;
+ public void registerUncaughtExceptionHandler(StreamsUncaughtExceptionHandler exceptionHandler) {
+ this.exceptionHandler = exceptionHandler;
+ }
+
+
+
+}
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/EventTopology.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/EventTopology.java
index 50e0abb3..e6247f6f 100644
--- a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/EventTopology.java
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/EventTopology.java
@@ -28,6 +28,15 @@ public class EventTopology
private static final Logger logger = LoggerFactory.getLogger(EventTopology.class);
+ @Override
+ public void start() {
+ // Don't start the topology if it is disabled by configuration setting
+ if (parameters.isEnabled()) {
+ super.start();
+ } else {
+ logger.warn("Not starting the EventTopology because EventParameters.enabled = false");
+ }
+ }
@Override
@@ -51,9 +60,14 @@ public Topology buildTopology() {
.merge(builder.stream(parameters.getMapMinimumDataTopicName(), Consumed.with(Serdes.String(), JsonSerdes.Event())))
.merge(builder.stream(parameters.getSpatMinimumDataTopicName(), Consumed.with(Serdes.String(), JsonSerdes.Event())))
.merge(builder.stream(parameters.getMapBroadcastRateTopicName(), Consumed.with(Serdes.String(), JsonSerdes.Event())))
- .merge(builder.stream(parameters.getSpatBroadcastRateTopicName(), Consumed.with(Serdes.String(), JsonSerdes.Event())));
+ .merge(builder.stream(parameters.getSpatBroadcastRateTopicName(), Consumed.with(Serdes.String(), JsonSerdes.Event())))
+ .merge(builder.stream(parameters.getSpatRevisionCounterEventTopicName(), Consumed.with(Serdes.String(), JsonSerdes.Event())))
+ .merge(builder.stream(parameters.getBsmRevisionCounterEventTopicName(), Consumed.with(Serdes.String(), JsonSerdes.Event())))
+ .merge(builder.stream(parameters.getMapRevisionCounterEventTopicName(), Consumed.with(Serdes.String(), JsonSerdes.Event())))
+ .merge(builder.stream(parameters.getTimestampDeltaEventTopicName(), Consumed.with(Serdes.String(), JsonSerdes.Event())));
allEvents.to(parameters.getEventOutputTopicName(), Produced.with(Serdes.String(), JsonSerdes.Event()));
+
if(parameters.isDebug()){
allEvents.print(Printed.toSysOut());
}
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/IntersectionEventTopology.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/IntersectionEventTopology.java
index a4ed1ef3..16c7ceec 100644
--- a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/IntersectionEventTopology.java
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/IntersectionEventTopology.java
@@ -349,9 +349,8 @@ public Topology buildTopology() {
StreamsBuilder builder = new StreamsBuilder();
- if (messageIngestAlgorithm instanceof MessageIngestStreamsAlgorithm) {
- var messageIngestStreamsAlgorithm = (MessageIngestStreamsAlgorithm)messageIngestAlgorithm;
- builder = messageIngestStreamsAlgorithm.buildTopology(builder);
+ if (messageIngestAlgorithm instanceof MessageIngestStreamsAlgorithm messageIngestStreamsAlgorithm) {
+ messageIngestStreamsAlgorithm.buildTopology(builder);
}
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/MapRevisionCounterTopology.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/MapRevisionCounterTopology.java
new file mode 100644
index 00000000..86244bac
--- /dev/null
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/MapRevisionCounterTopology.java
@@ -0,0 +1,107 @@
+package us.dot.its.jpo.conflictmonitor.monitor.topologies;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
+
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.BaseStreamsTopology;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.map_revision_counter.MapRevisionCounterParameters;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.map_revision_counter.MapRevisionCounterStreamsAlgorithm;
+import us.dot.its.jpo.conflictmonitor.monitor.models.events.MapRevisionCounterEvent;
+import us.dot.its.jpo.geojsonconverter.pojos.geojson.map.ProcessedMap;
+
+import us.dot.its.jpo.geojsonconverter.pojos.geojson.LineString;
+import us.dot.its.jpo.geojsonconverter.serialization.JsonSerdes;
+
+import static us.dot.its.jpo.conflictmonitor.monitor.algorithms.map_revision_counter.MapRevisionCounterConstants.DEFAULT_MAP_REVISION_COUNTER_ALGORITHM;
+
+import java.util.ArrayList;
+import java.util.Objects;
+
+@Component(DEFAULT_MAP_REVISION_COUNTER_ALGORITHM)
+public class MapRevisionCounterTopology
+ extends BaseStreamsTopology
+ implements MapRevisionCounterStreamsAlgorithm {
+
+ private static final Logger logger = LoggerFactory.getLogger(MapRevisionCounterTopology.class);
+
+
+
+ @Override
+ protected Logger getLogger() {
+ return logger;
+ }
+
+ @Override
+ public Topology buildTopology() {
+ StreamsBuilder builder = new StreamsBuilder();
+
+ KStream> inputStream = builder.stream(parameters.getMapInputTopicName(), Consumed.with(Serdes.String(), JsonSerdes.ProcessedMapGeoJson()));
+
+ KStream eventStream = inputStream
+ .groupByKey(Grouped.with(Serdes.String(), JsonSerdes.ProcessedMapGeoJson()))
+ .aggregate(() -> new MapRevisionCounterEvent(),
+ (key, newValue, aggregate) -> {
+
+ aggregate.setMessage(null);
+ if (aggregate.getNewMap() == null){
+ aggregate.setNewMap(newValue);
+ return aggregate;
+ }
+
+ //update the aggregate
+ aggregate.setPreviousMap(aggregate.getNewMap());
+ aggregate.setNewMap(newValue);
+
+ aggregate.getNewMap().getProperties().setTimeStamp(aggregate.getPreviousMap().getProperties().getTimeStamp());
+ aggregate.getNewMap().getProperties().setOdeReceivedAt(aggregate.getPreviousMap().getProperties().getOdeReceivedAt());
+
+ int oldHash = Objects.hash(aggregate.getPreviousMap().toString());
+ int newHash = Objects.hash(aggregate.getNewMap().toString());
+
+ if (oldHash != newHash){ //Contents of map message have changed
+ aggregate.getNewMap().getProperties().setTimeStamp(newValue.getProperties().getTimeStamp());
+ aggregate.getNewMap().getProperties().setOdeReceivedAt(newValue.getProperties().getOdeReceivedAt());
+ if (aggregate.getNewMap().getProperties().getRevision() == aggregate.getPreviousMap().getProperties().getRevision()) { //Revision has not changed
+ aggregate.setIntersectionID(aggregate.getNewMap().getProperties().getIntersectionId());
+ aggregate.setRoadRegulatorID(-1);
+ aggregate.setMessage("Map message changed without revision increment.");
+ return aggregate;
+ }
+ else { //Revision has changed
+ return aggregate;
+ }
+ }
+ else { //Map messages are the same
+ return aggregate;
+
+ }
+
+ }, Materialized.with(Serdes.String(), us.dot.its.jpo.conflictmonitor.monitor.serialization.JsonSerdes.MapRevisionCounterEvent()))
+ .toStream()
+ .flatMap((key, value) ->{
+ ArrayList> outputList = new ArrayList<>();
+ if (value.getMessage() != null){
+ outputList.add(new KeyValue<>(key, value));
+ }
+ return outputList;
+ });
+ eventStream.to(parameters.getMapRevisionEventOutputTopicName(), Produced.with(Serdes.String(), us.dot.its.jpo.conflictmonitor.monitor.serialization.JsonSerdes.MapRevisionCounterEvent()));
+
+ return builder.build();
+
+
+ }
+
+
+
+}
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/MapSpatMessageAssessmentTopology.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/MapSpatMessageAssessmentTopology.java
index 91b188c1..6a16b018 100644
--- a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/MapSpatMessageAssessmentTopology.java
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/MapSpatMessageAssessmentTopology.java
@@ -5,7 +5,14 @@
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.kstream.*;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Printed;
+import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -13,11 +20,13 @@
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.BaseStreamsTopology;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.map_spat_message_assessment.MapSpatMessageAssessmentParameters;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.map_spat_message_assessment.MapSpatMessageAssessmentStreamsAlgorithm;
-import us.dot.its.jpo.conflictmonitor.monitor.models.AllowedConcurrentPermissive;
import us.dot.its.jpo.conflictmonitor.monitor.models.Intersection.Intersection;
import us.dot.its.jpo.conflictmonitor.monitor.models.Intersection.LaneConnection;
import us.dot.its.jpo.conflictmonitor.monitor.models.RegulatorIntersectionId;
import us.dot.its.jpo.conflictmonitor.monitor.models.SpatMap;
+import us.dot.its.jpo.conflictmonitor.monitor.models.concurrent_permissive.ConnectedLanesPair;
+import us.dot.its.jpo.conflictmonitor.monitor.models.concurrent_permissive.ConnectedLanesPairList;
+import us.dot.its.jpo.conflictmonitor.monitor.models.config.ConfigMap;
import us.dot.its.jpo.conflictmonitor.monitor.models.events.IntersectionReferenceAlignmentEvent;
import us.dot.its.jpo.conflictmonitor.monitor.models.events.SignalGroupAlignmentEvent;
import us.dot.its.jpo.conflictmonitor.monitor.models.events.SignalStateConflictEvent;
@@ -65,9 +74,9 @@ private J2735MovementPhaseState getSpatEventStateBySignalGroup(ProcessedSpat spa
return null;
}
- private String hashLaneConnection(Integer intersectionID, int ingressOne, int ingressTwo, int egressOne, int egressTwo){
- return intersectionID + "_" + ingressOne + "_" + ingressTwo + "_" + egressOne + "_" + egressTwo;
- }
+// private String hashLaneConnection(Integer intersectionID, int ingressOne, int ingressTwo, int egressOne, int egressTwo){
+// return intersectionID + "_" + ingressOne + "_" + ingressTwo + "_" + egressOne + "_" + egressTwo;
+// }
private boolean doStatesConflict(J2735MovementPhaseState a, J2735MovementPhaseState b) {
return a.equals(J2735MovementPhaseState.PROTECTED_CLEARANCE)
@@ -85,15 +94,13 @@ private boolean doStatesConflict(J2735MovementPhaseState a, J2735MovementPhaseSt
public Topology buildTopology() {
- // TODO: Populate concurrent permissive allowed from intersection-level config
- Map allowMap = new HashMap<>();
- List list = new ArrayList<>();
- for(AllowedConcurrentPermissive elem : list){
- String hash = hashLaneConnection(elem.getIntersectionID(), elem.getFirstIngressLane(), elem.getSecondIngressLane(), elem.getFirstEgressLane(), elem.getSecondEgressLane());
- allowMap.put(hash, elem);
+ // Populate concurrent permissive allowed from intersection-level config
+ ConfigMap concurrentPermissiveConfigMap = parameters.getConcurrentPermissiveListMap();
+ final Set allowConcurrentPermissiveSet = new HashSet<>();
+ for (ConnectedLanesPairList list : concurrentPermissiveConfigMap.values()) {
+ allowConcurrentPermissiveSet.addAll(list);
}
-
StreamsBuilder builder = new StreamsBuilder();
// SPaT Input Stream
@@ -343,11 +350,14 @@ public Topology buildTopology() {
for (int j = i + 1; j < connections.size(); j++) {
LaneConnection secondConnection = connections.get(j);
- String compareHash = hashLaneConnection(intersection.getIntersectionId(), firstConnection.getIngressLane().getId(), secondConnection.getIngressLane().getId(), firstConnection.getEgressLane().getId(), secondConnection.getEgressLane().getId());
+ ConnectedLanesPair theseConnectedLanes = new ConnectedLanesPair(
+ intersection.getIntersectionId(), intersection.getRoadRegulatorId(),
+ firstConnection.getIngressLane().getId(), firstConnection.getEgressLane().getId(),
+ secondConnection.getIngressLane().getId(), secondConnection.getEgressLane().getId());
// Skip if this connection is defined in the allowable map.
- if(allowMap.containsKey(compareHash)){
+ if(allowConcurrentPermissiveSet.contains(theseConnectedLanes)){
continue;
}
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/MessageIngestTopology.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/MessageIngestTopology.java
index 046ac884..e3e023b8 100644
--- a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/MessageIngestTopology.java
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/MessageIngestTopology.java
@@ -4,8 +4,18 @@
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.kstream.*;
-import org.apache.kafka.streams.state.*;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.WindowStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -13,7 +23,6 @@
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.message_ingest.MessageIngestParameters;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.message_ingest.MessageIngestStreamsAlgorithm;
import us.dot.its.jpo.conflictmonitor.monitor.models.bsm.BsmIntersectionIdKey;
-import us.dot.its.jpo.conflictmonitor.monitor.models.bsm.BsmRsuIdKey;
import us.dot.its.jpo.conflictmonitor.monitor.models.bsm.BsmTimestampExtractor;
import us.dot.its.jpo.conflictmonitor.monitor.models.map.MapBoundingBox;
import us.dot.its.jpo.conflictmonitor.monitor.models.map.MapIndex;
@@ -38,9 +47,10 @@ public class MessageIngestTopology
implements MessageIngestStreamsAlgorithm {
private static final Logger logger = LoggerFactory.getLogger(MessageIngestTopology.class);
- //private int count = 0;
-
- public StreamsBuilder buildTopology(StreamsBuilder builder) {
+
+
+ @Override
+ public void buildTopology(StreamsBuilder builder) {
@@ -190,10 +200,6 @@ public StreamsBuilder buildTopology(StreamsBuilder builder) {
);
-
-
-
- return builder;
}
@@ -226,7 +232,6 @@ public ReadOnlyKeyValueStore> getMa
-
private MapIndex mapIndex;
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/NotificationTopology.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/NotificationTopology.java
index 78d48f73..b2bc7105 100644
--- a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/NotificationTopology.java
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/NotificationTopology.java
@@ -45,7 +45,8 @@ public Topology buildTopology() {
.merge(builder.stream(parameters.getIntersectionReferenceAlignmentNotificationTopicName(), Consumed.with(Serdes.String(), JsonSerdes.Notification())))
.merge(builder.stream(parameters.getSignalGroupAlignmentNotificationTopicName(), Consumed.with(Serdes.String(), JsonSerdes.Notification())))
.merge(builder.stream(parameters.getSignalStateConflictNotificationTopicName(), Consumed.with(Serdes.String(), JsonSerdes.Notification())))
- .merge(builder.stream(parameters.getSpatTimeChangeDetailsNotificationTopicName(), Consumed.with(Serdes.String(), JsonSerdes.Notification())));
+ .merge(builder.stream(parameters.getSpatTimeChangeDetailsNotificationTopicName(), Consumed.with(Serdes.String(), JsonSerdes.Notification())))
+ .merge(builder.stream(parameters.getTimestampDeltaNotificationTopicName(), Consumed.with(Serdes.String(), JsonSerdes.Notification())));
allNotifications.to(parameters.getNotificationOutputTopicName(), Produced.with(Serdes.String(), JsonSerdes.Notification()));
if(parameters.isDebug()){
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/RepartitionTopology.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/RepartitionTopology.java
index dd84625f..11cb7fd7 100644
--- a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/RepartitionTopology.java
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/RepartitionTopology.java
@@ -54,21 +54,11 @@ public Topology buildTopology() {
KStream bsmRekeyedStream = bsmRepartitionStream.selectKey((key, value)->{
String ip = BsmUtils.getRsuIp(value);
-// if (value.getMetadata() != null && value.getMetadata() instanceof OdeBsmMetadata) {
-// var metadata = (OdeBsmMetadata) value.getMetadata();
-// ip = metadata.getOriginIp();
-// }
String bsmId = BsmUtils.getVehicleId(value);
-// if (value.getPayload() != null
-// && value.getPayload().getData() instanceof J2735Bsm
-// && ((J2735Bsm) value.getPayload().getData()).getCoreData() != null) {
-// var coreData = ((J2735Bsm) value.getPayload().getData()).getCoreData();
-// bsmId = coreData.getId();
-// }
-
return new BsmRsuIdKey(ip, bsmId);
});
+
bsmRekeyedStream.to(
parameters.getBsmRepartitionOutputTopicName(),
Produced.with(
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/SpatRevisionCounterTopology.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/SpatRevisionCounterTopology.java
new file mode 100644
index 00000000..6670b606
--- /dev/null
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/SpatRevisionCounterTopology.java
@@ -0,0 +1,126 @@
+package us.dot.its.jpo.conflictmonitor.monitor.topologies;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams.StateListener;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
+
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.BaseStreamsTopology;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.spat_revision_counter.SpatRevisionCounterParameters;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.spat_revision_counter.SpatRevisionCounterStreamsAlgorithm;
+import us.dot.its.jpo.conflictmonitor.monitor.models.events.SpatRevisionCounterEvent;
+import us.dot.its.jpo.geojsonconverter.pojos.spat.ProcessedSpat;
+
+import us.dot.its.jpo.geojsonconverter.serialization.JsonSerdes;
+
+import static us.dot.its.jpo.conflictmonitor.monitor.algorithms.spat_revision_counter.SpatRevisionCounterConstants.DEFAULT_SPAT_REVISION_COUNTER_ALGORITHM;
+
+import java.util.ArrayList;
+
+@Component(DEFAULT_SPAT_REVISION_COUNTER_ALGORITHM)
+public class SpatRevisionCounterTopology
+ extends BaseStreamsTopology
+ implements SpatRevisionCounterStreamsAlgorithm {
+
+ private static final Logger logger = LoggerFactory.getLogger(SpatRevisionCounterTopology.class);
+
+
+
+ @Override
+ protected Logger getLogger() {
+ return logger;
+ }
+
+ @Override
+ public Topology buildTopology() {
+ StreamsBuilder builder = new StreamsBuilder();
+
+ KStream inputStream = builder.stream(parameters.getSpatInputTopicName(), Consumed.with(Serdes.String(), JsonSerdes.ProcessedSpat()));
+
+ KStream eventStream = inputStream
+ .groupByKey(Grouped.with(Serdes.String(), JsonSerdes.ProcessedSpat()))
+ .aggregate(() -> new SpatRevisionCounterEvent(),
+ (key, newValue, aggregate) -> {
+
+ aggregate.setMessage(null);
+ if (aggregate.getNewSpat() == null){
+ aggregate.setNewSpat(newValue);
+ return aggregate;
+ }
+
+ //update the aggregate
+ aggregate.setPreviousSpat(aggregate.getNewSpat());
+ aggregate.setNewSpat(newValue);
+
+ aggregate.getNewSpat().setUtcTimeStamp(aggregate.getPreviousSpat().getUtcTimeStamp());
+ aggregate.getNewSpat().setOdeReceivedAt(aggregate.getPreviousSpat().getOdeReceivedAt());
+
+ int oldHash = aggregate.getPreviousSpat().hashCode();
+ int newHash = aggregate.getNewSpat().hashCode();
+
+ if (oldHash != newHash){ //Contents of spat message have changed
+ aggregate.getNewSpat().setUtcTimeStamp(newValue.getUtcTimeStamp());
+ aggregate.getNewSpat().setOdeReceivedAt(newValue.getOdeReceivedAt());
+ if (aggregate.getNewSpat().getRevision() == aggregate.getPreviousSpat().getRevision()) { //Revision has not changed
+ aggregate.setIntersectionID(aggregate.getNewSpat().getIntersectionId());
+ aggregate.setRoadRegulatorID(-1);
+ aggregate.setMessage("Spat message changed without revision increment.");
+
+ return aggregate;
+ }
+ else { //Revision has changed
+ return aggregate;
+ }
+ }
+ else { //Spat messages are the same
+ return aggregate;
+
+ }
+
+ }, Materialized.with(Serdes.String(), us.dot.its.jpo.conflictmonitor.monitor.serialization.JsonSerdes.SpatRevisionCounterEvent()))
+ .toStream()
+ .flatMap((key, value) ->{
+ ArrayList> outputList = new ArrayList<>();
+ if (value.getMessage() != null){
+ outputList.add(new KeyValue<>(key, value));
+ }
+ return outputList;
+ });
+ eventStream.to(parameters.getSpatRevisionEventOutputTopicName(), Produced.with(Serdes.String(), us.dot.its.jpo.conflictmonitor.monitor.serialization.JsonSerdes.SpatRevisionCounterEvent()));
+
+ return builder.build();
+ }
+
+ public void stop() {
+ logger.info("Stopping Spat Revision Counter Socket Broadcast Topology.");
+ if (streams != null) {
+ streams.close();
+ streams.cleanUp();
+ streams = null;
+ }
+ logger.info("Stopped Spat Revision Counter Socket Broadcast Topology.");
+ }
+
+ StateListener stateListener;
+ public void registerStateListener(StateListener stateListener) {
+ this.stateListener = stateListener;
+ }
+
+ StreamsUncaughtExceptionHandler exceptionHandler;
+ public void registerUncaughtExceptionHandler(StreamsUncaughtExceptionHandler exceptionHandler) {
+ this.exceptionHandler = exceptionHandler;
+ }
+
+
+
+}
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/assessments/StopLineStopAssessmentTopology.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/assessments/StopLineStopAssessmentTopology.java
index 7c4a5587..d312b067 100644
--- a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/assessments/StopLineStopAssessmentTopology.java
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/assessments/StopLineStopAssessmentTopology.java
@@ -125,7 +125,7 @@ public Topology buildTopology() {
for(StopLineStopAssessmentGroup group: assessment.getStopLineStopAssessmentGroup()){
// Only Send Assessments that match the generating signal group.
if(group.getSignalGroup() == event.getSignalGroup() && group.getNumberOfEvents() >= parameters.getMinimumEventsToNotify()){
- double totalTime = group.getTimeStoppedOnDark() + group.getTimeStoppedOnGreen() + group.getTimeStoppedOnRed() + group.getTimeStoppedOnYellow();
+ double totalTime = group.getTimeStoppedOnDark() + group.getTimeStoppedOnGreen() + group.getTimeStoppedOnRed() + group.getTimeStoppedOnYellow() + group.getTimeStoppedOnDark();
if(group.getTimeStoppedOnGreen() > parameters.getGreenLightPercentToNotify() * totalTime){
StopLineStopNotification notification = new StopLineStopNotification();
notification.setSignalGroup(group.getSignalGroup());
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/config/ConfigInitializer.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/config/ConfigInitializer.java
index 521e5a0f..84d6e5c3 100644
--- a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/config/ConfigInitializer.java
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/config/ConfigInitializer.java
@@ -1,20 +1,18 @@
package us.dot.its.jpo.conflictmonitor.monitor.topologies.config;
-import com.fasterxml.jackson.core.JsonProcessingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.PropertyAccessor;
import org.springframework.beans.PropertyAccessorFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
-import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.AlgorithmParameters;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.config.ConfigParameters;
+import us.dot.its.jpo.conflictmonitor.monitor.models.concurrent_permissive.ConnectedLanesPairList;
import us.dot.its.jpo.conflictmonitor.monitor.models.config.*;
-import us.dot.its.jpo.geojsonconverter.DateJsonMapper;
-import static us.dot.its.jpo.conflictmonitor.monitor.serialization.JsonSerdes.*;
+
import java.lang.reflect.Field;
@@ -77,34 +75,42 @@ private void writeDefaultConfigObject(Object paramObj) {
}
private DefaultConfig> createConfig(Class> type, Object propValue, ConfigData updatable) {
-
if (Integer.class.equals(type) || "int".equals(type.getName())) {
- var config = new DefaultIntConfig();
- config.setValue((Integer)propValue);
+ var config = new DefaultConfig();
+ config.setValue((Integer) propValue);
setConfigProps(config, updatable, Integer.class);
return config;
} else if (String.class.equals(type)) {
- var config = new DefaultStringConfig();
- config.setValue((String)propValue);
+ var config = new DefaultConfig();
+ config.setValue((String) propValue);
setConfigProps(config, updatable, String.class);
return config;
} else if (Boolean.class.equals(type) || "boolean".equals(type.getName())) {
- var config = new DefaultBooleanConfig();
- config.setValue((Boolean)propValue);
+ var config = new DefaultConfig();
+ config.setValue((Boolean) propValue);
setConfigProps(config, updatable, Boolean.class);
return config;
} else if (Double.class.equals(type) || "double".equals(type.getName())) {
- var config = new DefaultDoubleConfig();
- config.setValue((Double)propValue);
+ var config = new DefaultConfig();
+ config.setValue((Double) propValue);
setConfigProps(config, updatable, Double.class);
return config;
} else if (Long.class.equals(type) || "long".equals(type.getName())) {
- var config = new DefaultLongConfig();
- config.setValue((Long)propValue);
+ var config = new DefaultConfig();
+ config.setValue((Long) propValue);
setConfigProps(config, updatable, Long.class);
return config;
+ } else if (ConnectedLanesPairList.class.equals(type)) {
+ var config = new DefaultConfig();
+ if (propValue != null) {
+ config.setValue((ConnectedLanesPairList) propValue);
+ } else {
+ config.setValue(new ConnectedLanesPairList());
+ }
+ setConfigProps(config, updatable, ConnectedLanesPairList.class);
+ return config;
} else {
- var config = new DefaultIntConfig();
+ var config = new DefaultConfig();
config.setValue((Integer)propValue);
setConfigProps(config, updatable, Integer.class);
return config;
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/config/ConfigTopology.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/config/ConfigTopology.java
index 04a542d6..bf61ebd7 100644
--- a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/config/ConfigTopology.java
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/config/ConfigTopology.java
@@ -7,7 +7,11 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
@@ -18,13 +22,27 @@
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.BaseStreamsTopology;
-import us.dot.its.jpo.conflictmonitor.monitor.algorithms.config.*;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.config.ConfigParameters;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.config.ConfigStreamsAlgorithm;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.config.ConfigUpdateResult;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.config.DefaultConfigListener;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.config.IntersectionConfigListener;
+import us.dot.its.jpo.conflictmonitor.monitor.models.config.Config;
+import us.dot.its.jpo.conflictmonitor.monitor.models.config.ConfigException;
import us.dot.its.jpo.conflictmonitor.monitor.models.config.DefaultConfig;
+import us.dot.its.jpo.conflictmonitor.monitor.models.config.DefaultConfigMap;
import us.dot.its.jpo.conflictmonitor.monitor.models.config.IntersectionConfig;
-import us.dot.its.jpo.conflictmonitor.monitor.models.config.*;
+import us.dot.its.jpo.conflictmonitor.monitor.models.config.IntersectionConfigKey;
+import us.dot.its.jpo.conflictmonitor.monitor.models.config.IntersectionConfigMap;
+import us.dot.its.jpo.conflictmonitor.monitor.models.config.UpdateType;
import us.dot.its.jpo.geojsonconverter.DateJsonMapper;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.TreeMap;
import static org.apache.kafka.common.serialization.Serdes.String;
import static us.dot.its.jpo.conflictmonitor.monitor.serialization.JsonSerdes.*;
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/timestamp_delta/AlternateMapTimestampDeltaAlgorithm.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/timestamp_delta/AlternateMapTimestampDeltaAlgorithm.java
new file mode 100644
index 00000000..61a601d1
--- /dev/null
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/timestamp_delta/AlternateMapTimestampDeltaAlgorithm.java
@@ -0,0 +1,30 @@
+package us.dot.its.jpo.conflictmonitor.monitor.topologies.timestamp_delta;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.timestamp_delta.map.MapTimestampDeltaAlgorithm;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.timestamp_delta.map.MapTimestampDeltaParameters;
+
+import static us.dot.its.jpo.conflictmonitor.monitor.algorithms.timestamp_delta.TimestampDeltaConstants.ALTERNATE_MAP_TIMESTAMP_DELTA_ALGORITHM;
+
+// Alternate algorithm that does nothing, for demonstrating the modular architecture
+@Component(ALTERNATE_MAP_TIMESTAMP_DELTA_ALGORITHM)
+@Slf4j
+public class AlternateMapTimestampDeltaAlgorithm implements MapTimestampDeltaAlgorithm {
+
+ MapTimestampDeltaParameters parameters;
+
+ @Override
+ public void setParameters(MapTimestampDeltaParameters mapTimestampDeltaParameters) {
+ this.parameters = mapTimestampDeltaParameters;
+ }
+
+ @Override
+ public MapTimestampDeltaParameters getParameters() {
+ return parameters;
+ }
+
+ public void doNothing() {
+ log.info("Doing nothing");
+ }
+}
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/timestamp_delta/AlternateSpatTimestampDeltaAlgorithm.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/timestamp_delta/AlternateSpatTimestampDeltaAlgorithm.java
new file mode 100644
index 00000000..14db64f5
--- /dev/null
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/timestamp_delta/AlternateSpatTimestampDeltaAlgorithm.java
@@ -0,0 +1,30 @@
+package us.dot.its.jpo.conflictmonitor.monitor.topologies.timestamp_delta;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.timestamp_delta.spat.SpatTimestampDeltaAlgorithm;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.timestamp_delta.spat.SpatTimestampDeltaParameters;
+
+import static us.dot.its.jpo.conflictmonitor.monitor.algorithms.timestamp_delta.TimestampDeltaConstants.ALTERNATE_SPAT_TIMESTAMP_DELTA_ALGORITHM;
+
+// Alternate algorithm that does nothing, for demonstrating the modular architecture
+@Component(ALTERNATE_SPAT_TIMESTAMP_DELTA_ALGORITHM)
+@Slf4j
+public class AlternateSpatTimestampDeltaAlgorithm implements SpatTimestampDeltaAlgorithm {
+
+ SpatTimestampDeltaParameters parameters;
+
+ @Override
+ public void setParameters(SpatTimestampDeltaParameters spatTimestampDeltaParameters) {
+ this.parameters = spatTimestampDeltaParameters;
+ }
+
+ @Override
+ public SpatTimestampDeltaParameters getParameters() {
+ return parameters;
+ }
+
+ public void doNothing() {
+ log.info("Doing nothing");
+ }
+}
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/timestamp_delta/BaseTimestampDeltaTopology.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/timestamp_delta/BaseTimestampDeltaTopology.java
new file mode 100644
index 00000000..21f8b827
--- /dev/null
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/timestamp_delta/BaseTimestampDeltaTopology.java
@@ -0,0 +1,114 @@
+package us.dot.its.jpo.conflictmonitor.monitor.topologies.timestamp_delta;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.state.Stores;
+import org.slf4j.Logger;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.BaseStreamsBuilder;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.timestamp_delta.BaseTimestampDeltaStreamsAlgorithm;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.timestamp_delta.ITimestampDeltaParameters;
+import us.dot.its.jpo.conflictmonitor.monitor.models.events.timestamp_delta.BaseTimestampDeltaEvent;
+import us.dot.its.jpo.conflictmonitor.monitor.models.events.timestamp_delta.TimestampDelta;
+import us.dot.its.jpo.conflictmonitor.monitor.models.notifications.timestamp_delta.BaseTimestampDeltaNotification;
+import us.dot.its.jpo.conflictmonitor.monitor.processors.timestamp_deltas.BaseTimestampDeltaNotificationProcessor;
+import us.dot.its.jpo.geojsonconverter.partitioner.IntersectionIdPartitioner;
+import us.dot.its.jpo.geojsonconverter.partitioner.RsuIntersectionKey;
+import java.time.Duration;
+
+public abstract class BaseTimestampDeltaTopology
+ extends BaseStreamsBuilder
+ implements BaseTimestampDeltaStreamsAlgorithm {
+
+ abstract protected Logger getLogger();
+ abstract protected long extractMessageTimestamp(TMessage message);
+ abstract protected long extractOdeReceivedAt(TMessage message);
+ abstract protected TEvent constructEvent();
+ abstract protected Serde eventSerde();
+ abstract protected Serde notificationSerde();
+
+ // Construct an instance of the notification processor
+ abstract protected BaseTimestampDeltaNotificationProcessor
+ constructProcessor(Duration retentionTime, String eventStoreName, String keyStoreName);
+
+ @Override
+ public void buildTopology(StreamsBuilder builder, KStream inputStream) {
+
+ final String keyStoreName = parameters.getKeyStoreName();
+ final String eventStoreName = parameters.getEventStoreName();
+ final Duration retentionTime = Duration.ofMinutes(parameters.getRetentionTimeMinutes());
+ final String outputTopicName = parameters.getOutputTopicName();
+ final String notificationTopicName = parameters.getNotificationTopicName();
+ final int maxDeltaMilliseconds = parameters.getMaxDeltaMilliseconds();
+ final boolean isDebug = parameters.isDebug();
+
+ final var eventStoreBuilder =
+ Stores.versionedKeyValueStoreBuilder(
+ Stores.persistentVersionedKeyValueStore(eventStoreName, retentionTime),
+ us.dot.its.jpo.geojsonconverter.serialization.JsonSerdes.RsuIntersectionKey(),
+ eventSerde()
+ );
+ final var keyStoreBuilder =
+ Stores.keyValueStoreBuilder(
+ Stores.persistentKeyValueStore(keyStoreName),
+ us.dot.its.jpo.geojsonconverter.serialization.JsonSerdes.RsuIntersectionKey(),
+ Serdes.Boolean()
+ );
+ builder.addStateStore(eventStoreBuilder);
+ builder.addStateStore(keyStoreBuilder);
+
+ KStream eventStream =
+ inputStream
+ // Ignore tombstones
+ .filter((rsuIntersectionKey, processedMap) -> processedMap != null)
+
+ // Calculate timestamp delta
+ .mapValues((rsuIntersectionKey, processedMap) -> {
+ TimestampDelta delta = new TimestampDelta();
+ delta.setMaxDeltaMillis(maxDeltaMilliseconds);
+ delta.setMessageTimestampMillis(extractMessageTimestamp(processedMap));
+ delta.setOdeIngestTimestampMillis(extractOdeReceivedAt(processedMap));
+ if (isDebug) {
+ getLogger().debug("RSU: {}, TimestampDelta: {}", rsuIntersectionKey.getRsuId(), delta);
+ }
+ return delta;
+ })
+
+ // Filter out deltas that shouldn't emit events
+ .filter((rsuIntersectionKey, timestampDelta) -> timestampDelta.emitEvent())
+
+ // Create Events
+ .mapValues((rsuIntersectionKey, timestampDelta) -> {
+ TEvent event = constructEvent();
+ event.setDelta(timestampDelta);
+ event.setSource(rsuIntersectionKey.getRsuId());
+ event.setIntersectionID(rsuIntersectionKey.getIntersectionId());
+ event.setRoadRegulatorID(rsuIntersectionKey.getRegion());
+ if (isDebug) {
+ getLogger().info("Producing TimestampDeltaEvent: {}", event);
+ }
+ return event;
+ });
+
+ // Output events
+ eventStream.to(outputTopicName, Produced.with(
+ us.dot.its.jpo.geojsonconverter.serialization.JsonSerdes.RsuIntersectionKey(),
+ eventSerde(),
+ new IntersectionIdPartitioner<>())); // Don't change partitioning of output
+
+
+
+ // Collect events to issue hourly summary notifications
+ eventStream
+ .process(() -> constructProcessor(retentionTime, eventStoreName, keyStoreName),
+ eventStoreName, keyStoreName)
+ .to(notificationTopicName,
+ Produced.with(
+ us.dot.its.jpo.geojsonconverter.serialization.JsonSerdes.RsuIntersectionKey(),
+ notificationSerde(),
+ new IntersectionIdPartitioner<>()));
+ }
+}
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/timestamp_delta/MapTimestampDeltaTopology.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/timestamp_delta/MapTimestampDeltaTopology.java
new file mode 100644
index 00000000..bda0d3e3
--- /dev/null
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/timestamp_delta/MapTimestampDeltaTopology.java
@@ -0,0 +1,64 @@
+package us.dot.its.jpo.conflictmonitor.monitor.topologies.timestamp_delta;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.Serde;
+import org.slf4j.Logger;
+import org.springframework.stereotype.Component;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.timestamp_delta.map.MapTimestampDeltaParameters;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.timestamp_delta.map.MapTimestampDeltaStreamsAlgorithm;
+import us.dot.its.jpo.conflictmonitor.monitor.models.events.timestamp_delta.MapTimestampDeltaEvent;
+import us.dot.its.jpo.conflictmonitor.monitor.models.notifications.timestamp_delta.MapTimestampDeltaNotification;
+import us.dot.its.jpo.conflictmonitor.monitor.processors.timestamp_deltas.BaseTimestampDeltaNotificationProcessor;
+import us.dot.its.jpo.conflictmonitor.monitor.processors.timestamp_deltas.MapTimestampDeltaNotificationProcessor;
+import us.dot.its.jpo.conflictmonitor.monitor.serialization.JsonSerdes;
+import us.dot.its.jpo.conflictmonitor.monitor.utils.ProcessedMapUtils;
+import us.dot.its.jpo.geojsonconverter.pojos.geojson.LineString;
+import us.dot.its.jpo.geojsonconverter.pojos.geojson.map.ProcessedMap;
+
+import java.time.Duration;
+
+import static us.dot.its.jpo.conflictmonitor.monitor.algorithms.timestamp_delta.TimestampDeltaConstants.DEFAULT_MAP_TIMESTAMP_DELTA_ALGORITHM;
+
+@Component(DEFAULT_MAP_TIMESTAMP_DELTA_ALGORITHM)
+@Slf4j
+public class MapTimestampDeltaTopology
+ extends BaseTimestampDeltaTopology, MapTimestampDeltaParameters, MapTimestampDeltaEvent,
+ MapTimestampDeltaNotification>
+ implements MapTimestampDeltaStreamsAlgorithm {
+
+ @Override
+ protected Logger getLogger() {
+ return log;
+ }
+
+ @Override
+ protected long extractMessageTimestamp(ProcessedMap message) {
+ return ProcessedMapUtils.getTimestamp(message);
+ }
+
+ @Override
+ protected long extractOdeReceivedAt(ProcessedMap message) {
+ return ProcessedMapUtils.getOdeReceivedAt(message);
+ }
+
+ @Override
+ protected MapTimestampDeltaEvent constructEvent() {
+ return new MapTimestampDeltaEvent();
+ }
+
+ @Override
+ protected Serde eventSerde() {
+ return JsonSerdes.MapTimestampDeltaEvent();
+ }
+
+ @Override
+ protected Serde notificationSerde() {
+ return JsonSerdes.MapTimestampDeltaNotification();
+ }
+
+ @Override
+ protected BaseTimestampDeltaNotificationProcessor constructProcessor(Duration retentionTime, String eventStoreName, String keyStoreName) {
+ return new MapTimestampDeltaNotificationProcessor(retentionTime, eventStoreName, keyStoreName);
+ }
+
+}
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/timestamp_delta/SpatTimestampDeltaTopology.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/timestamp_delta/SpatTimestampDeltaTopology.java
new file mode 100644
index 00000000..1064759a
--- /dev/null
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/timestamp_delta/SpatTimestampDeltaTopology.java
@@ -0,0 +1,63 @@
+package us.dot.its.jpo.conflictmonitor.monitor.topologies.timestamp_delta;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.Serde;
+import org.slf4j.Logger;
+import org.springframework.stereotype.Component;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.timestamp_delta.spat.SpatTimestampDeltaParameters;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.timestamp_delta.spat.SpatTimestampDeltaStreamsAlgorithm;
+import us.dot.its.jpo.conflictmonitor.monitor.models.events.timestamp_delta.SpatTimestampDeltaEvent;
+import us.dot.its.jpo.conflictmonitor.monitor.models.notifications.timestamp_delta.SpatTimestampDeltaNotification;
+import us.dot.its.jpo.conflictmonitor.monitor.processors.timestamp_deltas.BaseTimestampDeltaNotificationProcessor;
+import us.dot.its.jpo.conflictmonitor.monitor.processors.timestamp_deltas.SpatTimestampDeltaNotificationProcessor;
+import us.dot.its.jpo.conflictmonitor.monitor.serialization.JsonSerdes;
+import us.dot.its.jpo.conflictmonitor.monitor.utils.SpatUtils;
+import us.dot.its.jpo.geojsonconverter.pojos.spat.ProcessedSpat;
+
+import java.time.Duration;
+
+import static us.dot.its.jpo.conflictmonitor.monitor.algorithms.timestamp_delta.TimestampDeltaConstants.DEFAULT_SPAT_TIMESTAMP_DELTA_ALGORITHM;
+
+@Component(DEFAULT_SPAT_TIMESTAMP_DELTA_ALGORITHM)
+@Slf4j
+public class SpatTimestampDeltaTopology
+ extends BaseTimestampDeltaTopology
+ implements SpatTimestampDeltaStreamsAlgorithm {
+
+ @Override
+ protected Logger getLogger() {
+ return log;
+ }
+
+ @Override
+ protected long extractMessageTimestamp(ProcessedSpat processedSpat) {
+ return SpatUtils.getTimestamp(processedSpat);
+ }
+
+ @Override
+ protected long extractOdeReceivedAt(ProcessedSpat processedSpat) {
+ return SpatUtils.getOdeReceivedAt(processedSpat);
+ }
+
+ @Override
+ protected SpatTimestampDeltaEvent constructEvent() {
+ return new SpatTimestampDeltaEvent();
+ }
+
+ @Override
+ protected Serde eventSerde() {
+ return JsonSerdes.SpatTimestampDeltaEvent();
+ }
+
+ @Override
+ protected Serde notificationSerde() {
+ return JsonSerdes.SpatTimestampDeltaNotification();
+ }
+
+ @Override
+ protected BaseTimestampDeltaNotificationProcessor constructProcessor(Duration retentionTime, String eventStoreName, String keyStoreName) {
+ return new SpatTimestampDeltaNotificationProcessor(retentionTime, eventStoreName, keyStoreName);
+ }
+
+}
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/validation/AlternateMapValidationAlgorithm.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/validation/AlternateMapValidationAlgorithm.java
index 64d92776..3e92a905 100644
--- a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/validation/AlternateMapValidationAlgorithm.java
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/validation/AlternateMapValidationAlgorithm.java
@@ -9,8 +9,10 @@
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.timestamp_delta.map.MapTimestampDeltaAlgorithm;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.validation.map.MapValidationAlgorithm;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.validation.map.MapValidationParameters;
+import us.dot.its.jpo.conflictmonitor.monitor.topologies.timestamp_delta.AlternateMapTimestampDeltaAlgorithm;
/**
* Test algorithm just writes random numbers to the log
@@ -38,6 +40,9 @@ public void start() {
// Don't run if not debugging
if (!parameters.isDebug()) return;
+ // Plugin algorithm
+ timestampDeltaAlgorithm.doNothing();
+
timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@@ -63,5 +68,23 @@ public void stop() {
}
}
-
+
+
+ AlternateMapTimestampDeltaAlgorithm timestampDeltaAlgorithm;
+
+
+ @Override
+ public MapTimestampDeltaAlgorithm getTimestampDeltaAlgorithm() {
+ return timestampDeltaAlgorithm;
+ }
+
+ @Override
+ public void setTimestampDeltaAlgorithm(MapTimestampDeltaAlgorithm timestampDeltaAlgorithm) {
+ // Enforce a specific algorithm implementation
+ if (timestampDeltaAlgorithm instanceof AlternateMapTimestampDeltaAlgorithm altAlgorithm) {
+ this.timestampDeltaAlgorithm = altAlgorithm;
+ } else {
+ throw new IllegalArgumentException("Algorithm is not an instance of AlternateMapTimestampDeltaAlgorithm");
+ }
+ }
}
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/validation/AlternateSpatValidationAlgorithm.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/validation/AlternateSpatValidationAlgorithm.java
index 3002c0f4..454f6d3c 100644
--- a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/validation/AlternateSpatValidationAlgorithm.java
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/validation/AlternateSpatValidationAlgorithm.java
@@ -9,8 +9,10 @@
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.timestamp_delta.spat.SpatTimestampDeltaAlgorithm;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.validation.spat.SpatValidationAlgorithm;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.validation.spat.SpatValidationParameters;
+import us.dot.its.jpo.conflictmonitor.monitor.topologies.timestamp_delta.AlternateSpatTimestampDeltaAlgorithm;
/**
* Test SPAT algorithm, writes random numbers to the log in debug mode.
@@ -64,4 +66,22 @@ public void stop() {
}
}
+
+ AlternateSpatTimestampDeltaAlgorithm timestampDeltaAlgorithm;
+
+
+ @Override
+ public SpatTimestampDeltaAlgorithm getTimestampDeltaAlgorithm() {
+ return timestampDeltaAlgorithm;
+ }
+
+ @Override
+ public void setTimestampDeltaAlgorithm(SpatTimestampDeltaAlgorithm timestampDeltaAlgorithm) {
+ // Enforce a specific algorithm implementation
+ if (timestampDeltaAlgorithm instanceof AlternateSpatTimestampDeltaAlgorithm altAlgorithm) {
+ this.timestampDeltaAlgorithm = altAlgorithm;
+ } else {
+ throw new IllegalArgumentException("Algorithm is not an instance of AlternateSpatTimestampDeltaAlgorithm");
+ }
+ }
}
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/validation/MapValidationTopology.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/validation/MapValidationTopology.java
index efdf7fc4..6142cd0f 100644
--- a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/validation/MapValidationTopology.java
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/validation/MapValidationTopology.java
@@ -7,13 +7,13 @@
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
-import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
-import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.timestamp_delta.map.MapTimestampDeltaAlgorithm;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.timestamp_delta.map.MapTimestampDeltaStreamsAlgorithm;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.validation.map.MapValidationParameters;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.validation.map.MapValidationStreamsAlgorithm;
import us.dot.its.jpo.conflictmonitor.monitor.models.IntersectionRegion;
@@ -49,8 +49,34 @@ protected Logger getLogger() {
private static final String LATEST_TIMESTAMP_STORE = "latest-timestamp-store";
+ MapTimestampDeltaStreamsAlgorithm timestampDeltaAlgorithm;
+
+ @Override
+ public MapTimestampDeltaAlgorithm getTimestampDeltaAlgorithm() {
+ return timestampDeltaAlgorithm;
+ }
+
+ @Override
+ public void setTimestampDeltaAlgorithm(MapTimestampDeltaAlgorithm timestampDeltaAlgorithm) {
+ // Enforce the algorithm being a Streams algorithm
+ if (timestampDeltaAlgorithm instanceof MapTimestampDeltaStreamsAlgorithm timestampDeltaStreamsAlgorithm) {
+ this.timestampDeltaAlgorithm = timestampDeltaStreamsAlgorithm;
+ } else {
+ throw new IllegalArgumentException("algorithm is not an instance of MapTimestampDeltaStreamsAlgorithm");
+ }
+ }
+
+ @Override
+ protected void validate() {
+ super.validate();
+
+ if (timestampDeltaAlgorithm == null) {
+ throw new IllegalStateException("MapTimestampDeltaAlgorithm is not set.");
+ }
+ }
public Topology buildTopology() {
+
var builder = new StreamsBuilder();
// Create state store for zero count
@@ -69,6 +95,9 @@ public Topology buildTopology() {
.withTimestampExtractor(new TimestampExtractorForBroadcastRate())
);
+ // timestamp delta plugin after reading processed MAPs
+ timestampDeltaAlgorithm.buildTopology(builder, processedMapStream);
+
// Extract validation info for Minimum Data events
KStream minDataStream = processedMapStream
// Filter out messages that are valid
@@ -200,5 +229,5 @@ public Topology buildTopology() {
}
-
+
}
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/validation/SpatValidationTopology.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/validation/SpatValidationTopology.java
index 48414010..4f1cb467 100644
--- a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/validation/SpatValidationTopology.java
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/topologies/validation/SpatValidationTopology.java
@@ -12,6 +12,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.timestamp_delta.spat.SpatTimestampDeltaAlgorithm;
+import us.dot.its.jpo.conflictmonitor.monitor.algorithms.timestamp_delta.spat.SpatTimestampDeltaStreamsAlgorithm;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.validation.spat.SpatValidationParameters;
import us.dot.its.jpo.conflictmonitor.monitor.algorithms.validation.spat.SpatValidationStreamsAlgorithm;
import us.dot.its.jpo.conflictmonitor.monitor.models.events.ProcessingTimePeriod;
@@ -44,6 +46,31 @@ protected Logger getLogger() {
private static final String LATEST_TIMESTAMP_STORE = "latest-timestamp-store";
+ SpatTimestampDeltaStreamsAlgorithm timestampDeltaAlgorithm;
+
+ @Override
+ public SpatTimestampDeltaAlgorithm getTimestampDeltaAlgorithm() {
+ return timestampDeltaAlgorithm;
+ }
+
+ @Override
+ public void setTimestampDeltaAlgorithm(SpatTimestampDeltaAlgorithm timestampDeltaAlgorithm) {
+ // Enforce the algorithm being a Streams algorithm
+ if (timestampDeltaAlgorithm instanceof SpatTimestampDeltaStreamsAlgorithm timestampDeltaStreamsAlgorithm) {
+ this.timestampDeltaAlgorithm = timestampDeltaStreamsAlgorithm;
+ } else {
+ throw new IllegalArgumentException("Algorithm is not an instance of SpatTimestampDeltaStreamsAlgorithm");
+ }
+ }
+
+ @Override
+ protected void validate() {
+ super.validate();
+
+ if (timestampDeltaAlgorithm == null) {
+ throw new IllegalStateException("SpatTimestampDeltaAlgorithm is not set");
+ }
+ }
@Override
public Topology buildTopology() {
@@ -65,6 +92,9 @@ public Topology buildTopology() {
.withTimestampExtractor(new TimestampExtractorForBroadcastRate())
);
+ // timestamp delta plugin after reading processed SPATs
+ timestampDeltaAlgorithm.buildTopology(builder, processedSpatStream);
+
// Extract validation info for Minimum Data events
processedSpatStream
.filter((key, value) -> value != null && !value.getCti4501Conformant())
@@ -180,5 +210,4 @@ public Topology buildTopology() {
-
}
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/utils/BsmUtils.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/utils/BsmUtils.java
index f809fd12..78a579d8 100644
--- a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/utils/BsmUtils.java
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/utils/BsmUtils.java
@@ -1,5 +1,6 @@
package us.dot.its.jpo.conflictmonitor.monitor.utils;
+import lombok.extern.slf4j.Slf4j;
import org.locationtech.jts.geom.CoordinateXY;
import us.dot.its.jpo.ode.model.OdeBsmData;
import us.dot.its.jpo.ode.model.OdeBsmMetadata;
@@ -8,8 +9,11 @@
import us.dot.its.jpo.ode.plugin.j2735.J2735BsmCoreData;
import us.dot.its.jpo.ode.plugin.j2735.OdePosition3D;
+import java.time.Instant;
+import java.time.format.DateTimeParseException;
import java.util.Optional;
+@Slf4j
public class BsmUtils {
public static CoordinateXY getPosition(OdeBsmData bsm) {
CoordinateXY position = new CoordinateXY();
@@ -74,4 +78,18 @@ public static String getRsuIp(OdeBsmData bsm) {
}
return ip;
}
+
+ public static long getOdeReceivedAt(OdeBsmData bsm) {
+ long odeReceivedAt = 0;
+ if (bsm != null && bsm.getMetadata() != null && bsm.getMetadata() instanceof OdeBsmMetadata metadata) {
+ String strOdeReceivedAt = metadata.getOdeReceivedAt();
+ assert(strOdeReceivedAt != null);
+ try {
+ odeReceivedAt = Instant.parse(strOdeReceivedAt).toEpochMilli();
+ } catch (DateTimeParseException ex) {
+ log.error(String.format("Error parsing odeReceivedAt: %s", strOdeReceivedAt), ex);
+ }
+ }
+ return odeReceivedAt;
+ }
}
diff --git a/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/utils/ProcessedMapUtils.java b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/utils/ProcessedMapUtils.java
new file mode 100644
index 00000000..bdbca8b2
--- /dev/null
+++ b/jpo-conflictmonitor/src/main/java/us/dot/its/jpo/conflictmonitor/monitor/utils/ProcessedMapUtils.java
@@ -0,0 +1,52 @@
+package us.dot.its.jpo.conflictmonitor.monitor.utils;
+
+import lombok.extern.slf4j.Slf4j;
+import us.dot.its.jpo.geojsonconverter.pojos.geojson.map.MapSharedProperties;
+import us.dot.its.jpo.geojsonconverter.pojos.geojson.map.ProcessedMap;
+
+import java.time.ZonedDateTime;
+
+/**
+ * Methods to get properties from ProcessedMaps with null checks
+ */
+@Slf4j
+public class ProcessedMapUtils {
+
+ public static long getTimestamp(ProcessedMap processedMap) {
+ if (processedMap == null) {
+ log.error("ProcessedMap is null");
+ return 0L;
+ }
+ MapSharedProperties properties = processedMap.getProperties();
+ if (properties == null) {
+ log.error("ProcessedMap.properties are null");
+ return 0L;
+ }
+ ZonedDateTime zdt = properties.getTimeStamp();
+ if (zdt == null) {
+ log.error("ProcessedMap Timestamp is null");
+ return 0L;
+ }
+ return zdt.toInstant().toEpochMilli();
+ }
+
+ public static