forked from confluentinc/demo-scene
-
Notifications
You must be signed in to change notification settings - Fork 7
/
neo notes.txt
16 lines (15 loc) · 1.23 KB
/
neo notes.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
The above connector builds up a unique set of device -> SSID relationships, but does not capture each and every probe request. To do that we need to bring in a unique element into the data model; in this case I'm using the timestamp (extracted from the Kafka message timestamp into a new field `PROBE_TS` with a Single Message Transform) of the probe as an attribute of the edge `LOOKED_FOR_SSID`
[source,sql]
----
CREATE SINK CONNECTOR SINK_NEO4J_PROBES_02 WITH (
'connector.class'= 'streams.kafka.connect.sink.Neo4jSinkConnector',
'topics'= 'pcap_probe_enriched_00',
'neo4j.server.uri'= 'bolt://neo4j:7687',
'neo4j.authentication.basic.username'= 'neo4j',
'neo4j.authentication.basic.password'= 'connect',
'neo4j.topic.cypher.pcap_probe_enriched_00'= 'MERGE (source:source{mac: event.SOURCE_ADDRESS, mac_resolved: event.SOURCE_ADDRESS_RESOLVED, device_name: coalesce(event.SOURCE_DEVICE_NAME,""), is_known: event.IS_KNOWN_DEVICE}) MERGE (ssid:ssid{name: coalesce(event.SSID, "")}) MERGE (ssid)<-[:LOOKED_FOR_SSID{when: event.PROBE_TS}]-(source)',
'transforms'= 'ExtractTimestamp',
'transforms.ExtractTimestamp.type'= 'org.apache.kafka.connect.transforms.InsertField$Value',
'transforms.ExtractTimestamp.timestamp.field' = 'PROBE_TS'
);
----