Skip to content

Streaming data processing pipeline using Spark, PostgreSQL, Debezium, Kafka, Minio, Delta Lake, Trino and DBeaver

Notifications You must be signed in to change notification settings

VuBacktracking/stream-data-processing

Repository files navigation

DATA STREAM PROCESSING

Overview

  • Persist data to PostgreSQL.
  • Monitor changes to data using the Debezium Connector.
  • Stream data from a Kafka topic using PySpark (Spark Streaming).
  • Convert the streaming data to Delta Lake format.
  • Write the Delta Lake data to MinIO (AWS Object Storage).
  • Query the data with Trino.
  • Display the results in DBeaver.

System Architecture

workflow

Prequisites

Before runing this script, ensure you have the following installed.
Note: The project was setup on Ubuntu 22.04 OS.

  • Ubuntu 22.04 (prefered, but you can use Ubuntu 20.04)
  • Python 3.10
  • Apache Spark (installed locally)
  • Apache Airflow
  • Confluent Containers (Zookeeper, Kafka, Schema Registry, Connect, Control Center)
  • Docker
  • Minio
  • Trino, DBeaver CE
  • Delta Lake
  • Debezium, Debezium UI

Start

  1. Clone the repository
$ git clone https://github.com/VuBacktracking/stream-data-processing.git
$ cd stream-data-processing
  1. Start our data streaming infrastructure
$ sudo service docker start
$ docker compose -f storage-docker-compose.yaml -f stream-docker-compose.yaml up -d
  1. Setup environment
$ python3 -m venv .venv
$ pip install -r requirements.txt

Create .env file and paste your MINIO keys, SPARK_HOME in it.

# MinIO
- MINIO_ACCESS_KEY='minio_access_key'
- MINIO_SECRET_KEY='minio_secret_key'
- MINIO_ENDPOINT='http://localhost:9000'
- BUCKET_NAME='datalake'

# Postgres SQL
- POSTGRES_DB='v9'
- POSTGRES_USER='v9'
- POSTGRES_PASSWORD='v9'

# Spark
- SPARK_HOME=""
  1. Services

How to use?

  • Step 1. Start Debezium Connection
cd debezium
bash run-cdc.sh register_connector conf/products-cdc-config.json

You should see the connection is running like the image below in the port http://localhost:8085.

  • Step 2. Create table and insert data into Database
python3 database-operations/create_table.py
python3 database-operations/insert_table.py

In the PostgreSQL connection, you should see the database v9 and the table products like the image below.

  • Step 3. Start Streaming Data to MinIO
python3 stream_processing/delta-to-minio.py

After putting data to MinIO storage, you can go to the port http://localhost:9001 and see the result like this image

Read streaming data with Trino and Dbeaver

Connect Trino in Dbeaver

Query with Dbeaver

Create your Trino schema and table in Dbeaver

-- Create the schema if it doesn't exist
CREATE SCHEMA IF NOT EXISTS lakehouse.products
WITH (location = 's3://datalake/');

-- Create the products table
CREATE TABLE IF NOT EXISTS lakehouse.products.products (
    id VARCHAR,
    name VARCHAR,
    original_price DOUBLE,
    price DOUBLE,
    fulfillment_type VARCHAR,
    brand VARCHAR,
    review_count INTEGER,
    rating_average DOUBLE,
    favourite_count INTEGER,
    current_seller VARCHAR,
    number_of_images INTEGER,
    category VARCHAR,
    quantity_sold INTEGER,
    discount DOUBLE
) WITH (
    location = 's3://datalake/products/'
);

About

Streaming data processing pipeline using Spark, PostgreSQL, Debezium, Kafka, Minio, Delta Lake, Trino and DBeaver

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published