From 1d37a3b5065bb10039934ae38ed7fec265ec6d0b Mon Sep 17 00:00:00 2001 From: vishreddy01 Date: Wed, 14 Feb 2024 10:54:38 -0800 Subject: [PATCH] Custom sql Change --- data_replication_parametrized_audit_os.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/data_replication_parametrized_audit_os.py b/data_replication_parametrized_audit_os.py index c2f1bf0..855ef1a 100644 --- a/data_replication_parametrized_audit_os.py +++ b/data_replication_parametrized_audit_os.py @@ -95,7 +95,7 @@ def get_active_tables(mstr_schema,app_name): postgres_connection = PgresPool.getconn() postgres_cursor = postgres_connection.cursor() list_sql = f""" - SELECT application_name,source_schema_name,source_table_name,target_schema_name,target_table_name,truncate_flag,cdc_flag,full_inc_flag,cdc_column,replication_order,where_clause,customsql_ind,customsql_query + SELECT application_name,source_schema_name,source_table_name,target_schema_name,target_table_name,truncate_flag,cdc_flag,full_inc_flag,cdc_column,replication_order,customsql_ind,customsql_query from {mstr_schema}.cdc_master_table_list c where active_ind = 'Y' and application_name='{app_name}' order by replication_order, source_table_name @@ -121,14 +121,15 @@ def extract_from_oracle(table_name,source_schema,customsql_ind,customsql_query): oracle_cursor.execute(sql_query) rows = oracle_cursor.fetchall() OrcPool.release(oracle_connection) - return rows + return rows else: sql_query = f'SELECT * FROM {source_schema}.{table_name}' print(sql_query) oracle_cursor.execute(sql_query) rows = oracle_cursor.fetchall() OrcPool.release(oracle_connection) - return rows + return rows + except Exception as e: audit_batch_status_insert(table_name,'failed') print(f"Error extracting data from Oracle: {str(e)}") @@ -171,7 +172,7 @@ def load_into_postgres(table_name, data,target_schema): def load_data_from_src_tgt(table_name,source_schema,target_schema,customsql_ind,customsql_query): # Extract data from Oracle print(f'Source: Thread {table_name} started at ' + datetime.now().strftime("%H:%M:%S")) - oracle_data = extract_from_oracle(table_name,customsql_ind,customsql_query) # Ensure table name is in uppercase + oracle_data = extract_from_oracle(table_name,source_schema,customsql_ind,customsql_query) # Ensure table name is in uppercase print(f'Source: Extraction for {table_name} completed at ' + datetime.now().strftime("%H:%M:%S")) if oracle_data: @@ -184,8 +185,8 @@ def load_data_from_src_tgt(table_name,source_schema,target_schema,customsql_ind, if __name__ == '__main__': # Main ETL process active_tables_rows =get_active_tables(mstr_schema,app_name) - print(active_tables_rows) - tables_to_extract = [(row[2],row[1],row[3],row[11],row[12]) for row in active_tables_rows] + #print(active_tables_rows) + tables_to_extract = [(row[2],row[1],row[3],row[10],row[11]) for row in active_tables_rows] print(f"tables to extract are {tables_to_extract}") print(f'No of concurrent tasks:{concurrent_tasks}')