Skip to content

Commit

Permalink
Remove old pipeline functions, create results attribute
Browse files Browse the repository at this point in the history
  • Loading branch information
tobin-ford committed Feb 29, 2024
1 parent 68e6b1d commit 8aa2840
Showing 1 changed file with 86 additions and 70 deletions.
156 changes: 86 additions & 70 deletions pvdeg/scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from inspect import signature
from typing import Callable
import warnings
import pandas as pd

# TODO: add functions...

Expand Down Expand Up @@ -43,6 +44,7 @@ def __init__(
pipeline=[],
hpc=False,
file=None,
results : pd.Series = None,
) -> None:
"""
Initialize the degradation scenario object.
Expand Down Expand Up @@ -212,10 +214,16 @@ def viewScenario(self):
pp = pprint.PrettyPrinter(indent=4, sort_dicts=False)
print(f"Name : {self.name}")
print(f"pipeline: {self.pipeline}")
try:
print(f"pipeline results : {self.results}")
except:
print(f"pipeline has not been run")

print(f"gid file : {self.gids}")
print("test modules :")
for mod in self.modules:
pp.pprint(mod)

return

def updatePipeline(
Expand All @@ -238,11 +246,11 @@ def updatePipeline(
print("Function has not been added to pipeline.")
return None

# check if necessary parameters given
params_all = dict(signature(func).parameters)

# this is a bad way of doing it
# some values with NONE are still optional
# causing if statement below to work improperly,
reqs = {name: param for name, param in params_all.items() if param.default is None}
optional = {name: param for name, param in params_all.items() if name not in reqs}

Expand All @@ -258,58 +266,13 @@ def updatePipeline(
print("Function has not been added to pipeline.")
return None

# implicit
# if we get here we have the required inputs
job_dict = {"job": func, "params": func_params}
self.pipeline.append(job_dict)

if see_added_function:
message = f"{func.__name__} added to pipeline as \n {job_dict}"
warnings.warn(message, UserWarning)

# # I hate that this reuturns the function name
# # the other add methods do not return the corresponding added values
def addFunction(self, func_name=None, func_params=None):
"""
Add a pvdeg function to the scenario pipeline
TODO: list public functions if no func_name given or bad func_name given
Parameters:
-----------
func_name : (str)
The name of the requested pvdeg function. Do not include the class.
func_params : (dict)
The required parameters to run the requested pvdeg function
Returns:
--------
func_name : (str)
the name of the pvdeg function requested
"""

_func, reqs = pvdeg.Scenario._verify_function(func_name)

if _func == None:
print(f'FAILED: Requested function "{func_name}" not found')
print("Function has not been added to pipeline.")
return None

if not all(x in func_params for x in reqs):
print(
f"FAILED: Requestion function {func_name} did not receive enough parameters"
)
print(f"Requestion function: \n {_func} \n ---")
print(f"Required Parameters: \n {reqs} \n ---")
print("Function has not been added to pipeline.")
return None

# add the function and arguments to pipeline
job_dict = {"job": func_name, "params": func_params}

self.pipeline.append(job_dict)
return func_name

# new #
def runPipeline(self):
"""
Expand All @@ -326,32 +289,18 @@ def runPipeline(self):

results_dict[job['job'].__name__] = result

# this is not attached to the object in a meaningful way?
# how can we store data in a useful way to be used later or for later calculations in the pipeline
return results_dict

def runJob(self, job=None):
"""
Run a named function on the scenario object
TODO: overhaul with futures/slurm
capture results
standardize result format for all of pvdeg
# this may be a roundabout approach but is probably acceptable
# seperate dictionary items into keys and inner dicts
# inner dict -> dataframe
# keys will be indecies of the series
series_index = results_dict.keys()

Parameters:
-----------
job : (str, default=None)
"""
# this seems to be breaking the method every time #
# even if i manually set the attribute to false
if self.hpc:
# do something else
pass
# redo with list comprehension?
for entry in series_index:
to_add_df = pd.DataFrame([results_dict[entry]])
results_series = pd.Series({entry, to_add_df})

for job in self.pipeline:
args = job["parameters"]
_func = pvdeg.Scenario._verify_function(job["job"], args)[0]
result = _func(**args)
self.results = results_series


def exportScenario(self, file_path=None):
Expand Down Expand Up @@ -441,3 +390,70 @@ def _verify_function(func_name):
reqs.append(param)

return (_func, reqs)


# def runJob(self, job=None):
# """
# Run a named function on the scenario object

# TODO: overhaul with futures/slurm
# capture results
# standardize result format for all of pvdeg

# Parameters:
# -----------
# job : (str, default=None)
# """
# # this seems to be breaking the method every time #
# # even if i manually set the attribute to false
# if self.hpc:
# # do something else
# pass

# for job in self.pipeline:
# args = job["parameters"]
# _func = pvdeg.Scenario._verify_function(job["job"], args)[0]

# result = _func(**args)

# def addFunction(self, func_name=None, func_params=None):
# """
# Add a pvdeg function to the scenario pipeline

# TODO: list public functions if no func_name given or bad func_name given

# Parameters:
# -----------
# func_name : (str)
# The name of the requested pvdeg function. Do not include the class.
# func_params : (dict)
# The required parameters to run the requested pvdeg function

# Returns:
# --------
# func_name : (str)
# the name of the pvdeg function requested
# """

# _func, reqs = pvdeg.Scenario._verify_function(func_name)

# if _func == None:
# print(f'FAILED: Requested function "{func_name}" not found')
# print("Function has not been added to pipeline.")
# return None

# if not all(x in func_params for x in reqs):
# print(
# f"FAILED: Requestion function {func_name} did not receive enough parameters"
# )
# print(f"Requestion function: \n {_func} \n ---")
# print(f"Required Parameters: \n {reqs} \n ---")
# print("Function has not been added to pipeline.")
# return None

# # add the function and arguments to pipeline
# job_dict = {"job": func_name, "params": func_params}

# self.pipeline.append(job_dict)
# return func_name

0 comments on commit 8aa2840

Please sign in to comment.