-
Notifications
You must be signed in to change notification settings - Fork 3
/
pf_functions.py
331 lines (254 loc) · 9.81 KB
/
pf_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
#!/usr/bin/env python
import rospy
import rosservice
import numpy as np
import numpy.matlib as nm
from math import *
from tf.transformations import euler_from_quaternion, quaternion_from_euler
from sensor_msgs.msg import LaserScan
from nav_msgs.msg import Odometry
from gazebo_msgs.msg import ModelState
from pf_classes import *
COLLISION_DISTANCE = 0.12 # collision distance in [m]
def resample_particles(particles, map):
"""
Resample particles from distribution based on weights w_t
"""
weights = np.array([p.w for p in particles])
resampled_particles = []
while not (len(resampled_particles) == len(particles)):
rnd = np.random.rand() # unifrom distribution between 0-1
cumsum = np.cumsum(weights) # cumulative sum of particles weights
# find index of selected particle
ind = np.where(cumsum > rnd)[0][0]
# resample new particle around selected particle
mu_x = particles[ind].X[0]
sigma_x = 0.01 # in [m]
mu_y = particles[ind].X[1]
sigma_y = 0.01 # in [m]
mu_theta = particles[ind].X[2]
sigma_theta = 0.06 # in [radians]
X_res_particle = [np.random.normal(mu_x, sigma_x),
np.random.normal(mu_y, sigma_y),
np.random.normal(mu_theta, sigma_theta)]
w_res_particle = 1 / len(particles) # Not important at the moment, will be recalculated
res_particle = Particle(X_res_particle, w_res_particle)
# if resampled particle position is valid, add it to the list of resampled particles
if not map.check_invalid_position(res_particle):
resampled_particles.append(res_particle)
return resampled_particles
def normalize_particle_weights(particles):
"""
Normalize weights of particles to satisfy: sum(weights) = 1.0
"""
cumsum = 0.0
for p in particles:
cumsum += p.w
for p in particles:
p.w /= (cumsum + 1e-6)
return particles
def calculate_particle_weight(p, angles, distances, map):
"""
Calculate particle weights based on measurements: w_t = p(z_t|x_t)
"""
p_distances = np.array([])
p_angles = angles
for ang in angles:
distances_to_obstacles = []
for w in map.walls:
# fit line parameters r and alpha
(r, alpha) = fit_line(np.array([w.x, w.y]), np.array([p.X[0], p.X[1], p.X[2]]))
# distance to object
obst_dist = r / np.cos(ang - alpha)
# object bounds
x_min = np.min(w.x)
x_max = np.max(w.x)
y_min = np.min(w.y)
y_max = np.max(w.y)
# corners tolerance
x_min -= (0.02 * np.sign(x_min)) * x_min
x_max += (0.02 * np.sign(x_max)) * x_max
y_min -= (0.02 * np.sign(y_min)) * y_min
y_max += (0.02 * np.sign(y_max)) * y_max
# intersection in (x,y) coordinates
intersection_x = p.X[0] + obst_dist * np.cos(ang + p.X[2])
intersection_y = p.X[1] + obst_dist * np.sin(ang + p.X[2])
# object parallel to lidar beam
if obst_dist < 100:
parallel = False
else:
parallel = True
# object behind lidar beam
if obst_dist > 0:
behind = False
else:
behind = True
# lidar beam hits the object - bounded line
if x_min < intersection_x < x_max and y_min < intersection_y < y_max:
out_of_bounds = False
else:
out_of_bounds = True
if not out_of_bounds and not parallel and not behind:
distances_to_obstacles.append(obst_dist)
# if lidar beam hits near the corner it will intersect 2 obstacles -> choose closest
min_obst_dist = min(distances_to_obstacles)
# limit measurement
if min_obst_dist > MAX_LIDAR_DISTANCE:
min_obst_dist = MAX_LIDAR_DISTANCE
p_distances = np.append(p_distances, min_obst_dist)
# euc dist between estimated lidar measurements of particle and ground truth
lidar_est_dist = np.linalg.norm(distances - p_distances)
# max euc dist between estimated lidar measurements of particle and ground truth
max_lidar_est_dist = np.linalg.norm(MAX_LIDAR_DISTANCE * np.ones(len(distances)) - COLLISION_DISTANCE * np.ones(len(distances)))
# weight of particle - not normalized
p.w = (max_lidar_est_dist - lidar_est_dist) ** 2
return p
def apply_state_transision(p, dX):
"""
Apply state transision model p(x_t|x_t_1, u_t) to particle
"""
traveled_distance = np.sqrt(dX[0] ** 2 + dX[1] ** 2)
dX_particle = [traveled_distance * np.cos(p.X[2] + dX[2]),
traveled_distance * np.sin(p.X[2] + dX[2]),
dX[2]]
p.X = list(np.array(p.X) + np.array(dX_particle))
# normalize to 0 - 2 pi
if p.X[2] > 2 * np.pi:
p.X[2] = p.X[2] % (2 * np.pi)
return p
def generate_uniform_particles(map, N):
"""
Generate N uniformly distributed particles on the map
"""
x_min, y_min, x_max, y_max = 100, 100, -100, -100
for w in map.walls:
x_min = np.min(np.append(w.x, x_min))
x_max = np.max(np.append(w.x, x_max))
y_min = np.min(np.append(w.y, y_min))
y_max = np.max(np.append(w.y, y_max))
x_min = x_min + (COLLISION_DISTANCE + 0.03)
x_max = x_max - (COLLISION_DISTANCE + 0.03)
y_min = y_min + (COLLISION_DISTANCE + 0.03)
y_max = y_max - (COLLISION_DISTANCE + 0.03)
x_rand = np.random.uniform(low = x_min, high = x_max, size = N)
y_rand = np.random.uniform(low = y_min, high = y_max, size = N)
theta_rand = np.random.uniform(low = 0, high = 2*np.pi, size = N)
particles = []
for i in range(N):
X = [x_rand[i], y_rand[i], theta_rand[i]]
w = 1.0 / N
particles.append(Particle(X, w))
return particles
def copy_particles(particles):
"""
Copy particles function - for appending list of lists of particles
DATA_PARTICLES = [particles_0, particles_1, ...]
"""
new_particles = []
for p in particles:
new_particles.append(Particle(p.X, p.w))
return new_particles
def fit_line(X_line, X_robot):
"""
Fit line parameters r and alpha from X,Y points
"""
# Transformation to robot's local coordinate frame
R = np.array([[+np.cos(X_robot[2]), +np.sin(X_robot[2])],
[-np.sin(X_robot[2]), +np.cos(X_robot[2])]])
X_line_local = np.matmul(R, X_line - np.reshape(X_robot[0:2], (2,1)))
# line center
line_center_x = sum(X_line_local[0]) / len(X_line_local[0])
line_center_y = sum(X_line_local[1]) / len(X_line_local[1])
num = 0
denum = 0
for i in range(len(X_line_local[0])):
num += (X_line_local[0][i] - line_center_x) * (X_line_local[1][i] - line_center_y)
denum += (X_line_local[1][i] - line_center_y) ** 2 - (X_line_local[0][i] - line_center_x) ** 2
num = -2 * num
alpha = atan2(num, denum) / 2
r = line_center_x * cos(alpha) + line_center_y * sin(alpha)
if r < 0:
alpha = alpha + pi
if alpha > pi:
alpha = alpha - 2 * pi
r = -r
return (r, alpha)
def get_estimation_step(est_csv_name):
"""
Get estimation step from csv file with name 'PARTICLES_step_i.csv', step == i
"""
return int(est_csv_name.split('_')[-1].split('.')[0])
def transform_orientation(orientation_q):
"""
Transform theta to [radians] from [quaternion orientation]
"""
orientation_list = [ orientation_q.x, orientation_q.y, orientation_q.z, orientation_q.w]
(roll, pitch, yaw) = euler_from_quaternion(orientation_list)
if yaw < 0:
yaw = 2 * np.pi + yaw # 0->360 degrees >> 0->2pi
return yaw
def get_odom_orientation(msgOdom):
""""
Get theta from Odometry msg in [radians]
"""
orientation_q = msgOdom.pose.pose.orientation
theta = transform_orientation(orientation_q)
return theta
def get_odom_position(msgOdom):
"""
Get (x,y) coordinates from Odometry msg in [m]
"""
x = msgOdom.pose.pose.position.x
y = msgOdom.pose.pose.position.y
return (x, y)
def robot_set_pos(setPosPub, x, y, theta):
"""
Set robot position and orientation
"""
checkpoint = ModelState()
checkpoint.model_name = 'turtlebot3'
checkpoint.pose.position.x = x
checkpoint.pose.position.y = y
checkpoint.pose.position.z = 0.0
[x_q,y_q,z_q,w_q] = quaternion_from_euler(0.0,0.0,theta)
checkpoint.pose.orientation.x = x_q
checkpoint.pose.orientation.y = y_q
checkpoint.pose.orientation.z = z_q
checkpoint.pose.orientation.w = w_q
checkpoint.twist.linear.x = 0.0
checkpoint.twist.linear.y = 0.0
checkpoint.twist.linear.z = 0.0
checkpoint.twist.angular.x = 0.0
checkpoint.twist.angular.y = 0.0
checkpoint.twist.angular.z = 0.0
setPosPub.publish(checkpoint)
return (x, y, theta)
def lidar_scan(msgScan, max_lidar_distance):
"""
Convert LaserScan msg to array
"""
distances = np.array([])
angles = np.array([])
for i in range(len(msgScan.ranges)):
angle = i * msgScan.angle_increment
if ( msgScan.ranges[i] > max_lidar_distance ):
distance = max_lidar_distance
elif ( msgScan.ranges[i] < msgScan.range_min ):
distance = msgScan.range_min
# For real robot - protection
if msgScan.ranges[i] < 0.01:
distance = max_lidar_distance
else:
distance = msgScan.ranges[i]
distances = np.append(distances, distance)
angles = np.append(angles, angle)
# distances in [m], angles in [radians]
return ( distances, angles )
def check_crash(distances):
"""
Check for crash
"""
if np.min(distances) <= COLLISION_DISTANCE:
return True
else:
return False