diff --git a/snowflake_blueprints/upload_file.py b/snowflake_blueprints/upload_file.py index 57c7fdd..12f6336 100644 --- a/snowflake_blueprints/upload_file.py +++ b/snowflake_blueprints/upload_file.py @@ -97,7 +97,11 @@ def convert_to_parquet(source_full_path, table_name): shipyard.files.create_folder_if_dne(parquet_path) df = dd.read_csv(source_full_path) df.columns = map(lambda x: str(x).upper(), df.columns) - df.to_parquet(parquet_path, compression='gzip', write_index=False) + df.to_parquet( + parquet_path, + compression='gzip', + write_index=False, + write_metadata_file=False) return parquet_path @@ -105,7 +109,8 @@ def execute_put_command(db_connection, file_path, table_name, results_dict): """ Execute the PUT command against Snowflake and store the results. """ - put = db_connection.execute(f'PUT file://{file_path}/* @%"{table_name}"') + put_statement = f"PUT file://{file_path}/part.*.parquet '@%\"{table_name}\"'" + put = db_connection.execute(put_statement) for item in put: # These are guesses. The documentation doesn't specify. put_results = { @@ -135,8 +140,8 @@ def execute_copyinto_command(db_connection, table_name, results_dict): """ Execute the COPY INTO command against Snowflake and store the results. """ - copy = db_connection.execute( - f'COPY INTO "{table_name}" FILE_FORMAT=(type=PARQUET) PURGE=TRUE MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE') + copy_into_statement = f'COPY INTO "{table_name}" FILE_FORMAT=(type=PARQUET) PURGE=TRUE MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE' + copy = db_connection.execute(copy_into_statement) for item in copy: copy_results = { "file": item[0], @@ -231,6 +236,21 @@ def upload_data( table_name, insert_method, db_connection) + + # create artifacts folder to save responses + # TODO Both errors and successes need to be returned and printed out afterwards. + # Didn't build this in initially for simplicity of getting the backup old method working. + base_folder_name = shipyard.logs.determine_base_artifact_folder( + 'snowflake') + artifact_subfolder_paths = shipyard.logs.determine_artifact_subfolders( + base_folder_name) + shipyard.logs.create_artifacts_folders(artifact_subfolder_paths) + snowflake_upload_response_path = shipyard.files.combine_folder_and_file_name( + artifact_subfolder_paths['responses'], f'upload_{table_name}_response.json') + shipyard.files.write_json_to_file( + snowflake_results, + snowflake_upload_response_path) + # Needed to prevent other try from running if this is successful. return snowflake_results except ProgrammingError as pg_e: @@ -308,7 +328,7 @@ def main(): source_folder_name = args.source_folder_name source_full_path = shipyard.files.combine_folder_and_file_name( folder_name=source_folder_name, file_name=source_file_name) - table_name = args.table_name + table_name = args.table_name.upper() insert_method = args.insert_method try: @@ -369,18 +389,6 @@ def main(): insert_method=insert_method, db_connection=db_connection) - # create artifacts folder to save responses - base_folder_name = shipyard.logs.determine_base_artifact_folder( - 'snowflake') - artifact_subfolder_paths = shipyard.logs.determine_artifact_subfolders( - base_folder_name) - shipyard.logs.create_artifacts_folders(artifact_subfolder_paths) - snowflake_upload_response_path = shipyard.files.combine_folder_and_file_name( - artifact_subfolder_paths['responses'], f'upload_{table_name}_response.json') - shipyard.files.write_json_to_file( - snowflake_results, - snowflake_upload_response_path) - db_connection.dispose()