Skip to content

Commit

Permalink
feat(nmpc): example and tests for parameters in nmpc
Browse files Browse the repository at this point in the history
  • Loading branch information
Kevin CO committed Oct 8, 2024
1 parent 426f76b commit 0ae4997
Show file tree
Hide file tree
Showing 4 changed files with 363 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
"""
In this example, nmpc (Nonlinear model predictive control) is applied on a simple 2-dofs arm model. The goal is to
perform a rotation of the arm in a quasi-cyclic manner. The sliding window across iterations is advanced for a full
cycle at a time while optimizing three cycles at a time (main difference between cyclic and multi-cyclic is that
the latter has more cycle at a time giving the knowledge to the solver that 'something' is coming after)
"""

import platform

import numpy as np

from casadi import MX, SX, vertcat
import biorbd
from bioptim import (
Axis,
BiorbdModel,
BoundsList,
ConfigureProblem,
ConstraintFcn,
ConstraintList,
Dynamics,
DynamicsEvaluation,
DynamicsFunctions,
InitialGuessList,
InterpolationType,
MultiCyclicCycleSolutions,
MultiCyclicNonlinearModelPredictiveControl,
Node,
NonLinearProgram,
Objective,
ObjectiveFcn,
OptimalControlProgram,
ParameterList,
PhaseDynamics,
Solution,
SolutionMerge,
Solver,
VariableScaling,
)

class MyCyclicNMPC(MultiCyclicNonlinearModelPredictiveControl):
def advance_window_bounds_states(self, sol, n_cycles_simultaneous=None):
# Reimplementation of the advance_window method so the rotation of the wheel restart at -pi
super(MyCyclicNMPC, self).advance_window_bounds_states(sol)
self.nlp[0].x_bounds["q"].min[0, :] = -2 * np.pi * n_cycles_simultaneous
self.nlp[0].x_bounds["q"].max[0, :] = 0
return True

def advance_window_initial_guess_states(self, sol, n_cycles_simultaneous=None):
# Reimplementation of the advance_window method so the rotation of the wheel restart at -pi
super(MyCyclicNMPC, self).advance_window_initial_guess_states(sol)
q = sol.decision_states(to_merge=SolutionMerge.NODES)["q"]
self.nlp[0].x_init["q"].init[0, :] = q[0, :] # Keep the previously found value for the wheel
return True


def add_mass_fun(bio_model: biorbd.Model, value: MX):
return


def parameter_dependent_dynamic(
time: MX | SX,
states: MX | SX,
controls: MX | SX,
parameters: MX | SX,
algebraic_states: MX | SX,
numerical_timeseries: MX | SX,
nlp: NonLinearProgram,
) -> DynamicsEvaluation:
"""
The custom dynamics function that provides the derivative of the states: dxdt = f(t, x, u, p, a, d)
Parameters
----------
time: MX | SX
The time of the system
states: MX | SX
The state of the system
controls: MX | SX
The controls of the system
parameters: MX | SX
The parameters acting on the system
algebraic_states: MX | SX
The algebraic states variables of the system
numerical_timeseries: MX | SX
The numerical timeseries of the system
nlp: NonLinearProgram
A reference to the phase
Returns
-------
The derivative of the states in the tuple[MX | SX] format
"""

q = DynamicsFunctions.get(nlp.states["q"], states)
qdot = DynamicsFunctions.get(nlp.states["qdot"], states)
tau = DynamicsFunctions.get(nlp.controls["tau"], controls) * (parameters)

# You can directly call biorbd function (as for ddq) or call bioptim accessor (as for dq)
dq = DynamicsFunctions.compute_qdot(nlp, q, qdot)
ddq = nlp.model.forward_dynamics(q, qdot, tau)

return DynamicsEvaluation(dxdt=vertcat(dq, ddq), defects=None)


def custom_configure(ocp: OptimalControlProgram, nlp: NonLinearProgram, numerical_data_timeseries: dict[str, np.ndarray] = None):
"""
Tell the program which variables are states and controls.
The user is expected to use the ConfigureProblem.configure_xxx functions.
Parameters
----------
ocp: OptimalControlProgram
A reference to the ocp
nlp: NonLinearProgram
A reference to the phase
numerical_data_timeseries: dict[str, np.ndarray]
A list of values to pass to the dynamics at each node. Experimental external forces should be included here.
"""

ConfigureProblem.configure_q(ocp, nlp, as_states=True, as_controls=False)
ConfigureProblem.configure_qdot(ocp, nlp, as_states=True, as_controls=False)
ConfigureProblem.configure_tau(ocp, nlp, as_states=False, as_controls=True)

ConfigureProblem.configure_dynamics_function(ocp, nlp, parameter_dependent_dynamic)


def prepare_nmpc(
model_path,
cycle_len,
cycle_duration,
n_cycles_simultaneous,
n_cycles_to_advance,
max_torque,
phase_dynamics: PhaseDynamics = PhaseDynamics.SHARED_DURING_THE_PHASE,
expand_dynamics: bool = True,
use_sx: bool = False,
):
model = BiorbdModel(model_path)

parameter = ParameterList(use_sx=use_sx)
parameter_bounds = BoundsList()
parameter_init = InitialGuessList()

parameter.add(
name="tau_modifier",
function=add_mass_fun,
size=1,
scaling=VariableScaling("tau_modifier", [1]),
)

parameter_init["tau_modifier"] = np.array([2.25])

parameter_bounds.add(
"tau_modifier",
min_bound=[1.5],
max_bound=[3.5],
interpolation=InterpolationType.CONSTANT,
)

dynamics = Dynamics(custom_configure, expand_dynamics=expand_dynamics, phase_dynamics=phase_dynamics)

x_bounds = BoundsList()
x_bounds["q"] = model.bounds_from_ranges("q")
x_bounds["q"].min[0, :] = -2 * np.pi * n_cycles_simultaneous # Allow the wheel to spin as much as needed
x_bounds["q"].max[0, :] = 0
x_bounds["qdot"] = model.bounds_from_ranges("qdot")

u_bounds = BoundsList()
u_bounds["tau"] = [-max_torque] * model.nb_q, [max_torque] * model.nb_q

new_objectives = Objective(ObjectiveFcn.Lagrange.MINIMIZE_STATE, key="q")

# Rotate the wheel and force the marker of the hand to follow the marker on the wheel
wheel_target = np.linspace(-2 * np.pi * n_cycles_simultaneous, 0, cycle_len * n_cycles_simultaneous + 1)[
np.newaxis, :
]
constraints = ConstraintList()
constraints.add(ConstraintFcn.TRACK_STATE, key="q", index=0, node=Node.ALL, target=wheel_target)
constraints.add(
ConstraintFcn.SUPERIMPOSE_MARKERS,
node=Node.ALL,
first_marker="wheel",
second_marker="COM_hand",
axes=[Axis.X, Axis.Y],
)

return MyCyclicNMPC(
model,
dynamics,
cycle_len=cycle_len,
cycle_duration=cycle_duration,
n_cycles_simultaneous=n_cycles_simultaneous,
n_cycles_to_advance=n_cycles_to_advance,
common_objective_functions=new_objectives,
constraints=constraints,
x_bounds=x_bounds,
u_bounds=u_bounds,
parameters=parameter,
parameter_bounds=parameter_bounds,
parameter_init=parameter_init,
use_sx=use_sx,
)


def main():
model_path = "models/arm2.bioMod"
torque_max = 50

cycle_duration = 1
cycle_len = 20
n_cycles_to_advance = 1
n_cycles_simultaneous = 3
n_cycles = 4

nmpc = prepare_nmpc(
model_path,
cycle_len=cycle_len,
cycle_duration=cycle_duration,
n_cycles_to_advance=n_cycles_to_advance,
n_cycles_simultaneous=n_cycles_simultaneous,
max_torque=torque_max,
)

def update_functions(_nmpc: MultiCyclicNonlinearModelPredictiveControl, cycle_idx: int, _sol: Solution):
return cycle_idx < n_cycles # True if there are still some cycle to perform

# Solve the program
sol = nmpc.solve(
update_functions,
solver=Solver.IPOPT(show_online_optim=platform.system() == "Linux"),
n_cycles_simultaneous=n_cycles_simultaneous,
get_all_iterations=True,
cycle_solutions=MultiCyclicCycleSolutions.ALL_CYCLES,
)
sol[0].print_cost()
sol[0].graphs()
sol[0].animate(n_frames=100)


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

import platform

import numpy as np

from casadi import MX, SX, sin, vertcat

from bioptim import (
Expand Down Expand Up @@ -79,7 +81,7 @@ def time_dependent_dynamic(
return DynamicsEvaluation(dxdt=vertcat(dq, ddq), defects=None)


def custom_configure(ocp: OptimalControlProgram, nlp: NonLinearProgram):
def custom_configure(ocp: OptimalControlProgram, nlp: NonLinearProgram, numerical_data_timeseries: dict[str, np.ndarray] = None):
"""
Tell the program which variables are states and controls.
The user is expected to use the ConfigureProblem.configure_xxx functions.
Expand All @@ -90,6 +92,8 @@ def custom_configure(ocp: OptimalControlProgram, nlp: NonLinearProgram):
A reference to the ocp
nlp: NonLinearProgram
A reference to the phase
numerical_data_timeseries: dict[str, np.ndarray]
A list of values to pass to the dynamics at each node. Experimental external forces should be included here.
"""

ConfigureProblem.configure_q(ocp, nlp, as_states=True, as_controls=False)
Expand Down
40 changes: 28 additions & 12 deletions bioptim/optimization/receding_horizon_optimization.py
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,16 @@ def solve(
update_function=update_function, **extra_options
)

if self.parameters.shape != 0 and get_all_iterations:
final_solution_parameters_dict = [{key: None} for key in solution[0].parameters.keys()][0]
for key in solution[0].parameters.keys():
key_val = []
for sol in solution[1]:
key_val.append(sol.parameters[key])
final_solution_parameters_dict[key] = key_val

solution[0].cycle_parameters = final_solution_parameters_dict

final_solution = [solution[0]]

if get_all_iterations:
Expand All @@ -718,29 +728,30 @@ def solve(
cycle_solutions_output = []
if cycle_solutions in (MultiCyclicCycleSolutions.FIRST_CYCLES, MultiCyclicCycleSolutions.ALL_CYCLES):
for sol in solution[1]:
_states, _controls = self.export_cycles(sol)
_states, _controls, _parameters = self.export_cycles(sol)
dt = float(sol.t_span()[0][-1])
cycle_solutions_output.append(self._initialize_one_cycle(dt, _states, _controls))
cycle_solutions_output.append(self._initialize_one_cycle(dt, _states, _controls, _parameters))

if cycle_solutions == MultiCyclicCycleSolutions.ALL_CYCLES:
for cycle_number in range(1, self.n_cycles):
_states, _controls = self.export_cycles(solution[1][-1], cycle_number=cycle_number)
_states, _controls, _parameters = self.export_cycles(solution[1][-1], cycle_number=cycle_number)
dt = float(sol.t_span()[0][-1])
cycle_solutions_output.append(self._initialize_one_cycle(dt, _states, _controls))
cycle_solutions_output.append(self._initialize_one_cycle(dt, _states, _controls, _parameters))

if cycle_solutions in (MultiCyclicCycleSolutions.FIRST_CYCLES, MultiCyclicCycleSolutions.ALL_CYCLES):
final_solution.append(cycle_solutions_output)

return tuple(final_solution) if len(final_solution) > 1 else final_solution[0]

def export_cycles(self, sol: Solution, cycle_number: int = 0) -> tuple[dict, dict]:
def export_cycles(self, sol: Solution, cycle_number: int = 0) -> tuple[dict, dict, dict]:
"""Exports the solution of the desired cycle from the full window solution"""

decision_states = sol.decision_states(to_merge=SolutionMerge.NODES)
decision_controls = sol.decision_controls(to_merge=SolutionMerge.NODES)

states = {}
controls = {}
parameters = {}
window_slice = slice(cycle_number * self.cycle_len, (cycle_number + 1) * self.cycle_len + 1)
for key in self.nlp[0].states.keys():
states[key] = decision_states[key][:, window_slice]
Expand All @@ -750,7 +761,10 @@ def export_cycles(self, sol: Solution, cycle_number: int = 0) -> tuple[dict, dic
for key in self.nlp[0].controls.keys():
controls[key] = decision_controls[key][:, window_slice]

return states, controls
for key in self.nlp[0].parameters.keys():
parameters[key] = sol.parameters[key][0]

return states, controls, parameters

def _initialize_solution(self, dt: float, states: list, controls: list, parameters: list):
x_init = InitialGuessList()
Expand Down Expand Up @@ -799,7 +813,7 @@ def _initialize_solution(self, dt: float, states: list, controls: list, paramete
a_init = InitialGuessList()
return Solution.from_initial_guess(solution_ocp, [np.array([dt]), x_init, u_init, p_init, a_init])

def _initialize_one_cycle(self, dt: float, states: np.ndarray, controls: np.ndarray):
def _initialize_one_cycle(self, dt: float, states: np.ndarray, controls: np.ndarray, parameters: np.ndarray):
"""return a solution for a single window kept of the MHE"""
x_init = InitialGuessList()
for key in self.nlp[0].states.keys():
Expand All @@ -823,14 +837,17 @@ def _initialize_one_cycle(self, dt: float, states: np.ndarray, controls: np.ndar
model_class = model_serialized[0]
model_initializer = model_serialized[1]

parameters = ParameterList(use_sx=self.cx == SX)
param_list = ParameterList(use_sx=self.cx == SX)
p_init = InitialGuessList()
for key in self.nlp[0].parameters.keys():
parameters.add(
parameters_tp = parameters[key]
param_list.add(
name=key,
function=self.nlp[0].parameters[key].function,
size=self.nlp[0].parameters[key].shape,
scaling=self.nlp[0].parameters[key].scaling,
)
p_init.add(key, parameters_tp, interpolation=InterpolationType.EACH_FRAME, phase=0,)

solution_ocp = OptimalControlProgram(
bio_model=model_class(**model_initializer),
Expand All @@ -842,12 +859,11 @@ def _initialize_one_cycle(self, dt: float, states: np.ndarray, controls: np.ndar
x_init=x_init,
u_init=u_init,
use_sx=self.cx == SX,
parameters=parameters,
parameter_init=self.parameter_init,
parameters=param_list,
parameter_init=p_init,
parameter_bounds=self.parameter_bounds,
)
a_init = InitialGuessList()
p_init = InitialGuessList()
return Solution.from_initial_guess(solution_ocp, [np.array([dt]), x_init, u_init_for_solution, p_init, a_init])


Expand Down
Loading

0 comments on commit 0ae4997

Please sign in to comment.