diff --git a/.github/workflows/makefile.yml b/.github/workflows/makefile.yml index c9f50e2..a62750a 100644 --- a/.github/workflows/makefile.yml +++ b/.github/workflows/makefile.yml @@ -40,12 +40,12 @@ jobs: - name: Create Pinot Tables run: make tables - - name: Import Data - run: make import - - name: Validate that cluster is Up and Schemas are deployed run: make validate + - name: Import Data + run: make import + - name: Teardown run: make destroy diff --git a/.gitignore b/.gitignore index 85c7bdb..4bc7e7b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,7 @@ logs -.venv +# Python virtual environments +venv/ +myenv/ +*.venv +env/ +ENV/ \ No newline at end of file diff --git a/Makefile b/Makefile index f6606d2..03894e7 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -base: create tables import +base: create tables import info schema: docker run \ @@ -90,6 +90,9 @@ validate: exit 1; \ fi +info: + @printf "🍷 Pinot Query UI - \033[4mhttp://localhost:9000\033[0m\n" + destroy: docker compose down -v diff --git a/README.md b/README.md index bea5940..05fad45 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,25 @@ -# Pinot Getting Started - -This repository gets you started with Apache Pinot. It loads two sources of data: a real-time stream of movie ratings and a batch source of movies. The two data sets can be joined together in Apache Pinot. +# Pinot Getting Started Guide + +Welcome to the Apache Pinot Getting Started guide. +This repository will help you set up and run a demonstration that involves streaming and batch data sources. +The demonstration includes a real-time stream of movie ratings and a batch data source of movies, which can be joined in Apache Pinot for querying. + + +* [Pinot Getting Started Guide](#pinot-getting-started-guide) + * [Architecture Diagram](#architecture-diagram-) + * [A Quick Shortcut](#a-quick-shortcut) + * [Step-by-Step Details](#step-by-step-details) + * [Step 1: Build and Launch with Docker](#step-1-build-and-launch-with-docker) + * [Step 2: Create a Kafka Topic](#step-2-create-a-kafka-topic) + * [Step 3: Configure Pinot Tables](#step-3-configure-pinot-tables) + * [Step 4: Load Data into the Movies Table](#step-4-load-data-into-the-movies-table) + * [Step 5: Apache Pinot Advanced Usage](#step-5-apache-pinot-advanced-usage) + * [Clean Up](#clean-up) + * [Troubleshooting](#troubleshooting) + * [Further Reading](#further-reading) + + +## Architecture Diagram ```mermaid flowchart LR @@ -14,27 +33,28 @@ p-->mrp[Movie Ratings] p-->Movies ``` -## Just Run It +## A Quick Shortcut -Use `make` to just see the demonstration run. Run the command below. To delve into the setup, go to [step by step](#step-by-step) section. +To quickly see the demonstration in action, you can use the following command: ```bash -make base +make ``` -Skip to the [Apache Pinot](#apache-pinot) section to run the `multi-stage` join between the ratings and movies table. +For a detailed step-by-step setup, please refer to the [Step-by-Step Details](#step-by-step-details) section. -## Step-By-Step Details +If you're ready to explore the advanced features, jump directly to the [Apache Pinot Advanced Usage](#step-5-apache-pinot-advanced-usage) section to run a multi-stage join between the ratings and movies tables. -This section is a step-by-step outline of the process to get this demonstration running. It describes the steps in more detail. +## Step-by-Step Details -### Step 1 - Build and Compose Up with Docker +This section provides detailed instructions to get the demonstration up and running from scratch. -Apache Pinot's can query real-time streaming data flowing through streaming platforms like Apache Kafka. +### Step 1: Build and Launch with Docker -To mock streaming data, this quick start has a built-in stream producer that writes to Kafka using Python. All Python-related details for this producer can be found in its [Dockerfile](docker/producer/Dockerfile). +Apache Pinot queries real-time data through streaming platforms like Apache Kafka. +This setup includes a mock stream producer using Python to write data into Kafka. -Build the producer image and start all the services by running these commands. +First, build the producer image and start all services using the following commands: ```bash docker compose build --no-cache @@ -42,19 +62,16 @@ docker compose build --no-cache docker compose up -d ``` -The [docker-compose](./docker-compose.yml) file starts up these containers: - -- Dedicated Zookeeper for Pinot -- Pinot Controller -- Pinot Broker -- Pinot Server -- Kraft "Zookeeperless" Kafka -- The python producer +The `docker-compose.yml` file configures the following services: +- Zookeeper (dedicated to Pinot) +- Pinot Controller, Broker, and Server +- Kraft (Zookeeperless Kafka) +- Python producer -### Step 2 - Create a Kafka Topic +### Step 2: Create a Kafka Topic -Create the Kafka topic for the producer to write into and for the Pinot table to read from. +Next, create a Kafka topic for the producer to send data to, which Pinot will then read from: ```bash docker exec -it kafka kafka-topics.sh \ @@ -63,7 +80,7 @@ docker exec -it kafka kafka-topics.sh \ --topic movie_ratings ``` -At this point, the producer should be sending data to a topic in Kafka called `movie_ratings`. You can test this by running the command below. +To verify the stream, check the data flowing into the Kafka topic: ```bash docker exec -it kafka \ @@ -72,14 +89,15 @@ docker exec -it kafka \ --topic movie_ratings ``` -### Step 3 - Create the Pinot Tables +### Step 3: Configure Pinot Tables -There are two tables we need to create in Pinot: +In Pinot, create two types of tables: -- A REALTIME table called `movie_ratings`. -- An OFFLINE table called `movies`. +1. A REALTIME table for streaming data (`movie_ratings`). +2. An OFFLINE table for batch data (`movies`). -To query the Kafka topic in Pinot, we add the real-time table using the `pinot-admin` CLI, providing it with a [schema](./table/ratings.schema.json) and a [table configuration](./table/ratings.table.json). The table configuration contains the connection information to Kafka. +To query the Kafka topic in Pinot, we add the real-time table using the `pinot-admin` CLI, providing it with a [schema](./table/ratings.schema.json) and a [table configuration](./table/ratings.table.json). +The table configuration contains the connection information to Kafka. ```bash docker exec -it pinot-controller ./bin/pinot-admin.sh \ @@ -101,11 +119,13 @@ docker exec -it pinot-controller ./bin/pinot-admin.sh \ -exec ``` -Once added, the OFFLINE table will not have any data. Let's add data in the next step. +Once added, the OFFLINE table will not have any data. +Let's add data in the next step. -### Step 4 - Load the Movies Table -We again leverage the `pinot-admin.sh` CLI to load data into an OFFLINE table. +### Step 4: Load Data into the Movies Table + +Use the following command to load data into the OFFLINE movies table: ```bash docker exec -it pinot-controller ./bin/pinot-admin.sh \ @@ -113,49 +133,46 @@ docker exec -it pinot-controller ./bin/pinot-admin.sh \ -jobSpecFile /tmp/pinot/table/jobspec.yaml ``` -In this command, we use a YAML [file](table/jobspec.yaml) that provides the specification for loading the [movies data](data/movies.json). Once this job is completed, you can query the movies table [here](http://localhost:9000/#/query?query=select+*+from+movies+limit+10&tracing=false&useMSE=false). - -Now that you can query both the REALTIME and OFFLINE tables, you can perform a JOIN query in the next section. +Now, both the REALTIME and OFFLINE tables are queryable. -## Apache Pinot +### Step 5: Apache Pinot Advanced Usage -Click to open the Pinot console [here](http://localhost:9000/#/query). To perform a join, you'll need to select the `Use Multi-Stage Engine` before clicking on `RUN QUERY`. +To perform complex queries such as joins, open the Pinot console [here](http://localhost:9000/#/query) and enable `Use Multi-Stage Engine`. Example query: ```sql -select - r.rating latest_rating, - m.rating initial_rating, - m.title, - m.genres, - m.releaseYear +select + r.rating latest_rating, + m.rating initial_rating, + m.title, + m.genres, + m.releaseYear from movies m -left join movie_ratings r on m.movieId = r.movieId + left join movie_ratings r on m.movieId = r.movieId where r.rating > .9 order by r.rating desc -limit 10 - + limit 10 ``` -You should see a similar result: ![alt](./images/results.png) ## Clean Up -To destroy the demo, run the command below. +To stop and remove all services related to the demonstration, run: ```bash docker compose down ``` -## Trouble Shooting - -If you get "No space left on device" when executing docker build. +## Troubleshooting -```docker system prune -f``` +If you encounter "No space left on device" during the Docker build process, you can free up space with: +```bash +docker system prune -f +``` -## Getting Started +## Further Reading -Get started for yourself by visiting StarTree developer page [here](https://dev.startree.ai/docs/pinot/getting-started/quick-start) +For more detailed tutorials and documentation, visit the StarTree developer page [here](https://dev.startree.ai/) diff --git a/docker-compose.yml b/docker-compose.yml index 44c3467..832879a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,13 +1,11 @@ -version: "3.7" +version: "3.8" services: - producer: build: context: docker/producer container_name: producer environment: - - SCHEMA=/code/schema.json - BOOTSTRAPSERVER=kafka:9092 - TOPIC=movie_ratings - DATA=/tmp/movies.json @@ -16,7 +14,7 @@ services: - kafka volumes: - ./data/:/tmp/ - + pinot-zookeeper: image: zookeeper:latest container_name: pinot-zookeeper @@ -25,7 +23,7 @@ services: environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 - + pinot-controller: image: apachepinot/pinot:1.1.0-21-openjdk command: "StartController -zkAddress pinot-zookeeper:2181" @@ -39,10 +37,11 @@ services: depends_on: - pinot-zookeeper healthcheck: - test: [ "CMD", "curl", "-f", "http://localhost:9000/health" ] + test: [ "CMD-SHELL", "curl -f http://localhost:9000/health || exit 1" ] interval: 30s timeout: 10s - retries: 10 + retries: 3 + start_period: 10s volumes: - ./table/:/tmp/pinot/table/ - ./data/:/tmp/pinot/data/ @@ -57,12 +56,14 @@ services: environment: JAVA_OPTS: "-Dplugins.dir=/opt/pinot/plugins -Xms4G -Xmx4G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -Xloggc:gc-pinot-broker.log" depends_on: - - pinot-controller + pinot-controller: + condition: service_healthy healthcheck: - test: [ "CMD", "curl", "-f", "http://localhost:8099/health" ] + test: [ "CMD-SHELL", "curl -f http://localhost:8099/health || exit 1" ] interval: 30s timeout: 10s - retries: 10 + retries: 3 + start_period: 10s pinot-server: image: apachepinot/pinot:1.1.0-21-openjdk @@ -74,7 +75,8 @@ services: environment: JAVA_OPTS: "-Dplugins.dir=/opt/pinot/plugins -Xms4G -Xmx16G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -Xloggc:gc-pinot-server.log" depends_on: - - pinot-broker + pinot-broker: + condition: service_healthy kafka: image: docker.io/bitnami/kafka:3.6 @@ -82,6 +84,7 @@ services: container_name: kafka ports: - "9092:9092" + - "29092:29092" healthcheck: test: [ "CMD", "nc", "-z", "localhost", "9092" ] interval: 5s @@ -93,8 +96,9 @@ services: - KAFKA_CFG_PROCESS_ROLES=controller,broker - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 # Listeners - - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 - - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092 - - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,PLAINTEXT_HOST://:29092 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://localhost:29092 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT + \ No newline at end of file diff --git a/docker/producer/Dockerfile b/docker/producer/Dockerfile index c58b9ad..e9d698b 100644 --- a/docker/producer/Dockerfile +++ b/docker/producer/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.11.1 +FROM python:3.12.3-slim RUN apt-get update -y && \ apt-get install -y librdkafka-dev diff --git a/docker/producer/producer.py b/docker/producer/producer.py index 01cd702..575e571 100644 --- a/docker/producer/producer.py +++ b/docker/producer/producer.py @@ -1,43 +1,40 @@ #!/bin/local/python -import time -import random import json -import os import logging -from faker import Faker -import pandas as pd +import os +import random +import time +import pandas as pd from confluent_kafka import Producer + class Generator: - - def next() -> (str, object): - pass + def next(self) -> (str, object): + pass + class RatingGenerator(Generator): def __init__(self): - Faker.seed(0) - self.faker = Faker() path = os.getenv('DATA') - if(path is None): + if path is None: raise Exception("need to movies.json file") self.df = pd.read_json(path, lines=True) - + def next(self): key = random.randint(self.df['movieId'].min(), self.df['movieId'].max()) data = json.dumps({ - "movieId": key, - "rating": round(random.uniform(0.0, 10.0), 2), - "ratingTime": round(time.time() * 1000) + "movieId": key, + "rating": round(random.uniform(0.0, 10.0), 2), + "ratingTime": round(time.time() * 1000) }) return str(key), data.encode('utf-8') - def delivery_report(err, msg): - """ Called once for each message produced to indicate delivery result. + """ Called once for each message produced to indicate a delivery result. Triggered by poll() or flush(). """ if err is not None: print('Message delivery failed: {}'.format(err)) @@ -45,29 +42,29 @@ def delivery_report(err, msg): print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) -def send(p, topic, gen:Generator, limit:int=100000): +def send(p, topic, gen: Generator, limit: int = 100000): p.poll(0) - - for i in range(limit): + + for i in range(limit): key, data = gen.next() p.produce( key=key, - topic=topic, - value=data, + topic=topic, + value=data, on_delivery=delivery_report) - + # Wait for any outstanding messages to be delivered and delivery report # callbacks to be triggered. p.flush() -if __name__== "__main__": - bootstrap = os.getenv('BOOTSTRAPSERVER','kafka:9092') - topic = os.getenv('TOPIC','data') - limit = int(os.getenv('LIMIT', 100000)) - - logging.basicConfig(level=logging.INFO) - p = Producer({'bootstrap.servers': bootstrap}) - gen = RatingGenerator() - send(p, topic=topic, gen=gen, limit=limit) +if __name__ == "__main__": + bootstrap = os.getenv('BOOTSTRAPSERVER', 'kafka:9092') + tc = os.getenv('TOPIC', 'data') + lmt = int(os.getenv('LIMIT', 100000)) + + logging.basicConfig(level=logging.INFO) + pr = Producer({'bootstrap.servers': bootstrap}) + gn = RatingGenerator() + send(pr, topic=tc, gen=gn, limit=lmt) diff --git a/docker/producer/requirements.txt b/docker/producer/requirements.txt index 3f69cbf..fa7243e 100644 --- a/docker/producer/requirements.txt +++ b/docker/producer/requirements.txt @@ -1,4 +1,4 @@ confluent-kafka -Faker numpy pandas +pyarrow