diff --git a/bioptim/examples/moving_horizon_estimation/multi_cyclic_nmpc_with_parameters.py b/bioptim/examples/moving_horizon_estimation/multi_cyclic_nmpc_with_parameters.py new file mode 100644 index 000000000..e1aba197c --- /dev/null +++ b/bioptim/examples/moving_horizon_estimation/multi_cyclic_nmpc_with_parameters.py @@ -0,0 +1,242 @@ +""" +In this example, nmpc (Nonlinear model predictive control) is applied on a simple 2-dofs arm model. The goal is to +perform a rotation of the arm in a quasi-cyclic manner. The sliding window across iterations is advanced for a full +cycle at a time while optimizing three cycles at a time (main difference between cyclic and multi-cyclic is that +the latter has more cycle at a time giving the knowledge to the solver that 'something' is coming after) +""" + +import platform + +import numpy as np + +from casadi import MX, SX, vertcat +import biorbd +from bioptim import ( + Axis, + BiorbdModel, + BoundsList, + ConfigureProblem, + ConstraintFcn, + ConstraintList, + Dynamics, + DynamicsEvaluation, + DynamicsFunctions, + InitialGuessList, + InterpolationType, + MultiCyclicCycleSolutions, + MultiCyclicNonlinearModelPredictiveControl, + Node, + NonLinearProgram, + Objective, + ObjectiveFcn, + OptimalControlProgram, + ParameterList, + PhaseDynamics, + Solution, + SolutionMerge, + Solver, + VariableScaling, +) + +class MyCyclicNMPC(MultiCyclicNonlinearModelPredictiveControl): + def advance_window_bounds_states(self, sol, n_cycles_simultaneous=None): + # Reimplementation of the advance_window method so the rotation of the wheel restart at -pi + super(MyCyclicNMPC, self).advance_window_bounds_states(sol) + self.nlp[0].x_bounds["q"].min[0, :] = -2 * np.pi * n_cycles_simultaneous + self.nlp[0].x_bounds["q"].max[0, :] = 0 + return True + + def advance_window_initial_guess_states(self, sol, n_cycles_simultaneous=None): + # Reimplementation of the advance_window method so the rotation of the wheel restart at -pi + super(MyCyclicNMPC, self).advance_window_initial_guess_states(sol) + q = sol.decision_states(to_merge=SolutionMerge.NODES)["q"] + self.nlp[0].x_init["q"].init[0, :] = q[0, :] # Keep the previously found value for the wheel + return True + + +def add_mass_fun(bio_model: biorbd.Model, value: MX): + return + + +def parameter_dependent_dynamic( + time: MX | SX, + states: MX | SX, + controls: MX | SX, + parameters: MX | SX, + algebraic_states: MX | SX, + numerical_timeseries: MX | SX, + nlp: NonLinearProgram, +) -> DynamicsEvaluation: + """ + The custom dynamics function that provides the derivative of the states: dxdt = f(t, x, u, p, a, d) + + Parameters + ---------- + time: MX | SX + The time of the system + states: MX | SX + The state of the system + controls: MX | SX + The controls of the system + parameters: MX | SX + The parameters acting on the system + algebraic_states: MX | SX + The algebraic states variables of the system + numerical_timeseries: MX | SX + The numerical timeseries of the system + nlp: NonLinearProgram + A reference to the phase + + Returns + ------- + The derivative of the states in the tuple[MX | SX] format + """ + + q = DynamicsFunctions.get(nlp.states["q"], states) + qdot = DynamicsFunctions.get(nlp.states["qdot"], states) + tau = DynamicsFunctions.get(nlp.controls["tau"], controls) * (parameters) + + # You can directly call biorbd function (as for ddq) or call bioptim accessor (as for dq) + dq = DynamicsFunctions.compute_qdot(nlp, q, qdot) + ddq = nlp.model.forward_dynamics(q, qdot, tau) + + return DynamicsEvaluation(dxdt=vertcat(dq, ddq), defects=None) + + +def custom_configure(ocp: OptimalControlProgram, nlp: NonLinearProgram, numerical_data_timeseries: dict[str, np.ndarray] = None): + """ + Tell the program which variables are states and controls. + The user is expected to use the ConfigureProblem.configure_xxx functions. + + Parameters + ---------- + ocp: OptimalControlProgram + A reference to the ocp + nlp: NonLinearProgram + A reference to the phase + numerical_data_timeseries: dict[str, np.ndarray] + A list of values to pass to the dynamics at each node. Experimental external forces should be included here. + """ + + ConfigureProblem.configure_q(ocp, nlp, as_states=True, as_controls=False) + ConfigureProblem.configure_qdot(ocp, nlp, as_states=True, as_controls=False) + ConfigureProblem.configure_tau(ocp, nlp, as_states=False, as_controls=True) + + ConfigureProblem.configure_dynamics_function(ocp, nlp, parameter_dependent_dynamic) + + +def prepare_nmpc( + model_path, + cycle_len, + cycle_duration, + n_cycles_simultaneous, + n_cycles_to_advance, + max_torque, + phase_dynamics: PhaseDynamics = PhaseDynamics.SHARED_DURING_THE_PHASE, + expand_dynamics: bool = True, + use_sx: bool = False, +): + model = BiorbdModel(model_path) + + parameter = ParameterList(use_sx=use_sx) + parameter_bounds = BoundsList() + parameter_init = InitialGuessList() + + parameter.add( + name="tau_modifier", + function=add_mass_fun, + size=1, + scaling=VariableScaling("tau_modifier", [1]), + ) + + parameter_init["tau_modifier"] = np.array([2.25]) + + parameter_bounds.add( + "tau_modifier", + min_bound=[1.5], + max_bound=[3.5], + interpolation=InterpolationType.CONSTANT, + ) + + dynamics = Dynamics(custom_configure, expand_dynamics=expand_dynamics, phase_dynamics=phase_dynamics) + + x_bounds = BoundsList() + x_bounds["q"] = model.bounds_from_ranges("q") + x_bounds["q"].min[0, :] = -2 * np.pi * n_cycles_simultaneous # Allow the wheel to spin as much as needed + x_bounds["q"].max[0, :] = 0 + x_bounds["qdot"] = model.bounds_from_ranges("qdot") + + u_bounds = BoundsList() + u_bounds["tau"] = [-max_torque] * model.nb_q, [max_torque] * model.nb_q + + new_objectives = Objective(ObjectiveFcn.Lagrange.MINIMIZE_STATE, key="q") + + # Rotate the wheel and force the marker of the hand to follow the marker on the wheel + wheel_target = np.linspace(-2 * np.pi * n_cycles_simultaneous, 0, cycle_len * n_cycles_simultaneous + 1)[ + np.newaxis, : + ] + constraints = ConstraintList() + constraints.add(ConstraintFcn.TRACK_STATE, key="q", index=0, node=Node.ALL, target=wheel_target) + constraints.add( + ConstraintFcn.SUPERIMPOSE_MARKERS, + node=Node.ALL, + first_marker="wheel", + second_marker="COM_hand", + axes=[Axis.X, Axis.Y], + ) + + return MyCyclicNMPC( + model, + dynamics, + cycle_len=cycle_len, + cycle_duration=cycle_duration, + n_cycles_simultaneous=n_cycles_simultaneous, + n_cycles_to_advance=n_cycles_to_advance, + common_objective_functions=new_objectives, + constraints=constraints, + x_bounds=x_bounds, + u_bounds=u_bounds, + parameters=parameter, + parameter_bounds=parameter_bounds, + parameter_init=parameter_init, + use_sx=use_sx, + ) + + +def main(): + model_path = "models/arm2.bioMod" + torque_max = 50 + + cycle_duration = 1 + cycle_len = 20 + n_cycles_to_advance = 1 + n_cycles_simultaneous = 3 + n_cycles = 4 + + nmpc = prepare_nmpc( + model_path, + cycle_len=cycle_len, + cycle_duration=cycle_duration, + n_cycles_to_advance=n_cycles_to_advance, + n_cycles_simultaneous=n_cycles_simultaneous, + max_torque=torque_max, + ) + + def update_functions(_nmpc: MultiCyclicNonlinearModelPredictiveControl, cycle_idx: int, _sol: Solution): + return cycle_idx < n_cycles # True if there are still some cycle to perform + + # Solve the program + sol = nmpc.solve( + update_functions, + solver=Solver.IPOPT(show_online_optim=platform.system() == "Linux"), + n_cycles_simultaneous=n_cycles_simultaneous, + get_all_iterations=True, + cycle_solutions=MultiCyclicCycleSolutions.ALL_CYCLES, + ) + sol[0].print_cost() + sol[0].graphs() + sol[0].animate(n_frames=100) + + +if __name__ == "__main__": + main() diff --git a/bioptim/examples/torque_driven_ocp/example_pendulum_time_dependent.py b/bioptim/examples/torque_driven_ocp/example_pendulum_time_dependent.py index 1a9d9b93f..0177c0a70 100644 --- a/bioptim/examples/torque_driven_ocp/example_pendulum_time_dependent.py +++ b/bioptim/examples/torque_driven_ocp/example_pendulum_time_dependent.py @@ -11,6 +11,8 @@ import platform +import numpy as np + from casadi import MX, SX, sin, vertcat from bioptim import ( @@ -79,7 +81,7 @@ def time_dependent_dynamic( return DynamicsEvaluation(dxdt=vertcat(dq, ddq), defects=None) -def custom_configure(ocp: OptimalControlProgram, nlp: NonLinearProgram): +def custom_configure(ocp: OptimalControlProgram, nlp: NonLinearProgram, numerical_data_timeseries: dict[str, np.ndarray] = None): """ Tell the program which variables are states and controls. The user is expected to use the ConfigureProblem.configure_xxx functions. @@ -90,6 +92,8 @@ def custom_configure(ocp: OptimalControlProgram, nlp: NonLinearProgram): A reference to the ocp nlp: NonLinearProgram A reference to the phase + numerical_data_timeseries: dict[str, np.ndarray] + A list of values to pass to the dynamics at each node. Experimental external forces should be included here. """ ConfigureProblem.configure_q(ocp, nlp, as_states=True, as_controls=False) diff --git a/bioptim/optimization/receding_horizon_optimization.py b/bioptim/optimization/receding_horizon_optimization.py index bc2dd31de..56bb3b8a1 100644 --- a/bioptim/optimization/receding_horizon_optimization.py +++ b/bioptim/optimization/receding_horizon_optimization.py @@ -710,6 +710,16 @@ def solve( update_function=update_function, **extra_options ) + if self.parameters.shape != 0 and get_all_iterations: + final_solution_parameters_dict = [{key: None} for key in solution[0].parameters.keys()][0] + for key in solution[0].parameters.keys(): + key_val = [] + for sol in solution[1]: + key_val.append(sol.parameters[key]) + final_solution_parameters_dict[key] = key_val + + solution[0].cycle_parameters = final_solution_parameters_dict + final_solution = [solution[0]] if get_all_iterations: @@ -718,22 +728,22 @@ def solve( cycle_solutions_output = [] if cycle_solutions in (MultiCyclicCycleSolutions.FIRST_CYCLES, MultiCyclicCycleSolutions.ALL_CYCLES): for sol in solution[1]: - _states, _controls = self.export_cycles(sol) + _states, _controls, _parameters = self.export_cycles(sol) dt = float(sol.t_span()[0][-1]) - cycle_solutions_output.append(self._initialize_one_cycle(dt, _states, _controls)) + cycle_solutions_output.append(self._initialize_one_cycle(dt, _states, _controls, _parameters)) if cycle_solutions == MultiCyclicCycleSolutions.ALL_CYCLES: for cycle_number in range(1, self.n_cycles): - _states, _controls = self.export_cycles(solution[1][-1], cycle_number=cycle_number) + _states, _controls, _parameters = self.export_cycles(solution[1][-1], cycle_number=cycle_number) dt = float(sol.t_span()[0][-1]) - cycle_solutions_output.append(self._initialize_one_cycle(dt, _states, _controls)) + cycle_solutions_output.append(self._initialize_one_cycle(dt, _states, _controls, _parameters)) if cycle_solutions in (MultiCyclicCycleSolutions.FIRST_CYCLES, MultiCyclicCycleSolutions.ALL_CYCLES): final_solution.append(cycle_solutions_output) return tuple(final_solution) if len(final_solution) > 1 else final_solution[0] - def export_cycles(self, sol: Solution, cycle_number: int = 0) -> tuple[dict, dict]: + def export_cycles(self, sol: Solution, cycle_number: int = 0) -> tuple[dict, dict, dict]: """Exports the solution of the desired cycle from the full window solution""" decision_states = sol.decision_states(to_merge=SolutionMerge.NODES) @@ -741,6 +751,7 @@ def export_cycles(self, sol: Solution, cycle_number: int = 0) -> tuple[dict, dic states = {} controls = {} + parameters = {} window_slice = slice(cycle_number * self.cycle_len, (cycle_number + 1) * self.cycle_len + 1) for key in self.nlp[0].states.keys(): states[key] = decision_states[key][:, window_slice] @@ -750,7 +761,10 @@ def export_cycles(self, sol: Solution, cycle_number: int = 0) -> tuple[dict, dic for key in self.nlp[0].controls.keys(): controls[key] = decision_controls[key][:, window_slice] - return states, controls + for key in self.nlp[0].parameters.keys(): + parameters[key] = sol.parameters[key][0] + + return states, controls, parameters def _initialize_solution(self, dt: float, states: list, controls: list, parameters: list): x_init = InitialGuessList() @@ -799,7 +813,7 @@ def _initialize_solution(self, dt: float, states: list, controls: list, paramete a_init = InitialGuessList() return Solution.from_initial_guess(solution_ocp, [np.array([dt]), x_init, u_init, p_init, a_init]) - def _initialize_one_cycle(self, dt: float, states: np.ndarray, controls: np.ndarray): + def _initialize_one_cycle(self, dt: float, states: np.ndarray, controls: np.ndarray, parameters: np.ndarray): """return a solution for a single window kept of the MHE""" x_init = InitialGuessList() for key in self.nlp[0].states.keys(): @@ -823,14 +837,17 @@ def _initialize_one_cycle(self, dt: float, states: np.ndarray, controls: np.ndar model_class = model_serialized[0] model_initializer = model_serialized[1] - parameters = ParameterList(use_sx=self.cx == SX) + param_list = ParameterList(use_sx=self.cx == SX) + p_init = InitialGuessList() for key in self.nlp[0].parameters.keys(): - parameters.add( + parameters_tp = parameters[key] + param_list.add( name=key, function=self.nlp[0].parameters[key].function, size=self.nlp[0].parameters[key].shape, scaling=self.nlp[0].parameters[key].scaling, ) + p_init.add(key, parameters_tp, interpolation=InterpolationType.EACH_FRAME, phase=0,) solution_ocp = OptimalControlProgram( bio_model=model_class(**model_initializer), @@ -842,12 +859,11 @@ def _initialize_one_cycle(self, dt: float, states: np.ndarray, controls: np.ndar x_init=x_init, u_init=u_init, use_sx=self.cx == SX, - parameters=parameters, - parameter_init=self.parameter_init, + parameters=param_list, + parameter_init=p_init, parameter_bounds=self.parameter_bounds, ) a_init = InitialGuessList() - p_init = InitialGuessList() return Solution.from_initial_guess(solution_ocp, [np.array([dt]), x_init, u_init_for_solution, p_init, a_init]) diff --git a/tests/shard2/test_global_nmpc_final.py b/tests/shard2/test_global_nmpc_final.py index a455f83dd..8b6471d50 100644 --- a/tests/shard2/test_global_nmpc_final.py +++ b/tests/shard2/test_global_nmpc_final.py @@ -138,3 +138,91 @@ def update_functions(_nmpc, cycle_idx, _sol): npt.assert_almost_equal(sol[2][0].cost.toarray().squeeze(), 0.0002) npt.assert_almost_equal(sol[2][1].cost.toarray().squeeze(), 0.0002) npt.assert_almost_equal(sol[2][2].cost.toarray().squeeze(), 0.0002) + + +@pytest.mark.parametrize("phase_dynamics", [PhaseDynamics.SHARED_DURING_THE_PHASE, PhaseDynamics.ONE_PER_NODE]) +def test_multi_cyclic_nmpc_with_parameters(phase_dynamics): + def update_functions(_nmpc, cycle_idx, _sol): + return cycle_idx < n_cycles_total # True if there are still some cycle to perform + + from bioptim.examples.moving_horizon_estimation import multi_cyclic_nmpc_with_parameters as ocp_module + + bioptim_folder = os.path.dirname(ocp_module.__file__) + + n_cycles_simultaneous = 2 + n_cycles_to_advance = 1 + n_cycles_total = 3 + cycle_len = 20 + nmpc = ocp_module.prepare_nmpc( + model_path=bioptim_folder + "/models/arm2.bioMod", + cycle_len=cycle_len, + cycle_duration=1, + n_cycles_simultaneous=n_cycles_simultaneous, + n_cycles_to_advance=n_cycles_to_advance, + max_torque=50, + phase_dynamics=phase_dynamics, + ) + sol = nmpc.solve( + update_functions, + solver=Solver.IPOPT(), + n_cycles_simultaneous=n_cycles_simultaneous, + get_all_iterations=True, + cycle_solutions=MultiCyclicCycleSolutions.FIRST_CYCLES, + ) + + # Check some of the results + states = sol[0].decision_states(to_merge=SolutionMerge.NODES) + controls = sol[0].decision_controls(to_merge=SolutionMerge.NODES) + q, qdot, tau = states["q"], states["qdot"], controls["tau"] + parameters = sol[0].cycle_parameters + + # initial and final position + npt.assert_equal(q.shape, (3, n_cycles_total * cycle_len + 1)) + npt.assert_almost_equal(q[:, 0], np.array((-12.56637061, 1.04359174, 1.03625065))) + npt.assert_almost_equal(q[:, -1], np.array([1.37753244e-40, 1.04359174e00, 1.03625065e00])) + + # initial and final velocities + npt.assert_almost_equal(qdot[:, 0], np.array([6.28157623, 2.53488346, 0.08497867])) + npt.assert_almost_equal(qdot[:, -1], np.array([6.28157623, 2.42858387, -0.53856653]), decimal=5) + + # initial and final controls + npt.assert_almost_equal(tau[:, 0], np.array([0.0251475, 1.88932432, 0.8950255])) + npt.assert_almost_equal(tau[:, -1], np.array([-0.02512069, 2.17400305, 1.00178721]), decimal=4) + + # initial and final parameters + for key in parameters.keys(): + npt.assert_almost_equal(parameters[key], [np.array([2.5594204]), np.array([2.55932934]), np.array([2.56215198])]) + + # check time + n_steps = nmpc.nlp[0].ode_solver.n_integration_steps + time = sol[0].stepwise_time(to_merge=SolutionMerge.NODES) + assert time.shape == (n_cycles_total * cycle_len * (n_steps + 1) + 1, 1) + assert time[0] == 0 + npt.assert_almost_equal(time[-1], 3.0) + + # check some results of the second structure + for s in sol[1]: + states = s.stepwise_states(to_merge=SolutionMerge.NODES) + q = states["q"] + + # initial and final position + npt.assert_equal(q.shape, (3, 241)) + + # check time + time = s.stepwise_time(to_merge=SolutionMerge.NODES) + assert time.shape == (241, 1) + assert time[0] == 0 + npt.assert_almost_equal(time[-1], 2.0, decimal=4) + + for s in sol[2]: + states = s.stepwise_states(to_merge=SolutionMerge.NODES) + q = states["q"] + + # initial and final position + npt.assert_equal(q.shape, (3, 121)) + + # check time + time = s.stepwise_time(to_merge=SolutionMerge.NODES) + assert time.shape == (121, 1) + assert time[0] == 0 + npt.assert_almost_equal(time[-1], 1.0, decimal=4)