Skip to content

Commit

Permalink
Merge branch 'testing' into 'main'
Browse files Browse the repository at this point in the history
Testing

See merge request robotgenskill/public_code/invariants_py!36
  • Loading branch information
maximvochten committed Dec 3, 2024
2 parents f3d2bf9 + 412184d commit 9acc8c9
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 103 deletions.
3 changes: 1 addition & 2 deletions examples/calculate_invariants_position_longtrial.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@
#/*********************************************************************************************************************/
#/* Option 1: Calculate invariants using discretized analytical formulas
#/*********************************************************************************************************************/

progress_step = np.mean(np.diff(timestamps))#.reshape(-1,1)
progress_step = np.mean(np.diff(timestamps))
from invariants_py import discretized_vector_invariants as dvi
invariants_init, trajectory_init, moving_frames_init = dvi.calculate_discretized_invariants(trajectory, progress_step)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ def __init__(self, window_len = 100, bool_unsigned_invariants = False, rms_error

def calculate_invariants(self,trajectory_meas,stepsize, choice_initialization=2):

from invariants_py.ocp_initialization import calculate_velocity_from_discrete_rotations, calculate_vector_invariants
from invariants_py.discretized_vector_invariants import calculate_moving_frames
from invariants_py.ocp_initialization import calculate_velocity_from_discrete_rotations
from invariants_py.discretized_vector_invariants import calculate_moving_frames, calculate_vector_invariants
from invariants_py.dynamics_vector_invariants import reconstruct_rotation_traj
from invariants_py.kinematics.screw_kinematics import average_vector_orientation_frame

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
import rockit
from invariants_py import ocp_helper, ocp_initialization
from invariants_py.ocp_helper import check_solver
from invariants_py.dynamics_vector_invariants import integrate_vector_invariants_position
from invariants_py.dynamics_vector_invariants import integrate_vector_invariants_position, integrate_vector_invariants_position_seq
from invariants_py import discretized_vector_invariants as dvi

class OCP_calc_pos:

def __init__(self, window_len=100, rms_error_traj=10**-3, fatrop_solver=False, bool_unsigned_invariants=False, planar_task=False, solver_options = {}, geometric=False):
def __init__(self, window_len=100, rms_error_traj=10**-3, fatrop_solver=False, bool_unsigned_invariants=False, planar_task=False, solver_options = {}, geometric=False, periodic=False):
"""
Initializes an instance of the RockitCalculateVectorInvariantsPosition class.
It specifies the optimal control problem (OCP) for calculating the invariants of a trajectory in a symbolic way.
Expand Down Expand Up @@ -76,15 +77,20 @@ def __init__(self, window_len=100, rms_error_traj=10**-3, fatrop_solver=False, b

# Lower bounds on controls
if bool_unsigned_invariants:
ocp.subject_to(invars[0,:]>=0) # velocity always positive
#ocp.subject_to(invars[1,:]>=0) # curvature rate always positive
ocp.subject_to(invars[0]>=0) # velocity always positive
#ocp.subject_to(invars[1]>=0) # curvature rate always positive

#ocp.subject_to(-np.pi <= invars[1,:]*h) # curvature rate bounded by 1
#ocp.subject_to(-np.pi <= invars[2,:]*h) # curvature rate bounded by 1
#ocp.subject_to(invars[1,:]*h <= np.pi) # curvature rate bounded by 1
#ocp.subject_to(invars[2,:]*h <= np.pi) # curvature rate bounded by 1

# Measurement fitting constraint
# TODO: what about specifying the tolerance per sample instead of the sum?
ek = cas.dot(p_obj - p_obj_m, p_obj - p_obj_m) # squared position error
if not fatrop_solver:
# sum of squared position errors in the window should be less than the specified tolerance rms_error_traj
total_ek = ocp.sum(ek,grid='control',include_last=True)
total_ek = ocp.sum(ek,grid='control',include_last=True)
ocp.subject_to(total_ek < (N*rms_error_traj**2))

#ocp.subject_to(ek < 0.01**2)
Expand All @@ -103,11 +109,16 @@ def __init__(self, window_len=100, rms_error_traj=10**-3, fatrop_solver=False, b
#ocp.subject_to(p_obj - p_obj_m < np.array((0.0001,0.0001,0.0001))) # squared position error at each sample should be less than the specified tolerance rms_error_traj


# TODO this is still needed because last sample is not included in the sum now
#total_ek = ocp.state() # total sum of squared error
#ocp.set_next(total_ek, total_ek)
#ocp.subject_to(ocp.at_tf(total_ek == running_ek + ek))
# total_ek_scaled = total_ek/N/rms_error_traj**2 # scaled total error
#ocp.subject_to(ek < rms_error_traj**2) # squared position error should be less than the specified tolerance rms_error_traj

# # TODO this is still needed because last sample is not included in the sum now
# #total_ek = ocp.state() # total sum of squared error
# #ocp.set_next(total_ek, total_ek)
# #ocp.subject_to(ocp.at_tf(total_ek == running_ek + ek))
# # total_ek_scaled = total_ek/N/rms_error_traj**2 # scaled total error

#total_ek = ocp.sum(ek,grid='control',include_last=True)
#ocp.add_objective(total_ek/N)

#ocp.subject_to(total_ek/N < rms_error_traj**2)
#total_ek_scaled = running_ek/N/rms_error_traj**2 # scaled total error
Expand All @@ -128,6 +139,12 @@ def __init__(self, window_len=100, rms_error_traj=10**-3, fatrop_solver=False, b
L = ocp.state() # introduce extra state L for the speed
ocp.set_next(L, L) # enforce constant speed
ocp.subject_to(invars[0] - L == 0, include_last=False) # relate to first invariant

#if periodic:
# ocp.subject_to(ocp.at_tf(p_obj) - ocp.at_t0(p_obj) == 0)
# # Periodic boundary conditions (optional)
# ocp.subject_to(ocp.at_tf(R_t) == ocp.at_t0(R_t))


""" Objective function """

Expand Down Expand Up @@ -232,7 +249,7 @@ def calculate_invariants(self, measured_positions, stepsize, use_previous_soluti

# print(measured_positions.T)
# print(stepsize)
# print(*self.values_variables)
# print(*self.values_variables)False

self.values_variables = self.ocp_function(measured_positions.T, stepsize, *self.values_variables)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,11 @@ class OCP_calc_pos:

def __init__(self,
window_len=100,
w_pos = 1, w_rot = 1,
w_deriv = (10**-6)*np.array([1.0, 1.0, 1.0]),
w_abs = (10**-10)*np.array([1.0, 1.0]),
bool_unsigned_invariants = False,
planar_task = False,
geometric = False,
fatrop_solver = True,
solver_options = {}):

# TODO change "planar_task" to "planar_trajectory"
# TODO change bool_unsigned_invariants to positive_velocity

Expand Down Expand Up @@ -46,7 +43,7 @@ def __init__(self,

# System dynamics (integrate current states + controls to obtain next states)
# this relates the states/controls over the whole window
(R_t_plus1, p_obj_plus1) = integrate_vector_invariants_position_seq(R_t, p_obj, invars, h)
(R_t_plus1, p_obj_plus1) = integrate_vector_invariants_position(R_t, p_obj, invars, h)
ocp.set_next(p_obj,p_obj_plus1)
ocp.set_next(R_t,R_t_plus1)

Expand Down Expand Up @@ -120,9 +117,9 @@ def __init__(self,
""" Objective function """

# Minimize moving frame invariants to deal with singularities and noise
objective_reg = ocp.sum(cas.dot(invars[:3],invars[:3]))
objective = 10e-10*objective_reg/(N-1)
ocp.add_objective(objective)
# objective_reg = ocp.sum(cas.dot(invars[:3],invars[:3]))
# objective = 10e-10*objective_reg/(N-1)
# ocp.add_objective(objective)

# objective_reg2 = ocp.sum(cas.dot(invars_deriv,invars_deriv))
# objective_reg2_scaled = 0.000001*objective_reg2/(N-1)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,3 @@
'''
This class is used to calculate the invariants of a measured position trajectory using the Rockit optimal control problem (OCP) framework.
The invariants are calculated by finding the invariant control inputs that reconstruct a trajectory such that it lies within a specified tolerance from the measured trajectory, while minimizing the curvature and torsion rate of the trajectory to deal with noise and singularities.
Usage:
# Example data (helix)
N = 100
t = np.linspace(0, 4, N)
measured_positions = np.column_stack((1 * np.cos(t), 1 * np.sin(t), 0.1 * t))
stepsize = t[1]-t[0]
# Specify optimal control problem (OCP)
OCP = OCP_calc_pos(window_len=N, rms_error_traj=10**-3)
# Calculate invariants
invariants,trajectory,moving-frames = OCP.calculate_invariants(measured_positions, stepsize)
'''

import numpy as np
import casadi as cas
import rockit
Expand All @@ -26,18 +7,17 @@

class OCP_calc_pos:

def __init__(self, window_len=100, rms_error_traj=10**-2, fatrop_solver=False, bool_unsigned_invariants=False, planar_task=False, geometric = False, solver_options = {}):
"""
Initializes an instance of the RockitCalculateVectorInvariantsPosition class.
It specifies the optimal control problem (OCP) for calculating the invariants of a trajectory in a symbolic way.
def __init__(self,
window_len=100,
w_pos = 1, w_rot = 1,
w_deriv = (10**-6)*np.array([1.0, 1.0, 1.0]),
w_abs = (10**-10)*np.array([1.0, 1.0]),
fatrop_solver=False,
bool_unsigned_invariants=False,
planar_task=False,
geometric = False,
solver_options = {}):

Args:
window_len (int, optional): The length of the window of trajectory measurmeents in the optimization problem. Defaults to 100.
rms_error_traj (float, optional): The tolerance for the squared RMS error of the trajectory. Defaults to 10**-2.
fatrop_solver (bool, optional): Flag indicating whether to use the Fatrop solver. Defaults to False.
bool_unsigned_invariants (bool, optional): Flag indicating whether to enforce unsigned invariants. Defaults to False.
planar_task (bool, optional): Flag indicating whether the task is planar. Defaults to False.
"""
# TODO change "planar_task" to "planar_trajectory"
# TODO change bool_unsigned_invariants to positive_velocity

Expand Down Expand Up @@ -84,29 +64,9 @@ def __init__(self, window_len=100, rms_error_traj=10**-2, fatrop_solver=False, b

# Measurement fitting constraint
# TODO: what about specifying the tolerance per sample instead of the sum?
ek = cas.dot(p_obj - p_obj_m, p_obj - p_obj_m) # squared position error
if not fatrop_solver:
# sum of squared position errors in the window should be less than the specified tolerance rms_error_traj
total_ek = ocp.sum(ek,grid='control',include_last=True)
ocp.subject_to(total_ek/N < rms_error_traj**2)
#else:
# # Fatrop does not support summing over grid points inside the constraint, so we implement it differently to achieve the same result
# running_ek = ocp.state() # running sum of squared error
# ocp.subject_to(ocp.at_t0(running_ek == 0))
# ocp.set_next(running_ek, running_ek + ek)
# ocp.subject_to(ocp.at_tf(1000*running_ek/N < 1000*rms_error_traj**2)) # scaling to avoid numerical issues in fatrop

# # TODO this is still needed because last sample is not included in the sum now
# total_ek = ocp.state() # total sum of squared error
# ocp.set_next(total_ek, total_ek)
# ocp.subject_to(ocp.at_tf(total_ek == running_ek + ek))
# # total_ek_scaled = total_ek/N/rms_error_traj**2 # scaled total error
# ocp.subject_to(1000*total_ek/N < 1000*rms_error_traj**2)
# #total_ek_scaled = running_ek/N/rms_error_traj**2 # scaled total error
# #ocp.subject_to(ocp.at_tf(total_ek_scaled < 1))

#ocp.subject_to(ek < rms_error_traj**2)


#total_ek = ocp.sum(ek,grid='control',include_last=True
# Boundary conditions (optional, but may help to avoid straight line fits)
#ocp.subject_to(ocp.at_t0(p_obj == p_obj_m)) # fix first position to measurement
#ocp.subject_to(ocp.at_tf(p_obj == p_obj_m)) # fix last position to measurement
Expand All @@ -126,16 +86,17 @@ def __init__(self, window_len=100, rms_error_traj=10**-2, fatrop_solver=False, b
""" Objective function """

# Minimize moving frame invariants to deal with singularities and noise
objective_reg = ocp.sum(cas.dot(invars[1:3],invars[1:3]))
objective = 0.000001*objective_reg/(N-1)
ocp.add_objective(objective)

objective_reg2 = ocp.sum(cas.dot(invars_deriv,invars_deriv))
objective_reg2_scaled = 0.000001*objective_reg2/(N-1)
ocp.add_objective(objective_reg2_scaled)
deriv_invariants_weighted = w_deriv**(0.5)*invars_deriv
cost_deriv_invariants = ocp.sum(cas.dot(deriv_invariants_weighted,deriv_invariants_weighted))/(N-1)
ocp.add_objective(cost_deriv_invariants)
abs_invariants_weighted = w_abs**(0.5)*invars[1:3]
cost_absolute_invariants = ocp.sum(cas.dot(abs_invariants_weighted,abs_invariants_weighted))/(N-1)
ocp.add_objective(cost_absolute_invariants)

objective_fit = ocp.sum(ek, include_last=True)
ocp.add_objective(objective_fit)
pos_error_weighted = w_pos**(0.5)*(p_obj_m - p_obj)
cost_fitting = ocp.sum(cas.dot(pos_error_weighted, pos_error_weighted),grid='control',include_last=True)/N
ocp.add_objective(cost_fitting)

""" Solver definition """

Expand Down Expand Up @@ -289,18 +250,18 @@ def calculate_invariants_OLD(self, measured_positions, stepsize):
# Example data for measured positions and stepsize
N = 100
t = np.linspace(0, 4, N)
measured_positions = np.column_stack((1 * np.cos(t), 1 * np.sin(t), 0.1 * t))
measured_positions = np.column_stack((1 * np.cos(t), 1 * np.sin(t), 0.1 * t)) + np.random.normal(0, 0.001, (N, 3))
stepsize = t[1]-t[0]

# Test the functionalities of the class
OCP = OCP_calc_pos(window_len=N, rms_error_traj=10**-3, fatrop_solver=True)
OCP = OCP_calc_pos(window_len=N, fatrop_solver=True)

# Call the calculate_invariants function and measure the elapsed time
#start_time = time.time()
calc_invariants, calc_trajectory, calc_movingframes = OCP.calculate_invariants_OLD(measured_positions, stepsize)
calc_invariants, calc_trajectory, calc_movingframes = OCP.calculate_invariants(measured_positions, stepsize)
#elapsed_time = time.time() - start_time

ocp_helper.solution_check_pos(measured_positions,calc_trajectory,rms = 10**-3)
#ocp_helper.solution_check_pos(measured_positions,calc_trajectory,rms = 10**-3)

fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
Expand Down
10 changes: 5 additions & 5 deletions invariants_py/discretized_vector_invariants.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,15 @@ def calculate_binormal(vector_traj,tangent, tolerance_singularity, reference_vec

return binormal

def calculate_moving_frames(vector_traj, tolerance_singularity = 1e-2):
def calculate_moving_frames(vector_traj, tolerance_singularity_vel = 1e-2, tolerance_singularity_curv = 1e-2):

N = np.size(vector_traj, 0)

# Calculate first axis
e_tangent = calculate_tangent(vector_traj, tolerance_singularity)
e_tangent = calculate_tangent(vector_traj, tolerance_singularity_vel)

# Calculate third axis
e_binormal = calculate_binormal(vector_traj,e_tangent,tolerance_singularity,reference_vector=np.array([0,0,1]))
e_binormal = calculate_binormal(vector_traj,e_tangent,tolerance_singularity_curv,reference_vector=np.array([0,0,1]))

# Calculate second axis as cross product of third and first axis
e_normal = np.array([ np.cross(e_binormal[i,:],e_tangent[i,:]) for i in range(N) ])
Expand Down Expand Up @@ -214,7 +214,7 @@ def calculate_vector_invariants(R_mf_traj,vector_traj,progress_step):

return invariants

def calculate_discretized_invariants(measured_positions, progress_step, tolerance_singularity = 1e0):
def calculate_discretized_invariants(measured_positions, progress_step, tolerance_singularity_vel = 1e-1, tolerance_singularity_curv = 1e-2):
"""
Calculate the vector invariants of a measured position trajectory
based on a discrete approximation of the moving frame.
Expand All @@ -236,7 +236,7 @@ def calculate_discretized_invariants(measured_positions, progress_step, toleranc
#print(vel_vector)

# Calculate the moving frame
R_mf_traj = calculate_moving_frames(vel_vector, tolerance_singularity)
R_mf_traj = calculate_moving_frames(vel_vector, tolerance_singularity_vel, tolerance_singularity_curv)
#print(R_mf_traj)

# Calculate the invariants
Expand Down
25 changes: 15 additions & 10 deletions invariants_py/ocp_initialization.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import numpy as np
import invariants_py.kinematics.orientation_kinematics as SO3
from invariants_py.reparameterization import interpR
from invariants_py.discretized_vector_invariants import calculate_vector_invariants, calculate_velocity_from_discrete_rotations
from invariants_py.discretized_vector_invariants import calculate_discretized_invariants, calculate_velocity_from_discrete_rotations


def initial_trajectory_movingframe_rotation(R_obj_start,R_obj_end,N=100):
Expand Down Expand Up @@ -204,26 +204,31 @@ def estimate_initial_frames(vector_traj):
def initialize_VI_pos2(measured_positions,stepsize):

N = np.size(measured_positions,0)
Pdiff = np.diff(measured_positions, axis=0)
Pdiff = np.vstack((Pdiff, Pdiff[-1]))
invariants, meas_pos, mf = calculate_discretized_invariants(measured_positions,stepsize)

#Pdiff = np.diff(measured_positions, axis=0)
#Pdiff = np.vstack((Pdiff, Pdiff[-1]))

[ex,ey,ez] = estimate_initial_frames(Pdiff)
#[ex,ey,ez] = estimate_initial_frames(Pdiff)

R_t_init2 = np.zeros((N,3,3))
for i in range(N):
R_t_init2[i,:,:] = np.column_stack((ex[i,:],ey[i,:],ez[i,:]))
#R_t_init2 = np.zeros((N,3,3))
#for i in range(N):
# R_t_init2[i,:,:] = np.column_stack((ex[i,:],ey[i,:],ez[i,:]))
#print(R_t_init2)
invars = calculate_vector_invariants(R_t_init2,Pdiff,stepsize) + 1e-12*np.ones((N,3))
#invars = calculate_vector_invariants(R_t_init2,Pdiff,stepsize) + 1e-12*np.ones((N,3))
#print(invars)

R_t_init = np.zeros((9,N))
for i in range(N):
R_t_init[:,i] = np.hstack([ex[i,:],ey[i,:],ez[i,:]])
R_t_init[:,i] = np.hstack([mf[i,:,0],mf[i,:,1],mf[i,:,2]])

#print(R_t_init)

p_obj_sol = measured_positions.T

#invars = np.vstack((1e0*np.ones((1,N-1)),1e-1*np.ones((1,N-1)), 1e-12*np.ones((1,N-1))))

return [invars[:-1,:].T, p_obj_sol, R_t_init]
return [invariants[:-1,:].T, p_obj_sol, R_t_init]

def initialize_VI_rot(input_trajectory):

Expand Down

0 comments on commit 9acc8c9

Please sign in to comment.