Skip to content

Latest commit

 

History

History
567 lines (472 loc) · 17.3 KB

telegram_bot_demo.adoc

File metadata and controls

567 lines (472 loc) · 17.3 KB

Building a Telegram bot with Go, Apache Kafka, and ksqlDB

Prep

  • Launch kafka_alert_bot bot, clear history

  • Launch rmoff_golang_bot bot, clear history

  • Three terminal windows

    # Run the Carpark telegram bot for first demo
    cd ~/git/demo-scene/telegram-bot-carparks/go
    go run .
    # To show the Telegram bot basic code
    cd ~/git/golang-telegram-bot
    ls -l
    # For running interactive session to
    # walk through all the cool stuff
    cd ~/git/demo-scene/telegram-bot-carparks/
    docker-compose up -d
    
    docker exec -it ksqldb bash -c 'echo -e "\n\n⏳ Waiting for ksqlDB to be available before launching CLI\n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksqldb:8088/info) ; echo -e $(date) " ksqlDB server listener HTTP state: " $curl_status " (waiting for 200)" ; if [ $curl_status -eq 200 ] ; then  break ; fi ; sleep 5 ; done ; ksql http://ksqldb:8088'

Demo

  1. Show working bot

  2. Show telegram/Go stuff

  3. Show ksqlDB and building the actual carpark bot


overview
stream lineage

Building a Telegram bot in Go

cd ~/git/golang-telegram-bot
ls -l

Stream data from REST endpoint into Kafka

Source data

curl --show-error --silent https://datahub.bradford.gov.uk/ebase/api/getData/v2/Council/CarParkCurrent

Source data with header removed

curl --show-error --silent https://datahub.bradford.gov.uk/ebase/api/getData/v2/Council/CarParkCurrent | \
        tail -n +2

Source data with header removed streamed into a Kafka topic

curl --show-error --silent https://datahub.bradford.gov.uk/ebase/api/getData/v2/Council/CarParkCurrent | \
        tail -n +2 | \
        docker exec -i kafkacat kafkacat -b broker:29092 -t carparks -P -T
curl kafkacat

Stream the current data from the REST endpoint into Kafka, piping the output from curl into kafkacat, polling every three minutes:

while [ 1 -eq 1 ];
do
    curl --show-error --silent https://datahub.bradford.gov.uk/ebase/api/getData/v2/Council/CarParkCurrent | \
        tail -n +2 | \
        docker exec -i kafkacat kafkacat -b broker:29092 -t carparks -P -T
    sleep 180
done

Apply a schema to the data with ksqlDB

CREATE STREAM CARPARK_SRC (date          VARCHAR ,
                           time          VARCHAR ,
                           name          VARCHAR ,
                           capacity      INT ,
                           empty_places  INT ,
                           status        VARCHAR ,
                           latitude      DOUBLE ,
                           longitude     DOUBLE ,
                           directionsURL VARCHAR)
                WITH (KAFKA_TOPIC='carparks',
                      VALUE_FORMAT='DELIMITED');

Use the schema to project columns:

SET 'auto.offset.reset' = 'earliest';
SELECT DATE, TIME, NAME, EMPTY_PLACES FROM CARPARK_SRC EMIT CHANGES LIMIT 5;
+-------------+-------+-------------+-------------+
|DATE         |TIME   |NAME         |EMPTY_PLACES |
+-------------+-------+-------------+-------------+
|2020-07-28   |14:55  |Westgate     |73           |
|2020-07-28   |14:55  |Burnett St   |108          |
|2020-07-28   |14:55  |Crown Court  |94           |
|2020-07-28   |14:55  |NCP Hall Ings|505          |

Transform the data

Create a new stream:

  • Add a source field for lineage

  • Set the timestamp from the two source fields

  • Make the location (lat/lon) a struct

  • Create a calculated field (PCT_FULL)

  • Serialise to Protobuf so that the schema is available for use downstream

    • Could also use Avro or JSON Schema here

SET 'auto.offset.reset' = 'earliest';

CREATE STREAM CARPARK_EVENTS WITH (VALUE_FORMAT='PROTOBUF') AS
SELECT STRINGTOTIMESTAMP(DATE + ' ' + TIME ,'yyyy-MM-dd HH:mm','Europe/London' ) AS TS,
       *,
       (CAST((CAPACITY - EMPTY_PLACES) AS DOUBLE) /
        CAST(CAPACITY AS DOUBLE)) * 100 AS PCT_FULL,
       STRUCT("lat" := LATITUDE, "lon":= LONGITUDE) AS "location",
       'v2/Council/CarParkCurrent' as SOURCE
  FROM CARPARK_SRC
  EMIT CHANGES;

Create a materialised view of the current state

Show that there are multiple results for a given car park:

SELECT TIMESTAMPTOSTRING( TS,'yyyy-MM-dd HH:mm:ss','Europe/London'),
       NAME,
       EMPTY_PLACES
  FROM CARPARK_EVENTS
  WHERE NAME='Westgate'
  EMIT CHANGES
  LIMIT 3;
SET 'auto.offset.reset' = 'earliest';
CREATE TABLE CARPARK AS
SELECT NAME,
       TIMESTAMPTOSTRING(LATEST_BY_OFFSET(TS),
                         'yyyy-MM-dd HH:mm:ss','Europe/London') AS LATEST_TS,
       LATEST_BY_OFFSET(CAPACITY)      AS CAPACITY,
       LATEST_BY_OFFSET(EMPTY_PLACES)  AS CURRENT_EMPTY_PLACES,
       LATEST_BY_OFFSET(PCT_FULL)      AS CURRENT_PCT_FULL,
       AVG(EMPTY_PLACES)               AS AVG_EMPTY_PLACES,
       LATEST_BY_OFFSET(STATUS)        AS CURRENT_STATUS,
       LATEST_BY_OFFSET(LATITUDE)      AS LATITUDE,
       LATEST_BY_OFFSET(LONGITUDE)     AS LONGITUDE,
       LATEST_BY_OFFSET(DIRECTIONSURL) AS DIRECTIONSURL
    FROM CARPARK_EVENTS
    GROUP BY NAME;

K/V lookup: How many spaces are currently free?

SELECT LATEST_TS,
       CURRENT_EMPTY_PLACES,
       AVG_EMPTY_PLACES
  FROM CARPARK
  WHERE NAME='Westgate';
+----------------------+----------------------+------------------+
|LATEST_TS             |CURRENT_EMPTY_PLACES  |AVG_EMPTY_PLACES  |
+----------------------+----------------------+------------------+
|2020-10-22 14:59:00   |81                    |80.25             |

ksqlDB table scan

SELECT NAME,
       LATEST_TS,
       CURRENT_EMPTY_PLACES,
       AVG_EMPTY_PLACES
  FROM CARPARK
  WHERE CURRENT_EMPTY_PLACES > 80;

Use the REST API

curl --silent --http2 --location --request POST 'http://localhost:8088/query-stream' \
--header 'Content-Type: application/vnd.ksql.v1+json; charset=utf-8' --header 'Accept: application/json' \
--data-raw '{"sql":"SELECT LATEST_TS, CURRENT_EMPTY_PLACES, AVG_EMPTY_PLACES FROM CARPARK WHERE NAME='\''Westgate'\'';"}' | jq '.'
[
  {
    "queryId": null,
    "columnNames": [
      "LATEST_TS",
      "CURRENT_EMPTY_PLACES"
    ],
    "columnTypes": [
      "STRING",
      "INTEGER"
    ]
  },
  [
    "2020-07-29 15:01:00",
    73
  ]
]

Event-driven alert: Tell me when there’s a space available

SET 'auto.offset.reset' = 'latest';

SELECT NAME AS CARPARK,
      TIMESTAMPTOSTRING(TS,'yyyy-MM-dd HH:mm:ss','Europe/London') AS DATA_TS,
      CAPACITY,
      EMPTY_PLACES
 FROM CARPARK_EVENTS
 WHERE NAME = 'Westgate'
   AND EMPTY_PLACES > 80
 EMIT CHANGES;

Use the REST API

curl --http2 --location --request POST 'http://localhost:8088//query-stream' \
--header 'Content-Type: application/vnd.ksql.v1+json; charset=utf-8' \
--data-raw '{"properties":{"ksql.streams.auto.offset.reset": "latest"},
    "sql": "SELECT NAME AS CARPARK,      TIMESTAMPTOSTRING(TS,'\''yyyy-MM-dd HH:mm:ss'\'','\''Europe/London'\'') AS DATA_TS,      CAPACITY     ,      EMPTY_PLACES FROM CARPARK_EVENTS  WHERE NAME = '\''Westgate'\''    AND EMPTY_PLACES > 0  EMIT CHANGES;"
}'
{"queryId":"20a9c981-12d7-494e-a632-e6602b95ef96","columnNames":["CARPARK","DATA_TS","CAPACITY","EMPTY_PLACES"],"columnTypes":["STRING","STRING","INTEGER","INTEGER"]}
["Kirkgate Centre","2020-07-28 16:58:00",611,510]

ksqlDB-powered Telegram bot

Uses the community ksqlDB Go client.

cd ~/git/demo-scene/telegram-bot-carparks/go; go run .

Analytics: Stream the data to Elasticsearch.

Create a sink connector from ksqlDB:

CREATE SINK CONNECTOR SINK_ELASTIC_01 WITH (
  'connector.class'                     = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'topics'                              = 'CARPARK_EVENTS',
  'key.converter'                       = 'org.apache.kafka.connect.storage.StringConverter',
  'value.converter'                     = 'io.confluent.connect.protobuf.ProtobufConverter',
  'value.converter.schema.registry.url' = 'http://schema-registry:8081',
  'connection.url'                      = 'http://elasticsearch:9200',
  'type.name'                           = '_doc',
  'key.ignore'                          = 'true',
  'schema.ignore'                       = 'true');

Check the status of the connector in ksqlDB

SHOW CONNECTORS;
 Connector Name  | Type | Class                                                         | Status
----------------------------------------------------------------------------------------------------------------------
 SINK_ELASTIC_01 | SINK | io.confluent.connect.elasticsearch.ElasticsearchSinkConnector | RUNNING (1/1 tasks RUNNING)
----------------------------------------------------------------------------------------------------------------------

Check that data is arriving:

docker exec elasticsearch curl -s "http://localhost:9200/_cat/indices/*?h=idx,docsCount"
.kibana_task_manager_1        2
.apm-agent-configuration      0
.kibana_1                     1
carpark_events           265793

(Open Kibana (Elastic Cloud))

Visualise it in Kibana:

carpark kibana01

Extended demo

Load historic data

Preview the data

curl --show-error --silent https://datahub.bradford.gov.uk/ebase/api/getData/v2/Council/CarParkHistoric | head

Load the data

curl --show-error --silent https://datahub.bradford.gov.uk/ebase/api/getData/v2/Council/CarParkHistoric | \
  docker exec -i kafkacat kafkacat -b broker:29092 -t carparks_historic -P

Apply schema to historic

CREATE STREAM CARPARK_HISTORIC (date          VARCHAR ,
                                time          VARCHAR ,
                                name          VARCHAR ,
                                capacity      INT ,
                                empty_places  INT ,
                                status        VARCHAR ,
                                latitude      DOUBLE ,
                                longitude     DOUBLE )
                WITH (KAFKA_TOPIC='carparks_historic',
                VALUE_FORMAT='DELIMITED');

Merge the data into the existing carpark stream

SET 'auto.offset.reset' = 'earliest';

INSERT INTO CARPARK_EVENTS
SELECT STRINGTOTIMESTAMP(DATE + ' ' + TIME ,'yyyy-MM-dd HH:mm','Europe/London' ) AS TS,
       *,
       '' AS DIRECTIONSURL,
       (CAST((CAPACITY - EMPTY_PLACES) AS DOUBLE) /
        CAST(CAPACITY AS DOUBLE)) * 100 AS PCT_FULL,
       STRUCT("lat" := LATITUDE, "lon":= LONGITUDE) AS "location",
       'v2/Council/CarParkHistoric' as SOURCE
  FROM CARPARK_HISTORIC
  EMIT CHANGES;

Check the data:

SELECT SOURCE,
       COUNT(*) AS EVENT_CT,
       TIMESTAMPTOSTRING( MIN(TS),'yyyy-MM-dd HH:mm:ss','Europe/London') AS EARLIEST_TS,
       TIMESTAMPTOSTRING( MAX(TS),'yyyy-MM-dd HH:mm:ss','Europe/London') AS LATEST_TS
  FROM CARPARK_EVENTS
GROUP BY SOURCE
EMIT CHANGES
LIMIT 2;

Load same logical data with different physical schema

http -a $LEEDS_USER:$LEEDS_PW get http://www.leedstravel.info/datex2/carparks/content.xml | \
        xq '."d2lm:d2LogicalModel"."d2lm:payloadPublication"."d2lm:situation"[]."d2lm:situationRecord"'
{
  "@id": "LEEDSCP0001_1",
  "@xsi:type": "d2lm:CarParks",
  "d2lm:situationRecordCreationTime": "2021-10-05T19:54:47",
  "d2lm:situationRecordVersion": "1",
  "d2lm:situationRecordVersionTime": "2021-10-05T19:54:47",
  "d2lm:situationRecordFirstSupplierVersionTime": "2021-10-05T19:54:47",
  "d2lm:probabilityOfOccurrence": "certain",
  "d2lm:validity": {
    "d2lm:validityStatus": "active",
    "d2lm:validityTimeSpecification": {
      "d2lm:overallStartTime": "2021-10-05T19:54:47"
    }
  },
  "d2lm:groupOfLocations": {
    "d2lm:locationContainedInGroup": {
      "@xsi:type": "d2lm:Point",
      "d2lm:pointByCoordinates": {
        "d2lm:pointCoordinates": {
          "d2lm:latitude": "53.79798394752138",
          "d2lm:longitude": "-1.546011529543302"
        }
      }
    }
  },
  "d2lm:carParkIdentity": "Albion Street:LEEDSCP0001",
  "d2lm:carParkOccupancy": "12",
  "d2lm:carParkStatus": "enoughSpacesAvailable",
  "d2lm:occupiedSpaces": "47",
  "d2lm:totalCapacity": "389"
}
source .env

http -a $LEEDS_USER:$LEEDS_PW get http://www.leedstravel.info/datex2/carparks/content.xml | \
        xq -c '."d2lm:d2LogicalModel"."d2lm:payloadPublication"."d2lm:situation"[]."d2lm:situationRecord"' | \
        docker exec -i kafkacat kafkacat -b broker:29092 -t carparks_leeds -P
CREATE STREAM CARPARK_LEEDS_SRC01 (
"d2lm:situationRecordCreationTime" VARCHAR,
    "d2lm:groupOfLocations" STRUCT <
        "d2lm:locationContainedInGroup" STRUCT <
            "d2lm:pointByCoordinates" STRUCT <
                "d2lm:pointCoordinates" STRUCT <
                    "d2lm:latitude" DOUBLE,
                    "d2lm:longitude" DOUBLE > > > >,
    "d2lm:carParkIdentity" VARCHAR,
    "d2lm:carParkOccupancy" DOUBLE,
    "d2lm:carParkStatus" VARCHAR,
    "d2lm:occupiedSpaces" INT,
    "d2lm:totalCapacity"  INT
    )
WITH (KAFKA_TOPIC='carparks_leeds',
      VALUE_FORMAT='JSON');
INSERT INTO CARPARK_EVENTS
    SELECT  STRINGTOTIMESTAMP("d2lm:situationRecordCreationTime",
                            'yyyy-MM-dd''T''HH:mm:ss',
                            'Europe/London')
                                    AS TS,
            TIMESTAMPTOSTRING(STRINGTOTIMESTAMP("d2lm:situationRecordCreationTime",
                            'yyyy-MM-dd''T''HH:mm:ss',
                            'Europe/London'),
                            'yyyy-MM-dd')
                                    AS DATE,
            TIMESTAMPTOSTRING(STRINGTOTIMESTAMP("d2lm:situationRecordCreationTime",
                                    'yyyy-MM-dd''T''HH:mm:ss',
                                    'Europe/London'),
                                    'HH:mm')
                                    AS TIME,
            "d2lm:carParkIdentity"  AS NAME,
            "d2lm:totalCapacity"    AS CAPACITY,
            "d2lm:totalCapacity" - "d2lm:occupiedSpaces"
                                    AS EMPTY_PLACES,
            "d2lm:carParkStatus"    AS STATUS,
            "d2lm:groupOfLocations" -> "d2lm:locationContainedInGroup" -> "d2lm:pointByCoordinates" -> "d2lm:pointCoordinates" -> "d2lm:latitude"
                                    AS LATITUDE,
            "d2lm:groupOfLocations" -> "d2lm:locationContainedInGroup" -> "d2lm:pointByCoordinates" -> "d2lm:pointCoordinates" -> "d2lm:longitude"
                                    AS LONGITUDE,
            ''                      AS DIRECTIONSURL,
            "d2lm:carParkOccupancy" AS PCT_FULL ,
            STRUCT("lat" := "d2lm:groupOfLocations" -> "d2lm:locationContainedInGroup" -> "d2lm:pointByCoordinates" -> "d2lm:pointCoordinates" -> "d2lm:latitude",
                   "lon" := "d2lm:groupOfLocations" -> "d2lm:locationContainedInGroup" -> "d2lm:pointByCoordinates" -> "d2lm:pointCoordinates" -> "d2lm:longitude")
                                    AS "location",
            'https://datamillnorth.org/dataset/live-car-park-spaces-api'
                                    AS SOURCE
            FROM  CARPARK_LEEDS_SRC01
        EMIT CHANGES;

Making the alert variable

CREATE TABLE ALERT_CONFIG (CARPARK VARCHAR PRIMARY KEY, SPACES_ALERT INT) WITH (KAFKA_TOPIC='alert_config', VALUE_FORMAT='PROTOBUF', PARTITIONS=4);

INSERT INTO ALERT_CONFIG (CARPARK, SPACES_ALERT) VALUES ('Kirkgate Centre',470);

CREATE STREAM CARPARK_ALERTS AS
    SELECT C.NAME AS CARPARK,
           TIMESTAMPTOSTRING(C.TS,'yyyy-MM-dd HH:mm:ss','Europe/London') AS DATA_TS,
           CAPACITY     ,
           EMPTY_PLACES,
           A.SPACES_ALERT AS ALERT_THRESHOLD,
           STATUS      ,
           LATITUDE    ,
           LONGITUDE   ,
           DIRECTIONSURL
      FROM CARPARK_EVENTS C
            INNER JOIN
           ALERT_CONFIG A
            ON C.NAME=A.CARPARK
      WHERE C.EMPTY_PLACES >= A.SPACES_ALERT ;
SELECT CARPARK, ALERT_THRESHOLD, DATA_TS, EMPTY_PLACES FROM CARPARK_ALERTS EMIT CHANGES;
+-----------------+-----------------+--------------------+-------------+
|CARPARK          |ALERT_THRESHOLD  |DATA_TS             |EMPTY_PLACES |
+-----------------+-----------------+--------------------+-------------+
|Kirkgate Centre  |470              |2020-07-21 10:55:00 |505          |