diff --git a/01-PythonBasics/05-04-DataStructure(dictionaries).md b/01-PythonBasics/05-04-DataStructure(dictionaries).md index 53d443f..bd778ef 100644 --- a/01-PythonBasics/05-04-DataStructure(dictionaries).md +++ b/01-PythonBasics/05-04-DataStructure(dictionaries).md @@ -222,7 +222,7 @@ Dictionaries are versatile data structures in Python, offering efficient lookup # Storing application configuration settings config = { "database": {"host": "localhost", "port": 3306, "username": "admin", "password": "password"}, - "logging": {"level": "DEBUG", "file": "app.log"} + "logging": {"level": "DEBUG", "file": "app.logs"} } # Accessing database connection details diff --git a/01-PythonBasics/08-ErrorHandling.md b/01-PythonBasics/08-ErrorHandling.md index caf5cd9..5c00b04 100644 --- a/01-PythonBasics/08-ErrorHandling.md +++ b/01-PythonBasics/08-ErrorHandling.md @@ -282,7 +282,7 @@ Effective error handling involves not only using the right techniques but also f ```python import logging - logging.basicConfig(filename='app.log', level=logging.ERROR) + logging.basicConfig(filename='app.logs', level=logging.ERROR) try: result = 10 / 0 diff --git a/03-GitAndGitHub/05-GitBasics.md b/03-GitAndGitHub/05-GitBasics.md index 7700751..451d663 100644 --- a/03-GitAndGitHub/05-GitBasics.md +++ b/03-GitAndGitHub/05-GitBasics.md @@ -87,7 +87,7 @@ Finally, let's view the commit history to see the changes we've made. 1. **Viewing Commit History**: - Use the `git log` command to see a log of all the commits made to the repository. ```sh - git log + git logs ``` - This command shows a list of commits, including their commit hashes, author information, date, and commit messages. This history is crucial for tracking the development of your project over time. diff --git a/05-WorkingWithDataSourcesAndSerialization/07-CsvFiles.md b/05-WorkingWithDataSourcesAndSerialization/07-CsvFiles.md index fb97b0e..436c58f 100644 --- a/05-WorkingWithDataSourcesAndSerialization/07-CsvFiles.md +++ b/05-WorkingWithDataSourcesAndSerialization/07-CsvFiles.md @@ -489,7 +489,7 @@ Implement comprehensive error handling to log and manage exceptions, providing m ```python import logging -logging.basicConfig(filename='data_processing.log', level=logging.ERROR) +logging.basicConfig(filename='data_processing.logs', level=logging.ERROR) def process_csv(file_path): with open(file_path, 'r', encoding='utf-8') as file: diff --git a/05-WorkingWithDataSourcesAndSerialization/13-ParquetAndPyArrow.md b/05-WorkingWithDataSourcesAndSerialization/13-ParquetAndPyArrow.md index 0474923..01d5aa9 100644 --- a/05-WorkingWithDataSourcesAndSerialization/13-ParquetAndPyArrow.md +++ b/05-WorkingWithDataSourcesAndSerialization/13-ParquetAndPyArrow.md @@ -2503,8 +2503,8 @@ import csv import random import datetime -# Configuration for the log file -num_records = 10000 # Number of log records to generate +# Configuration for the logs file +num_records = 10000 # Number of logs records to generate start_date = datetime.datetime(2024, 1, 1) # Start date for the logs end_date = datetime.datetime(2024, 1, 10) # End date for the logs @@ -2521,14 +2521,14 @@ def generate_random_timestamp(start, end): seconds=random.randint(0, int((end - start).total_seconds())) ) -# Function to generate a single log record +# Function to generate a single logs record def generate_log_record(): timestamp = generate_random_timestamp(start_date, end_date).strftime("%Y-%m-%d %H:%M:%S") user_id = random.choice(user_ids) page_url = random.choice(page_urls) return [timestamp, user_id, page_url] -# Generate the log file +# Generate the logs file with open("large_log_file.csv", "w", newline="") as csvfile: writer = csv.writer(csvfile) writer.writerow(["timestamp", "user_id", "page_url"]) # Write header @@ -2545,14 +2545,14 @@ import pyarrow as pa import pandas as pd import pyarrow.csv as csv -# Define the schema for the log file +# Define the schema for the logs file schema = pa.schema([ ('timestamp', pa.timestamp('ms')), ('user_id', pa.string()), ('page_url', pa.string()) ]) -# Function to read log file in batches +# Function to read logs file in batches def read_log_file_in_batches(file_path, batch_size): read_options = csv.ReadOptions(block_size=batch_size) convert_options = csv.ConvertOptions(column_types=schema) diff --git a/08-BuildingDataPipelines/02-ETL.md b/08-BuildingDataPipelines/02-ETL.md index 139ed5f..716d932 100644 --- a/08-BuildingDataPipelines/02-ETL.md +++ b/08-BuildingDataPipelines/02-ETL.md @@ -469,6 +469,8 @@ In this project, we aim to implement an ETL (Extract, Transform, Load) process f **Objective:** Build an ETL pipeline to manage and analyze data from an e-commerce platform. The data includes sales transactions, customer interactions, and inventory information. This pipeline will help in generating aggregated insights and reports for business intelligence. +![](/lab/learning_projects/PythonForDataEngineering/_assets/etl_project_overview.webp) + **Data Sources:** - **Sales Data:** Stored in a PostgreSQL database, including transaction details such as sales amount, customer ID, product ID, and transaction date. @@ -586,7 +588,7 @@ docker-compose up -d **1. Connect to PostgreSQL** -You can connect to PostgreSQL using a tool like `psql`, or any SQL client that supports PostgreSQL. +You can connect to PostgreSQL using a tool like `psql`, `dbeaver`or any SQL client that supports PostgreSQL. **2. Create Database Schema** @@ -626,5 +628,294 @@ CREATE TABLE sales ( -![](./Examples/simple_ecommerce_etl/images/sql_schema.png) +![](Examples/images/sql_schema.png) + +#### Creating data warehouse schema: + +**1. Connect to MySQL** + +You can connect to MySQL using a tool like `mysql workbench`, `dbeaver`, or any SQL client that supports PostgreSQL. + +**2. Create Database Schema** + +Use the following SQL statements to create the data warehouse schema: + +``` sql +CREATE DATABASE IF NOT EXISTS data_warehouse; + +CREATE TABLE IF NOT EXISTS dim_customer ( + customer_id SERIAL PRIMARY KEY, + name VARCHAR(255) NOT NULL, + email VARCHAR(255) UNIQUE NOT NULL, + join_date DATE NOT NULL +); + + +CREATE TABLE IF NOT EXISTS dim_inventory ( + product_id SERIAL PRIMARY KEY, + product_name VARCHAR(255) NOT NULL, + quantity INT NOT NULL CHECK (quantity >= 0), + price DECIMAL(10, 2) NOT NULL CHECK (price >= 0) +); + + +CREATE TABLE IF NOT EXISTS dim_date ( + date_id SERIAL PRIMARY KEY, + date DATE NOT NULL, + year INT NOT NULL, + quarter INT NOT NULL CHECK (quarter BETWEEN 1 AND 4), + month INT NOT NULL CHECK (month BETWEEN 1 AND 12), + day_of_week INT NOT NULL CHECK (day_of_week BETWEEN 1 AND 7), + week_of_year INT NOT NULL CHECK (week_of_year BETWEEN 1 AND 53), + UNIQUE (date) +); + +-- Example insert statement for dim_date +-- You may want to generate these rows programmatically to cover a range of dates +INSERT INTO dim_date (date, year, quarter, month, day_of_week, week_of_year) +VALUES ('2024-01-01', 2024, 1, 1, 1, 1); -- Add more rows as needed + + +CREATE TABLE IF NOT EXISTS fact_sales ( + sales_id SERIAL PRIMARY KEY, + customer_id INT NOT NULL, + product_id INT NOT NULL, + amount DECIMAL(10, 2) NOT NULL CHECK (amount > 0), + date_id INT NOT NULL, + + -- You can add indexes for performance + INDEX (customer_id), + INDEX (product_id), + INDEX (date_id) +); +``` + +![](../_assets/etl_project_star_schema.webp) + +##### Why Foreign Key Constraints Are Often Omitted in Star Schema Designs + +In a star schema design, explicit relationships like foreign key constraints are often omitted to optimize performance and enhance flexibility. The absence of these constraints reduces overhead during bulk data loads and improves query performance by simplifying the schema. This denormalized approach is tailored for read-heavy environments, allowing for faster query response times and reduced complexity. Additionally, without foreign key constraints, the ETL (Extract, Transform, Load) process gains flexibility in handling data transformations and discrepancies, enabling smoother and more efficient data integration. + +Managing data quality and consistency is typically handled at the ETL level rather than through database constraints. This approach provides greater control over data integrity and allows for easier schema evolution, accommodating changes in business requirements and data sources. By focusing on performance and flexibility, the star schema facilitates efficient data handling and reporting, aligning with the needs of analytical environments where rapid and reliable query access is essential. + +#### Install Dependences + +Ensure you have the necessary Python packages installed. You can install them using `pip`: + +```bash +pip install pandas sqlalchemy psycopg2-binary mysql-connector-python +``` + +#### Extract + +To encapsulate the extraction logic into a Python class, we'll create a class that handles connecting to the PostgreSQL database, executing queries, and storing the results. This approach promotes code reusability and maintainability. + +```python +import pandas as pd +from sqlalchemy import create_engine + +class DataExtractor: + def __init__(self, db_url): + """ + Initializes the DataExtractor with the database connection URL. + + :param db_url: str, The connection URL for the PostgreSQL database. + """ + self.engine = create_engine(db_url) + + def extract(self, query): + """ + Executes a SQL query and returns the results as a pandas DataFrame. + + :param query: str, The SQL query to be executed. + :return: pd.DataFrame, The results of the query. + """ + try: + df = pd.read_sql(query, self.engine) + return df + except Exception as e: + print(f"An error occurred: {e}") + return None +``` + +**Example Usage:** + +```python +# Create an instance of DataExtractor +extractor = DataExtractor('postgresql://user:password@localhost:5432/ecommerce_db') + +# Extract sales data +sales_df = extractor.extract("SELECT * FROM sales") + +# Check the results +if sales_df is not None: + print(sales_df.head()) +else: + print("Failed to extract sales data.") +``` + +The `DataExtractor` class simplifies the process of connecting to a PostgreSQL database, executing queries, and retrieving data as pandas DataFrames. This encapsulated approach helps in maintaining cleaner code and managing database interactions effectively. + +#### Transform + +The following Python class, `DataTransformer`, encapsulates the core functionalities required for transforming data. This class will provide methods to perform various data transformation tasks, such as cleaning and aggregating data using pandas DataFrames. + +```python +import pandas as pd + +class DataTransformer: + def __init__(self, dataframe: pd.DataFrame): + """ + Initializes the DataTransformer with a pandas DataFrame. + + :param dataframe: pd.DataFrame, The data to be transformed. + """ + self.data = dataframe + + def clean_data(self): + """ + Cleans the data by handling missing values and removing duplicates. + + :return: pd.DataFrame, The cleaned data. + """ + self.data.drop_duplicates(inplace=True) + self.data.fillna(0, inplace=True) # Fill missing values with 0 + return self.data + + def normalize_data(self): + """ + Normalizes the data by standardizing formats and units. + + :return: pd.DataFrame, The normalized data. + """ + if 'date' in self.data.columns: + self.data['date'] = pd.to_datetime(self.data['date'], errors='coerce') + + if 'amount' in self.data.columns: + self.data['amount'] = self.data['amount'].replace('[\$,]', '', regex=True).astype(float) + + return self.data + + def aggregate_data(self, group_by_column: str, aggregation_column: str, aggregation_func: str): + """ + Aggregates the data based on specified columns and function. + + :param group_by_column: str, The column to group by. + :param aggregation_column: str, The column to aggregate. + :param aggregation_func: str, The aggregation function (e.g., 'sum', 'mean'). + :return: pd.DataFrame, The aggregated data. + """ + if group_by_column not in self.data.columns or aggregation_column not in self.data.columns: + raise ValueError("The specified columns are not present in the DataFrame.") + + aggregated_data = self.data.groupby(group_by_column).agg({aggregation_column: aggregation_func}) + return aggregated_data.reset_index() + +``` + +#### Load + +To complete the ETL process, the Load phase involves loading the transformed data into a data warehouse. In this example, we will use MySQL as the data warehouse and perform the loading process using Python. We will use the `pandas` library along with `SQLAlchemy` to accomplish this. + +```python +import pandas as pd +from sqlalchemy import create_engine + +class DataLoader: + def __init__(self, db_url: str): + """ + Initializes the DataLoader with the data warehouse connection URL. + + :param db_url: str, The connection URL for the MySQL database. + """ + self.engine = create_engine(db_url) + + def load(self, dataframe: pd.DataFrame, table_name: str, if_exists: str = 'replace') -> None: + """ + Loads a DataFrame into a MySQL table. + + :param dataframe: pd.DataFrame, The data to be loaded. + :param table_name: str, The name of the target table in the MySQL database. + :param if_exists: str, What to do if the table already exists ('fail', 'replace', 'append'). + """ + try: + dataframe.to_sql(table_name, self.engine, index=False, if_exists=if_exists) + print(f"Data successfully loaded into table '{table_name}'.") + except Exception as e: + print(f"An error occurred while loading data: {e}") +``` + +**Load Data Using the `DataLoader` Class:** + +- **Define MySQL Connection URL:** + +```python +mysql_db_url = 'mysql+mysqlconnector://user:password@localhost:3306/datawarehouse_db' +``` + +- **Create an Instance of `DataLoader`:** + +```python +# Create an instance of DataLoader +loader = DataLoader(mysql_db_url) +``` + +- **Load Transformed Data:** + +```python +# Assuming `aggregated_sales_df` is the transformed DataFrame to be loaded +loader.load(aggregated_sales_df, 'aggregated_sales') +``` + +#### Logging and Monitoring + +**Monitoring** and **logging** are critical aspects of managing ETL (Extract, Transform, Load) pipelines. They help track the pipeline’s performance, detect issues, and ensure data integrity throughout the ETL process. Proper monitoring and logging provide valuable insights into the pipeline's operations and assist in troubleshooting and maintaining the system. + +##### Overview of Monitoring and Logging + +- **Monitoring:** Refers to continuously checking the health and performance of the ETL pipeline. It involves tracking metrics like data volume processed, pipeline execution time, error rates, and resource usage. Monitoring helps identify bottlenecks and potential failures early. + +- **Logging:** Involves recording detailed information about the ETL pipeline’s operations, such as data extraction, transformation, and loading steps. Logs capture both informational and error messages, which are essential for debugging and auditing purposes. + +##### Implementation Logging in the ETL Pipeline + +In the provided ETL pipeline code, logging is implemented using Python's built-in `logging` module. This implementation ensures that relevant information is captured both on the console and in a log file for review and analysis. + +**Logging Configuration** + +- **File and Console Handlers:** The logging configuration includes a `StreamHandler` for console output and a `FileHandler` for logging to a file named `etl_pipeline.log`. +- **Log Levels and Format:** Both handlers are set to log messages at the `INFO` level and above. A consistent format (`%(asctime)s - %(levelname)s - %(message)s`) is used for log entries to include the timestamp, log level, and message. + +```python +# logging_config.py +import logging + +class LoggingConfig: + @staticmethod + def setup_logging(path): + # Create handlers + console_handler = logging.StreamHandler() + file_handler = logging.FileHandler(path) + + # Set logs levels + console_handler.setLevel(logging.INFO) + file_handler.setLevel(logging.INFO) + + # Create a formatter and set it for handlers + formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + console_handler.setFormatter(formatter) + file_handler.setFormatter(formatter) + + # Configure the root logger + logging.basicConfig(level=logging.INFO, handlers=[console_handler, file_handler]) +``` + +By integrating logging into each phase of the ETL pipeline—extraction, transformation, and loading—this implementation provides visibility into the pipeline’s execution and helps diagnose issues when they arise. The logging setup ensures that detailed information is available for both real-time monitoring on the console and historical review in log files. This approach not only facilitates debugging and troubleshooting but also helps in maintaining and improving the ETL pipeline over time. + +### Summary + +This ETL process extracts data from a PostgreSQL database, transforms it to ensure quality and consistency, and loads it into a MySQL data warehouse. The entire process is managed with Python and SQL, and Docker Compose is used to deploy and manage the database services. + +--- +Let me know if you need any further details or adjustments! diff --git a/08-BuildingDataPipelines/Examples/simple_ecommerce_etl/images/sql_schema.png b/08-BuildingDataPipelines/Examples/images/sql_schema.png similarity index 100% rename from 08-BuildingDataPipelines/Examples/simple_ecommerce_etl/images/sql_schema.png rename to 08-BuildingDataPipelines/Examples/images/sql_schema.png diff --git a/08-BuildingDataPipelines/Examples/simple_ecommerce_etl/scripts/mysql_dw.sql b/08-BuildingDataPipelines/Examples/simple_ecommerce_etl/scripts/mysql_dw.sql new file mode 100644 index 0000000..5853fc1 --- /dev/null +++ b/08-BuildingDataPipelines/Examples/simple_ecommerce_etl/scripts/mysql_dw.sql @@ -0,0 +1,46 @@ +CREATE DATABASE IF NOT EXISTS data_warehouse; + +CREATE TABLE IF NOT EXISTS dim_customer ( + customer_id SERIAL PRIMARY KEY, + name VARCHAR(255) NOT NULL, + email VARCHAR(255) UNIQUE NOT NULL, + join_date DATE NOT NULL +); + + +CREATE TABLE IF NOT EXISTS dim_inventory ( + product_id SERIAL PRIMARY KEY, + product_name VARCHAR(255) NOT NULL, + quantity INT NOT NULL CHECK (quantity >= 0), + price DECIMAL(10, 2) NOT NULL CHECK (price >= 0) +); + + +CREATE TABLE IF NOT EXISTS dim_date ( + date_id SERIAL PRIMARY KEY, + date DATE NOT NULL, + year INT NOT NULL, + quarter INT NOT NULL CHECK (quarter BETWEEN 1 AND 4), + month INT NOT NULL CHECK (month BETWEEN 1 AND 12), + day_of_week INT NOT NULL CHECK (day_of_week BETWEEN 1 AND 7), + week_of_year INT NOT NULL CHECK (week_of_year BETWEEN 1 AND 53), + UNIQUE (date) +); + +-- Example insert statement for dim_date +-- You may want to generate these rows programmatically to cover a range of dates +INSERT INTO dim_date (date, year, quarter, month, day_of_week, week_of_year) +VALUES ('2024-01-01', 2024, 1, 1, 1, 1); -- Add more rows as needed + + +CREATE TABLE IF NOT EXISTS fact_sales ( + sales_id SERIAL PRIMARY KEY, + customer_id INT NOT NULL, + product_id INT NOT NULL, + amount DECIMAL(10, 2) NOT NULL CHECK (amount > 0), + amount_usd DECIMAL(10, 2) NOT NULL CHECK (amount_usd > 0), + date DATETIME NOT NULL, + -- You can add indexes for performance + INDEX (customer_id), + INDEX (product_id) +); diff --git a/08-BuildingDataPipelines/Examples/simple_ecommerce_etl/scripts/postgres_db.sql b/08-BuildingDataPipelines/Examples/simple_ecommerce_etl/scripts/postgres_db.sql new file mode 100644 index 0000000..2570532 --- /dev/null +++ b/08-BuildingDataPipelines/Examples/simple_ecommerce_etl/scripts/postgres_db.sql @@ -0,0 +1,58 @@ +-- CREATE DATABASE ecommerce_db; + +-- Create the 'customers' table +CREATE TABLE IF NOT EXISTS customers ( + customer_id SERIAL PRIMARY KEY, + name VARCHAR(255) NOT NULL, + email VARCHAR(255) UNIQUE NOT NULL, + join_date DATE NOT NULL, + CHECK (join_date <= CURRENT_DATE) +); + +CREATE TABLE IF NOT EXISTS inventory ( + product_id SERIAL PRIMARY KEY, + product_name VARCHAR(255) NOT NULL, + quantity INT NOT NULL CHECK (quantity >= 0), + price DECIMAL(10, 2) NOT NULL CHECK (price >= 0) +); + +-- Create the 'sales' table +CREATE TABLE IF NOT EXISTS sales ( + sales_id SERIAL PRIMARY KEY, + customer_id INT NOT NULL, + product_id INT NOT NULL, + amount DECIMAL(10, 2) NOT NULL CHECK (amount > 0), + date DATE NOT NULL, + FOREIGN KEY (customer_id) REFERENCES customers(customer_id) ON DELETE CASCADE, + FOREIGN KEY (product_id) REFERENCES inventory(product_id) ON DELETE CASCADE +); + + +-- Insert sample data into the 'customers' table +INSERT INTO customers (name, email, join_date) VALUES +('Alice Johnson', 'alice.johnson@example.com', '2024-01-15'), +('Bob Smith', 'bob.smith@example.com', '2024-02-20'), +('Charlie Brown', 'charlie.brown@example.com', '2024-03-10'), +('Diana Prince', 'diana.prince@example.com', '2024-04-05'), +('Eve Adams', 'eve.adams@example.com', '2024-05-25'); + +-- Insert sample data into the 'inventory' table +INSERT INTO inventory (product_name, quantity, price) VALUES +('Widget A', 100, 19.99), +('Widget B', 150, 29.99), +('Widget C', 200, 39.99), +('Widget D', 120, 49.99), +('Widget E', 180, 59.99); + +-- Insert sample data into the 'sales' table +INSERT INTO sales (customer_id, product_id, amount, date) VALUES +(1, 1, 19.99, '2024-01-16'), +(2, 2, 29.99, '2024-02-22'), +(3, 3, 39.99, '2024-03-12'), +(4, 4, 49.99, '2024-04-07'), +(5, 5, 59.99, '2024-05-30'), +(1, 2, 29.99, '2024-01-18'), +(2, 3, 39.99, '2024-02-25'), +(3, 4, 49.99, '2024-03-15'), +(4, 5, 59.99, '2024-04-10'), +(5, 1, 19.99, '2024-05-28'); diff --git a/08-BuildingDataPipelines/Examples/simple_ecommerce_etl/src/__pycache__/config.cpython-311.pyc b/08-BuildingDataPipelines/Examples/simple_ecommerce_etl/src/__pycache__/config.cpython-311.pyc new file mode 100644 index 0000000..f133c9f Binary files /dev/null and b/08-BuildingDataPipelines/Examples/simple_ecommerce_etl/src/__pycache__/config.cpython-311.pyc differ diff --git a/08-BuildingDataPipelines/Examples/simple_ecommerce_etl/src/__pycache__/logging_config.cpython-311.pyc b/08-BuildingDataPipelines/Examples/simple_ecommerce_etl/src/__pycache__/logging_config.cpython-311.pyc new file mode 100644 index 0000000..a418c1a Binary files /dev/null and b/08-BuildingDataPipelines/Examples/simple_ecommerce_etl/src/__pycache__/logging_config.cpython-311.pyc differ diff --git a/08-BuildingDataPipelines/Examples/simple_ecommerce_etl/src/config.py b/08-BuildingDataPipelines/Examples/simple_ecommerce_etl/src/config.py new file mode 100644 index 0000000..3f44c67 --- /dev/null +++ b/08-BuildingDataPipelines/Examples/simple_ecommerce_etl/src/config.py @@ -0,0 +1,15 @@ +POSTGRESQL_CONFIG = { + 'host': 'localhost', + 'port': '5433', + 'dbname': 'ecommerce_db', + 'user': 'postgres', + 'password': 'postgres' +} + +MYSQL_CONFIG = { + 'host': 'localhost', + 'port': '3306', + 'dbname': 'data_warehouse', + 'user': 'mysql', + 'password': 'mysql' +} diff --git a/08-BuildingDataPipelines/Examples/simple_ecommerce_etl/src/logging_config.py b/08-BuildingDataPipelines/Examples/simple_ecommerce_etl/src/logging_config.py new file mode 100644 index 0000000..6aef331 --- /dev/null +++ b/08-BuildingDataPipelines/Examples/simple_ecommerce_etl/src/logging_config.py @@ -0,0 +1,22 @@ +# logging_config.py +import logging + + +class LoggingConfig: + @staticmethod + def setup_logging(path): + # Create handlers + console_handler = logging.StreamHandler() + file_handler = logging.FileHandler(path) + + # Set logs levels + console_handler.setLevel(logging.INFO) + file_handler.setLevel(logging.INFO) + + # Create a formatter and set it for handlers + formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + console_handler.setFormatter(formatter) + file_handler.setFormatter(formatter) + + # Configure the root logger + logging.basicConfig(level=logging.INFO, handlers=[console_handler, file_handler]) diff --git a/08-BuildingDataPipelines/Examples/simple_ecommerce_etl/src/pipeline.py b/08-BuildingDataPipelines/Examples/simple_ecommerce_etl/src/pipeline.py new file mode 100644 index 0000000..88e9d5c --- /dev/null +++ b/08-BuildingDataPipelines/Examples/simple_ecommerce_etl/src/pipeline.py @@ -0,0 +1,122 @@ +import logging +import pandas as pd +from sqlalchemy import create_engine +from config import POSTGRESQL_CONFIG, MYSQL_CONFIG +from logging_config import LoggingConfig + +# Set up logging +LoggingConfig.setup_logging('../logs/etl_pipeline.logs') + + +class Extractor: + def __init__(self, config): + self.engine = create_engine( + f"postgresql://{config['user']}:{config['password']}@{config['host']}:{config['port']}/{config['dbname']}") + + def extract_data(self, query): + try: + logging.info("Starting data extraction") + data = pd.read_sql_query(query, self.engine) + logging.info("Data extraction completed successfully") + return data + except Exception as e: + logging.error(f"Data extraction failed: {str(e)}") + print(f"Error during extraction: {str(e)}") + raise + + +class Transformer: + def transform_data(self, data): + try: + logging.info("Starting data transformation") + + # Convert 'date' column to datetime if it's not already + if not pd.api.types.is_datetime64_any_dtype(data['date']): + data['date'] = pd.to_datetime(data['date']) + + # Example transformation: Convert amount to USD + data['amount_usd'] = data['amount'] * 1.1 + + # Create the 'dim_date' dataframe + data['year'] = data['date'].dt.year + data['month'] = data['date'].dt.month + data['day'] = data['date'].dt.day + data['quarter'] = data['date'].dt.quarter + data['day_of_week'] = data['date'].dt.dayofweek + 1 + data['week_of_year'] = data['date'].dt.isocalendar().week + dim_dates = data[['date', 'year', 'quarter', 'month', 'day_of_week', 'week_of_year']].drop_duplicates() + dim_dates = dim_dates.rename(columns={'date': 'date'}) + + # Create 'dim_customers' dataframe + dim_customers = data[['customer_id', 'name', 'email', 'join_date']].drop_duplicates() + + # Create 'dim_inventory' dataframe + dim_inventory = data[['product_id', 'product_name', 'quantity', 'price']].drop_duplicates() + + # Create 'fact_sales' dataframe + fact_sales = data[['sales_id', 'customer_id', 'product_id', 'amount', 'date', 'amount_usd']] + fact_sales = fact_sales.rename(columns={'date': 'date'}) + + logging.info("Data transformation completed successfully") + return fact_sales, dim_customers, dim_inventory, dim_dates + except Exception as e: + logging.error(f"Data transformation failed: {str(e)}") + print(f"Error during transformation: {str(e)}") + raise + + +class Loader: + def __init__(self, config): + self.engine = create_engine( + f"mysql+mysqlconnector://{config['user']}:{config['password']}@{config['host']}:{config['port']}/{config['dbname']}") + + def load_data(self, data, table_name): + try: + logging.info(f"Starting data load into {table_name}") + data.to_sql(table_name, self.engine, if_exists='append', index=False) + logging.info(f"Data load into {table_name} completed successfully") + except Exception as e: + logging.error(f"Data load into {table_name} failed: {str(e)}") + print(f"Error during loading into {table_name}: {str(e)}") + raise + + +class ETL: + def __init__(self, extract_query): + self.extractor = Extractor(POSTGRESQL_CONFIG) + self.transformer = Transformer() + self.loader = Loader(MYSQL_CONFIG) + self.extract_query = extract_query + + def run(self): + try: + data = self.extractor.extract_data(self.extract_query) + fact_sales, dim_customers, dim_inventory, dim_dates = self.transformer.transform_data(data) + + # Load dimension tables + self.loader.load_data(dim_customers, 'dim_customer') + self.loader.load_data(dim_inventory, 'dim_inventory') + self.loader.load_data(dim_dates, 'dim_date') + + # Load fact table + self.loader.load_data(fact_sales, 'fact_sales') + + logging.info("ETL pipeline executed successfully") + print("ETL pipeline executed successfully") + except Exception as e: + logging.error(f"ETL pipeline failed: {str(e)}") + print(f"ETL pipeline failed: {str(e)}") + + +if __name__ == "__main__": + # Define your ETL process + extract_query = """ + SELECT s.sales_id, s.customer_id, s.product_id, s.amount, s.date, c.name, c.email, c.join_date, p.product_name, p.quantity, p.price + FROM sales s + JOIN customers c ON s.customer_id = c.customer_id + JOIN inventory p ON s.product_id = p.product_id; + """ + + # Create and run the ETL process + etl_pipeline = ETL(extract_query) + etl_pipeline.run() diff --git a/_assets/etl_diagrams.webp b/_assets/etl_diagrams.webp index 1889ce3..683619b 100644 Binary files a/_assets/etl_diagrams.webp and b/_assets/etl_diagrams.webp differ diff --git a/_assets/etl_project_overview.webp b/_assets/etl_project_overview.webp new file mode 100644 index 0000000..7800535 Binary files /dev/null and b/_assets/etl_project_overview.webp differ diff --git a/_assets/etl_project_star_schema.webp b/_assets/etl_project_star_schema.webp new file mode 100644 index 0000000..4bd0587 Binary files /dev/null and b/_assets/etl_project_star_schema.webp differ