diff --git a/compiler_opt/es/blackbox_optimizers.py b/compiler_opt/es/blackbox_optimizers.py index 240c9e1d..be695ae0 100644 --- a/compiler_opt/es/blackbox_optimizers.py +++ b/compiler_opt/es/blackbox_optimizers.py @@ -54,6 +54,7 @@ import abc import enum +import gin import math import numpy as np @@ -64,7 +65,13 @@ from compiler_opt.es import gradient_ascent_optimization_algorithms -SequenceOfFloats = Union[Sequence[float], npt.NDArray[np.float32]] +FloatArray = npt.NDArray[np.float32] + +# should specifically be a 2d numpy array of floats +# but numpy.typing does not allow for that indication +FloatArray2D = Sequence[FloatArray] + +SequenceOfFloats = Union[Sequence[float], FloatArray] LinearModel = Union[linear_model.Ridge, linear_model.Lasso, linear_model.LinearRegression] @@ -75,22 +82,33 @@ class CurrentPointEstimate(enum.Enum): AVERAGE = 2 +@gin.constants_from_enum(module='blackbox_optimizers') +class Algorithm(enum.Enum): + MONTE_CARLO = 1 + TRUST_REGION = 2 + SKLEARN_REGRESSION = 3 + + +@gin.constants_from_enum(module='blackbox_optimizers') class EstimatorType(enum.Enum): FORWARD_FD = 1 ANTITHETIC = 2 +@gin.constants_from_enum(module='blackbox_optimizers') class GradientType(enum.Enum): MC = 1 REGRESSION = 2 +@gin.constants_from_enum(module='blackbox_optimizers') class RegressionType(enum.Enum): LASSO = 1 RIDGE = 2 LINEAR = 3 +@gin.constants_from_enum(module='blackbox_optimizers') class UpdateMethod(enum.Enum): STATE_NORMALIZATION = 1 NO_METHOD = 2 @@ -100,10 +118,9 @@ class UpdateMethod(enum.Enum): def filter_top_directions( - perturbations: npt.NDArray[np.float32], - function_values: npt.NDArray[np.float32], est_type: EstimatorType, - num_top_directions: int -) -> Tuple[npt.NDArray[np.float32], npt.NDArray[np.float32]]: + perturbations: FloatArray2D, function_values: FloatArray, + est_type: EstimatorType, + num_top_directions: int) -> Tuple[FloatArray, FloatArray]: """Select the subset of top-performing perturbations. Args: @@ -151,10 +168,8 @@ class BlackboxOptimizer(metaclass=abc.ABCMeta): """ @abc.abstractmethod - def run_step(self, perturbations: npt.NDArray[np.float32], - function_values: npt.NDArray[np.float32], - current_input: npt.NDArray[np.float32], - current_value: float) -> npt.NDArray[np.float32]: + def run_step(self, perturbations: FloatArray2D, function_values: FloatArray, + current_input: FloatArray, current_value: float) -> FloatArray: """Conducts a single step of blackbox optimization procedure. Conducts a single step of blackbox optimization procedure, given values of @@ -332,10 +347,9 @@ def __init__(self, super().__init__(est_type, normalize_fvalues, hyperparameters_update_method, extra_params) - def run_step(self, perturbations: npt.NDArray[np.float32], - function_values: npt.NDArray[np.float32], - current_input: npt.NDArray[np.float32], - current_value: float) -> npt.NDArray[np.float32]: + # TODO: Issue #285 + def run_step(self, perturbations: FloatArray2D, function_values: FloatArray, + current_input: FloatArray, current_value: float) -> FloatArray: dim = len(current_input) if self.normalize_fvalues: values = function_values.tolist() @@ -413,10 +427,8 @@ def __init__(self, super().__init__(est_type, normalize_fvalues, hyperparameters_update_method, extra_params) - def run_step(self, perturbations: npt.NDArray[np.float32], - function_values: npt.NDArray[np.float32], - current_input: npt.NDArray[np.float32], - current_value: float) -> npt.NDArray[np.float32]: + def run_step(self, perturbations: FloatArray2D, function_values: FloatArray, + current_input: FloatArray, current_value: float) -> FloatArray: dim = len(current_input) if self.normalize_fvalues: values = function_values.tolist() @@ -474,8 +486,8 @@ def set_state(self, state: SequenceOfFloats) -> None: def normalize_function_values( - function_values: npt.NDArray[np.float32], - current_value: float) -> Tuple[npt.NDArray[np.float32], List[float]]: + function_values: FloatArray, + current_value: float) -> Tuple[FloatArray, List[float]]: values = function_values.tolist() values.append(current_value) mean = sum(values) / float(len(values)) @@ -484,13 +496,12 @@ def normalize_function_values( return (np.array(normalized_values[:-1]), normalized_values[-1]) -def monte_carlo_gradient( - precision_parameter: float, - est_type: EstimatorType, - perturbations: npt.NDArray[np.float32], - function_values: npt.NDArray[np.float32], - current_value: float, - energy: Optional[float] = 0) -> npt.NDArray[np.float32]: +def monte_carlo_gradient(precision_parameter: float, + est_type: EstimatorType, + perturbations: FloatArray2D, + function_values: FloatArray, + current_value: float, + energy: Optional[float] = 0) -> FloatArray: """Calculates Monte Carlo gradient. There are several ways of estimating the gradient. This is specified by the @@ -530,11 +541,10 @@ def monte_carlo_gradient( return gradient -def sklearn_regression_gradient( - clf: LinearModel, est_type: EstimatorType, - perturbations: npt.NDArray[np.float32], - function_values: npt.NDArray[np.float32], - current_value: float) -> npt.NDArray[np.float32]: +def sklearn_regression_gradient(clf: LinearModel, est_type: EstimatorType, + perturbations: FloatArray2D, + function_values: FloatArray, + current_value: float) -> FloatArray: """Calculates gradient by function difference regression. Args: @@ -603,8 +613,8 @@ class QuadraticModel(object): # pylint: disable=invalid-name # argument Av should be capitalized as such for mathematical convention def __init__(self, - Av: Callable[[npt.NDArray[np.float32]], npt.NDArray[np.float32]], - b: npt.NDArray[np.float32], + Av: Callable[[FloatArray], FloatArray], + b: FloatArray, c: Optional[float] = 0): """Initialize quadratic function. @@ -619,7 +629,7 @@ def __init__(self, self.c = c # pylint: enable=invalid-name - def f(self, x: npt.NDArray[np.float32]) -> float: + def f(self, x: FloatArray) -> float: """Evaluate the quadratic function. Args: @@ -629,7 +639,7 @@ def f(self, x: npt.NDArray[np.float32]) -> float: """ return 0.5 * np.dot(x, self.quad_v(x)) + np.dot(x, self.b) + self.c - def grad(self, x: npt.NDArray[np.float32]) -> npt.NDArray[np.float32]: + def grad(self, x: FloatArray) -> FloatArray: """Evaluate the gradient of the quadratic, Ax + b. Args: @@ -653,9 +663,8 @@ class ProjectedGradientOptimizer(object): """ def __init__(self, function_object: QuadraticModel, - projection_operator: Callable[[npt.NDArray[np.float32]], - npt.NDArray[np.float32]], - pgd_params: Mapping[str, Any], x_init: npt.NDArray[np.float32]): + projection_operator: Callable[[FloatArray], FloatArray], + pgd_params: Mapping[str, Any], x_init: FloatArray): self.f = function_object self.proj = projection_operator if pgd_params is not None: @@ -698,7 +707,7 @@ def run_step(self) -> None: self.x = x_next self.k += 1 - def get_solution(self) -> npt.NDArray[np.float32]: + def get_solution(self) -> FloatArray: return self.x def get_x_diff_norm(self) -> float: @@ -708,9 +717,7 @@ def get_iterations(self) -> int: return self.k -def make_projector( - radius: float -) -> Callable[[npt.NDArray[np.float32]], npt.NDArray[np.float32]]: +def make_projector(radius: float) -> Callable[[FloatArray], FloatArray]: """Makes an L2 projector function centered at origin. Args: @@ -719,7 +726,7 @@ def make_projector( A function of one argument that projects onto L2 ball. """ - def projector(w: npt.NDArray[np.float32]) -> npt.NDArray[np.float32]: + def projector(w: FloatArray) -> FloatArray: w_norm = np.linalg.norm(w, 2) if w_norm > radius: return radius / w_norm * w @@ -748,7 +755,7 @@ class TrustRegionSubproblemOptimizer(object): def __init__(self, model_function: QuadraticModel, trust_region_params: Dict[str, Any], - x_init: Optional[npt.NDArray[np.float32]] = None): + x_init: Optional[FloatArray] = None): self.mf = model_function self.params = trust_region_params self.center = x_init @@ -783,7 +790,7 @@ def solve_trust_region_subproblem(self) -> None: self.x = pgd_solver.get_solution() - def get_solution(self) -> npt.NDArray[np.float32]: + def get_solution(self) -> FloatArray: return self.x @@ -913,7 +920,7 @@ def __init__(self, precision_parameter: float, est_type: EstimatorType, self.clf = linear_model.Lasso(alpha=self.params['grad_reg_alpha']) self.is_returned_step = False - def trust_region_test(self, current_input: npt.NDArray[np.float32], + def trust_region_test(self, current_input: FloatArray, current_value: float) -> bool: """Test the next step to determine how to update the trust region. @@ -981,9 +988,9 @@ def trust_region_test(self, current_input: npt.NDArray[np.float32], print('Unchanged: ' + str(self.radius) + log_message) return True - def update_hessian_part(self, perturbations: npt.NDArray[np.float32], - function_values: npt.NDArray[np.float32], - current_value: float, is_update: bool) -> None: + def update_hessian_part(self, perturbations: FloatArray2D, + function_values: FloatArray, current_value: float, + is_update: bool) -> None: """Updates the internal state which stores Hessian information. Recall that the Hessian is given by @@ -1046,13 +1053,12 @@ def update_hessian_part(self, perturbations: npt.NDArray[np.float32], self.saved_function_values = np.append(self.saved_function_values, function_values) - def create_hessv_function( - self) -> Callable[[npt.NDArray[np.float32]], npt.NDArray[np.float32]]: + def create_hessv_function(self) -> Callable[[FloatArray], FloatArray]: """Returns a function of one argument that evaluates Hessian-vector product. """ if self.params['dense_hessian']: - def hessv_func(x: npt.NDArray[np.float32]) -> npt.NDArray[np.float32]: + def hessv_func(x: FloatArray) -> FloatArray: """Calculates Hessian-vector product from dense Hessian. Args: @@ -1068,7 +1074,7 @@ def hessv_func(x: npt.NDArray[np.float32]) -> npt.NDArray[np.float32]: return hessv else: - def hessv_func(x: npt.NDArray[np.float32]) -> npt.NDArray[np.float32]: + def hessv_func(x: FloatArray) -> FloatArray: """Calculates Hessian-vector product from perturbation/value pairs. Args: @@ -1095,9 +1101,8 @@ def hessv_func(x: npt.NDArray[np.float32]) -> npt.NDArray[np.float32]: return hessv_func - def update_quadratic_model(self, perturbations: npt.NDArray[np.float32], - function_values: npt.NDArray[np.float32], - current_value: float, + def update_quadratic_model(self, perturbations: FloatArray2D, + function_values: FloatArray, current_value: float, is_update: bool) -> QuadraticModel: """Updates the internal state of the optimizer with new perturbations. @@ -1145,10 +1150,8 @@ def update_quadratic_model(self, perturbations: npt.NDArray[np.float32], is_update) return QuadraticModel(self.create_hessv_function(), self.saved_gradient) - def run_step(self, perturbations: npt.NDArray[np.float32], - function_values: npt.NDArray[np.float32], - current_input: npt.NDArray[np.float32], - current_value: float) -> npt.NDArray[np.float32]: + def run_step(self, perturbations: FloatArray2D, function_values: FloatArray, + current_input: FloatArray, current_value: float) -> FloatArray: """Run a single step of trust region optimizer. Args: