Skip to content

Commit

Permalink
Merge pull request #9 from shipyardapp/explore-alternative-upload-method
Browse files Browse the repository at this point in the history
Explore alternative upload method
  • Loading branch information
Blake Burch authored Aug 16, 2022
2 parents f9c7b32 + ef292b3 commit 0f99037
Showing 1 changed file with 25 additions and 17 deletions.
42 changes: 25 additions & 17 deletions snowflake_blueprints/upload_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,20 @@ def convert_to_parquet(source_full_path, table_name):
shipyard.files.create_folder_if_dne(parquet_path)
df = dd.read_csv(source_full_path)
df.columns = map(lambda x: str(x).upper(), df.columns)
df.to_parquet(parquet_path, compression='gzip', write_index=False)
df.to_parquet(
parquet_path,
compression='gzip',
write_index=False,
write_metadata_file=False)
return parquet_path


def execute_put_command(db_connection, file_path, table_name, results_dict):
"""
Execute the PUT command against Snowflake and store the results.
"""
put = db_connection.execute(f'PUT file://{file_path}/* @%"{table_name}"')
put_statement = f"PUT file://{file_path}/part.*.parquet '@%\"{table_name}\"'"
put = db_connection.execute(put_statement)
for item in put:
# These are guesses. The documentation doesn't specify.
put_results = {
Expand Down Expand Up @@ -135,8 +140,8 @@ def execute_copyinto_command(db_connection, table_name, results_dict):
"""
Execute the COPY INTO command against Snowflake and store the results.
"""
copy = db_connection.execute(
f'COPY INTO "{table_name}" FILE_FORMAT=(type=PARQUET) PURGE=TRUE MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE')
copy_into_statement = f'COPY INTO "{table_name}" FILE_FORMAT=(type=PARQUET) PURGE=TRUE MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE'
copy = db_connection.execute(copy_into_statement)
for item in copy:
copy_results = {
"file": item[0],
Expand Down Expand Up @@ -231,6 +236,21 @@ def upload_data(
table_name,
insert_method,
db_connection)

# create artifacts folder to save responses
# TODO Both errors and successes need to be returned and printed out afterwards.
# Didn't build this in initially for simplicity of getting the backup old method working.
base_folder_name = shipyard.logs.determine_base_artifact_folder(
'snowflake')
artifact_subfolder_paths = shipyard.logs.determine_artifact_subfolders(
base_folder_name)
shipyard.logs.create_artifacts_folders(artifact_subfolder_paths)
snowflake_upload_response_path = shipyard.files.combine_folder_and_file_name(
artifact_subfolder_paths['responses'], f'upload_{table_name}_response.json')
shipyard.files.write_json_to_file(
snowflake_results,
snowflake_upload_response_path)

# Needed to prevent other try from running if this is successful.
return snowflake_results
except ProgrammingError as pg_e:
Expand Down Expand Up @@ -308,7 +328,7 @@ def main():
source_folder_name = args.source_folder_name
source_full_path = shipyard.files.combine_folder_and_file_name(
folder_name=source_folder_name, file_name=source_file_name)
table_name = args.table_name
table_name = args.table_name.upper()
insert_method = args.insert_method

try:
Expand Down Expand Up @@ -369,18 +389,6 @@ def main():
insert_method=insert_method,
db_connection=db_connection)

# create artifacts folder to save responses
base_folder_name = shipyard.logs.determine_base_artifact_folder(
'snowflake')
artifact_subfolder_paths = shipyard.logs.determine_artifact_subfolders(
base_folder_name)
shipyard.logs.create_artifacts_folders(artifact_subfolder_paths)
snowflake_upload_response_path = shipyard.files.combine_folder_and_file_name(
artifact_subfolder_paths['responses'], f'upload_{table_name}_response.json')
shipyard.files.write_json_to_file(
snowflake_results,
snowflake_upload_response_path)

db_connection.dispose()


Expand Down

0 comments on commit 0f99037

Please sign in to comment.