This repository provides a code base for the information integration course in the summer semester of 2022. Below you can find the documentation for setting up the project.
- Install Python 3.10.5
- Install Poetry
- Install Docker and docker-compose
- Install Protobuf compiler (protoc). If you are using windows you can use this guide
- Install jq
The Registerbekanntmachung website contains
announcements concerning entries made into the companies, cooperatives, and
partnerships registers within the electronic information and communication system. You can search for the announcements.
Each announcement can be requested through the link below. You only need to pass the query parameters rb_id
and land_abk
. For instance, we chose the state Rheinland-Pfalz rp
with an announcement id of 56267
, the
new entry of the company BioNTech.
export STATE="rp"
export RB_ID="56267"
curl -X GET "https://www.handelsregisterbekanntmachungen.de/skripte/hrb.php?rb_id=$RB_ID&land_abk=$STATE"
The Registerbekanntmachung crawler (rb_crawler) sends a get request to the link above with parameters (rb_id
and land_abk
) passed to it and extracts the information from the response.
We use Protocol buffers
to define our schema.
The crawler uses the generated model class (i.e., Announcement
class) from
the protobuf schema.
We will explain further how you can generate this class using the protobuf compiler.
The compiler creates an Announcement
object with the fields defined in the schema.
The crawler fills the object fields with the extracted data from the website.
It then serializes the Announcement
object to bytes so that Kafka can read it and produces it to
the rb-announcements
topic.
After that, it increments the rb_id
value and sends another GET request.
This process continues until the end of the announcements is reached, and the crawler will stop automatically.
The rb-annourncements
topic holds all the announcements produced by the rb_crawler
. Each message in a Kafka topic
consist of a key and value.
The key type of this topic is String
. The rb_crawler generates the key
. The key
is a combination of the land_abk
and the rb_id
. If we consider the rb_id
and land_abk
from the example above,
the key will look like this: rp_56267
.
The value of the message contains more information like event_name
, event_date
, and more. Therefore, the value type
is complex and needs a schema definition.
This topic contains the extracted information about the corporates of the Registerbekanntmachung. The key is a self generated hash from the corporate name and the value is a complex schema type.
This topic contains the extracted information about the persons of the Registerbekanntmachung. The key is a self generated hash from the source(i.e., rb), firstname, lastname, and corporate name. The value is a complex schema type.
The Federal Financial Supervisory Authority (BaFin) brings
together under one roof the supervision of banks and
financial services providers, insurance undertakings and securities trading.
The website also contains registered announcements of managers’ transactions pursuant to Article 19 of the MAR.
These announcements describe the transaction details that an executive director (manager) of a company did in the
stocks. The website holds the transaction information in a one-year time window.
The first announcement has a message ID of 18794
at the time of this writing.
You can find the message ID on this page.
This crawler extracts the information from the BaFin portal and fills the model objects with the extracted data. Moreover, it serializes the objects and produces each of them to the desired topic.
The crawler is initialized with a message_id
at the beginning of the crawl and sends a request to the portal URL of
BaFin. This process is demonstrated in the script below:
export MESSAGE_ID="18794"
curl -X GET https://portal.mvp.bafin.de/database/DealingsInfo/ergebnisListe.do?cmd=loadEmittentenAction&meldepflichtigerId=$MEESAGE_ID
After retrieving the HTML of the page, the crawler extracts the BaFin-ID
in the table and
sends another request to retrieve the detailed transaction information. This is demonstrated with a shell script:
export MESSAGE_ID="18794"
export BAFIN_ID=40002082
https://portal.mvp.bafin.de/database/DealingsInfo/transaktionListe.do?cmd=loadTransaktionenAction&emittentBafinId=$BAFIN_ID&meldungId=$MESSAGE_ID&KeepThis=true&TB_iframe=true&modal=true
The crawler uses the HTML response and extracts the information, and produces them for each topic.
It then increases the message_id
and repeats this process.
This topic contains all the transaction information a person made. The key is a self-generated string from the message-id and the BaFin id. The value is a complex schema type.
The bafin_corporates
topic contains all the information about a corporate.
The key is a self-generated hash from the corporate name.
The value is a complex schema type.
This topic contains the extracted information about the persons who made a transaction. The key is a self-generated hash from the source (i.e., BaFin), first name, last name, and corporate name. The value is a complex schema type.
Kafka Connect is a tool to move large data sets into (source) and out (sink) of Kafka. Here we only use the Sink connector, which consumes data from a Kafka topic into a secondary index such as Elasticsearch.
We use the Elasticsearch Sink Connector
to move the data from the coporate-events
topic into the Elasticsearch.
This project uses Poetry as a build tool.
To install all the dependencies, just run poetry install
.
This project uses Protobuf for serializing and deserializing objects.
You can find these schemas under the proto
folder.
Furthermore, you must generate the Python code for the model class from the proto file.
To do so run the generate-proto.sh
script.
This script uses the Protobuf compiler (protoc) to generate the model class
under the bakdata
folder.
Use docker-compose up -d
to start all the services: Zookeeper
, Kafka, Schema
Registry, Redpanda Console,
Kafka Connect,
and Elasticsearch. Depending on your system, it takes a couple of minutes
before the services are up and running. You can use a tool
like lazydocker to check the status of the services.
NOTE: Kafka Connect start time for the Apple silicon is more than 5 minutes!
You can start using Kafka Connect whenever the status of the container is running (healthy)
.
After all the services are up and running, you need to configure Kafka Connect to use
the Elasticsearch or the Neo4j sink connector.
The config file is a JSON formatted file. We provided the sink configuration for the different topics under the
connect
folder.
You can find more information about the configuration properties for the Elasticsearch sink on the official documentation page. Details on configuring the Neo4j sink connector are available on the official documentation page.
To start the connector, you must push the JSON config file to Kafka. You can use the UI dashboard in Redpanda Console or the bash script provided. It is possible to remove a connector by deleting it through Redpanda's Console or calling the deletion API in the bash script provided.
You can start the crawler with the command below:
poetry run python -m rb_crawler.main --id $RB_ID --state $STATE
The --id
option is an integer, which determines the initial event in the Handelsregisterbekanntmachungen to be
crawled.
The --state
option takes a string (only the ones listed above). This string defines the state where the crawler should
start from.
You can use the --help
option to see the usage:
Usage: main.py [OPTIONS]
Options:
-i, --id INTEGER The rb_id to initialize the crawl from
-s, --state [bw|by|be|br|hb|hh|he|mv|ni|nw|rp|sl|sn|st|sh|th]
The state ISO code
--help Show this message and exit.
You can start the crawler with the command below:
poetry run python -m bafin_crawler.main --id $MESSAGE_ID
The --id
option is an integer, which determines the initial event in the BaFin portal to be
crawled.
You can use the --help
option to see the usage:
Usage: main.py [OPTIONS]
Options:
-i, --id INTEGER The message_id to initialize the crawl from
--help Show this message and exit.
Redpanda Console is a web application that helps you manage and debug your Kafka workloads effortlessly. You can create, update, and delete Kafka resources like Topics and Kafka Connect configs. You can open Redpanda Console in your browser under http://localhost:8080.
To query the data from Elasticsearch, you can use the query DSL of elastic. For example:
curl -X GET "localhost:9200/_search?pretty" -H 'Content-Type: application/json' -d'
{
"query": {
"match": {
<field>
}
}
}
'
<field>
is the field you wish to search. For example:
"first_name":"Sussane"
You can stop and remove all the resources by running:
docker-compose down