diff --git a/.gitignore b/.gitignore index b300af1..eb84765 100644 --- a/.gitignore +++ b/.gitignore @@ -16,5 +16,5 @@ __pycache__/ casadi_codegen.obj invariants_py/data/sine_wave_invariants.csv venv/ -debug_fatrop_* +debug_fatrop* nlp_hess_l.casadi \ No newline at end of file diff --git a/examples/calculate_screw_invariants_pose.py b/examples/calculate_screw_invariants_pose.py index fc13611..205e80b 100644 --- a/examples/calculate_screw_invariants_pose.py +++ b/examples/calculate_screw_invariants_pose.py @@ -4,7 +4,7 @@ from invariants_py import data_handler as dh import numpy as np from invariants_py.reparameterization import interpT -from invariants_py.calculate_invariants.opti_calculate_screw_invariants_pose import OCP_calc_pose +from invariants_py.calculate_invariants.opti_calculate_screw_invariants_pose_fatrop import OCP_calc_pose import matplotlib.pyplot as plt from scipy.spatial.transform import Rotation as R @@ -147,33 +147,28 @@ def main(): # Compute the number of new samples N = int(1 + np.floor(timestamps[-1] / dt)) + print(f'Number of samples: {N}') # Generate new equidistant time vector time_new = np.linspace(0, timestamps[-1], N) # Interpolate pose matrices to new time vector - T = interpT(timestamps, T, time_new) + T = interpT(time_new, timestamps, T) # Plot the input trajectory plot_trajectory_kettle(T, 'Input Trajectory') # Initialize OCP object and calculate pose - OCP = OCP_calc_pose(T, rms_error_traj=5 * 10**-2) + OCP = OCP_calc_pose(N, rms_error_traj_pos = 1e-3, rms_error_traj_rot= 1e-2, solver='fatrop') # Calculate screw invariants and other outputs - U, T_sol_, T_isa_ = OCP.calculate_invariants(T, dt) + U, T_sol, T_isa = OCP.calculate_invariants(T, dt) - # Initialize an array for the solution trajectory - T_sol = np.zeros((N, 4, 4)) - for k in range(N): - T_sol[k, :3, :] = T_sol_[k] - T_sol[k, 3, 3] = 1 - # Plot the reconstructed trajectory plot_trajectory_kettle(T_sol, 'Reconstructed Trajectory') # Plot the screw invariants - plot_screw_invariants(time_new[:-1], U.T) + plot_screw_invariants(time_new, U) # Display the plots if not running in a non-interactive backend if plt.get_backend() != 'agg': diff --git a/invariants_py/calculate_invariants/opti_calculate_screw_invariants_pose.py b/invariants_py/calculate_invariants/opti_calculate_screw_invariants_pose.py index 37e3be6..eac29f5 100644 --- a/invariants_py/calculate_invariants/opti_calculate_screw_invariants_pose.py +++ b/invariants_py/calculate_invariants/opti_calculate_screw_invariants_pose.py @@ -2,13 +2,21 @@ import casadi as cas from invariants_py.dynamics_screw_invariants import define_integrator_invariants_pose from invariants_py import ocp_helper +import matplotlib.pyplot as plt +from mpl_toolkits.mplot3d import Axes3D + +def form_homogeneous_matrix(T): + T_hom = np.eye(4) + T_hom[:3, :] = T + return T_hom class OCP_calc_pose: - def __init__(self, T_input, bool_unsigned_invariants = False, rms_error_traj = 10**-2): - - N = T_input.shape[0] - + def __init__(self, N, + rms_error_traj_pos = 10**-3, + rms_error_traj_rot = 10**-2, + bool_unsigned_invariants = False): + opti = cas.Opti() # use OptiStack package from Casadi for easy bookkeeping of variables (no cumbersome indexing) # Define system states X (unknown object pose + moving frame pose at every time step) @@ -28,13 +36,13 @@ def __init__(self, T_input, bool_unsigned_invariants = False, rms_error_traj = 1 opti.subject_to(ocp_helper.tril_vec(T_obj[0][0:3,0:3].T @ T_obj[0][0:3,0:3] - np.eye(3)) == 0) # Dynamics constraints (Multiple shooting) - integrator = define_integrator_invariants_pose(h) + geometric_integrator = define_integrator_invariants_pose(h) for k in range(N-1): # Integrate current state to obtain next state (next rotation and position) - Xk_end = integrator(X[k],U[:,k],h) + Xk_end = geometric_integrator(X[k],U[:,k],h) # Continuity constraint (closing the gap) - opti.subject_to(Xk_end==X[k+1]) + opti.subject_to(X[k+1] == Xk_end) # Lower bounds on controls if bool_unsigned_invariants: @@ -46,11 +54,13 @@ def __init__(self, T_input, bool_unsigned_invariants = False, rms_error_traj = 1 trajectory_error_rot = 0 for k in range(N): err_pos = T_obj[k][0:3,3] - T_obj_m[k][0:3,3] # position error - err_rot = T_obj_m[k][0:3,0:3].T @ T_obj[k][0:3,0:3] - np.eye(3) # orientation error + err_rot = (T_obj[k][0:3,0:3].T @ T_obj_m[k][0:3,0:3]) - np.eye(3) # orientation error trajectory_error_pos = trajectory_error_pos + cas.dot(err_pos,err_pos) trajectory_error_rot = trajectory_error_rot + cas.dot(err_rot,err_rot) - opti.subject_to(trajectory_error_pos/N/rms_error_traj**2 < 1) - opti.subject_to(trajectory_error_rot/N/rms_error_traj**2 < 1) + opti.subject_to(trajectory_error_pos < N*rms_error_traj_pos**2) + opti.subject_to(trajectory_error_rot < N*rms_error_traj_rot**2) + + #objective_fit = trajectory_error_pos + trajectory_error_rot # Minimize moving frame invariants to deal with singularities and noise objective_reg = 0 @@ -59,9 +69,11 @@ def __init__(self, T_input, bool_unsigned_invariants = False, rms_error_traj = 1 objective_reg = objective_reg + cas.dot(err_abs,err_abs) # cost term objective = objective_reg/(N-1) # normalize with window length + #objective = 1e-8*objective + objective_fit + # Solver opti.minimize(objective) - opti.solver('ipopt',{"print_time":True,"expand":True},{'gamma_theta':1e-12,'max_iter':200,'tol':1e-4,'print_level':5,'ma57_automatic_scaling':'no','linear_solver':'mumps','print_info_string':'yes'}) + opti.solver('ipopt',{"print_time":True,"expand":True},{'max_iter':300,'tol':1e-6,'print_level':5,'ma57_automatic_scaling':'no','linear_solver':'mumps','print_info_string':'yes'}) # 'gamma_theta':1e-12 # Store variables self.opti = opti @@ -80,7 +92,6 @@ def calculate_invariants(self, T_obj_m, h): T_obj_init = T_obj_m T_isa_init0 = np.vstack([[0, 0, 1, 0], [0, 1, 0, 0], [-1, 0, 0, 0], [0, 0, 0, 1]]).T T_isa_init = np.tile(T_isa_init0, (self.N, 1, 1)) # initial guess for moving frame poses - # Set initial guess for i in range(self.N): @@ -90,29 +101,85 @@ def calculate_invariants(self, T_obj_m, h): self.opti.set_value(self.h, h) for i in range(self.N-1): - self.opti.set_initial(self.U[:,i], np.zeros(6)+0.001*np.random.randn(6)) + self.opti.set_initial(self.U[:,i], np.zeros(6)+0.1) # Solve sol = self.opti.solve() # Return solution - T_isa = [sol.value(self.T_isa[k]) for k in range(self.N)] - T_obj = [sol.value(self.T_obj[k]) for k in range(self.N)] - U = sol.value(self.U) + T_isa = np.array([form_homogeneous_matrix(sol.value(i)) for i in self.T_isa]) + T_obj = np.array([form_homogeneous_matrix(sol.value(i)) for i in self.T_obj]) + U = np.array(sol.value(self.U)) + U = np.append(U, [U[-1, :]], axis=0) return U, T_obj, T_isa - - + if __name__ == "__main__": - N=100 - T_obj_m = np.tile(np.eye(4), (100, 1, 1)) # example: measured object poses + from invariants_py.reparameterization import interpT + import invariants_py.kinematics.orientation_kinematics as SE3 + + # Test data + N = 100 + T_start = np.eye(4) # Rotation matrix 1 + T_mid = np.eye(4) + T_mid[:3, :3] = SE3.rotate_z(np.pi) # Rotation matrix 3 + T_end = np.eye(4) + T_end[:3, :3] = SE3.RPY(np.pi/2, 0, np.pi/2) # Rotation matrix 2 + + # Interpolate between R_start and R_end + T_obj_m = interpT(np.linspace(0,1,N), np.array([0,0.5,1]), np.stack([T_start, T_mid, T_end],0)) + + print(T_obj_m) + # check orthonormality of rotation matrix part of T_obj_m + # for i in range(N): + # print(np.linalg.norm(T_obj_m[i,0:3,0:3].T @ T_obj_m[i,0:3,0:3] - np.eye(3))) + - OCP = OCP_calc_pose(T_obj_m, rms_error_traj=10**-3) + OCP = OCP_calc_pose(N, rms_error_traj_pos = 10e-3, rms_error_traj_rot = 10e-3, bool_unsigned_invariants=True) # Example: calculate invariants for a given trajectory h = 0.01 # step size for integration of dynamic equations - U, T_obj, T_isa = OCP.calculate_invariants(T_obj_m, h) print("Invariants U: ", U) + print("T_obj: ", T_obj) + print("T_isa: ", T_isa) + + # Assuming T_isa is already defined and is an Nx4x4 matrix + N = T_isa.shape[0] + + # Extract points and directions + points = T_isa[:, :3, 3] # First three elements of the fourth column + directions = T_isa[:, :3, 0] # First three elements of the first column + + # Create a 3D plot + fig = plt.figure() + ax = fig.add_subplot(111, projection='3d') + + # Plot the points + ax.scatter(points[:, 0], points[:, 1], points[:, 2], color='r', label='Points on the line') + + # Plot the directions as lines + for i in range(N): + start_point = points[i] - directions[i] * 0.1 + end_point = points[i] + directions[i] * 0.1 # Scale the direction for better visualization + ax.plot([start_point[0], end_point[0]], [start_point[1], end_point[1]], [start_point[2], end_point[2]], color='b') + + # Set axis limits + ax.set_xlim([-0.1, +0.1]) + ax.set_ylim([-0.1, +0.1]) + ax.set_zlim([-0.1, +0.1]) + + # Set labels + ax.set_xlabel('X') + ax.set_ylabel('Y') + ax.set_zlabel('Z') + ax.set_title('Plotting the instantaneous screw axis') + + # Add legend + ax.legend() + + # Show plot + plt.show() + diff --git a/invariants_py/calculate_invariants/opti_calculate_screw_invariants_pose_fatrop.py b/invariants_py/calculate_invariants/opti_calculate_screw_invariants_pose_fatrop.py new file mode 100644 index 0000000..9cfc45a --- /dev/null +++ b/invariants_py/calculate_invariants/opti_calculate_screw_invariants_pose_fatrop.py @@ -0,0 +1,199 @@ +import numpy as np +import casadi as cas +from invariants_py.dynamics_screw_invariants import define_integrator_invariants_pose +from invariants_py import ocp_helper +import matplotlib.pyplot as plt +from mpl_toolkits.mplot3d import Axes3D + +def form_homogeneous_matrix(T): + T_hom = np.eye(4) + T_hom[:3, :] = T + return T_hom + +class OCP_calc_pose: + + def __init__(self, N, + rms_error_traj_pos = 10**-3, + rms_error_traj_rot = 10**-2, + bool_unsigned_invariants = False, + solver = 'fatrop'): + + opti = cas.Opti() # use OptiStack package from Casadi for easy bookkeeping of variables (no cumbersome indexing) + + # Define system states X (unknown object pose + moving frame pose at every time step) + T_isa = [] + T_obj = [] + cost_position = [] + cost_orientation = [] + X = [] + U = [] + for k in range(N-1): + T_isa.append(opti.variable(3, 4)) # Instantaneous Screw Axis frame + T_obj.append(opti.variable(3, 4)) # Object frame + cost_position.append(opti.variable(1)) + cost_orientation.append(opti.variable(1)) + U.append(opti.variable(6)) + X.append(cas.vertcat(cas.vec(T_isa[k]), cas.vec(T_obj[k]))) + + # Add the last state N + T_isa.append(opti.variable(3, 4)) # Instantaneous Screw Axis frame + T_obj.append(opti.variable(3, 4)) # Object frame + cost_position.append(opti.variable(1)) + cost_orientation.append(opti.variable(1)) + X.append(cas.vertcat(cas.vec(T_isa[-1]), cas.vec(T_obj[-1]))) + + # Define system parameters P (known values in optimization that need to be set right before solving) + T_obj_m = [opti.parameter(3,4) for _ in range(N)] # measured object poses + h = opti.parameter() # step size for integration of dynamic equations + + # # Dynamics constraints (Multiple shooting) + geometric_integrator = define_integrator_invariants_pose(h) + for k in range(N-1): + # Integrate current state to obtain next state (next rotation and position) + Xk_end = geometric_integrator(X[k],U[k],h) + + # Continuity constraint (closing the gap in multiple shooting) + opti.subject_to(X[k+1] == Xk_end) + + # Measurement fitting constraint + err_pos = T_obj[k][0:3,3] - T_obj_m[k][0:3,3] # position error + err_rot = (T_obj[k][0:3,0:3].T @ T_obj_m[k][0:3,0:3]) - np.eye(3) # orientation error + opti.subject_to(cost_position[k+1] == cost_position[k] + cas.dot(err_pos,err_pos)) + opti.subject_to(cost_orientation[k+1] == cost_orientation[k] + cas.dot(err_rot,err_rot)) + + # Constrain rotation matrices to be orthogonal (only needed for one timestep, property is propagated by integrator) + if k == 0: + opti.subject_to(ocp_helper.tril_vec(T_isa[0][0:3,0:3].T @ T_isa[0][0:3,0:3] - np.eye(3)) == 0) + opti.subject_to(ocp_helper.tril_vec(T_obj[0][0:3,0:3].T @ T_obj[0][0:3,0:3] - np.eye(3)) == 0) + opti.subject_to(cost_position[0] == 0) + opti.subject_to(cost_orientation[0] == 0) + + # Lower bounds on controls + if bool_unsigned_invariants: + opti.subject_to(U[k][0] >= 0) # lower bounds on control + opti.subject_to(U[k][1] >= 0) # lower bounds on control + + # Final state constraints + err_pos = T_obj[-1][0:3,3] - T_obj_m[-1][0:3,3] # position error + err_rot = (T_obj[-1][0:3,0:3].T @ T_obj_m[-1][0:3,0:3]) - np.eye(3) # orientation error + opti.subject_to(cost_position[-1] + cas.dot(err_pos,err_pos) < N*rms_error_traj_pos**2) + opti.subject_to(cost_orientation[-1] + cas.dot(err_rot,err_rot) < N*rms_error_traj_rot**2) + #opti.subject_to(cost_position[-1] < N*rms_error_traj_pos**2) + #opti.subject_to(cost_orientation[-1] < N*rms_error_traj_rot**2) + + # Minimize moving frame invariants to deal with singularities and noise + objective_reg = 0 + for k in range(N-1): + err_abs = U[k][1,2,4,5] # value of moving frame invariants + objective_reg = objective_reg + cas.dot(err_abs,err_abs) # cost term + objective = objective_reg/(N-1) # normalize with window length + + # Solver + opti.minimize(objective) + if solver == 'ipopt': + opti.solver('ipopt',{"print_time":True,"expand":True},{'max_iter':300,'tol':1e-6,'print_level':5,'ma57_automatic_scaling':'no','linear_solver':'mumps','print_info_string':'yes'}) #'gamma_theta':1e-12 + elif solver == 'fatrop': + opti.solver('fatrop',{"expand":True,'fatrop.max_iter':300,'fatrop.tol':1e-6,'fatrop.print_level':5, "structure_detection":"auto","debug":True,"fatrop.mu_init":0.1}) + + # Store variables + self.opti = opti + self.N = N + self.T_obj_m = T_obj_m + self.h = h + self.T_obj = T_obj + self.T_isa = T_isa + self.U = U + + def calculate_invariants(self, T_obj_m, h): + + assert(self.N == T_obj_m.shape[0]) + + # Initial guess + T_obj_init = T_obj_m + T_isa_init0 = np.vstack([[0, 0, 1, 0], [0, 1, 0, 0], [-1, 0, 0, 0], [0, 0, 0, 1]]).T + T_isa_init = np.tile(T_isa_init0, (self.N, 1, 1)) # initial guess for moving frame poses + + # Set initial guess + for i in range(self.N): + self.opti.set_value(self.T_obj_m[i], T_obj_m[i,:3]) + self.opti.set_initial(self.T_isa[i], T_isa_init[i,:3]) + self.opti.set_initial(self.T_obj[i], T_obj_init[i,:3]) + self.opti.set_value(self.h, h) + + for i in range(self.N-1): + self.opti.set_initial(self.U[i], np.zeros(6)+0.1) + + # Solve + sol = self.opti.solve() + + # Return solution + T_isa = np.array([form_homogeneous_matrix(sol.value(i)) for i in self.T_isa]) + T_obj = np.array([form_homogeneous_matrix(sol.value(i)) for i in self.T_obj]) + U = np.array([sol.value(i) for i in self.U]) + U = np.vstack((U,[U[-1,:]])) + + return U, T_obj, T_isa + +if __name__ == "__main__": + + from invariants_py.reparameterization import interpT + import invariants_py.kinematics.orientation_kinematics as SE3 + + # Test data + N = 100 + T_start = np.eye(4) # Rotation matrix 1 + T_mid = np.eye(4) + T_mid[:3, :3] = SE3.rotate_z(np.pi) # Rotation matrix 3 + T_end = np.eye(4) + T_end[:3, :3] = SE3.RPY(np.pi/2, 0, np.pi/2) # Rotation matrix 2 + + # Interpolate between R_start and R_end + T_obj_m = interpT(np.linspace(0,1,N), np.array([0,0.5,1]), np.stack([T_start, T_mid, T_end],0)) + + OCP = OCP_calc_pose(N, rms_error_traj_pos = 10e-3, rms_error_traj_rot = 10e-3, bool_unsigned_invariants=True, solver='fatrop') + + # Example: calculate invariants for a given trajectory + h = 0.01 # step size for integration of dynamic equations + U, T_obj, T_isa = OCP.calculate_invariants(T_obj_m, h) + + #print("Invariants U: ", U) + print("T_obj: ", T_obj) + print("T_isa: ", T_isa) + + # Assuming T_isa is already defined and is an Nx4x4 matrix + N = T_isa.shape[0] + + # Extract points and directions + points = T_isa[:, :3, 3] # First three elements of the fourth column + directions = T_isa[:, :3, 0] # First three elements of the first column + + # Create a 3D plot + fig = plt.figure() + ax = fig.add_subplot(111, projection='3d') + + # Plot the points + ax.scatter(points[:, 0], points[:, 1], points[:, 2], color='r', label='Points on the line') + + # Plot the directions as lines + for i in range(N): + start_point = points[i] - directions[i] * 0.1 + end_point = points[i] + directions[i] * 0.1 # Scale the direction for better visualization + ax.plot([start_point[0], end_point[0]], [start_point[1], end_point[1]], [start_point[2], end_point[2]], color='b') + + # Set axis limits + ax.set_xlim([-0.1, +0.1]) + ax.set_ylim([-0.1, +0.1]) + ax.set_zlim([-0.1, +0.1]) + + # Set labels + ax.set_xlabel('X') + ax.set_ylabel('Y') + ax.set_zlabel('Z') + ax.set_title('Plotting the instantaneous screw axis') + + # Add legend + ax.legend() + + # Show plot + plt.show() + diff --git a/invariants_py/calculate_invariants/opti_calculate_vector_invariants_position.py b/invariants_py/calculate_invariants/opti_calculate_vector_invariants_position.py index 55dfa96..7d10551 100644 --- a/invariants_py/calculate_invariants/opti_calculate_vector_invariants_position.py +++ b/invariants_py/calculate_invariants/opti_calculate_vector_invariants_position.py @@ -5,7 +5,12 @@ class OCP_calc_pos: - def __init__(self, window_len = 100, bool_unsigned_invariants = False, rms_error_traj = 10**-2, geometric = False, planar_task = False, solver_options = {}): + def __init__(self, window_len = 100, + rms_error_traj = 10**-2, + geometric = False, + planar_task = False, + bool_unsigned_invariants = False, + solver_options = {}): # Set solver options tolerance = solver_options.get('tol',1e-4) # tolerance for the solver diff --git a/invariants_py/calculate_invariants/opti_calculate_vector_invariants_position_mf.py b/invariants_py/calculate_invariants/opti_calculate_vector_invariants_position_mf.py index 03cc28f..13e5cc9 100644 --- a/invariants_py/calculate_invariants/opti_calculate_vector_invariants_position_mf.py +++ b/invariants_py/calculate_invariants/opti_calculate_vector_invariants_position_mf.py @@ -5,7 +5,13 @@ class OCP_calc_pos: - def __init__(self, window_len = 100, bool_unsigned_invariants = False, w_pos = 1, w_rot = 1, w_deriv = (10**-6)*np.array([1.0, 1.0, 1.0]), w_abs = (10**-10)*np.array([1.0, 1.0]), planar_task = False, geometric = False): + def __init__(self, window_len = 100, + w_pos = 1, w_rot = 1, + w_deriv = (10**-6)*np.array([1.0, 1.0, 1.0]), + w_abs = (10**-10)*np.array([1.0, 1.0]), + bool_unsigned_invariants = False, + planar_task = False, + geometric = False): ''' Create decision variables and parameters for the optimization problem ''' @@ -71,7 +77,7 @@ def __init__(self, window_len = 100, bool_unsigned_invariants = False, w_pos = 1 err_deriv = U[:,k] - U[:,k-1] # first-order finite backwards derivative (noise smoothing effect) else: err_deriv = 0 - err_abs = U[[1,2],k] # absolute value invariants (force arbitrary invariants in singularities to zero) + err_abs = U[1:3,k] # absolute value invariants (force arbitrary invariants in singularities to zero) ##Check that obj function is correctly typed in !!! objective_reg = objective_reg \ @@ -82,7 +88,7 @@ def __init__(self, window_len = 100, bool_unsigned_invariants = False, w_pos = 1 ''' Define solver and save variables ''' opti.minimize(objective) - opti.solver('ipopt',{"print_time":True,"expand":True},{'max_iter':1000,'tol':1e-4,'print_level':5,'ma57_automatic_scaling':'no','linear_solver':'mumps','print_info_string':'yes'}) + opti.solver('ipopt',{"print_time":True,"expand":True},{'max_iter':300,'tol':1e-6,'print_level':5,'ma57_automatic_scaling':'no','linear_solver':'mumps','print_info_string':'yes'}) # Save variables self.R_t = R_t @@ -118,7 +124,7 @@ def calculate_invariants(self,trajectory_meas,stepsize): # Initialize controls for k in range(N-1): - self.opti.set_initial(self.U[:,k], np.array([1,1e-12,1e-12])) + self.opti.set_initial(self.U[:,k], np.array([1,1e-1,1e-5])) # Set values parameters self.opti.set_value(self.R_t_0, np.eye(3,3)) @@ -226,7 +232,7 @@ def calculate_invariants_online(self,trajectory_meas,stepsize,sample_jump): stepsize = t[1]-t[0] # Test the functionalities of the class - OCP = OCP_calc_pos(window_len=np.size(measured_positions,0)) + OCP = OCP_calc_pos(window_len=N) # Call the calculate_invariants function and measure the elapsed time #start_time = time.time() @@ -237,11 +243,10 @@ def calculate_invariants_online(self,trajectory_meas,stepsize,sample_jump): ax = fig.add_subplot(111, projection='3d') ax.plot(measured_positions[:, 0], measured_positions[:, 1], measured_positions[:, 2],'b.-') ax.plot(calc_trajectory[:, 0], calc_trajectory[:, 1], calc_trajectory[:, 2],'r--') - plt.show() - + plt.show(block=False) + # # Print the results and elapsed time - # print("Calculated invariants:") - #print(calc_invariants) + #print("Calculated invariants:", calc_invariants) # print("Calculated Moving Frame:") # print(calc_movingframes) # print("Calculated Trajectory:") diff --git a/invariants_py/calculate_invariants/opti_calculate_vector_invariants_position_mf_fatrop.py b/invariants_py/calculate_invariants/opti_calculate_vector_invariants_position_mf_fatrop.py new file mode 100644 index 0000000..c4141fe --- /dev/null +++ b/invariants_py/calculate_invariants/opti_calculate_vector_invariants_position_mf_fatrop.py @@ -0,0 +1,282 @@ +import numpy as np +import casadi as cas +import invariants_py.dynamics_vector_invariants as dynamics +import invariants_py.ocp_helper as ocp_helper + +class OCP_calc_pos: + + def __init__(self, window_len = 100, + w_pos = 1, w_rot = 1, + w_deriv = (10**-6)*np.array([1.0, 1.0, 1.0]), + w_abs = (10**-10)*np.array([1.0, 1.0]), + solver = 'ipopt', + planar_task = False, + bool_unsigned_invariants = False, + geometric = False): + + ''' Create decision variables and parameters for the optimization problem ''' + + opti = cas.Opti() # use OptiStack package from Casadi for easy bookkeeping of variables (no cumbersome indexing) + + # Define system states X (unknown object pose + moving frame pose at every time step) + p_obj = [] + R_t = [] + X = [] + U = [] + for k in range(window_len-1): + R_t.append(opti.variable(3,3)) # translational Frenet-Serret frame + p_obj.append(opti.variable(3,1)) # object position + U.append(opti.variable(3,1)) # invariants + X.append(cas.vertcat(cas.vec(R_t[k]), cas.vec(p_obj[k]))) + + # Add last state + R_t.append(opti.variable(3,3)) # translational Frenet-Serret frame + p_obj.append(opti.variable(3,1)) # object position + X.append(cas.vertcat(cas.vec(R_t[-1]), cas.vec(p_obj[-1]))) + + # Define system controls (invariants at every time step) + #U = opti.variable(3,window_len-1) + + # Define system parameters P (known values in optimization that need to be set right before solving) + p_obj_m = [] # measured object positions + for k in range(window_len): + p_obj_m.append(opti.parameter(3,1)) # object position + R_t_0 = opti.parameter(3,3) # initial translational Frenet-Serret frame at first sample of window + h = opti.parameter(1,1) + + ''' Specifying the constraints ''' + + # Dynamic constraints + integrator = dynamics.define_integrator_invariants_position(h) + for k in range(window_len-1): + # Integrate current state to obtain next state (next rotation and position) + Xk_end = integrator(X[k],U[k],h) + + # Gap closing constraint + opti.subject_to(X[k+1] - Xk_end == 0) + + # (Optional) Geometric invariant constraint where first invariant is constant throughout the window + if geometric and k!=window_len-2: + opti.subject_to(U[k+1][0] - U[k][0] == 0) + + # Constrain rotation matrices to be orthogonal (only needed for one timestep, property is propagated by integrator) + if k==0: + opti.subject_to(ocp_helper.tril_vec(R_t[0].T @ R_t[0] - np.eye(3)) == 0) + + # Lower bounds on controls + if bool_unsigned_invariants: + opti.subject_to(U[k][0] >= 0) # positive velocity + #opti.subject_to(U[k][1] >= 0) # positive curvature + + # 2D contour + if planar_task: + opti.subject_to( cas.dot(R_t[k][:,2],np.array([0,0,1])) > 0) + + # Last stage constraints + if planar_task: + opti.subject_to( cas.dot(R_t[-1][:,2],np.array([0,0,1])) > 0) + + ''' Specifying the objective ''' + + # Fitting constraint to remain close to measurements + objective_fit = 0 + for k in range(window_len): + err_pos = p_obj[k] - p_obj_m[k] # position error + objective_fit = objective_fit + w_pos*cas.dot(err_pos,err_pos) + + # Regularization constraints to deal with singularities and noise + objective_reg = 0 + for k in range(window_len-1): + if k!=0: + err_deriv = U[k] - U[k-1] # first-order finite backwards derivative (noise smoothing effect) + else: + err_deriv = 0 + err_abs = U[k][1:3] # absolute value invariants (force arbitrary invariants in singularities to zero) + + ##Check that obj function is correctly typed in !!! + objective_reg = objective_reg \ + + cas.dot(w_deriv**(0.5)*err_deriv,w_deriv**(0.5)*err_deriv) \ + + cas.dot(w_abs**(0.5)*err_abs, w_abs**(0.5)*err_abs) + + objective = objective_fit + objective_reg + + ''' Define solver and save variables ''' + opti.minimize(objective) + if solver == 'ipopt': + opti.solver('ipopt',{"print_time":True,"expand":True},{'max_iter':300,'tol':1e-6,'print_level':5,'ma57_automatic_scaling':'no','linear_solver':'mumps','print_info_string':'yes'}) + elif solver == 'fatrop': + opti.solver('fatrop',{"expand":True,'fatrop.max_iter':300,'fatrop.tol':1e-6,'fatrop.print_level':5, "structure_detection":"auto","debug":True,"fatrop.mu_init":0.1}) + + # Construct a CasADi function out of the opti object. This function can be called with the initial guess to solve the NLP. Faster than doing opti.set_initial + opti.solve + opti.value + solution = [*R_t, *p_obj, *U] + self.opti_function = opti.to_function('opti_function', [*p_obj_m,h,*solution], [*solution]) + #print(self.opti_function(*[0 for _ in range(400)])) + + # Save variables + self.R_t = R_t + self.p_obj = p_obj + self.U = U + self.p_obj_m = p_obj_m + self.R_t_0 = R_t_0 + self.window_len = window_len + self.opti = opti + self.first_window = True + self.h = h + + def calculate_invariants(self,trajectory_meas,stepsize): + + + if trajectory_meas.shape[1] == 3: + measured_positions = trajectory_meas + else: + measured_positions = trajectory_meas[:,:3,3] + N = self.window_len + + # Initialize states + #TODO this is not correct yet, ex not perpendicular to ey + Pdiff = np.diff(measured_positions,axis=0) + ex = Pdiff / np.linalg.norm(Pdiff,axis=1).reshape(N-1,1) + ex = np.vstack((ex,[ex[-1,:]])) + ez = np.tile( np.array((0,0,1)), (N,1) ) + ey = np.array([np.cross(ez[i,:],ex[i,:]) for i in range(N)]) + + for k in range(N): + self.opti.set_initial(self.R_t[k], np.array([ex[k,:], ey[k,:], ez[k,:]]).T ) #construct_init_FS_from_traj(meas_traj.Obj_location) + self.opti.set_initial(self.p_obj[k], measured_positions[k]) + + # Initialize controls + for k in range(N-1): + self.opti.set_initial(self.U[k], np.array([1,1e-1,1e-5])) + + # Set values parameters + self.opti.set_value(self.R_t_0, np.eye(3,3)) + for k in range(N): + self.opti.set_value(self.p_obj_m[k], measured_positions[k]) + + self.opti.set_value(self.h,stepsize) + + # ###################### + # ## DEBUGGING: check integrator in initial values, time step 0 to 1 + # x0 = cas.vertcat(cas.vec(np.eye(3,3)), cas.vec(measured_positions[0])) + # u0 = 1e-8*np.ones((3,1)) + # integrator = dynamics.define_integrator_invariants_position(self.stepsize) + # x1 = integrator(x0,u0) + # print(x1) + # ###################### + + # Solve the NLP + # try: + sol = self.opti.solve_limited() + self.sol = sol + + # Extract the solved variables + invariants = np.array([sol.value(i) for i in self.U]) + invariants = np.vstack((invariants,[invariants[-1,:]])) + calculated_trajectory = np.array([sol.value(i) for i in self.p_obj]) + calculated_movingframe = np.array([sol.value(i) for i in self.R_t]) + # except RuntimeError: + # # Extract the solved variables + # invariants = np.array([self.opti.debug.value(i) for i in self.U]) + # invariants = np.vstack((invariants,[invariants[-1,:]])) + # calculated_trajectory = np.array([self.opti.debug.value(i) for i in self.p_obj]) + # calculated_movingframe = np.array([self.opti.debug.value(i) for i in self.R_t]) + + return invariants, calculated_trajectory, calculated_movingframe + + def calculate_invariants_online(self,trajectory_meas,stepsize,sample_jump): + + if self.first_window: + # Calculate invariants in first window + invariants, calculated_trajectory, calculated_movingframe = self.calculate_invariants(trajectory_meas,stepsize) + self.first_window = False + + # Add continuity constraints on first sample + #self.opti.subject_to( self.R_t[0] == self.R_t_0 ) + #self.opti.subject_to( self.p_obj[0] == self.p_obj_m[0]) + + return invariants, calculated_trajectory, calculated_movingframe + else: + + if trajectory_meas.shape[1] == 3: + measured_positions = trajectory_meas + else: + measured_positions = trajectory_meas[:,:3,3] + N = self.window_len + + ''' Set values parameters ''' + #for k in range(1,N): + # self.opti.set_value(self.p_obj_m[k], measured_positions[k-1]) + + for k in range(0,N): + self.opti.set_value(self.p_obj_m[k], measured_positions[k]) + + # Set other parameters equal to the measurements in that window + self.opti.set_value(self.R_t_0, self.sol.value(self.R_t[sample_jump])) + #self.opti.set_value(self.p_obj_m[0], self.sol.value(self.p_obj[sample_jump])); + + self.opti.set_value(self.h,stepsize) + + ''' First part of window initialized using results from earlier solution ''' + # Initialize states + for k in range(N-sample_jump-1): + self.opti.set_initial(self.R_t[k], self.sol.value(self.R_t[sample_jump+k])) + self.opti.set_initial(self.p_obj[k], self.sol.value(self.p_obj[sample_jump+k])) + + # Initialize controls + for k in range(N-sample_jump-1): + self.opti.set_initial(self.U[:,k], self.sol.value(self.U[:,sample_jump+k])) + + ''' Second part of window initialized uses default initialization ''' + # Initialize states + for k in range(N-sample_jump,N): + self.opti.set_initial(self.R_t[k], self.sol.value(self.R_t[-1])) + self.opti.set_initial(self.p_obj[k], measured_positions[k-1]) + + # Initialize controls + for k in range(N-sample_jump-1,N-1): + self.opti.set_initial(self.U[:,k], 1e-3*np.ones((3,1))) + + #print(self.sol.value(self.R_t[-1])) + + ''' Solve the NLP ''' + sol = self.opti.solve_limited() + self.sol = sol + + # Extract the solved variables + invariants = sol.value(self.U).T + invariants = np.vstack((invariants,[invariants[-1,:]])) + calculated_trajectory = np.array([sol.value(i) for i in self.p_obj]) + calculated_movingframe = np.array([sol.value(i) for i in self.R_t]) + + return invariants, calculated_trajectory, calculated_movingframe + +if __name__ == "__main__": + import matplotlib.pyplot as plt + + # Example data for measured positions and the stepsize + N = 100 + t = np.linspace(0, 4, N) + measured_positions = np.column_stack((1 * np.cos(t), 1 * np.sin(t), 0.1 * t)) + stepsize = t[1]-t[0] + + # Test the functionalities of the class + OCP = OCP_calc_pos(window_len=N, solver='ipopt', planar_task=False, bool_unsigned_invariants=False, geometric=False) + + # Call the calculate_invariants function and measure the elapsed time + #start_time = time.time() + calc_invariants, calc_trajectory, calc_movingframes = OCP.calculate_invariants(measured_positions, stepsize) + #elapsed_time = time.time() - start_time + + fig = plt.figure() + ax = fig.add_subplot(111, projection='3d') + ax.plot(measured_positions[:, 0], measured_positions[:, 1], measured_positions[:, 2],'b.-') + ax.plot(calc_trajectory[:, 0], calc_trajectory[:, 1], calc_trajectory[:, 2],'r--') + plt.show(block=True) + + # # Print the results and elapsed time + print("Calculated invariants:", calc_invariants) + # print("Calculated Moving Frame:") + # print(calc_movingframes) + # print("Calculated Trajectory:") + # print(calc_trajectory) + # print("Elapsed Time:", elapsed_time, "seconds") \ No newline at end of file diff --git a/invariants_py/dynamics_screw_invariants.py b/invariants_py/dynamics_screw_invariants.py index 0d8edfd..b1f1495 100644 --- a/invariants_py/dynamics_screw_invariants.py +++ b/invariants_py/dynamics_screw_invariants.py @@ -22,13 +22,15 @@ def integrate_twist_cas(twist, h): def transform_screw_cas(T, twist): """Return the screw coordinates of the transformed twist in the new frame.""" + R = T[:3,:3] p = T[:3,3] omega = twist[:3] v = twist[3:] omega_new = R @ omega - v_new = R @ (v - cas.cross(omega, p)) + #v_new = R @ (v - cas.cross(omega, p)) + v_new = R @ v - cas.cross(p, R @ omega) return cas.vertcat(omega_new, v_new) @@ -47,7 +49,7 @@ def integrate_screw_invariants_pose(T_isa, T_obj, u, h): T_isa = cas.vertcat(T_isa, cas.horzcat(0,0,0,1)) T_obj = cas.vertcat(T_obj, cas.horzcat(0,0,0,1)) - invariants_isa = cas.vertcat(i3,i2,0,i6,i5,0) + invariants_isa = cas.vertcat(i3,0,i2,i6,0,i5) invariants_obj = cas.vertcat(i1,0,0,i4,0,0) invariants_obj_ref = transform_screw_cas(T_isa, invariants_obj) @@ -57,14 +59,14 @@ def integrate_screw_invariants_pose(T_isa, T_obj, u, h): deltaT_obj = integrate_twist_cas(invariants_obj_ref,h) T_obj_plus1 = cas.mtimes(deltaT_obj,T_obj) - return (T_isa_plus1[:3,:], T_obj_plus1[:3,:]) + return (T_isa_plus1[0:3,:], T_obj_plus1[0:3,:]) def define_integrator_invariants_pose(h): + # System states - T_isa = cas.MX.sym('R_r',3,4) - T_obj = cas.MX.sym('R_obj',3,4) + T_isa = cas.MX.sym('T_isa',3,4) + T_obj = cas.MX.sym('T_obj',3,4) x = cas.vertcat(cas.vec(T_isa), cas.vec(T_obj)) - u = cas.MX.sym('i',6,1) (T_isa_plus1, T_obj_plus1) = integrate_screw_invariants_pose(T_isa, T_obj, u, h) diff --git a/invariants_py/reparameterization.py b/invariants_py/reparameterization.py index cd095f1..6a4377d 100644 --- a/invariants_py/reparameterization.py +++ b/invariants_py/reparameterization.py @@ -2,7 +2,7 @@ import numpy as np import invariants_py.kinematics.orientation_kinematics as SO3 from invariants_py.kinematics.rigidbody_kinematics import inverse_T -from scipy.linalg import expm, logm +from invariants_py.kinematics.rigidbody_kinematics import expm_T, logm_T def interpR(x_n, x_p, R_p): """ @@ -54,7 +54,7 @@ def interpR(x_n, x_p, R_p): return R_n -def interpT(x, T, xq): +def interpT(xq, x, T): """ Interpolates transformation matrices T at query points xq based on given sample points x. @@ -83,20 +83,20 @@ def interpT(x, T, xq): for i in range(M): # Find the segment [x[j], x[j + 1]] that contains xq[i] - while xq[i] > x[j + 1]: + while xq[i] > x[j+1]: j += 1 - x0, x1 = x[j], x[j + 1] - T0, T1 = T[j, :, :], T[j + 1, :, :] + x0, x1 = x[j], x[j+1] + T0, T1 = T[j], T[j+1] if x1 - x0 != 0: # Interpolate using the matrix logarithm and exponential - T_new = T0 @ expm((xq[i] - x0) / (x1 - x0) * logm(inverse_T(T0) @ T1)) + T_new = T0 @ expm_T((xq[i] - x0) / (x1 - x0) * logm_T(inverse_T(T0) @ T1)) else: # Handle the case where x0 == x1 (should not occur if x is strictly increasing) T_new = T0 - T_interpolated[i, :, :] = T_new # Store the interpolated matrix + T_interpolated[i] = T_new # Store the interpolated matrix return T_interpolated diff --git a/pyproject.toml b/pyproject.toml index 52b73c3..4591ce1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,7 @@ requires-python = ">=3.8" keywords = ["trajectory analysis", "trajectory generation", "invariant", "coordinate-invariant", "trajectory representation", "trajectory optimization", "trajectory planning", "robotics", "dynamics", "kinematics", "screw theory", "optimal control", "geometric optimal control", "differential geometry", "optimization", "casadi", "python"] dependencies = [ # it is possible to have optional dependencies "scipy", - "casadi", + "casadi>=3.6.7", "PyQt5", "matplotlib", "numpy", diff --git a/tests/test_compare_ipopt_fatrop_position.py b/tests/test_compare_ipopt_fatrop_position.py index 6879a0d..6e1c847 100644 --- a/tests/test_compare_ipopt_fatrop_position.py +++ b/tests/test_compare_ipopt_fatrop_position.py @@ -59,6 +59,6 @@ def test_fatrop_solver_comparison3(self): # Run a specifc test # loader = unittest.TestLoader() - # suite = loader.loadTestsFromName('test_compare_ipopt_fatrop_position.TestOCPcalcpos.test_fatrop_solver_comparison2') + # suite = loader.loadTestsFromName('test_compare_ipopt_fatrop_position.TestOCPcalcpos.test_fatrop_solver_comparison3') # runner = unittest.TextTestRunner() # runner.run(suite) \ No newline at end of file