- Prep
- Demo
- Building a Telegram bot in Go
- Stream data from REST endpoint into Kafka
- Apply a schema to the data with ksqlDB
- Transform the data
- Create a materialised view of the current state
- Event-driven alert: Tell me when there’s a space available
- ksqlDB-powered Telegram bot
- Analytics: Stream the data to Elasticsearch.
- Extended demo
-
Three terminal windows
# Run the Carpark telegram bot for first demo cd ~/git/demo-scene/telegram-bot-carparks/go go run .
# To show the Telegram bot basic code cd ~/git/golang-telegram-bot ls -l
# For running interactive session to # walk through all the cool stuff cd ~/git/demo-scene/telegram-bot-carparks/ docker-compose up -d docker exec -it ksqldb bash -c 'echo -e "\n\n⏳ Waiting for ksqlDB to be available before launching CLI\n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksqldb:8088/info) ; echo -e $(date) " ksqlDB server listener HTTP state: " $curl_status " (waiting for 200)" ; if [ $curl_status -eq 200 ] ; then break ; fi ; sleep 5 ; done ; ksql http://ksqldb:8088'
Source data
curl --show-error --silent https://datahub.bradford.gov.uk/ebase/api/getData/v2/Council/CarParkCurrent
Source data with header removed
curl --show-error --silent https://datahub.bradford.gov.uk/ebase/api/getData/v2/Council/CarParkCurrent | \
tail -n +2
Source data with header removed streamed into a Kafka topic
curl --show-error --silent https://datahub.bradford.gov.uk/ebase/api/getData/v2/Council/CarParkCurrent | \
tail -n +2 | \
docker exec -i kafkacat kafkacat -b broker:29092 -t carparks -P -T
Stream the current data from the REST endpoint into Kafka, piping the output from curl
into kafkacat
, polling every three minutes:
while [ 1 -eq 1 ];
do
curl --show-error --silent https://datahub.bradford.gov.uk/ebase/api/getData/v2/Council/CarParkCurrent | \
tail -n +2 | \
docker exec -i kafkacat kafkacat -b broker:29092 -t carparks -P -T
sleep 180
done
CREATE STREAM CARPARK_SRC (date VARCHAR ,
time VARCHAR ,
name VARCHAR ,
capacity INT ,
empty_places INT ,
status VARCHAR ,
latitude DOUBLE ,
longitude DOUBLE ,
directionsURL VARCHAR)
WITH (KAFKA_TOPIC='carparks',
VALUE_FORMAT='DELIMITED');
Use the schema to project columns:
SET 'auto.offset.reset' = 'earliest';
SELECT DATE, TIME, NAME, EMPTY_PLACES FROM CARPARK_SRC EMIT CHANGES LIMIT 5;
+-------------+-------+-------------+-------------+
|DATE |TIME |NAME |EMPTY_PLACES |
+-------------+-------+-------------+-------------+
|2020-07-28 |14:55 |Westgate |73 |
|2020-07-28 |14:55 |Burnett St |108 |
|2020-07-28 |14:55 |Crown Court |94 |
|2020-07-28 |14:55 |NCP Hall Ings|505 |
Create a new stream:
-
Add a source field for lineage
-
Set the timestamp from the two source fields
-
Make the location (lat/lon) a struct
-
Create a calculated field (
PCT_FULL
) -
Serialise to Protobuf so that the schema is available for use downstream
-
Could also use Avro or JSON Schema here
-
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM CARPARK_EVENTS WITH (VALUE_FORMAT='PROTOBUF') AS
SELECT STRINGTOTIMESTAMP(DATE + ' ' + TIME ,'yyyy-MM-dd HH:mm','Europe/London' ) AS TS,
*,
(CAST((CAPACITY - EMPTY_PLACES) AS DOUBLE) /
CAST(CAPACITY AS DOUBLE)) * 100 AS PCT_FULL,
STRUCT("lat" := LATITUDE, "lon":= LONGITUDE) AS "location",
'v2/Council/CarParkCurrent' as SOURCE
FROM CARPARK_SRC
EMIT CHANGES;
Show that there are multiple results for a given car park:
SELECT TIMESTAMPTOSTRING( TS,'yyyy-MM-dd HH:mm:ss','Europe/London'),
NAME,
EMPTY_PLACES
FROM CARPARK_EVENTS
WHERE NAME='Westgate'
EMIT CHANGES
LIMIT 3;
SET 'auto.offset.reset' = 'earliest';
CREATE TABLE CARPARK AS
SELECT NAME,
TIMESTAMPTOSTRING(LATEST_BY_OFFSET(TS),
'yyyy-MM-dd HH:mm:ss','Europe/London') AS LATEST_TS,
LATEST_BY_OFFSET(CAPACITY) AS CAPACITY,
LATEST_BY_OFFSET(EMPTY_PLACES) AS CURRENT_EMPTY_PLACES,
LATEST_BY_OFFSET(PCT_FULL) AS CURRENT_PCT_FULL,
AVG(EMPTY_PLACES) AS AVG_EMPTY_PLACES,
LATEST_BY_OFFSET(STATUS) AS CURRENT_STATUS,
LATEST_BY_OFFSET(LATITUDE) AS LATITUDE,
LATEST_BY_OFFSET(LONGITUDE) AS LONGITUDE,
LATEST_BY_OFFSET(DIRECTIONSURL) AS DIRECTIONSURL
FROM CARPARK_EVENTS
GROUP BY NAME;
SELECT LATEST_TS,
CURRENT_EMPTY_PLACES,
AVG_EMPTY_PLACES
FROM CARPARK
WHERE NAME='Westgate';
+----------------------+----------------------+------------------+
|LATEST_TS |CURRENT_EMPTY_PLACES |AVG_EMPTY_PLACES |
+----------------------+----------------------+------------------+
|2020-10-22 14:59:00 |81 |80.25 |
SELECT NAME,
LATEST_TS,
CURRENT_EMPTY_PLACES,
AVG_EMPTY_PLACES
FROM CARPARK
WHERE CURRENT_EMPTY_PLACES > 80;
curl --silent --http2 --location --request POST 'http://localhost:8088/query-stream' \
--header 'Content-Type: application/vnd.ksql.v1+json; charset=utf-8' --header 'Accept: application/json' \
--data-raw '{"sql":"SELECT LATEST_TS, CURRENT_EMPTY_PLACES, AVG_EMPTY_PLACES FROM CARPARK WHERE NAME='\''Westgate'\'';"}' | jq '.'
[
{
"queryId": null,
"columnNames": [
"LATEST_TS",
"CURRENT_EMPTY_PLACES"
],
"columnTypes": [
"STRING",
"INTEGER"
]
},
[
"2020-07-29 15:01:00",
73
]
]
SET 'auto.offset.reset' = 'latest';
SELECT NAME AS CARPARK,
TIMESTAMPTOSTRING(TS,'yyyy-MM-dd HH:mm:ss','Europe/London') AS DATA_TS,
CAPACITY,
EMPTY_PLACES
FROM CARPARK_EVENTS
WHERE NAME = 'Westgate'
AND EMPTY_PLACES > 80
EMIT CHANGES;
curl --http2 --location --request POST 'http://localhost:8088//query-stream' \
--header 'Content-Type: application/vnd.ksql.v1+json; charset=utf-8' \
--data-raw '{"properties":{"ksql.streams.auto.offset.reset": "latest"},
"sql": "SELECT NAME AS CARPARK, TIMESTAMPTOSTRING(TS,'\''yyyy-MM-dd HH:mm:ss'\'','\''Europe/London'\'') AS DATA_TS, CAPACITY , EMPTY_PLACES FROM CARPARK_EVENTS WHERE NAME = '\''Westgate'\'' AND EMPTY_PLACES > 0 EMIT CHANGES;"
}'
{"queryId":"20a9c981-12d7-494e-a632-e6602b95ef96","columnNames":["CARPARK","DATA_TS","CAPACITY","EMPTY_PLACES"],"columnTypes":["STRING","STRING","INTEGER","INTEGER"]}
["Kirkgate Centre","2020-07-28 16:58:00",611,510]
Uses the community ksqlDB Go client.
cd ~/git/demo-scene/telegram-bot-carparks/go; go run .
Create a sink connector from ksqlDB:
CREATE SINK CONNECTOR SINK_ELASTIC_01 WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'topics' = 'CARPARK_EVENTS',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter' = 'io.confluent.connect.protobuf.ProtobufConverter',
'value.converter.schema.registry.url' = 'http://schema-registry:8081',
'connection.url' = 'http://elasticsearch:9200',
'type.name' = '_doc',
'key.ignore' = 'true',
'schema.ignore' = 'true');
Check the status of the connector in ksqlDB
SHOW CONNECTORS;
Connector Name | Type | Class | Status
----------------------------------------------------------------------------------------------------------------------
SINK_ELASTIC_01 | SINK | io.confluent.connect.elasticsearch.ElasticsearchSinkConnector | RUNNING (1/1 tasks RUNNING)
----------------------------------------------------------------------------------------------------------------------
Check that data is arriving:
docker exec elasticsearch curl -s "http://localhost:9200/_cat/indices/*?h=idx,docsCount"
.kibana_task_manager_1 2
.apm-agent-configuration 0
.kibana_1 1
carpark_events 265793
Open Kibana locally
(Open Kibana (Elastic Cloud))
Visualise it in Kibana:
Preview the data
curl --show-error --silent https://datahub.bradford.gov.uk/ebase/api/getData/v2/Council/CarParkHistoric | head
Load the data
curl --show-error --silent https://datahub.bradford.gov.uk/ebase/api/getData/v2/Council/CarParkHistoric | \
docker exec -i kafkacat kafkacat -b broker:29092 -t carparks_historic -P
Apply schema to historic
CREATE STREAM CARPARK_HISTORIC (date VARCHAR ,
time VARCHAR ,
name VARCHAR ,
capacity INT ,
empty_places INT ,
status VARCHAR ,
latitude DOUBLE ,
longitude DOUBLE )
WITH (KAFKA_TOPIC='carparks_historic',
VALUE_FORMAT='DELIMITED');
Merge the data into the existing carpark stream
SET 'auto.offset.reset' = 'earliest';
INSERT INTO CARPARK_EVENTS
SELECT STRINGTOTIMESTAMP(DATE + ' ' + TIME ,'yyyy-MM-dd HH:mm','Europe/London' ) AS TS,
*,
'' AS DIRECTIONSURL,
(CAST((CAPACITY - EMPTY_PLACES) AS DOUBLE) /
CAST(CAPACITY AS DOUBLE)) * 100 AS PCT_FULL,
STRUCT("lat" := LATITUDE, "lon":= LONGITUDE) AS "location",
'v2/Council/CarParkHistoric' as SOURCE
FROM CARPARK_HISTORIC
EMIT CHANGES;
Check the data:
SELECT SOURCE,
COUNT(*) AS EVENT_CT,
TIMESTAMPTOSTRING( MIN(TS),'yyyy-MM-dd HH:mm:ss','Europe/London') AS EARLIEST_TS,
TIMESTAMPTOSTRING( MAX(TS),'yyyy-MM-dd HH:mm:ss','Europe/London') AS LATEST_TS
FROM CARPARK_EVENTS
GROUP BY SOURCE
EMIT CHANGES
LIMIT 2;
http -a $LEEDS_USER:$LEEDS_PW get http://www.leedstravel.info/datex2/carparks/content.xml | \
xq '."d2lm:d2LogicalModel"."d2lm:payloadPublication"."d2lm:situation"[]."d2lm:situationRecord"'
{
"@id": "LEEDSCP0001_1",
"@xsi:type": "d2lm:CarParks",
"d2lm:situationRecordCreationTime": "2021-10-05T19:54:47",
"d2lm:situationRecordVersion": "1",
"d2lm:situationRecordVersionTime": "2021-10-05T19:54:47",
"d2lm:situationRecordFirstSupplierVersionTime": "2021-10-05T19:54:47",
"d2lm:probabilityOfOccurrence": "certain",
"d2lm:validity": {
"d2lm:validityStatus": "active",
"d2lm:validityTimeSpecification": {
"d2lm:overallStartTime": "2021-10-05T19:54:47"
}
},
"d2lm:groupOfLocations": {
"d2lm:locationContainedInGroup": {
"@xsi:type": "d2lm:Point",
"d2lm:pointByCoordinates": {
"d2lm:pointCoordinates": {
"d2lm:latitude": "53.79798394752138",
"d2lm:longitude": "-1.546011529543302"
}
}
}
},
"d2lm:carParkIdentity": "Albion Street:LEEDSCP0001",
"d2lm:carParkOccupancy": "12",
"d2lm:carParkStatus": "enoughSpacesAvailable",
"d2lm:occupiedSpaces": "47",
"d2lm:totalCapacity": "389"
}
source .env
http -a $LEEDS_USER:$LEEDS_PW get http://www.leedstravel.info/datex2/carparks/content.xml | \
xq -c '."d2lm:d2LogicalModel"."d2lm:payloadPublication"."d2lm:situation"[]."d2lm:situationRecord"' | \
docker exec -i kafkacat kafkacat -b broker:29092 -t carparks_leeds -P
CREATE STREAM CARPARK_LEEDS_SRC01 (
"d2lm:situationRecordCreationTime" VARCHAR,
"d2lm:groupOfLocations" STRUCT <
"d2lm:locationContainedInGroup" STRUCT <
"d2lm:pointByCoordinates" STRUCT <
"d2lm:pointCoordinates" STRUCT <
"d2lm:latitude" DOUBLE,
"d2lm:longitude" DOUBLE > > > >,
"d2lm:carParkIdentity" VARCHAR,
"d2lm:carParkOccupancy" DOUBLE,
"d2lm:carParkStatus" VARCHAR,
"d2lm:occupiedSpaces" INT,
"d2lm:totalCapacity" INT
)
WITH (KAFKA_TOPIC='carparks_leeds',
VALUE_FORMAT='JSON');
INSERT INTO CARPARK_EVENTS
SELECT STRINGTOTIMESTAMP("d2lm:situationRecordCreationTime",
'yyyy-MM-dd''T''HH:mm:ss',
'Europe/London')
AS TS,
TIMESTAMPTOSTRING(STRINGTOTIMESTAMP("d2lm:situationRecordCreationTime",
'yyyy-MM-dd''T''HH:mm:ss',
'Europe/London'),
'yyyy-MM-dd')
AS DATE,
TIMESTAMPTOSTRING(STRINGTOTIMESTAMP("d2lm:situationRecordCreationTime",
'yyyy-MM-dd''T''HH:mm:ss',
'Europe/London'),
'HH:mm')
AS TIME,
"d2lm:carParkIdentity" AS NAME,
"d2lm:totalCapacity" AS CAPACITY,
"d2lm:totalCapacity" - "d2lm:occupiedSpaces"
AS EMPTY_PLACES,
"d2lm:carParkStatus" AS STATUS,
"d2lm:groupOfLocations" -> "d2lm:locationContainedInGroup" -> "d2lm:pointByCoordinates" -> "d2lm:pointCoordinates" -> "d2lm:latitude"
AS LATITUDE,
"d2lm:groupOfLocations" -> "d2lm:locationContainedInGroup" -> "d2lm:pointByCoordinates" -> "d2lm:pointCoordinates" -> "d2lm:longitude"
AS LONGITUDE,
'' AS DIRECTIONSURL,
"d2lm:carParkOccupancy" AS PCT_FULL ,
STRUCT("lat" := "d2lm:groupOfLocations" -> "d2lm:locationContainedInGroup" -> "d2lm:pointByCoordinates" -> "d2lm:pointCoordinates" -> "d2lm:latitude",
"lon" := "d2lm:groupOfLocations" -> "d2lm:locationContainedInGroup" -> "d2lm:pointByCoordinates" -> "d2lm:pointCoordinates" -> "d2lm:longitude")
AS "location",
'https://datamillnorth.org/dataset/live-car-park-spaces-api'
AS SOURCE
FROM CARPARK_LEEDS_SRC01
EMIT CHANGES;
CREATE TABLE ALERT_CONFIG (CARPARK VARCHAR PRIMARY KEY, SPACES_ALERT INT) WITH (KAFKA_TOPIC='alert_config', VALUE_FORMAT='PROTOBUF', PARTITIONS=4);
INSERT INTO ALERT_CONFIG (CARPARK, SPACES_ALERT) VALUES ('Kirkgate Centre',470);
CREATE STREAM CARPARK_ALERTS AS
SELECT C.NAME AS CARPARK,
TIMESTAMPTOSTRING(C.TS,'yyyy-MM-dd HH:mm:ss','Europe/London') AS DATA_TS,
CAPACITY ,
EMPTY_PLACES,
A.SPACES_ALERT AS ALERT_THRESHOLD,
STATUS ,
LATITUDE ,
LONGITUDE ,
DIRECTIONSURL
FROM CARPARK_EVENTS C
INNER JOIN
ALERT_CONFIG A
ON C.NAME=A.CARPARK
WHERE C.EMPTY_PLACES >= A.SPACES_ALERT ;
SELECT CARPARK, ALERT_THRESHOLD, DATA_TS, EMPTY_PLACES FROM CARPARK_ALERTS EMIT CHANGES;
+-----------------+-----------------+--------------------+-------------+
|CARPARK |ALERT_THRESHOLD |DATA_TS |EMPTY_PLACES |
+-----------------+-----------------+--------------------+-------------+
|Kirkgate Centre |470 |2020-07-21 10:55:00 |505 |