Skip to content

Commit

Permalink
update content: add simple ETL project
Browse files Browse the repository at this point in the history
  • Loading branch information
behnamyazdan committed Aug 12, 2024
1 parent f7a4613 commit 23cc933
Show file tree
Hide file tree
Showing 17 changed files with 566 additions and 12 deletions.
2 changes: 1 addition & 1 deletion 01-PythonBasics/05-04-DataStructure(dictionaries).md
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ Dictionaries are versatile data structures in Python, offering efficient lookup
# Storing application configuration settings
config = {
"database": {"host": "localhost", "port": 3306, "username": "admin", "password": "password"},
"logging": {"level": "DEBUG", "file": "app.log"}
"logging": {"level": "DEBUG", "file": "app.logs"}
}

# Accessing database connection details
Expand Down
2 changes: 1 addition & 1 deletion 01-PythonBasics/08-ErrorHandling.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ Effective error handling involves not only using the right techniques but also f
```python
import logging

logging.basicConfig(filename='app.log', level=logging.ERROR)
logging.basicConfig(filename='app.logs', level=logging.ERROR)

try:
result = 10 / 0
Expand Down
2 changes: 1 addition & 1 deletion 03-GitAndGitHub/05-GitBasics.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ Finally, let's view the commit history to see the changes we've made.
1. **Viewing Commit History**:
- Use the `git log` command to see a log of all the commits made to the repository.
```sh
git log
git logs
```
- This command shows a list of commits, including their commit hashes, author information, date, and commit messages. This history is crucial for tracking the development of your project over time.

Expand Down
2 changes: 1 addition & 1 deletion 05-WorkingWithDataSourcesAndSerialization/07-CsvFiles.md
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ Implement comprehensive error handling to log and manage exceptions, providing m
```python
import logging

logging.basicConfig(filename='data_processing.log', level=logging.ERROR)
logging.basicConfig(filename='data_processing.logs', level=logging.ERROR)

def process_csv(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2503,8 +2503,8 @@ import csv
import random
import datetime

# Configuration for the log file
num_records = 10000 # Number of log records to generate
# Configuration for the logs file
num_records = 10000 # Number of logs records to generate
start_date = datetime.datetime(2024, 1, 1) # Start date for the logs
end_date = datetime.datetime(2024, 1, 10) # End date for the logs

Expand All @@ -2521,14 +2521,14 @@ def generate_random_timestamp(start, end):
seconds=random.randint(0, int((end - start).total_seconds()))
)

# Function to generate a single log record
# Function to generate a single logs record
def generate_log_record():
timestamp = generate_random_timestamp(start_date, end_date).strftime("%Y-%m-%d %H:%M:%S")
user_id = random.choice(user_ids)
page_url = random.choice(page_urls)
return [timestamp, user_id, page_url]

# Generate the log file
# Generate the logs file
with open("large_log_file.csv", "w", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["timestamp", "user_id", "page_url"]) # Write header
Expand All @@ -2545,14 +2545,14 @@ import pyarrow as pa
import pandas as pd
import pyarrow.csv as csv

# Define the schema for the log file
# Define the schema for the logs file
schema = pa.schema([
('timestamp', pa.timestamp('ms')),
('user_id', pa.string()),
('page_url', pa.string())
])

# Function to read log file in batches
# Function to read logs file in batches
def read_log_file_in_batches(file_path, batch_size):
read_options = csv.ReadOptions(block_size=batch_size)
convert_options = csv.ConvertOptions(column_types=schema)
Expand Down
295 changes: 293 additions & 2 deletions 08-BuildingDataPipelines/02-ETL.md
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,8 @@ In this project, we aim to implement an ETL (Extract, Transform, Load) process f

**Objective:** Build an ETL pipeline to manage and analyze data from an e-commerce platform. The data includes sales transactions, customer interactions, and inventory information. This pipeline will help in generating aggregated insights and reports for business intelligence.

![](/lab/learning_projects/PythonForDataEngineering/_assets/etl_project_overview.webp)

**Data Sources:**

- **Sales Data:** Stored in a PostgreSQL database, including transaction details such as sales amount, customer ID, product ID, and transaction date.
Expand Down Expand Up @@ -586,7 +588,7 @@ docker-compose up -d

**1. Connect to PostgreSQL**

You can connect to PostgreSQL using a tool like `psql`, or any SQL client that supports PostgreSQL.
You can connect to PostgreSQL using a tool like `psql`, `dbeaver`or any SQL client that supports PostgreSQL.

**2. Create Database Schema**

Expand Down Expand Up @@ -626,5 +628,294 @@ CREATE TABLE sales (



![](./Examples/simple_ecommerce_etl/images/sql_schema.png)
![](Examples/images/sql_schema.png)

#### Creating data warehouse schema:

**1. Connect to MySQL**

You can connect to MySQL using a tool like `mysql workbench`, `dbeaver`, or any SQL client that supports PostgreSQL.

**2. Create Database Schema**

Use the following SQL statements to create the data warehouse schema:

``` sql
CREATE DATABASE IF NOT EXISTS data_warehouse;

CREATE TABLE IF NOT EXISTS dim_customer (
customer_id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL,
join_date DATE NOT NULL
);


CREATE TABLE IF NOT EXISTS dim_inventory (
product_id SERIAL PRIMARY KEY,
product_name VARCHAR(255) NOT NULL,
quantity INT NOT NULL CHECK (quantity >= 0),
price DECIMAL(10, 2) NOT NULL CHECK (price >= 0)
);


CREATE TABLE IF NOT EXISTS dim_date (
date_id SERIAL PRIMARY KEY,
date DATE NOT NULL,
year INT NOT NULL,
quarter INT NOT NULL CHECK (quarter BETWEEN 1 AND 4),
month INT NOT NULL CHECK (month BETWEEN 1 AND 12),
day_of_week INT NOT NULL CHECK (day_of_week BETWEEN 1 AND 7),
week_of_year INT NOT NULL CHECK (week_of_year BETWEEN 1 AND 53),
UNIQUE (date)
);

-- Example insert statement for dim_date
-- You may want to generate these rows programmatically to cover a range of dates
INSERT INTO dim_date (date, year, quarter, month, day_of_week, week_of_year)
VALUES ('2024-01-01', 2024, 1, 1, 1, 1); -- Add more rows as needed


CREATE TABLE IF NOT EXISTS fact_sales (
sales_id SERIAL PRIMARY KEY,
customer_id INT NOT NULL,
product_id INT NOT NULL,
amount DECIMAL(10, 2) NOT NULL CHECK (amount > 0),
date_id INT NOT NULL,

-- You can add indexes for performance
INDEX (customer_id),
INDEX (product_id),
INDEX (date_id)
);
```

![](../_assets/etl_project_star_schema.webp)

##### Why Foreign Key Constraints Are Often Omitted in Star Schema Designs

In a star schema design, explicit relationships like foreign key constraints are often omitted to optimize performance and enhance flexibility. The absence of these constraints reduces overhead during bulk data loads and improves query performance by simplifying the schema. This denormalized approach is tailored for read-heavy environments, allowing for faster query response times and reduced complexity. Additionally, without foreign key constraints, the ETL (Extract, Transform, Load) process gains flexibility in handling data transformations and discrepancies, enabling smoother and more efficient data integration.

Managing data quality and consistency is typically handled at the ETL level rather than through database constraints. This approach provides greater control over data integrity and allows for easier schema evolution, accommodating changes in business requirements and data sources. By focusing on performance and flexibility, the star schema facilitates efficient data handling and reporting, aligning with the needs of analytical environments where rapid and reliable query access is essential.

#### Install Dependences

Ensure you have the necessary Python packages installed. You can install them using `pip`:

```bash
pip install pandas sqlalchemy psycopg2-binary mysql-connector-python
```

#### Extract

To encapsulate the extraction logic into a Python class, we'll create a class that handles connecting to the PostgreSQL database, executing queries, and storing the results. This approach promotes code reusability and maintainability.

```python
import pandas as pd
from sqlalchemy import create_engine

class DataExtractor:
def __init__(self, db_url):
"""
Initializes the DataExtractor with the database connection URL.
:param db_url: str, The connection URL for the PostgreSQL database.
"""
self.engine = create_engine(db_url)

def extract(self, query):
"""
Executes a SQL query and returns the results as a pandas DataFrame.
:param query: str, The SQL query to be executed.
:return: pd.DataFrame, The results of the query.
"""
try:
df = pd.read_sql(query, self.engine)
return df
except Exception as e:
print(f"An error occurred: {e}")
return None
```

**Example Usage:**

```python
# Create an instance of DataExtractor
extractor = DataExtractor('postgresql://user:password@localhost:5432/ecommerce_db')

# Extract sales data
sales_df = extractor.extract("SELECT * FROM sales")

# Check the results
if sales_df is not None:
print(sales_df.head())
else:
print("Failed to extract sales data.")
```

The `DataExtractor` class simplifies the process of connecting to a PostgreSQL database, executing queries, and retrieving data as pandas DataFrames. This encapsulated approach helps in maintaining cleaner code and managing database interactions effectively.

#### Transform

The following Python class, `DataTransformer`, encapsulates the core functionalities required for transforming data. This class will provide methods to perform various data transformation tasks, such as cleaning and aggregating data using pandas DataFrames.

```python
import pandas as pd

class DataTransformer:
def __init__(self, dataframe: pd.DataFrame):
"""
Initializes the DataTransformer with a pandas DataFrame.
:param dataframe: pd.DataFrame, The data to be transformed.
"""
self.data = dataframe

def clean_data(self):
"""
Cleans the data by handling missing values and removing duplicates.
:return: pd.DataFrame, The cleaned data.
"""
self.data.drop_duplicates(inplace=True)
self.data.fillna(0, inplace=True) # Fill missing values with 0
return self.data

def normalize_data(self):
"""
Normalizes the data by standardizing formats and units.
:return: pd.DataFrame, The normalized data.
"""
if 'date' in self.data.columns:
self.data['date'] = pd.to_datetime(self.data['date'], errors='coerce')

if 'amount' in self.data.columns:
self.data['amount'] = self.data['amount'].replace('[\$,]', '', regex=True).astype(float)

return self.data

def aggregate_data(self, group_by_column: str, aggregation_column: str, aggregation_func: str):
"""
Aggregates the data based on specified columns and function.
:param group_by_column: str, The column to group by.
:param aggregation_column: str, The column to aggregate.
:param aggregation_func: str, The aggregation function (e.g., 'sum', 'mean').
:return: pd.DataFrame, The aggregated data.
"""
if group_by_column not in self.data.columns or aggregation_column not in self.data.columns:
raise ValueError("The specified columns are not present in the DataFrame.")

aggregated_data = self.data.groupby(group_by_column).agg({aggregation_column: aggregation_func})
return aggregated_data.reset_index()

```

#### Load

To complete the ETL process, the Load phase involves loading the transformed data into a data warehouse. In this example, we will use MySQL as the data warehouse and perform the loading process using Python. We will use the `pandas` library along with `SQLAlchemy` to accomplish this.

```python
import pandas as pd
from sqlalchemy import create_engine

class DataLoader:
def __init__(self, db_url: str):
"""
Initializes the DataLoader with the data warehouse connection URL.
:param db_url: str, The connection URL for the MySQL database.
"""
self.engine = create_engine(db_url)

def load(self, dataframe: pd.DataFrame, table_name: str, if_exists: str = 'replace') -> None:
"""
Loads a DataFrame into a MySQL table.
:param dataframe: pd.DataFrame, The data to be loaded.
:param table_name: str, The name of the target table in the MySQL database.
:param if_exists: str, What to do if the table already exists ('fail', 'replace', 'append').
"""
try:
dataframe.to_sql(table_name, self.engine, index=False, if_exists=if_exists)
print(f"Data successfully loaded into table '{table_name}'.")
except Exception as e:
print(f"An error occurred while loading data: {e}")
```

**Load Data Using the `DataLoader` Class:**

- **Define MySQL Connection URL:**

```python
mysql_db_url = 'mysql+mysqlconnector://user:password@localhost:3306/datawarehouse_db'
```

- **Create an Instance of `DataLoader`:**

```python
# Create an instance of DataLoader
loader = DataLoader(mysql_db_url)
```

- **Load Transformed Data:**

```python
# Assuming `aggregated_sales_df` is the transformed DataFrame to be loaded
loader.load(aggregated_sales_df, 'aggregated_sales')
```

#### Logging and Monitoring

**Monitoring** and **logging** are critical aspects of managing ETL (Extract, Transform, Load) pipelines. They help track the pipeline’s performance, detect issues, and ensure data integrity throughout the ETL process. Proper monitoring and logging provide valuable insights into the pipeline's operations and assist in troubleshooting and maintaining the system.

##### Overview of Monitoring and Logging

- **Monitoring:** Refers to continuously checking the health and performance of the ETL pipeline. It involves tracking metrics like data volume processed, pipeline execution time, error rates, and resource usage. Monitoring helps identify bottlenecks and potential failures early.

- **Logging:** Involves recording detailed information about the ETL pipeline’s operations, such as data extraction, transformation, and loading steps. Logs capture both informational and error messages, which are essential for debugging and auditing purposes.

##### Implementation Logging in the ETL Pipeline

In the provided ETL pipeline code, logging is implemented using Python's built-in `logging` module. This implementation ensures that relevant information is captured both on the console and in a log file for review and analysis.

**Logging Configuration**

- **File and Console Handlers:** The logging configuration includes a `StreamHandler` for console output and a `FileHandler` for logging to a file named `etl_pipeline.log`.
- **Log Levels and Format:** Both handlers are set to log messages at the `INFO` level and above. A consistent format (`%(asctime)s - %(levelname)s - %(message)s`) is used for log entries to include the timestamp, log level, and message.

```python
# logging_config.py
import logging

class LoggingConfig:
@staticmethod
def setup_logging(path):
# Create handlers
console_handler = logging.StreamHandler()
file_handler = logging.FileHandler(path)

# Set logs levels
console_handler.setLevel(logging.INFO)
file_handler.setLevel(logging.INFO)

# Create a formatter and set it for handlers
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)

# Configure the root logger
logging.basicConfig(level=logging.INFO, handlers=[console_handler, file_handler])
```

By integrating logging into each phase of the ETL pipeline—extraction, transformation, and loading—this implementation provides visibility into the pipeline’s execution and helps diagnose issues when they arise. The logging setup ensures that detailed information is available for both real-time monitoring on the console and historical review in log files. This approach not only facilitates debugging and troubleshooting but also helps in maintaining and improving the ETL pipeline over time.

### Summary

This ETL process extracts data from a PostgreSQL database, transforms it to ensure quality and consistency, and loads it into a MySQL data warehouse. The entire process is managed with Python and SQL, and Docker Compose is used to deploy and manage the database services.

---

Let me know if you need any further details or adjustments!
Loading

0 comments on commit 23cc933

Please sign in to comment.