Skip to content

Commit

Permalink
Merge pull request #22 from thomaswmorris/test-funcs
Browse files Browse the repository at this point in the history
Test functions and botorch utilities
  • Loading branch information
mrakitin authored Apr 18, 2023
2 parents 435bf12 + c21580e commit df07474
Show file tree
Hide file tree
Showing 13 changed files with 365 additions and 200 deletions.
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ exclude =
dist,
docs/source/conf.py
examples/benchmark.py,
examples/prepare_bluesky.py,
examples/prepare_tes_shadow.py,
bloptools/tests/test_boa.py,
versioneer.py,
max-line-length = 115
Expand Down
198 changes: 87 additions & 111 deletions bloptools/bo/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import logging

import bluesky.plan_stubs as bps
import bluesky.plans as bp # noqa F401
import h5py
Expand All @@ -8,7 +10,7 @@
from matplotlib import pyplot as plt

from .. import utils
from . import models
from . import acquisition, models

mpl.rc("image", cmap="coolwarm")

Expand All @@ -29,7 +31,7 @@ def __init__(
self,
dofs,
dets,
dof_bounds,
bounds,
experiment,
db,
training_iter=256,
Expand All @@ -42,16 +44,22 @@ def __init__(
detector (Detector)
detector_type (str)
dofs (list of Devices)
dof_bounds (list of bounds)
bounds (list of bounds)
fitness_model (str)
training_iter (int)
"""

self.dofs, self.dof_bounds = dofs, dof_bounds
self.dofs = dofs
for dof in self.dofs:
dof.kind = "hinted"

self.n_dof = len(dofs)

self.bounds = bounds if bounds is not None else np.array([[-1.0, +1.0] for i in range(self.n_dof)])

self.dets = dets

self.experiment = experiment
Expand All @@ -64,23 +72,20 @@ def __init__(
MAX_TEST_POINTS = 2**10

n_bins_per_dim = int(np.power(MAX_TEST_POINTS, 1 / self.n_dof))
self.dim_bins = [np.linspace(*bounds, n_bins_per_dim + 1) for bounds in self.dof_bounds]
self.dim_bins = [np.linspace(*bounds, n_bins_per_dim + 1) for bounds in self.bounds]
self.dim_mids = [0.5 * (bins[1:] + bins[:-1]) for bins in self.dim_bins]
self.test_grid = np.swapaxes(np.r_[np.meshgrid(*self.dim_mids, indexing="ij")], 0, -1)

sampler = sp.stats.qmc.Halton(d=self.n_dof, scramble=True)
self.test_params = sampler.random(n=MAX_TEST_POINTS) * self.dof_bounds.ptp(axis=1) + self.dof_bounds.min(
axis=1
)
self.test_params = sampler.random(n=MAX_TEST_POINTS) * self.bounds.ptp(axis=1) + self.bounds.min(axis=1)

# convert params to x
self.params_trans_fun = lambda params: (params - self.dof_bounds.min(axis=1)) / self.dof_bounds.ptp(axis=1)
self.params_trans_fun = lambda params: (params - self.bounds.min(axis=1)) / self.bounds.ptp(axis=1)

# convert x to params
self.inv_params_trans_fun = lambda x: x * self.dof_bounds.ptp(axis=1) + self.dof_bounds.min(axis=1)
self.inv_params_trans_fun = lambda x: x * self.bounds.ptp(axis=1) + self.bounds.min(axis=1)

# for actual prediction and optimization

self.params = np.empty((0, self.n_dof))
self.data = pd.DataFrame()

Expand Down Expand Up @@ -172,7 +177,8 @@ def tell(self, new_params, new_data, reuse_hypers=True, verbose=False):
self.data = pd.concat([self.data, new_data])
self.data.index = np.arange(len(self.data))

self.images = np.array([im for im in self.data[self.experiment.IMAGE_NAME].values])
if hasattr(self.experiment, "IMAGE_NAME"):
self.images = np.array([im for im in self.data[self.experiment.IMAGE_NAME].values])

X = self.params_trans_fun(self.params)
Y = self.data.fitness.values[:, None]
Expand All @@ -199,12 +205,14 @@ def acquire_with_bluesky(self, params, routing=True, verbose=False):
if verbose:
print(f"sampling {ordered_params}")

# try:
uid = yield from bp.list_scan(
self.dets, *[_ for items in zip(self.dofs, np.atleast_2d(ordered_params).T) for _ in items]
)
_table = self.db[uid].table(fill=True)
_table.loc[:, "uid"] = uid
try:
uid = yield from bp.list_scan(
self.dets, *[_ for items in zip(self.dofs, np.atleast_2d(ordered_params).T) for _ in items]
)
_table = self.db[uid].table(fill=True)
_table.loc[:, "uid"] = uid
except Exception as err:
logging.warning(repr(err))

for i, entry in _table.iterrows():
keys, vals = self.experiment.parse_entry(entry)
Expand Down Expand Up @@ -263,23 +271,14 @@ def recommend(
if not all(cost > 0):
raise ValueError("Some estimated acquisition times are non-positive.")

if strategy.lower() == "exploit": # greedy expected reward maximization
objective = -self._negative_expected_improvement(evaluator, classifier, TEST_X).sum(axis=1)
if strategy.lower() == "ei": # maximize the expected improvement
objective = acquisition.expected_improvement(evaluator, classifier, TEST_X).sum(axis=1)

if strategy.lower() == "explore": # greedy expected reward maximization
objective = -self._negative_expected_variance(evaluator, classifier, TEST_X).sum(axis=1)
if strategy.lower() == "max_entropy": # maximize the total entropy
objective = (evaluator.normalized_entropy(TEST_X) + classifier.entropy(TEST_X)).sum(axis=1)

# if strategy.lower() == "eig":
# objective = -_negative_expected_information_gain(evaluator, classifier, evaluator.X, TEST_X)

if strategy.lower() == "ip":
objective = -self._negative_improvement_probability(evaluator, classifier, evaluator.X, TEST_X)

if strategy.lower() == "a-optimal":
objective = -self._negative_A_optimality(TEST_X)

if strategy.lower() == "d-optimal":
objective = -self._negative_D_optimality(TEST_X)
if strategy.lower() == "egibbon": # maximize the expected GIBBON
objective = acquisition.expected_gibbon(evaluator, classifier, TEST_X).sum(axis=1)

return TEST_X[np.argmax(objective / cost)]

Expand Down Expand Up @@ -401,19 +400,8 @@ def update_state_plots(self, gridded=False):
Create the axes onto which we plot/update the state of the GPO
"""

# self.state_fig.clear()

s = 16

if gridded:
P = self.validate(self.test_grid)
FE = self.fitness_entropy(self.test_grid)
else:
P = self.validate(self.test_params)
FE = self.fitness_entropy(self.test_params)

PE = 0.5 * np.log(2 * np.pi * np.e * P * (1 - P))

# so that the points and estimates have the same nice norm
fitness_norm = mpl.colors.Normalize(*np.nanpercentile(self.data.fitness.values, q=[1, 99]))

Expand Down Expand Up @@ -485,23 +473,23 @@ def update_state_plots(self, gridded=False):
ax.set_title("expected improvement")

if gridded:
expected_improvement = -self._negative_expected_improvement(
expected_improvement = acquisition.expected_improvement(
self.evaluator, self.classifier, self.params_trans_fun(self.test_grid)
)
expected_improvement[~(expected_improvement > 0)] = np.nan
# expected_improvement[~(expected_improvement > 0)] = np.nan
ref = ax.pcolormesh(
*self.dim_mids[:2], expected_improvement, norm=mpl.colors.Normalize(vmin=0), shading="nearest"
*self.dim_mids[:2], expected_improvement, norm=mpl.colors.Normalize(), shading="nearest"
)
else:
expected_improvement = -self._negative_expected_improvement(
expected_improvement = acquisition.expected_improvement(
self.evaluator, self.classifier, self.params_trans_fun(self.test_params)
)
expected_improvement[~(expected_improvement > 0)] = np.nan
# expected_improvement[~(expected_improvement > 0)] = np.nan
ref = ax.scatter(
*self.test_params.T[:2],
s=s,
c=-expected_improvement,
norm=mpl.colors.Normalize(vmin=0),
c=expected_improvement,
norm=mpl.colors.Normalize(),
)

# ax.plot(*ordered_max_improvement_params.T[:2], lw=1, color="k")
Expand All @@ -526,36 +514,71 @@ def update_state_plots(self, gridded=False):
ax.clear()
ax.set_title("validity estimate")
if gridded:
ref = ax.pcolormesh(*self.dim_mids[:2], P, vmin=0, vmax=1, shading="nearest")
ref = ax.pcolormesh(
*self.dim_mids[:2],
self.classifier.p(self.params_trans_fun(self.test_grid)),
vmin=0,
vmax=1,
shading="nearest",
)
else:
ref = ax.scatter(*self.test_params.T[:2], s=s, c=P, vmin=0, vmax=1)
ref = ax.scatter(
*self.test_params.T[:2],
c=self.classifier.p(self.params_trans_fun(self.test_params)),
s=s,
vmin=0,
vmax=1,
)
clb = self.state_fig.colorbar(ref, ax=ax, location="bottom", aspect=32, shrink=0.8)
clb.set_label("probability of validity")
ax.scatter(*self.params.T[:2], s=s, edgecolor="k", facecolor="none")

ax = self.state_axes[1, 2]
ax.set_title("validity entropy rate")
ax.clear()
ax.set_title("validity entropy")
if gridded:
ref = ax.pcolormesh(*self.dim_mids[:2], PE, shading="nearest")
ref = ax.pcolormesh(
*self.dim_mids[:2],
self.classifier.entropy(self.params_trans_fun(self.test_grid)),
shading="nearest",
)
else:
ref = ax.scatter(*self.test_params.T[:2], s=s, c=PE)
ref = ax.scatter(
*self.test_params.T[:2], c=self.classifier.entropy(self.params_trans_fun(self.test_params)), s=s
)
clb = self.state_fig.colorbar(ref, ax=ax, location="bottom", aspect=32, shrink=0.8)
clb.set_label("nepits per volume")
clb.set_label("nats")
ax.scatter(*self.params.T[:2], s=s, edgecolor="k", facecolor="none")

ax = self.state_axes[1, 3]
ax.set_title("expected entropy reduction")
ax.clear()
ax.set_title("expected GIBBON")
if gridded:
ref = ax.pcolormesh(*self.dim_mids[:2], P * FE, shading="nearest")
expected_improvement = acquisition.expected_gibbon(
self.evaluator, self.classifier, self.params_trans_fun(self.test_grid)
)
# expected_improvement[~(expected_improvement > 0)] = np.nan
ref = ax.pcolormesh(
*self.dim_mids[:2], expected_improvement, norm=mpl.colors.Normalize(), shading="nearest"
)
else:
ref = ax.scatter(*self.test_params.T[:2], s=s, c=P * FE)
expected_improvement = acquisition.expected_gibbon(
self.evaluator, self.classifier, self.params_trans_fun(self.test_params)
)
# expected_improvement[~(expected_improvement > 0)] = np.nan
ref = ax.scatter(
*self.test_params.T[:2],
s=s,
c=expected_improvement,
norm=mpl.colors.Normalize(),
)
clb = self.state_fig.colorbar(ref, ax=ax, location="bottom", aspect=32, shrink=0.8)
clb.set_label("nepits per volume")
clb.set_label("units")
ax.scatter(*self.params.T[:2], s=s, edgecolor="k", facecolor="none")

for ax in self.state_axes.ravel():
ax.set_xlim(*self.dof_bounds[0])
ax.set_ylim(*self.dof_bounds[1])
ax.set_xlim(*self.bounds[0])
ax.set_ylim(*self.bounds[1])

def show_state_plots(self, save_as=None):
self.state_fig.canvas.draw_idle()
Expand All @@ -580,61 +603,14 @@ def fitness_entropy(self, params):
return np.log(np.sqrt(2 * np.pi * np.e) * self.fitness_sigma(params) + 1e-12)

def validate(self, params):
return self.classifier.classify(self.params_trans_fun(params).reshape(-1, self.n_dof)).reshape(
params.shape[:-1]
)
return self.classifier.p(self.params_trans_fun(params).reshape(-1, self.n_dof)).reshape(params.shape[:-1])

def delay_estimate(self, params):
return self.timer.mean(self.params_trans_fun(params).reshape(-1, self.n_dof)).reshape(params.shape[:-1])

def delay_sigma(self, params):
return self.timer.sigma(self.params_trans_fun(params).reshape(-1, self.n_dof)).reshape(params.shape[:-1])

def _negative_improvement_probability(self, evaluator, classifier, test_X):
"""
Returns the negative expected improvement over the maximum, in GP units.
"""
x = test_X.reshape(-1, self.n_dof)

# using GPRC units here
mu = evaluator.mean(x)
sigma = evaluator.sigma(x)
nu = evaluator.nu
pv = classifier.classify(x)

P = pv * 0.5 * (1 + sp.special.erf((mu - nu) / (np.sqrt(2) * sigma)))

return -P.reshape(test_X.shape[:-1])

def _negative_expected_improvement(self, evaluator, classifier, test_X):
"""
Returns the negative expected improvement over the maximum, in GP units.
"""
x = test_X.reshape(-1, self.n_dof)

# using GPRC units here
mu = evaluator.mean(x)
sigma = evaluator.sigma(x)
nu = evaluator.nu
pv = classifier.classify(x)

A = np.exp(-0.5 * np.square((mu - nu) / sigma)) / (np.sqrt(2 * np.pi) * sigma)
B = 0.5 * (1 + sp.special.erf((mu - nu) / (np.sqrt(2) * sigma)))
E = -pv * (A * sigma**2 + B * (mu - nu))

return E.reshape(test_X.shape[:-1])

def _negative_expected_variance(self, evaluator, classifier, test_X):
"""
Returns the negative expected improvement over the maximum, in GP units.
"""
x = test_X.reshape(-1, self.n_dof)

sigma = evaluator.sigma(x)
pv = classifier.classify(x)

return -(pv * sigma**2).reshape(test_X.shape[:-1])

def _negative_A_optimality(self, params):
"""
The negative trace of the inverse Fisher information matrix contingent on sampling the passed params.
Expand Down
Loading

0 comments on commit df07474

Please sign in to comment.