Skip to content

Commit

Permalink
refactoring for pylint score.
Browse files Browse the repository at this point in the history
  • Loading branch information
PhillipsOwen committed Oct 9, 2023
1 parent edfc9ad commit f3adc10
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 39 deletions.
82 changes: 46 additions & 36 deletions src/common/pg_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,43 +129,14 @@ def get_terria_map_catalog_data(self, **kwargs):
# init the return
ret_val: dict = {}

# init the new workbench dict
new_workbench_data: dict = {}

# are we using the new catalog workbench retrieval
if kwargs['use_new_wb']:
# create the param list
params: list = ['insertion_date', 'met_class', 'physical_location', 'ensemble_name', 'project_code']

# loop through the params for the SP
for param in params:
# if the param is already in the kwargs use it, otherwise null it out
if param not in kwargs:
# add this parm to the list
kwargs.update({param: 'null'})

# add in the max age int
kwargs.update({'max_age': 2})

# try to make the call for records
ret_val = self.get_terria_map_workbench_data(**kwargs)

# check the return
if ret_val == -1:
ret_val = {'Error': 'Database error getting catalog workbench data.'}
# check the return, no data gets a 404 return
elif len(ret_val) == 0 or ret_val is None:
# set a warning message
ret_val = {'Warning': 'No workbench data found using the filter criteria selected.'}
else:
# save the workbench data
new_workbench_data = ret_val
# get the new workbench data
workbench_data: dict = self.get_workbench_data(**kwargs)

# should we continue
if not ('Error' in new_workbench_data or 'Warning' in ret_val):
if not ('Error' in workbench_data or 'Warning' in workbench_data):
# if there was workbench data use it in the data query
if len(new_workbench_data) > 0:
wb_sql: str = f",_run_id:='{'-'.join(new_workbench_data['workbench'][0].split('-')[:-1])}%'"
if len(workbench_data) > 0:
wb_sql: str = f",_run_id:='{'-'.join(workbench_data['workbench'][0].split('-')[:-1])}%'"
else:
wb_sql: str = ""

Expand Down Expand Up @@ -202,13 +173,52 @@ def get_terria_map_catalog_data(self, **kwargs):
ret_val.update({'pulldown_data': pulldown_data})

# if there is new workbench data add it in now
if len(new_workbench_data) > 0:
if len(workbench_data) > 0:
# merge the workbench data to the catalog list
ret_val.update({'workbench': new_workbench_data['workbench']})
ret_val.update({'workbench': workbench_data['workbench']})

# return the data
return ret_val

def get_workbench_data(self, **kwargs):
"""
gets the workbench data from the DB using the filter params specified.
:param kwargs:
:return:
"""
# init the return
ret_val: dict = {}

# are we using the new catalog workbench retrieval
if kwargs['use_new_wb']:
# create the param list
params: list = ['insertion_date', 'met_class', 'physical_location', 'ensemble_name', 'project_code']

# loop through the params for the SP
for param in params:
# if the param is already in the kwargs use it, otherwise null it out
if param not in kwargs:
# add this parm to the list
kwargs.update({param: 'null'})

# add in the max age int
kwargs.update({'max_age': 2})

# try to make the call for records
ret_val = self.get_terria_map_workbench_data(**kwargs)

# check the return
if ret_val == -1:
ret_val = {'Error': 'Database error getting catalog workbench data.'}
# check the return, no data gets a 404 return
elif len(ret_val) == 0 or ret_val is None:
# set a warning message
ret_val = {'Warning': 'No workbench data found using the filter criteria selected.'}

# return the data to the caller
return ret_val

def get_pull_down_data(self, **kwargs) -> dict:
"""
gets the pulldown data given the list of filtering mechanisms passed.
Expand Down
6 changes: 3 additions & 3 deletions src/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,6 @@ async def get_terria_map_catalog_data_file(file_name: Union[str, None] = Query(d
# append a unique path to avoid collisions
temp_file_path: str = os.path.join(os.getenv('TEMP_FILE_PATH', os.path.dirname(__file__)), str(uuid.uuid4()))

# make the directory
os.makedirs(temp_file_path)

# append the file name
file_path: str = os.path.join(temp_file_path, file_name)

Expand Down Expand Up @@ -378,6 +375,9 @@ async def get_terria_map_catalog_data_file(file_name: Union[str, None] = Query(d
# set the status to a not found
status_code = 404
else:
# make the directory
os.makedirs(temp_file_path)

# write out the data to a file
with open(file_path, 'w', encoding='utf-8') as f_h:
json.dump(ret_val, f_h)
Expand Down

0 comments on commit f3adc10

Please sign in to comment.