Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge Release/2024 q3 into master #111

Merged
merged 124 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
124 commits
Select commit Hold shift + click to select a range
02902dc
Created revision counter topology, algorithm, and events for SPAT and…
linda-baker May 8, 2024
7bd9318
Revision Counter bugfixes to enable build
linda-baker May 8, 2024
42f27eb
Add revision counter to MonitorServiceController
linda-baker May 8, 2024
f0014bd
Data objects for connected lanes
iyourshaw May 29, 2024
3b83aff
Remove unused classes
iyourshaw May 29, 2024
89bb063
Add concurrent permissive to intersection-level config map
iyourshaw May 29, 2024
143f1c7
Fix deserializing ConnectedLanes
iyourshaw Jun 5, 2024
c2552a3
Add validation for concurrent permissive intersection/region
iyourshaw Jun 5, 2024
3940f84
Clean up code
iyourshaw Jun 5, 2024
82eebda
Fixing deserialization for IntersectionConfig
iyourshaw Jun 6, 2024
27e940c
Merge branch 'develop' into Add-revision-counter
linda-baker Jun 7, 2024
c50847b
Add unit tests for config deserialization
iyourshaw Jun 7, 2024
3aae8e1
Modified Dockerfile to create copy of keyfile to prevent permissions …
John-Wiens Jun 7, 2024
8d0d0df
fixed revision counter naming typo
linda-baker Jun 12, 2024
1ae3191
Unit tests for deserializing concurrent permissive config
iyourshaw Jun 12, 2024
f08cab7
Making Topic Names Configurable
John-Wiens Jun 14, 2024
b2dd475
Updating Keying Scheme for TIM Messages
John-Wiens Jun 14, 2024
ffe34bf
Adding in Kafka Environment Variable
John-Wiens Jun 17, 2024
936e7be
Merge pull request #93 from usdot-jpo-ode/keyfile-permissions-fix
John-Wiens Jun 17, 2024
9dfa167
Merge pull request #97 from usdot-jpo-ode/deduplicator-paramaterization
John-Wiens Jun 17, 2024
bc74b8d
Merge pull request #98 from usdot-jpo-ode/master
dan-du-car Jun 18, 2024
b55760b
Add revision count unit tests
linda-baker Jun 24, 2024
fc21e2e
Bugfix improve revision counter topology logic and unit tests
linda-baker Jun 26, 2024
ddd1dcb
Fixing Tim Deduplicator not receiving messages
John-Wiens Jun 26, 2024
26d4124
Merge pull request #99 from usdot-jpo-ode/deduplicator-tim-fixes
John-Wiens Jun 26, 2024
38acfeb
Merge branch 'develop' into Add-revision-counter
linda-baker Jun 27, 2024
eb37a04
Additional revision counter bugfixes
linda-baker Jun 27, 2024
71d073f
Adding Bsm Deduplicator
John-Wiens Jun 28, 2024
4e2a0b5
Remove unused classes
iyourshaw Jun 30, 2024
b36d348
Unit test
iyourshaw Jun 30, 2024
8c04e99
Fix test
iyourshaw Jun 30, 2024
effe892
Merge branch 'develop' into permissive-movement-config
iyourshaw Jun 30, 2024
9c456bc
Clean up imports
iyourshaw Jul 1, 2024
d94c23d
Remove unused classes
iyourshaw Jul 1, 2024
8afdd5e
Fix imports
iyourshaw Jul 1, 2024
089a1e8
Fix health endpoints to work with updated json config
iyourshaw Jul 1, 2024
85e9ffb
Fix map property name
iyourshaw Jul 1, 2024
e17224b
Added Test Coverage for Deduplicator
John-Wiens Jul 1, 2024
f4efac5
Merge branch 'develop' of github.com:usdot-jpo-ode/jpo-conflictmonito…
John-Wiens Jul 1, 2024
e24c567
Added Dark Events to Stop Line Stop Assessment
John-Wiens Jul 1, 2024
8a58480
Updated Test Case to Match
John-Wiens Jul 1, 2024
3e86603
Merge pull request #101 from usdot-jpo-ode/bsm-deduplicator
John-Wiens Jul 1, 2024
b0e91e7
Merge pull request #100 from usdot-jpo-ode/permissive-movement-config
John-Wiens Jul 2, 2024
0513991
Switching BSM to avoid deduplication by default
John-Wiens Jul 2, 2024
6cfe9fc
Merge pull request #102 from usdot-jpo-ode/dark-stop-line-fix
John-Wiens Jul 2, 2024
bcadb16
resolve revision counter topology build issues
linda-baker Jul 8, 2024
45f704f
Refactor base classes for algorithms: simplify using Lombok getters/s…
iyourshaw Jul 10, 2024
4d6a874
Visualizing topologies
iyourshaw Jul 11, 2024
8b49fb4
Visualize topologies
iyourshaw Jul 11, 2024
da99a63
Create interfaces and paramters classes for timestamp deltas algorithm
iyourshaw Jul 15, 2024
a72ed2c
Timestamp delta parameters and event
iyourshaw Jul 15, 2024
f2adace
Create BSM Timestamp delta processor
iyourshaw Jul 17, 2024
075d6cc
Update RevisionCounter integration test scripts
linda-baker Jul 17, 2024
8e20a31
add bsmRevisionCounter
linda-baker Jul 17, 2024
7ac2ccd
Create MapTimestampDeltaTopology
iyourshaw Jul 18, 2024
f3fdcd7
Remove unused classes. Create SpatTimestampDeltaTopology.
iyourshaw Jul 18, 2024
f639cc5
Reconfigured Tim Deduplicator to Match ODE decoded format
John-Wiens Jul 23, 2024
e57cc1d
Implement ingegrate map and spat timestamp topologies with validation…
iyourshaw Jul 24, 2024
41d9329
Merge branch 'develop' into Add-revision-counter
linda-baker Jul 24, 2024
e67f9d1
Update geojsonconverter submodule reference
linda-baker Jul 24, 2024
5c0881b
Add ability to set offset milliseconds in script templates, with unit…
iyourshaw Jul 25, 2024
e2f4ffe
Test script for MAP timestamp delta events
iyourshaw Jul 25, 2024
e48101d
Update script-runner-cli
iyourshaw Jul 25, 2024
50e88e8
Change mongo health check to a simple ping
iyourshaw Jul 25, 2024
dd68656
Fixes
iyourshaw Jul 25, 2024
77f8472
show timestamp difference in serialized events
iyourshaw Jul 26, 2024
e7c604b
Test script for SPAT timestamp-deltas
iyourshaw Jul 26, 2024
7af5350
Fix unit tests
iyourshaw Jul 26, 2024
45e356e
Unit test for map timestamp delta topologies
iyourshaw Jul 26, 2024
a555511
Unit test for Spat timestamp topology
iyourshaw Jul 26, 2024
5b9a90b
Documentation and fixes for topology visualization
iyourshaw Jul 26, 2024
e51f7cf
Modular Architecture documentation
iyourshaw Jul 27, 2024
793c610
Fix typos
iyourshaw Jul 27, 2024
400143f
Switched TIM deduplication to work on a per intersection basis
John-Wiens Jul 30, 2024
6f6f7c2
Added deduplication tology for OdeRawEncoded tim messages
John-Wiens Jul 30, 2024
240b30b
Fixing bugs in Raw Tim Deduplication
John-Wiens Jul 30, 2024
a1ce4ab
Updated TIM Deduplicator to remove commit rate errors
John-Wiens Aug 1, 2024
8b1f080
Refactored to use shared classes
John-Wiens Aug 1, 2024
3c2583b
Converting TIM and BSM deduplicators to new processor format.
John-Wiens Aug 2, 2024
1653cc9
Switched OdeRawEncoded Deduplication To use a processor
John-Wiens Aug 2, 2024
3c06d82
Converted OdeMapJson to use processor
John-Wiens Aug 2, 2024
4e4f96a
Converted ProcessedMap to Processor
John-Wiens Aug 2, 2024
a326809
Updated Processed Map WKT to use processor
John-Wiens Aug 2, 2024
ded091d
Switched Tim Topology to directly use properties object
John-Wiens Aug 2, 2024
8d75d46
Fixed incorrect input topic in Tim Deduplicator
John-Wiens Aug 2, 2024
ff1dea1
Setting Commit interval back to longer value
John-Wiens Aug 5, 2024
5a049e7
Bugfix: populate intersection ID in SpatRevisionCounterEvent and MapR…
linda-baker Aug 6, 2024
f5c775d
Merge pull request #103 from usdot-jpo-ode/Add-revision-counter
John-Wiens Aug 7, 2024
c58c6b1
Merge branch 'develop' of github.com:usdot-jpo-ode/jpo-conflictmonito…
John-Wiens Aug 7, 2024
7ad43b5
Merge from develop, resolve conflicts
iyourshaw Aug 7, 2024
ee5a331
Add Timestamp Delta Events to combined Event topology
iyourshaw Aug 7, 2024
eae771f
Fix unit test for Event Topo
iyourshaw Aug 7, 2024
e6fca29
Add event topology to service controller, add parameter to default to…
iyourshaw Aug 7, 2024
0a464a6
Create classes for timestamp delta notifications
iyourshaw Aug 12, 2024
1d0fd32
Update streams version to allow using versioned state store feature
iyourshaw Aug 12, 2024
90c6b05
All tests pass with streams v3.7.1
iyourshaw Aug 12, 2024
cbb9da3
Delta notification processor
iyourshaw Aug 12, 2024
383f56e
Always emit events if OdeReceivedAt is earlier than message timestamp…
iyourshaw Aug 13, 2024
25df724
Notification processor
iyourshaw Aug 14, 2024
ebe3f7f
Calculate statistics and generate notifications
iyourshaw Aug 14, 2024
3f750c0
Update unit tests
iyourshaw Aug 14, 2024
74ded33
Create common base classes for map and spat timestamp delta algorithm…
iyourshaw Aug 14, 2024
07aa9a2
Create common base classes for map and spat timestamp delta algorithm…
iyourshaw Aug 15, 2024
a7b0dbe
Add to combined notification topic
iyourshaw Aug 15, 2024
fea3b9c
Update notification topo test
iyourshaw Aug 15, 2024
3b43198
Added Repartition Calls to Key Selctors
John-Wiens Aug 16, 2024
f16e2af
Move processors to the 'processors' namespace to be consistent
iyourshaw Aug 17, 2024
4d91ca8
Add new topics to connect script
iyourshaw Aug 17, 2024
f73e0ec
use median in notifications
iyourshaw Aug 17, 2024
9264384
Fix min/max assessment
iyourshaw Aug 18, 2024
5ff6109
Fix statistics calc. Fix tests so timestamps aren't identical
iyourshaw Aug 18, 2024
3dc6517
default configs
iyourshaw Aug 18, 2024
451e6aa
update graphs
iyourshaw Aug 18, 2024
f6a6e2e
update graphs
iyourshaw Aug 18, 2024
4ca677a
Merge pull request #105 from usdot-jpo-ode/tim-dedup-update
John-Wiens Aug 21, 2024
4ac2b5a
Merge branch 'develop' of github.com:usdot-jpo-ode/jpo-conflictmonito…
John-Wiens Aug 21, 2024
05616e6
Merge pull request #104 from usdot-jpo-ode/timestamp-deltas
John-Wiens Aug 21, 2024
70bdd08
updating geojson converter commit reference. Changed the deduplicator…
Michael7371 Aug 27, 2024
85aa7ca
Merge pull request #106 from usdot-jpo-ode/allow-unknown-props
Michael7371 Aug 28, 2024
905d5d1
Merge branch 'develop' of github.com:usdot-jpo-ode/jpo-conflictmonito…
John-Wiens Sep 4, 2024
a193d52
Added support for Processed BSM's to mongo and Kafka Connect
John-Wiens Sep 6, 2024
e497cc4
Merge pull request #107 from usdot-jpo-ode/Processed_BSM_Connect
John-Wiens Sep 6, 2024
ea0eaea
update submodule ref and pom.xml
SaikrishnaBairamoni Sep 30, 2024
9bc07ae
update submoduels and pom.xml
SaikrishnaBairamoni Sep 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker-compose-multi-instance.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ services:
- ./docker/mongo/b_create_indexes.js:/docker-entrypoint-initdb.d/b_create_indexes.js
healthcheck:
test: |
test $$(mongosh --username ${MONGO_INITDB_ROOT_USERNAME} --password ${MONGO_INITDB_ROOT_PASSWORD} --quiet --eval "try { rs.initiate({ _id: 'rs0', members: [{ _id: 0, host: '${DB_HOST_IP}' }] }).ok } catch (_) { rs.status().ok }") -eq 1
test $$(mongosh --username ${MONGO_INITDB_ROOT_USERNAME} --password ${MONGO_INITDB_ROOT_PASSWORD} --quiet --eval "db.runCommand({ping:1}).ok") -eq 1
interval: 10s
start_period: 30s
entrypoint:
Expand Down
5 changes: 3 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ services:
volumes:
- mongodb_data_container:/data/db
- ./docker/mongo/manage-volume-cron:/docker-entrypoint-initdb.d/manage-volume-cron
- ./docker/mongo/keyfile.txt:/data/keyfile.txt
- ./docker/mongo/keyfile.txt:/data/keyfile-import.txt
- ./docker/mongo/a_init_replicas.js:/docker-entrypoint-initdb.d/a_init_replicas.js
- ./docker/mongo/b_create_indexes.js:/docker-entrypoint-initdb.d/b_create_indexes.js
- ./docker/mongo/manage_volume.js:/docker-entrypoint-initdb.d/manage_volume.js
healthcheck:
test: |
test $$(mongosh --username ${MONGO_INITDB_ROOT_USERNAME} --password ${MONGO_INITDB_ROOT_PASSWORD} --quiet --eval "try { rs.initiate({ _id: 'rs0', members: [{ _id: 0, host: '${DB_HOST_IP}' }] }).ok } catch (_) { rs.status().ok }") -eq 1
test $$(mongosh --username ${MONGO_INITDB_ROOT_USERNAME} --password ${MONGO_INITDB_ROOT_PASSWORD} --quiet --eval "db.runCommand({ping:1}).ok") -eq 1
interval: 10s
start_period: 60s
entrypoint:
Expand All @@ -98,6 +98,7 @@ services:
dos2unix /etc/cron.d/manage-volume-cron
chmod 644 /etc/cron.d/manage-volume-cron
systemctl restart cron
cp /data/keyfile-import.txt /data/keyfile.txt
chmod 400 /data/keyfile.txt
chown 999:999 /data/keyfile.txt

Expand Down
10 changes: 10 additions & 0 deletions docker/connect/connect_start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ declare -A OdeRawEncodedBSMJson=([name]="topic.OdeRawEncodedBSMJson" [collection
[convert_timestamp]=false [timefield]="" [use_key]=false [key]="" [add_timestamp]=true)
declare -A OdeBsmJson=([name]="topic.OdeBsmJson" [collection]="OdeBsmJson"
[convert_timestamp]=false [timefield]="" [use_key]=false [key]="" [add_timestamp]=true)
declare -A ProcessedBsm=([name]="topic.ProcessedBsm" [collection]="ProcessedBsm"
[convert_timestamp]=false [timefield]="" [use_key]=false [key]="" [add_timestamp]=true)

# Record Map Data
declare -A OdeMapJson=([name]="topic.DeduplicatedOdeMapJson" [collection]="OdeMapJson"
Expand Down Expand Up @@ -89,6 +91,9 @@ declare -A CmMapMinimumDataEvents=([name]="topic.CmMapMinimumDataEvents" [collec
[convert_timestamp]=true [timefield]="eventGeneratedAt" [use_key]=false [key]="" [add_timestamp]=false)
declare -A CmSpatBroadcastRateEvents=([name]="topic.CmSpatBroadcastRateEvents" [collection]="CmSpatBroadcastRateEvents"
[convert_timestamp]=true [timefield]="eventGeneratedAt" [use_key]=false [key]="" [add_timestamp]=false)
declare -A CmTimestampDeltaEvent=([name]="topic.CmTimestampDeltaEvent" [collection]="CmTimestampDeltaEvent"
[convert_timestamp]=true [timefield]="eventGeneratedAt" [use_key]=false [key]="" [add_timestamp]=false)


# Record BSM Events
declare -A CmBsmEvents=([name]="topic.CmBsmEvents" [collection]="CmBsmEvents"
Expand Down Expand Up @@ -125,6 +130,8 @@ declare -A CmStopLineStopNotification=([name]="topic.CmStopLineStopNotification"
[convert_timestamp]=true [timefield]="notificationGeneratedAt" [use_key]=true [key]="key" [add_timestamp]=false)
declare -A CmStopLinePassageNotification=([name]="topic.CmStopLinePassageNotification" [collection]="CmStopLinePassageNotification"
[convert_timestamp]=true [timefield]="notificationGeneratedAt" [use_key]=true [key]="key" [add_timestamp]=false)
declare -A CmTimestampDeltaNotification=([name]="topic.CmTimestampDeltaNotification" [collection]="CmTimestampDeltaNotification"
[convert_timestamp]=true [timefield]="notificationGeneratedAt" [use_key]=true [key]="key" [add_timestamp]=false)

function createSink() {
local -n topic=$1
Expand Down Expand Up @@ -209,6 +216,7 @@ function createSink() {

createSink OdeRawEncodedBSMJson
createSink OdeBsmJson
createSink ProcessedBsm

createSink OdeMapJson
createSink ProcessedMap
Expand Down Expand Up @@ -245,6 +253,7 @@ createSink CmSpatMinimumDataEvents
createSink CmMapBroadcastRateEvents
createSink CmMapMinimumDataEvents
createSink CmSpatBroadcastRateEvents
createSink CmTimestampDeltaEvent

createSink CmBsmEvents

Expand All @@ -262,6 +271,7 @@ createSink CmSignalGroupAlignmentNotification
createSink CmNotification
createSink CmStopLineStopNotification
createSink CmStopLinePassageNotification
createSink CmTimestampDeltaNotification


echo "----------------------------------"
Expand Down
38 changes: 38 additions & 0 deletions docker/mongo/b_create_indexes.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ const collections = [
// GeoJson Converter Data
{name: "ProcessedMap", ttlField: "recordGeneratedAt", timeField: "properties.timeStamp", intersectionField: "properties.intersectionId"},
{name: "ProcessedSpat", ttlField: "recordGeneratedAt", timeField: "utcTimeStamp", intersectionField: "intersectionId"},
{name: "ProcessedBsm", ttlField: "recordGeneratedAt", timeField: "timeStamp", geoSpatialField: "features.geometry.coordinates"},

// Conflict Monitor Events
{ name: "CmStopLineStopEvent", ttlField: "eventGeneratedAt", timeField: "eventGeneratedAt", intersectionField: "intersectionID" },
Expand Down Expand Up @@ -145,6 +146,7 @@ do {
createTimeIntersectionIndex(collection);
createTimeRsuIpIndex(collection);
createTimeIndex(collection);
createGeoSpatialIndex(collection);
}else{
missing_collection_count++;
console.log("Collection " + collection.name + " does not exist yet");
Expand Down Expand Up @@ -318,6 +320,38 @@ function createTimeIntersectionIndex(collection){
}
}

function createGeoSpatialIndex(collection){
if(geoSpatialIndexExists(collection)){
return;
}

if(collection.hasOwnProperty("timeField") && collection.timeField != null && collection.hasOwnProperty("geoSpatialField") && collection.geoSpatialField != null){
const collectionName = collection.name;
const timeField = collection.timeField;
const geoSpatialField = collection.geoSpatialField;
console.log("Creating GeoSpatial index for " + collectionName);

var indexJson = {};
indexJson[geoSpatialField] = "2dsphere";
indexJson[timeField] = -1;


try {
db[collectionName].createIndex(indexJson);
console.log("Created time geospatial index for " + collectionName + " using the field: " + timeField + " as the timestamp and : " + geoSpatialField + " as the GeoSpatial Field");
} catch (err) {
db.runCommand({
"collMod": collectionName,
"index": {
keyPattern: indexJson
}
});
console.log("Updated time geospatial index for " + collectionName + " using the field: " + timeField + " as the timestamp and : " + geoSpatialField + " as the GeoSpatial Field");
}
}

}

function ttlIndexExists(collection) {
return db[collection.name].getIndexes().find((idx) => idx.hasOwnProperty("expireAfterSeconds")) !== undefined;
}
Expand All @@ -333,3 +367,7 @@ function timeRsuIpIndexExists(collection){
function timeIndexExists(collection){
return db[collection.name].getIndexes().find((idx) => idx.name == collection.timeField + "_-1") !== undefined;
}

function geoSpatialIndexExists(collection){
return db[collection.name].getIndexes().find((idx) => idx.name == collection.geoSpatialField + "_2dsphere_timeStamp_-1") !== undefined;
}
75 changes: 75 additions & 0 deletions docs/KafkaStreamsVisualizations/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Kafka Streams Visualization Tools

Endpoints for describing Kafka Streams internals are available at:

http://localhost:8082/health/topologies

with the Conflict Monitor running in Docker.

## Topology Overviews

The `topologies/simple` endpoints produce GraphViz `dot` files that represent the input and output topics of topologies, connections between topologies, and associated state stores.

To visualize the current state of all Kafka Streams download the `all.dot` file from:

http://localhost:8082/health/topologies/simple/all

To see a simple view of the inputs and outpus of a single topology select one of the `topologies/simple` endpoints, for example:

http://localhost:8082/health/topologies/simple/mapValidation

To create images from the dot files, install the GraphViz command line tool from:

https://graphviz.org/download/

and run the `dot` command, for example:

Create SVG:
```bash
dot -Tsvg all.dot > all.svg
```

Create PNG:
```bash
dot -Tpng all.dot > all.png
```

### Examples
#### MapValidation Topology Overview
![MAP Validation Topology Overview](./mapValidation.svg)

#### Spat Validation Topology Overview
![SPaT Validation Topology Overview](./spatValidation.svg)

#### All Topologies Overview

Open the image in a separate tab to zoom in

![All Topologies Overview](./all.svg)


## Topology Details

To view detailed internals for one topology select one of the `topologies/detail` endpoints, for example:

http://localhost:8082/health/topologies/detail/mapValidation

Save the response to a text file. This file is the result of calling the Topology.describe() method. This format can be visualized using the online Kafka Streams Topology Visualizer here:

https://zz85.github.io/kafka-streams-viz/

#### Example: MAP Validation Topology Details
![MAP Validation detailed topology](mapValidation_detail.jpg)













Loading
Loading