Skip to content

Commit

Permalink
Merge pull request #15 from shipyardapp/bugfix/parse_dates
Browse files Browse the repository at this point in the history
merging to main
  • Loading branch information
wrp801 authored Apr 4, 2023
2 parents 4740373 + 6f21f97 commit 12b91a9
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 31 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,5 @@ soccer.csv
tmp
modify_file.py
soccer_null.csv
*-artifacts/
*-artifacts/
test.csv
141 changes: 111 additions & 30 deletions snowflake_blueprints/upload_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import ast
import snowflake.sqlalchemy as sql
from sqlalchemy import Table, Column, Integer, String, MetaData
from copy import deepcopy
try:
import errors
except BaseException:
Expand Down Expand Up @@ -106,7 +107,11 @@ def map_snowflake_to_pandas(snowflake_data_types):
field = item[0]
dtype = item[1]
try:
converted = snowflake_to_pandas.get(str(dtype).upper())
converted = snowflake_to_pandas[str(dtype).upper()]
if converted is None:
print(
f"The datatype {field} is not a recognized snowflake datatype")
sys.exit(errors.EXIT_CODE_INVALID_DATA_TYPES)
pandas_dtypes[field] = converted
except KeyError as e:
print(
Expand All @@ -116,13 +121,26 @@ def map_snowflake_to_pandas(snowflake_data_types):
return pandas_dtypes


def get_pandas_dates(pandas_datatypes: dict) -> tuple:
dates = []
new_dict = deepcopy(pandas_datatypes)
for k, v in pandas_datatypes.items():
if v in ['datetime64[ns]', 'datetime64']:
dates.append(k)
del new_dict[k]
if len(dates) == 0:
return None, new_dict
return dates, new_dict


def create_table_with_types(table_name, db_connection, data_types):
"""
Creates a table with specific data types or replaces a table if it already exists.
Replacement will wipe the data in the existing table and then set the columns with the appropriate data types
"""
try:
query = f"create or replace table {table_name}" + "(\n"
temp_name = f"temp_{table_name}"
query = f"create or replace temporary table {temp_name}" + "(\n"
length = len(data_types)
index = 1
for datatype in data_types:
Expand All @@ -136,7 +154,8 @@ def create_table_with_types(table_name, db_connection, data_types):
else:
query = query + "\n );"
db_connection.execute(query)
print(f"Successfully created {table_name}")
print(f"Successfully created {temp_name}")
return temp_name

except Exception as e:
print(f"Error in creating {table_name}")
Expand All @@ -153,8 +172,12 @@ def create_table(source_full_path, table_name, insert_method, db_connection, sno
try:
chunksize = 10000
mapping = map_snowflake_to_pandas(snowflake_data_types)
dates = None
pandas_datatypes = None
if snowflake_data_types is not None:
dates, pandas_datatypes = get_pandas_dates(datatypes)
for index, chunk in enumerate(
pd.read_csv(source_full_path, chunksize=chunksize, dtype=datatypes)):
pd.read_csv(source_full_path, chunksize=chunksize, dtype=pandas_datatypes, parse_dates=dates)):
chunk.head(0).to_sql(
table_name,
con=db_connection,
Expand Down Expand Up @@ -183,7 +206,12 @@ def convert_to_parquet(source_full_path, table_name, snowflake_datatypes):
parquet_path = f'./tmp/{table_name}'
shipyard.files.create_folder_if_dne(parquet_path)
datatypes = map_snowflake_to_pandas(snowflake_datatypes)
df = dd.read_csv(source_full_path, dtype=datatypes)
dates = None
pandas_datatypes = None
if datatypes is not None:
dates, pandas_datatypes = get_pandas_dates(datatypes)
df = dd.read_csv(source_full_path, dtype=pandas_datatypes,
parse_dates=dates, date_parser=lambda x: pd.to_datetime(x).to_datetime64())
df.columns = map(lambda x: str(x).upper(), df.columns)
df.to_parquet(
parquet_path,
Expand All @@ -193,6 +221,46 @@ def convert_to_parquet(source_full_path, table_name, snowflake_datatypes):
return parquet_path


def compress_csv(source_full_path, table_name, snowflake_datatypes):
"""
Compresses a CSV using GZIP to be loaded to Snowflake
"""
csv_path = f'./tmp/{table_name}'
full_path = f"{csv_path}/data_*.csv.gz"
shipyard.files.create_folder_if_dne(csv_path)
datatypes = map_snowflake_to_pandas(snowflake_datatypes)
dates = None
pandas_datatypes = None
if datatypes is not None:
dates, pandas_datatypes = get_pandas_dates(datatypes)
df = dd.read_csv(source_full_path, dtype=pandas_datatypes, parse_dates=dates,
date_parser=lambda x: pd.to_datetime(x).to_datetime64())
df.columns = map(lambda x: str(x).upper(), df.columns)
df.to_csv(full_path, compression='gzip', index=False)
return full_path


def put_csv(db_connection, file_path, table_name, results_dict):
"""
Execute the PUT command against Snowflake and store the results.
"""
put_statement = f"PUT file://{file_path} '@%\"{table_name}\"'"
put = db_connection.execute(put_statement)
for item in put:
# These are guesses. The documentation doesn't specify.
put_results = {
"input_file_name": item[0],
"uploaded_file_name": item[1],
"input_bytes": item[2],
"uploaded_bytes": item[3],
"input_file_type": item[4],
"uploaded_file_type": item[5],
"status": item[6]}
results_dict['put'].append(put_results)

return results_dict


def execute_put_command(db_connection, file_path, table_name, results_dict):
"""
Execute the PUT command against Snowflake and store the results.
Expand All @@ -210,6 +278,7 @@ def execute_put_command(db_connection, file_path, table_name, results_dict):
"uploaded_file_type": item[5],
"status": item[6]}
results_dict['put'].append(put_results)

return results_dict


Expand All @@ -224,11 +293,27 @@ def execute_drop_command(db_connection, table_name, results_dict):
return results_dict


def create_file_format(db_connection):
file_format = "t_csv"
sql = f""" CREATE OR REPLACE FILE FORMAT {file_format}
TYPE = "CSV"
COMPRESSION = "GZIP"
FILE_EXTENSION= 'csv.gz'
SKIP_HEADER = 1
EMPTY_FIELD_AS_NULL = TRUE
ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE;
"""
db_connection.execute(sql)
return file_format


def execute_copyinto_command(db_connection, table_name, results_dict):
"""
Execute the COPY INTO command against Snowflake and store the results.
"""
copy_into_statement = f'COPY INTO "{table_name}" FILE_FORMAT=(type=PARQUET) PURGE=TRUE MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE'
# copy_into_statement = f'COPY INTO "{table_name}" FILE_FORMAT=(type=PARQUET) PURGE=TRUE MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE'
format_name = create_file_format(db_connection)
copy_into_statement = f"""COPY INTO "{table_name}" FILE_FORMAT='{format_name}' PURGE=TRUE"""
copy = db_connection.execute(copy_into_statement)
for item in copy:
copy_results = {
Expand All @@ -254,40 +339,31 @@ def upload_data_with_put(source_full_path,
"""
Upload data by PUTing the file(s) in Snowflake temporary storage and using COPY INTO to get them into the table.
"""
parquet_path = convert_to_parquet(
source_full_path, table_name, snowflake_data_types)
csv_path = compress_csv(source_full_path, table_name, snowflake_data_types)
print('Attempting upload with put method')
snowflake_results = {"put": [], "copy": [], "drop": []}
if insert_method == 'replace':
snowflake_results = execute_drop_command(
db_connection, table_name, snowflake_results)
if snowflake_data_types is not None:
create_table_with_types(
table_name, db_connection, snowflake_data_types)
snowflake_results = execute_put_command(
db_connection, parquet_path, table_name, snowflake_results)
snowflake_results = execute_copyinto_command(
db_connection, table_name, snowflake_results)
else:
create_table(
source_full_path,
table_name,
insert_method,
db_connection,
snowflake_data_types)
snowflake_results = execute_put_command(
db_connection, parquet_path, table_name, snowflake_results)
snowflake_results = execute_copyinto_command(
db_connection, table_name, snowflake_results)
create_table(
source_full_path,
table_name,
insert_method,
db_connection,
snowflake_data_types)
snowflake_results = put_csv(
db_connection, csv_path, table_name, snowflake_results)
snowflake_results = execute_copyinto_command(
db_connection, table_name, snowflake_results)
elif insert_method == 'append':
create_table(
source_full_path,
table_name,
insert_method,
db_connection,
snowflake_data_types)
snowflake_results = execute_put_command(
db_connection, parquet_path, table_name, snowflake_results)
snowflake_results = put_csv(
db_connection, csv_path, table_name, snowflake_results)
snowflake_results = execute_copyinto_command(
db_connection, table_name, snowflake_results)
print(f'{source_full_path} successfully {insert_method}{"ed to " if insert_method == "append" else "d "}the table {table_name}.')
Expand All @@ -305,8 +381,12 @@ def upload_data_with_insert(source_full_path,
print('Attempting upload with insert method')
chunksize = 10000
datatypes = map_snowflake_to_pandas(snowflake_data_types)
dates = None
pandas_datatypes = None
if datatypes is not None:
dates, pandas_datatypes = get_pandas_dates(datatypes)
for index, chunk in enumerate(
pd.read_csv(source_full_path, chunksize=chunksize, dtype=datatypes)):
pd.read_csv(source_full_path, chunksize=chunksize, dtype=pandas_datatypes, parse_dates=dates)):

if insert_method == 'replace' and index > 0:
# First chunk replaces the table, the following chunks
Expand Down Expand Up @@ -384,7 +464,8 @@ def upload_data(
upload_data_with_insert(source_full_path,
table_name,
insert_method,
db_connection)
db_connection,
snowflake_data_types=snowflake_data_types)
except DatabaseError as db_e:
if 'No active warehouse' in str(db_e):
print(
Expand Down

0 comments on commit 12b91a9

Please sign in to comment.