Skip to content

Commit

Permalink
Add scripts to generate notebook html
Browse files Browse the repository at this point in the history
  • Loading branch information
jessegrabowski committed Jun 29, 2024
1 parent 4e7d9e0 commit 9e3388c
Show file tree
Hide file tree
Showing 6 changed files with 1,552 additions and 165 deletions.
1 change: 0 additions & 1 deletion REQUIREMENTS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ numpy
sympy
scipy
pandas
ipython
joblib
pytensor
latextable
Expand Down
1 change: 0 additions & 1 deletion cge_modeling/base/cge.py
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,6 @@ def __pytensor_euler_helper(*args, n_steps=100, **kwargs):
system, jac_inv, B, variables, parameters
)
# inputs = get_required_inputs(outputs)
print(inputs, outputs)
f_step = pytensor.function(inputs, outputs, mode=mode)
f_step.trust_inputs = True
self.f_step = f_step
Expand Down
104 changes: 88 additions & 16 deletions cge_modeling/plotting.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,34 @@
from cge_modeling import CGEModel


def _recover_shock_sizes(res):
if not any(group in res for group in ["euler", "optimizer"]):
raise ValueError("No simulation results found")
params = res["euler"].parameters if "euler" in res else res["optimizer"].parameters
shocks = {
x: (params[x].isel(step=0).values, params[x].isel(step=-1).values)
for x in params
}

return shocks


def _get_single_shock(shocks):
init = {key: value[0] for key, value in shocks.items()}
final = {key: value[1] for key, value in shocks.items()}
delta = {k: init[k] - final[k] for k in shocks.keys()}

has_shocks = np.array([np.all(x != 0) for x in delta.values()])
if has_shocks.sum() > 1:
return False, None, None, None
shock_var = list(shocks.keys())[has_shocks.tolist().index(True)]
coord_shock = delta[shock_var] != 0
if coord_shock.sum() > 1:
return False, None, None, None

return True, shock_var, init[shock_var][coord_shock], final[shock_var][coord_shock]


def prepare_gridspec_figure(
n_cols: int, n_plots: int
) -> tuple[GridSpec, list[tuple[slice, slice], ...]]:
Expand Down Expand Up @@ -54,7 +82,7 @@ def prepare_gridspec_figure(


def plot_lines(
idata: az.InferenceData,
result: dict[str, az.InferenceData],
mod: CGEModel,
n_cols: int = 5,
var_names: list[str] | None = None,
Expand All @@ -64,6 +92,7 @@ def plot_lines(
rename_dict: dict[str, str] | None = None,
legends: bool | list | None = None,
cmap: str | None = None,
x_unit: Literal["step", "pct", "value"] = "step",
**figure_kwargs,
) -> plt.Figure:
"""
Expand Down Expand Up @@ -103,6 +132,8 @@ def plot_lines(
fig: plt.Figure
The figure object containing the plot.
"""
shock_var = None

if var_names is None:
var_names = mod.variable_names

Expand All @@ -118,11 +149,11 @@ def plot_lines(
legends = dict.fromkeys(legends, True)
legends.update(dict.fromkeys(no_legend, False))

if "euler" not in idata and plot_euler:
if "euler" not in result and plot_euler:
raise ValueError(
'Cannot plot results of euler approximation, provided results has no "euler" group'
)
if "optimizer" not in idata and plot_optimizer:
if "optimizer" not in result and plot_optimizer:
raise ValueError(
'Cannot plot results of optimizer, provided results has no "optimizer" group'
)
Expand All @@ -141,6 +172,7 @@ def plot_lines(

def f_cmap(n):
return plt.colormaps[cmap](np.linspace(0, 1, n))

elif cmap in plt.color_sequences:

def f_cmap(*args):
Expand All @@ -150,31 +182,51 @@ def f_cmap(*args):

for idx, var in enumerate(var_names):
axis = fig.add_subplot(gs[plot_locs[idx]])
axis.set(title=rename_dict.get(var, var), xlabel=None)
x_value = (
result["euler"].parameters.coords["step"].values
if "euler" in result
else result["optimizer"].parameters.coords["step"].values
)
t0 = x_value[0]
T = x_value[-1]

if plot_euler:
data = idata["euler"].variables[var]
data = result["euler"].variables[var]

n_lines = int(np.prod(data.values.shape[1:]))
cycler = plt.cycler(color=f_cmap(n_lines))
axis.set_prop_cycle(cycler)

if data.ndim > 2:
data = data.stack(pair=data.dims[1:])
data.plot.line(x="step", ax=axis, add_legend=legends[var])

if plot_optimizer:
initial_value = idata["optimizer"].variables[var].isel(step=0).data.ravel()
final_value = idata["optimizer"].variables[var].isel(step=-1).data.ravel()
axis.plot(x_value, data.values, lw=0.5, label=legends[var])

if x_unit == "pct":
axis.xaxis.set_major_formatter(PercentFormatter(x_value.max()))
elif x_unit == "value":
shocks = _recover_shock_sizes(result)
has_single_shock, shock_var, t0, T = _get_single_shock(shocks)
if not has_single_shock:
raise ValueError(
'Found more than one shock in the provided results. Cannot use x_unit="value", '
"because there is no single scalar range over which to plot the change in the "
"model variables."
)
n_steps = x_value.shape[0]
tick_values = np.linspace(t0, T, n_steps).squeeze()
axis.set_xlim(0, n_steps)
axis.xaxis.set_major_formatter(
lambda x, pos: f"{int(tick_values[max(0, int(x) - 1)]) + 1:d}"
)

t0 = np.zeros_like(initial_value)
T = np.ones_like(final_value)
if plot_euler:
T = T * idata["euler"].variables.step.max().item()
if plot_optimizer:
initial_value = result["optimizer"].variables[var].isel(step=0).data.ravel()
final_value = result["optimizer"].variables[var].isel(step=-1).data.ravel()

axis.scatter(
t0,
final_value,
initial_value,
marker="*",
color="tab:green",
zorder=10,
Expand All @@ -192,6 +244,13 @@ def f_cmap(*args):
[spine.set_visible(False) for spine in axis.spines.values()]
axis.grid(ls="--", lw=0.5)

xlabel_dict = {
"step": "Step",
"pct": "Percent of Shock Implemented",
"value": rename_dict.get(shock_var, shock_var),
}
axis.set(title=rename_dict.get(var, var), xlabel=xlabel_dict[x_unit], ylabel="")

fig.tight_layout()
return fig

Expand Down Expand Up @@ -348,16 +407,29 @@ def _plot_one_bar(
legend=True,
threshhold=1e-6,
):
if not data.dims:
try:
data = data.to_array()
except AttributeError:
pass
if not initial_data.dims:
try:
initial_data = initial_data.to_array()
except AttributeError:
pass
try:
data = data.to_dataframe().droplevel(level=drop_vars, axis=0)
initial_data = initial_data.to_dataframe().droplevel(level=drop_vars, axis=0)
data = data.to_dataframe(name="data").droplevel(level=drop_vars, axis=0)
initial_data = initial_data.to_dataframe(name="data").droplevel(
level=drop_vars, axis=0
)
except ValueError:
data = pd.DataFrame([data.to_dict()]).loc[:, ["data", "name"]].set_index("name")
initial_data = (
pd.DataFrame([initial_data.to_dict()])
.loc[:, ["data", "name"]]
.set_index("name")
)

if "step" in data.columns:
data = data.drop(columns=["step"])
if "step" in initial_data.columns:
Expand Down
Loading

0 comments on commit 9e3388c

Please sign in to comment.