Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ods-replication-pg2pg #1

Merged
merged 12 commits into from
Apr 18, 2024
65 changes: 65 additions & 0 deletions .github/workflows/docker-pg2pg.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
name: Push to GHCR

on:
push:
branches: [ "ods-replication-pg2pg" ]

env:
# DF-NOTE: pull ghcr.io/bcgov/nr-dap-ods-trino:main
REGISTRY: ghcr.io
DOCKERFILE_PATH: shared/ods_replication_pg2pg
IMAGE_NAME: ${{ github.repository }}-pg2pg

jobs:
build:
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
id-token: write

steps:
- name: Checkout repository
uses: actions/checkout@v3

- name: Install cosign
if: github.event_name != 'pull_request'
uses: sigstore/cosign-installer@v3.3.0

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@f95db51fddba0c2d1ec667646a06c2ce06100226 # v3.0.0

- name: Log into registry ${{ env.REGISTRY }}
if: github.event_name != 'pull_request'
uses: docker/login-action@343f7c4344506bcbf9b4de18042ae17996df046d # v3.0.0
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}

- name: Extract Docker metadata
id: meta
uses: docker/metadata-action@96383f45573cb7f253c731d3b3ab81c87ef81934 # v5.0.0
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}

- name: Build and push Docker image
id: build-and-push
uses: docker/build-push-action@0565240e2d4ab88bba5387d719585280857ece09 # v5.0.0
with:
# DF-NOTE: to help the action find the Dockerfile to build from
context: ${{ env.DOCKERFILE_PATH }}/
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max

- name: Sign the published Docker image
if: ${{ github.event_name != 'pull_request' }}
env:
# https://docs.github.com/en/actions/security-guides/security-hardening-for-github-actions#using-an-intermediate-environment-variable
TAGS: ${{ steps.meta.outputs.tags }}
DIGEST: ${{ steps.build-and-push.outputs.digest }}

run: echo "${TAGS}" | xargs -I {} cosign sign --yes {}@${DIGEST}
17 changes: 17 additions & 0 deletions shared/ods_replication_pg2pg/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
FROM python:3.11.4-slim-buster

WORKDIR /app

# PostgreSQL library

RUN apt-get update \
&& apt-get -y install libpq-dev gcc \
&& pip install psycopg2

ADD *.py .

COPY requirements.txt requirements.txt

RUN pip3 install -r requirements.txt

CMD ["python3", "./data_replication_pg2pg.py"]
295 changes: 295 additions & 0 deletions shared/ods_replication_pg2pg/data_replication_pg2pg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,295 @@
#!/usr/bin/env python
# coding: utf-8

# In[1]: Imports
# refer if block at line 38, some imports are conditional
import psycopg2
import psycopg2.pool
import psycopg2.extras
from psycopg2.extras import execute_batch
import configparser
import time
import json
import concurrent.futures
from datetime import datetime
import sys
import os
import argparse


start = time.time()

# In[3]: Retrieve Oracle database configuration
src_postgres_username = os.environ['DB_USERNAME']
src_postgres_password = os.environ['DB_PASSWORD']
src_postgres_host = os.environ['DB_HOST']
src_postgres_port = os.environ['DB_PORT']
src_postgres_database = os.environ['DATABASE']
# In[4]: Retrieve Postgres database configuration
postgres_username = os.environ['ODS_USERNAME']
postgres_password = os.environ['ODS_PASSWORD']
postgres_host = os.environ['ODS_HOST']
postgres_port = os.environ['ODS_PORT']
postgres_database = os.environ['ODS_DATABASE']
# In[5]: Script parameters
mstr_schema = os.environ['MSTR_SCHEMA']
app_name = os.environ['APP_NAME']
concurrent_tasks = int(os.environ['CONCUR_TASKS'])
audit_table = 'audit_batch_status'
current_date = datetime.now().strftime('%Y-%m-%d')

#concurrent_tasks = int(concurrent_tasks)
#In[5]: Concurrent tasks - number of tables to be replicated in parallel
#concurrent_tasks = 5

# In[6]: Set up Oracle connection pool
# In[7]: Setup Postgres Pool

try:
SrcPgresPool = psycopg2.pool.ThreadedConnectionPool(
minconn=concurrent_tasks,
maxconn=concurrent_tasks,
host=src_postgres_host,
port=src_postgres_port,
dbname=src_postgres_database,
user=src_postgres_username,
password=src_postgres_password
)
print('Source Postgres Connection Successful')
# Exit with code 0 (success) if connection successful
except Exception as e:
print(f'Error connecting to Source PostgreSQL: {e}')
# Exit with code 1 (failure) if connection unsuccessful
sys.exit(1)


# In[7]: Setup Postgres Pool
try:
PgresPool = psycopg2.pool.ThreadedConnectionPool(
minconn=concurrent_tasks,
maxconn=concurrent_tasks,
host=postgres_host,
port=postgres_port,
dbname=postgres_database,
user=postgres_username,
password=postgres_password
)
print('Target Postgres Connection Successful')

except psycopg2.OperationalError as e:
print(f'Error connecting to Target PostgreSQL: {e}')
# Exit with code 1 (failure) if connection unsuccessful
sys.exit(2)

def del_audit_entries_rerun(current_date):
postgres_connection = PgresPool.getconn()
postgres_cursor = postgres_connection.cursor()
try:
del_sql = f"""
DELETE FROM {mstr_schema}.{audit_table} c
where application_name='{app_name}' and batch_run_date='{current_date}'
"""
postgres_cursor.execute(del_sql)
postgres_connection.commit()
postgres_cursor.close()
PgresPool.putconn(postgres_connection)
print(del_sql)
return None
except Exception as e:
print(f"Failed to delete audit entries for application name {app_name} and date '{current_date}': {str(e)}")
sys.exit(3)

# Function to insert the audit batch status entry
def audit_batch_status_insert(table_name,status):
postgres_connection = PgresPool.getconn()
postgres_cursor = postgres_connection.cursor()
try:
audit_batch_status_query = f"""INSERT INTO {mstr_schema}.{audit_table} VALUES ('{table_name}','{app_name}','replication','{status}',current_date)"""
print(audit_batch_status_query)
postgres_cursor.execute(audit_batch_status_query)
postgres_connection.commit()
print(f"Record inserted into audit batch status table")
return None
except Exception as e:
print(f"Error inserting record into to audit batch status table: {str(e)}")
sys.exit(4)
return None
finally:
# Return the connection to the pool
if postgres_connection:
postgres_cursor.close()
PgresPool.putconn(postgres_connection)

# In[8]: Function to get active rows from master table
def get_active_tables(mstr_schema,app_name):
postgres_connection = PgresPool.getconn()
postgres_cursor = postgres_connection.cursor()
list_sql = f"""
SELECT application_name,source_schema_name,source_table_name,target_schema_name,target_table_name,truncate_flag,cdc_flag,full_inc_flag,cdc_column,replication_order,customsql_ind,customsql_query
from {mstr_schema}.cdc_master_table_list c
where active_ind = 'Y' and application_name='{app_name}'
order by replication_order, source_table_name
"""
try:
with postgres_connection.cursor() as curs:
curs.execute(list_sql)
rows = curs.fetchall()
postgres_connection.commit()
postgres_cursor.close()
PgresPool.putconn(postgres_connection)
return rows
except Exception as e:
print(f"Error selecting record from cdc_master_table_list table: {str(e)}")
sys.exit(5)
return None


# In[9]: Function to extract data from Oracle
def extract_from_srcpg(table_name,source_schema,customsql_ind,customsql_query):
# Acquire a connection from the pool
srcpostgres_connection = SrcPgresPool.getconn()
srcpostgres_cursor = srcpostgres_connection.cursor()
try:
if customsql_ind == "Y":
# Use placeholders in the query and bind the table name as a parameter
sql_query=customsql_query
print(sql_query)
srcpostgres_cursor.execute(sql_query)
rows = srcpostgres_cursor.fetchall()
#OrcPool.release(oracle_connection)
return rows
else:
sql_query = f'SELECT * FROM {source_schema}.{table_name}'
print(sql_query)
srcpostgres_cursor.execute(sql_query)
rows = srcpostgres_cursor.fetchall()
#OrcPool.release(oracle_connection)
return rows

except Exception as e:
audit_batch_status_insert(table_name,'failed')
print(f"Error extracting data from SrcPostgres: {str(e)}")
#OrcPool.release(oracle_connection) #Temporary change
return []

finally:
# Return the connection to the pool
if srcpostgres_connection:
srcpostgres_cursor.close()
SrcPgresPool.putconn(srcpostgres_connection)
# In[10]: Function to load data into Target PostgreSQL using data from Source Oracle
def load_into_postgres(table_name, data,target_schema):
postgres_connection = PgresPool.getconn()
postgres_cursor = postgres_connection.cursor()
try:
# Delete existing data in the target table
delete_query = f'TRUNCATE TABLE {target_schema}.{table_name}'
postgres_cursor.execute(delete_query)

# Build the INSERT query with placeholders
insert_query = f'INSERT INTO {target_schema}.{table_name} VALUES ({", ".join(["%s"] * len(data[0]))})'
#insert_query = f'INSERT INTO {target_schema}.{table_name} VALUES %s'

# Use execute_batch for efficient batch insert
with postgres_connection.cursor() as cursor:
# Prepare the data as a list of tuples
data_to_insert = [(tuple(row)) for row in data]
execute_batch(cursor, insert_query, data_to_insert)
postgres_connection.commit()
# Insert record to audit batch table
audit_batch_status_insert(table_name,'success')


except Exception as e:
print(f"Error loading data into PostgreSQL: {str(e)}")
audit_batch_status_insert(table_name,'failed')
finally:
# Return the connection to the pool
if postgres_connection:
postgres_cursor.close()
PgresPool.putconn(postgres_connection)

# In[11]: Function to call both extract and load functions
def load_data_from_src_tgt(table_name,source_schema,target_schema,customsql_ind,customsql_query):
# Extract data from Oracle
print(f'Source: Thread {table_name} started at ' + datetime.now().strftime("%H:%M:%S"))
srcpg_data = extract_from_srcpg(table_name,source_schema,customsql_ind,customsql_query) # Ensure table name is in uppercase
print(f'Source: Extraction for {table_name} completed at ' + datetime.now().strftime("%H:%M:%S"))

if srcpg_data:
# Load data into PostgreSQL
load_into_postgres(table_name, srcpg_data, target_schema)
print(f"Target: Data loaded into table: {table_name}")
print(f'Target: Thread {table_name} ended at ' + datetime.now().strftime("%H:%M:%S"))

def check_failed_tables(mstr_schema,app_name,current_date):
postgres_connection = PgresPool.getconn()
postgres_cursor = postgres_connection.cursor()
list_sql = f"""
SELECT object_name
from {mstr_schema}.audit_batch_status c
where application_name='{app_name}' and batch_run_date='{current_date}' and object_execution_status='failed'
"""
try:
with postgres_connection.cursor() as curs:
curs.execute(list_sql)
rows = curs.fetchall()
postgres_connection.commit()
postgres_cursor.close()
PgresPool.putconn(postgres_connection)
return rows
except Exception as e:
print(f"Error selecting record from cdc_master_table_list table: {str(e)}")
sys.exit(6)
return None



# In[12]: Initializing concurrency
if __name__ == '__main__':
# Main ETL process
active_tables_rows =get_active_tables(mstr_schema,app_name)
#print(active_tables_rows)
tables_to_extract = [(row[2],row[1],row[3],row[10],row[11]) for row in active_tables_rows]

print(f"tables to extract are {tables_to_extract}")
print(f'No of concurrent tasks:{concurrent_tasks}')
#Delete audit entries for rerun on same day
del_audit_entries_rerun(current_date)
# Using ThreadPoolExecutor to run tasks concurrently
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_tasks) as executor:
# Submit tasks to the executor
future_to_table = {executor.submit(load_data_from_src_tgt, table[0],table[1],table[2],table[3],table[4]): table for table in tables_to_extract}

# Wait for all tasks to complete
concurrent.futures.wait(future_to_table)

# Print results
for future in future_to_table:
table_name = future_to_table[future]
try:
# Get the result of the task, if any
future.result()
except Exception as e:
# Handle exceptions that occurred during the task
print(f"Error replicating {table_name}: {e}")
audit_batch_status_insert(table_name[0],'failed')

get_failed_tables_rows = check_failed_tables(mstr_schema,app_name,current_date)

if len(get_failed_tables_rows)>0:
print("Some of the tables are failed while replication for an application:",{app_name})
sys.exit(7)
else:
print("All the tables have been successfully replicated for an application:",{app_name})


# record end time
end = time.time()
SrcPgresPool.closeall()
PgresPool.closeall()

print("ETL process completed successfully.")
print("The time of execution of the program is:", (end - start) , "secs")

sys.exit(0)
12 changes: 12 additions & 0 deletions shared/ods_replication_pg2pg/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
oracledb==1.3.1
psycopg2==2.9.6
pandas==2.0.2
openpyxl==3.1.2
configparser==6.0.0
dbt-postgres==1.6.4
PyYAML==6.0
pyodbc==4.0.39
python-dotenv==1.0.0
boto3==1.28.10
requests==2.31.0
XlsxWriter==3.1.2
Loading