diff --git a/invariants_py/calculate_invariants/opti_calculate_vector_invariants_position.py b/invariants_py/calculate_invariants/opti_calculate_vector_invariants_position.py index 8d200b8..ec90c37 100644 --- a/invariants_py/calculate_invariants/opti_calculate_vector_invariants_position.py +++ b/invariants_py/calculate_invariants/opti_calculate_vector_invariants_position.py @@ -58,9 +58,6 @@ def __init__(self, window_len = 100, bool_unsigned_invariants = False, rms_error for k in range(window_len-2): opti.subject_to(U[0,k+1] == U[0,k]) - #opti.subject_to(p_obj[0] == p_obj_m[0]) # Fix first measurement - #opti.subject_to(p_obj[-1] == p_obj_m[-1]) # Fix last measurement - # Measurement fitting constraint trajectory_error = 0 for k in range(window_len): @@ -69,8 +66,8 @@ def __init__(self, window_len = 100, bool_unsigned_invariants = False, rms_error opti.subject_to(trajectory_error/window_len < rms_error_traj**2) # Boundary constraints - #pti.subject_to(self.p_obj[0] == self.p_obj_m[0]) # Fix first measurement - #opti.subject_to(self.p_obj[N-1] == self.p_obj_m[N-1]) # Fix last measurement + #opti.subject_to(self.p_obj[0] == self.p_obj_m[0]) # Fix first measurement + #opti.subject_to(self.p_obj[-1] == self.p_obj_m[-1]) # Fix last measurement #opti.subject_to(U[1,-1] == U[1,-2]) # Last sample has no impact on RMS error ''' Objective function ''' @@ -82,6 +79,8 @@ def __init__(self, window_len = 100, bool_unsigned_invariants = False, rms_error objective_reg = objective_reg + cas.dot(err_abs,err_abs) # cost term objective = objective_reg/(window_len-1) # normalize with window length + #objective = objective + 10e-8*trajectory_error/window_len # add trajectory error to objective function + ''' Solver ''' opti.minimize(objective) opti.solver('ipopt',{"print_time":False,"expand":True},{ @@ -180,20 +179,20 @@ def calculate_invariants_online(self,trajectory_meas,stepsize,sample_jump): self.opti.set_value(self.p_obj_m[k], measured_positions[k]) # Set other parameters equal to the measurements in that window - self.opti.set_value(self.R_t_0, self.sol.value(self.R_t[sample_jump])); - #self.opti.set_value(self.p_obj_m[0], self.sol.value(self.p_obj[sample_jump])); + self.opti.set_value(self.R_t_0, self.sol.value(self.R_t[sample_jump])) + #self.opti.set_value(self.p_obj_m[0], self.sol.value(self.p_obj[sample_jump])) - self.opti.set_value(self.h,stepsize); + self.opti.set_value(self.h,stepsize) ''' First part of window initialized using results from earlier solution ''' # Initialize states for k in range(N-sample_jump-1): self.opti.set_initial(self.R_t[k], self.sol.value(self.R_t[sample_jump+k])) - self.opti.set_initial(self.p_obj[k], self.sol.value(self.p_obj[sample_jump+k])); + self.opti.set_initial(self.p_obj[k], self.sol.value(self.p_obj[sample_jump+k])) # Initialize controls for k in range(N-sample_jump-1): - self.opti.set_initial(self.U[:,k], self.sol.value(self.U[:,sample_jump+k])); + self.opti.set_initial(self.U[:,k], self.sol.value(self.U[:,sample_jump+k])) ''' Second part of window initialized uses default initialization ''' # Initialize states @@ -229,7 +228,7 @@ def calculate_invariants_online(self,trajectory_meas,stepsize,sample_jump): stepsize = t[1]-t[0] # Test the functionalities of the class - OCP = OCP_calc_pos(window_len=np.size(measured_positions,0), rms_error_traj=10**-3) + OCP = OCP_calc_pos(window_len=np.size(measured_positions,0), rms_error_traj=10**-5) # Call the calculate_invariants function and measure the elapsed time #start_time = time.time() @@ -244,7 +243,7 @@ def calculate_invariants_online(self,trajectory_meas,stepsize,sample_jump): # # Print the results and elapsed time # print("Calculated invariants:") - #print(calc_invariants) + print(calc_invariants) # print("Calculated Moving Frame:") # print(calc_movingframes) # print("Calculated Trajectory:") diff --git a/invariants_py/calculate_invariants/opti_calculate_vector_invariants_rotation.py b/invariants_py/calculate_invariants/opti_calculate_vector_invariants_rotation.py index eff24f6..1dc4fdb 100644 --- a/invariants_py/calculate_invariants/opti_calculate_vector_invariants_rotation.py +++ b/invariants_py/calculate_invariants/opti_calculate_vector_invariants_rotation.py @@ -99,7 +99,7 @@ def __init__(self, window_len = 100, bool_unsigned_invariants = False, rms_error def calculate_invariants(self,trajectory_meas,stepsize, choice_initialization=2): - from invariants_py.initialization import calculate_velocity_from_discrete_rotations, estimate_movingframes, estimate_rotational_invariants + from invariants_py.initialization import calculate_velocity_from_discrete_rotations, estimate_movingframes, estimate_vector_invariants from invariants_py.dynamics_vector_invariants import reconstruct_rotation_traj from invariants_py.kinematics.screw_kinematics import average_vector_orientation_frame @@ -136,7 +136,7 @@ def calculate_invariants(self,trajectory_meas,stepsize, choice_initialization=2) Rdiff = calculate_velocity_from_discrete_rotations(measured_orientation,timestamps=np.arange(N)) invariants = np.hstack((1*np.ones((N-1,1)),1e-1*np.ones((N-1,2)))) R_r_traj = estimate_movingframes(Rdiff) - invariants = estimate_rotational_invariants(R_r_traj,Rdiff) + invariants = estimate_vector_invariants(R_r_traj,Rdiff,stepsize) R_obj_traj = measured_orientation elif choice_initialization == 3: diff --git a/invariants_py/calculate_invariants/rockit_calculate_vector_invariants_position.py b/invariants_py/calculate_invariants/rockit_calculate_vector_invariants_position.py index 78b8ea6..2a7a44b 100644 --- a/invariants_py/calculate_invariants/rockit_calculate_vector_invariants_position.py +++ b/invariants_py/calculate_invariants/rockit_calculate_vector_invariants_position.py @@ -85,7 +85,6 @@ def __init__(self, window_len=100, rms_error_traj=10**-2, fatrop_solver=False, b if not fatrop_solver: # sum of squared position errors in the window should be less than the specified tolerance rms_error_traj total_ek = ocp.sum(ek,grid='control',include_last=True) - ocp.subject_to(total_ek/N < rms_error_traj**2) else: # Fatrop does not support summing over grid points inside the constraint, so we implement it differently to achieve the same result running_ek = ocp.state() # running sum of squared error @@ -98,9 +97,10 @@ def __init__(self, window_len=100, rms_error_traj=10**-2, fatrop_solver=False, b ocp.set_next(total_ek, total_ek) ocp.subject_to(ocp.at_tf(total_ek == running_ek + ek)) # total_ek_scaled = total_ek/N/rms_error_traj**2 # scaled total error - ocp.subject_to(total_ek/N < rms_error_traj**2) - #total_ek_scaled = running_ek/N/rms_error_traj**2 # scaled total error - #ocp.subject_to(ocp.at_tf(total_ek_scaled < 1)) + + ocp.subject_to(total_ek/N < rms_error_traj**2) + #total_ek_scaled = running_ek/N/rms_error_traj**2 # scaled total error + #ocp.subject_to(ocp.at_tf(total_ek_scaled < 1)) # Boundary conditions (optional, but may help to avoid straight line fits) #ocp.subject_to(ocp.at_t0(p_obj == p_obj_m)) # fix first position to measurement @@ -128,16 +128,17 @@ def __init__(self, window_len=100, rms_error_traj=10**-2, fatrop_solver=False, b ocp._method.set_expand(True) else: ocp.method(rockit.MultipleShooting(N=N-1)) - ocp.solver('ipopt', {'expand':True, 'ipopt.tol':tolerance,'ipopt.print_info_string':'yes', 'ipopt.max_iter':max_iter,'ipopt.print_level':print_level, 'ipopt.ma57_automatic_scaling':'no', 'ipopt.linear_solver':'mumps'}) + ocp.solver('ipopt', {'expand':True, 'print_time':False, 'ipopt.tol':tolerance,'ipopt.print_info_string':'yes', 'ipopt.max_iter':max_iter,'ipopt.print_level':print_level, 'ipopt.ma57_automatic_scaling':'no', 'ipopt.linear_solver':'mumps'}) """ Encapsulate solver in a casadi function so that it can be easily reused """ # Solve already once with dummy values so that Fatrop can do code generation (Q: can this step be avoided somehow?) ocp.set_initial(R_t, np.eye(3)) - ocp.set_initial(invars, np.array([1,0.01,0.01])) - ocp.set_value(p_obj_m, np.vstack((np.linspace(0, 1, N), np.ones((2, N))))) + ocp.set_initial(invars, np.array([1,0.00001,0.00001])) + ocp.set_initial(p_obj, np.vstack((np.linspace(1, 2, N), np.ones((2, N))))) + ocp.set_value(p_obj_m, np.vstack((np.linspace(1, 2, N), np.ones((2, N))))) ocp.set_value(h, 0.01) - ocp.solve_limited() # code generation + ocp.solve() # code generation # Set Fatrop solver options (Q: why can this not be done before solving?) if fatrop_solver: @@ -170,7 +171,7 @@ def __init__(self, window_len=100, rms_error_traj=10**-2, fatrop_solver=False, b self.first_window = True self.h = h - def calculate_invariants(self, measured_positions, stepsize, use_previous_solution=True): + def calculate_invariants(self, measured_positions, stepsize, use_previous_solution=False, init_values=None): """ Calculate the invariants for the given measurements. @@ -191,9 +192,22 @@ def calculate_invariants(self, measured_positions, stepsize, use_previous_soluti # Check if this is the first function call if not use_previous_solution or self.first_time: - # Initialize states and controls using measurements - self.values_variables = initialization.initialize_VI_pos2(measured_positions) - self.first_time = False + + if init_values is not None: + + temp0 = init_values[0][:-1,:].T + temp1 = init_values[1].T + temp2 = init_values[2] + N = np.size(temp2,0) + R_t_init = np.zeros((9,np.size(temp2,0))) + for i in range(N): + R_t_init[:,i] = temp2[i,:,:].reshape(-1) + + self.values_variables = [temp0,temp1,R_t_init] + else: + # Initialize states and controls using measurements + self.values_variables = initialization.initialize_VI_pos2(measured_positions,stepsize) + self.first_time = False # Solve the optimization problem for the given measurements starting from previous solution self.values_variables = self.ocp_function(measured_positions.T, stepsize, *self.values_variables) @@ -210,7 +224,7 @@ def calculate_invariants_OLD(self, measured_positions, stepsize): """ Calculate the invariants for the given measurements. - Note: This function is not recommended for repeated use due to overhead caused by sampling the solution. + ! WARNING: This function is not recommended for repeated use due to overhead caused by sampling the solution. ! Parameters: - measured_positions (numpy.ndarray of shape (N, 3)): measured positions @@ -265,15 +279,17 @@ def calculate_invariants_OLD(self, measured_positions, stepsize): measured_positions = np.column_stack((1 * np.cos(t), 1 * np.sin(t), 0.1 * t)) stepsize = t[1]-t[0] + rms_val = 10**-3 + # Test the functionalities of the class - OCP = OCP_calc_pos(window_len=N, rms_error_traj=10**-3, fatrop_solver=True) + OCP = OCP_calc_pos(window_len=N, rms_error_traj=rms_val, fatrop_solver=False, solver_options={'max_iter': 100}) # Call the calculate_invariants function and measure the elapsed time #start_time = time.time() calc_invariants, calc_trajectory, calc_movingframes = OCP.calculate_invariants_OLD(measured_positions, stepsize) #elapsed_time = time.time() - start_time - ocp_helper.solution_check_pos(measured_positions,calc_trajectory,rms = 10**-3) + ocp_helper.solution_check_pos(measured_positions,calc_trajectory,rms = rms_val) fig = plt.figure() ax = fig.add_subplot(111, projection='3d') diff --git a/invariants_py/calculate_invariants/rockit_calculate_vector_invariants_position_mf.py b/invariants_py/calculate_invariants/rockit_calculate_vector_invariants_position_mf.py index ad4d267..f11aece 100644 --- a/invariants_py/calculate_invariants/rockit_calculate_vector_invariants_position_mf.py +++ b/invariants_py/calculate_invariants/rockit_calculate_vector_invariants_position_mf.py @@ -183,7 +183,7 @@ def __init__(self, window_len=100, rms_error_traj=10**-2, fatrop_solver=False, b self.first_window = True self.h = h - def calculate_invariants(self, measured_positions, stepsize, use_previous_solution=True): + def calculate_invariants(self, measured_positions, stepsize, use_previous_solution=False, init_values=None): """ Calculate the invariants for the given measurements. @@ -200,9 +200,23 @@ def calculate_invariants(self, measured_positions, stepsize, use_previous_soluti # Check if this is the first function call if not use_previous_solution or self.first_time: - # Initialize states and controls using measurements - self.values_variables = initialization.initialize_VI_pos2(measured_positions) - self.first_time = False + + if init_values is not None: + + temp0 = init_values[0][:-1,:].T + temp1 = init_values[1].T + temp2 = init_values[2] + N = np.size(temp2,0) + R_t_init = np.zeros((9,np.size(temp2,0))) + for i in range(N): + R_t_init[:,i] = temp2[i,:,:].reshape(-1) + + self.values_variables = [temp0,temp1,R_t_init] + else: + + # Initialize states and controls using measurements + self.values_variables = initialization.initialize_VI_pos2(measured_positions) + self.first_time = False # Solve the optimization problem for the given measurements starting from previous solution self.values_variables = self.ocp_function(measured_positions.T, stepsize, *self.values_variables) diff --git a/invariants_py/dynamics_vector_invariants.py b/invariants_py/dynamics_vector_invariants.py index 82fcd3f..281f1f7 100644 --- a/invariants_py/dynamics_vector_invariants.py +++ b/invariants_py/dynamics_vector_invariants.py @@ -106,6 +106,27 @@ def integrate_vector_invariants_position(R_t, p_obj, u, h): return (R_t_plus1, p_obj_plus1) +def integrate_vector_invariants_position_seq(R_t, p_obj, u, h): + """ + Discrete dynamics of the vector invariants for position. Integrate invariants over interval h starting from a current state (object pose + moving frames) + + This function is used to integrate the invariants sequentially, i.e. one invariant at a time. + """ + i_kappa = u[1]@h ; i_tau = u[2]@h ; i_vel = u[0]@h + + rot_z = cas.vcat([cas.hcat([cas.cos(i_kappa),-cas.sin(i_kappa),0]),\ + cas.hcat([cas.sin(i_kappa),cas.cos(i_kappa),0]),\ + cas.hcat([0,0,1])]) + + rot_x = cas.vcat([cas.hcat([1,0,0]),\ + cas.hcat([0,cas.cos(i_tau),-cas.sin(i_tau)]),\ + cas.hcat([0,cas.sin(i_tau),cas.cos(i_tau)])]) + + R_t_plus1 = R_t @ (rot_z @ rot_x) + p_obj_plus1 = R_t[:,0] * i_vel + p_obj + + return (R_t_plus1, p_obj_plus1) + def integrate_vector_invariants_rotation(R_r, R_obj, u, h): """Integrate invariants over interval h starting from a current state (object pose + moving frames)""" # Define a geometric integrator for eFSI, diff --git a/invariants_py/initialization.py b/invariants_py/initialization.py index f93ddbd..a1c332d 100644 --- a/invariants_py/initialization.py +++ b/invariants_py/initialization.py @@ -1,6 +1,7 @@ import numpy as np import invariants_py.kinematics.orientation_kinematics as SO3 from invariants_py.reparameterization import interpR +from math import atan2 def initial_trajectory_movingframe_rotation(R_obj_start,R_obj_end,N=100): @@ -174,10 +175,10 @@ def calculate_tangent(vector_traj): return tangent -def calculate_binormal(vector_traj,tangent): +def calculate_binormal(vector_traj,tangent, reference_vector=None): """ - Estimate the second axis of the moving frame based on the given trajectory. - The second axis is calculated by normalizing the cross product of the trajectory vector and the tangent. + Estimate the third axis of the moving frame based on the given trajectory. + The third axis is calculated by normalizing the cross product of the trajectory vector and the tangent. For vectors with a norm close to zero, the binormal of the previous vector is used. For vectors before the first non-zero norm, the binormal of the first non-zero norm is used. If no non-zero norm is found, all binormals are initialized with [0, 1, 0]. @@ -204,13 +205,14 @@ def calculate_binormal(vector_traj,tangent): first_nonzero_norm_index = np.where(~np.isclose(norm_binormal_vec, 0))[0] if first_nonzero_norm_index.size == 0: # choose a non-collinear vector - a = np.array([0, 1, 0]) if not np.isclose(tangent[1], 0) else np.array([0, 0, 1]) + a = np.array([0, 0, 1]) if not np.isclose(tangent[0,2], 1) else np.array([0, 1, 0]) # take cross-product to get perpendicular perp = np.cross(tangent[0,:], a, axis=0) # normalize - binormal = perp / np.linalg.norm(perp) + for i in range(N): + binormal[i, :] = perp / np.linalg.norm(perp) else: first_nonzero_norm_index = first_nonzero_norm_index[0] @@ -224,7 +226,12 @@ def calculate_binormal(vector_traj,tangent): # For each sample before the first non-zero norm index for i in range(first_nonzero_norm_index): binormal[i, :] = binormal[first_nonzero_norm_index, :] - + + if reference_vector is not None: + for i in range(N): + if np.dot(binormal[i, :], reference_vector) < 0: + binormal[i, :] = -binormal[i, :] + return binormal def estimate_movingframes(vector_traj): @@ -236,13 +243,13 @@ def estimate_movingframes(vector_traj): # Calculate binormal vector N = np.size(vector_traj, 0) - binormal_vec = np.zeros((N,3)) - for i in range(N-1): - binormal_vec[i,:] = np.cross(vector_traj[i,:],vector_traj[i+1,:]) - binormal_vec[-1,:] = binormal_vec[-2,:] + # binormal_vec = np.zeros((N,3)) + # for i in range(N-1): + # binormal_vec[i,:] = np.cross(vector_traj[i,:],vector_traj[i+1,:]) + # binormal_vec[-1,:] = binormal_vec[-2,:] # Estimate second axis - e_binormal = calculate_binormal(vector_traj,e_tangent) + e_binormal = calculate_binormal(vector_traj,e_tangent,reference_vector=np.array([0,0,1])) # Calculate third axis e_normal = np.array([ np.cross(e_binormal[i,:],e_tangent[i,:]) for i in range(N) ]) @@ -252,23 +259,27 @@ def estimate_movingframes(vector_traj): R[i,:,:] = np.column_stack((e_tangent[i,:],e_normal[i,:],e_binormal[i,:])) return R -def angle_between_vectors(u, v): +def angle_between_vectors(u, v, rot_axis = None): """ - Calculate the angle between two vectors. + Calculate the angle between two vectors in a robust way. Input: u: first vector v: second vector Output: - angle: angle in rad between the two vectors + angle: angle [rad] between the two vectors """ - from math import atan2 - angle = atan2(np.linalg.norm(np.cross(u,v)),np.dot(u,v)) - #angle = np.arccos(np.dot(u, v) / (np.linalg.norm(u) * np.linalg.norm(v))) + cross_prod = np.cross(u,v) + angle = atan2(np.linalg.norm(cross_prod),np.dot(u,v)) + + if rot_axis is not None: + sign = np.sign(np.dot(cross_prod, rot_axis)) + angle = sign*angle + return angle -def estimate_rotational_invariants(R_mf_traj,vector_traj): +def estimate_vector_invariants(R_mf_traj,vector_traj,stepsize): ''' ''' @@ -276,20 +287,20 @@ def estimate_rotational_invariants(R_mf_traj,vector_traj): N = np.size(vector_traj,0) invariants = np.zeros((N,3)) - # first invariant is the norm of the dot-product between the rotational velocity Rdiff and the first axis of the moving frame + # first invariant is the norm of the dot-product between the trajectory vector and the first axis of the moving frame for i in range(N): invariants[i,0] = np.dot(vector_traj[i,:],R_mf_traj[i,:,0]) # second invariant is the angle between successive first axes of the moving frame for i in range(N-1): - invariants[i,1] = angle_between_vectors(R_mf_traj[i,:,0], R_mf_traj[i+1,:,0]) + invariants[i,1] = angle_between_vectors(R_mf_traj[i,:,0], R_mf_traj[i+1,:,0], R_mf_traj[i,:,2])/stepsize # third invariant is the angle between successive third axes of the moving frame for i in range(N-1): - invariants[i,2] = angle_between_vectors(R_mf_traj[i,:,2], R_mf_traj[i+1,:,2]) + invariants[i,2] = angle_between_vectors(R_mf_traj[i,:,2], R_mf_traj[i+1,:,2], R_mf_traj[i,:,0])/stepsize invariants[-1,1:] = invariants[-2,1:] # copy last values - print(invariants) + #print(invariants) return invariants @@ -329,7 +340,7 @@ def initialize_VI_pos(input_trajectory): invars = np.vstack((1e0*np.ones((1,N-1)),1e-1*np.ones((1,N-1)), 1e-12*np.ones((1,N-1)))) return [invars, p_obj_sol, R_t] -def initialize_VI_pos2(measured_positions): +def initialize_VI_pos2(measured_positions,stepsize): N = np.size(measured_positions,0) Pdiff = np.diff(measured_positions, axis=0) @@ -340,9 +351,9 @@ def initialize_VI_pos2(measured_positions): R_t_init2 = np.zeros((N,3,3)) for i in range(N): R_t_init2[i,:,:] = np.column_stack((ex[i,:],ey[i,:],ez[i,:])) - print(R_t_init2) - invars = estimate_rotational_invariants(R_t_init2,Pdiff) + 1e-12*np.ones((N,3)) - print(invars) + #print(R_t_init2) + invars = estimate_vector_invariants(R_t_init2,Pdiff,stepsize) + 1e-12*np.ones((N,3)) + #print(invars) R_t_init = np.zeros((9,N)) for i in range(N): @@ -396,4 +407,34 @@ def initialize_VI_rot2(measured_orientation): invars = np.vstack((1e0*np.ones((1,N-1)),1e-1*np.ones((1,N-1)), 1e-12*np.ones((1,N-1)))) - return [invars, R_obj, R_r] \ No newline at end of file + return [invars, R_obj, R_r] + + +def discrete_approximation_invariants(measured_positions, stepsize): + """ + Calculate the vector invariants of a measured position trajectory + based on a discrete approximation of the moving frame. + + Input: + measured_positions: measured positions (Nx3) + stepsize: stepsize of the simulation + Output: + invariants: invariants (Nx3) + trajectory: trajectory of the moving frame (Nx3) + mf: moving frame (Nx3x3) + """ + N = np.size(measured_positions, 0) + + Pdiff = np.diff(measured_positions,axis=0) + + # Calculate the trajectory of the moving frame + R_mf_traj = estimate_movingframes(Pdiff) + + # Calculate the invariants based on the trajectory + invariants = estimate_vector_invariants(R_mf_traj,Pdiff/stepsize,stepsize) + 1e-6*np.ones((N-1,3)) + invariants = np.vstack((invariants, invariants[-1,:])) + + # append last value to mf + R_mf_traj = np.concatenate((R_mf_traj, R_mf_traj[-1,:,:][np.newaxis, :, :]), axis=0) + + return invariants, measured_positions, R_mf_traj \ No newline at end of file diff --git a/tests/test_compare_ipopt_fatrop_position.py b/tests/test_compare_ipopt_fatrop_position.py index 1a450ea..6879a0d 100644 --- a/tests/test_compare_ipopt_fatrop_position.py +++ b/tests/test_compare_ipopt_fatrop_position.py @@ -56,9 +56,9 @@ def test_fatrop_solver_comparison3(self): if __name__ == '__main__': # Run all tests unittest.main() -# + # Run a specifc test # loader = unittest.TestLoader() - # suite = loader.loadTestsFromName('test_compare_ipopt_fatrop_position.TestOCPcalcpos.test_fatrop_solver_comparison3') + # suite = loader.loadTestsFromName('test_compare_ipopt_fatrop_position.TestOCPcalcpos.test_fatrop_solver_comparison2') # runner = unittest.TextTestRunner() # runner.run(suite) \ No newline at end of file