-
Notifications
You must be signed in to change notification settings - Fork 4
/
k9_motor_sm.py
308 lines (278 loc) · 9.95 KB
/
k9_motor_sm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
#!/usr/bin/env python
# coding: utf-8
# Author: Richard Hopkins
# Date: 6 April 2022
#
# This program runs the motor state machine for
# the K9 robot and responds to commands received
# by MQTT/Bluetooth.
#
import time
import sys
#import json
import math
#from tkinter.messagebox import NO
import logo # k9 movement library
from state import State # Base FSM State class
import paho.mqtt.client as mqtt
print("MQTT found...")
from queue import Queue
from memory import Memory
from voice import Voice
print("All imports done!")
mem = Memory()
voice = Voice()
# Define K9 Motor States
class ManualControl(State):
'''
The state where K9 is waiting for movement commands
'''
def __init__(self):
super(ManualControl, self).__init__()
logo.stop()
while True:
message = check_queue()
if message != "no_message":
self.on_event(message)
def on_event(self, event):
if event == 'ComeHere':
return Scanning()
if event == 'FollowMe':
return Following()
if event == 'TurnAbout' or event == 'Turn90Right':
return Turn_Around(math.pi)
if event == 'Turn90Left':
return Turn_Around(-math.pi)
if event == 'Turn180Left':
return Turn_Around(-2*math.pi)
if event == 'Turn180Right':
return Turn_Around(2*math.pi)
if event.startswith("TurnAngle"):
angle_rad = float(event[len("TurnAngle"):])
return Turn_Around(angle_rad)
return self
class Scanning(State):
'''
The state where K9 is looking for the nearest person to follow
'''
def __init__(self):
super(Scanning, self).__init__()
while True:
message = check_queue()
if message != "no_message":
self.on_event(message)
self.target = None
self.target = mem.retrieveLastSensorReading("person")
try:
if self.target['distance'] != 0.0 and self.target['angle'] != 0.0 :
self.on_event('person_found')
except KeyError:
pass
def on_event(self, event):
if event == 'person_found':
k9.voice("Coming master")
return Turning(self.target)
return self
class Turning(State):
'''
The child state where K9 is turning towards the target person
'''
def __init__(self, target):
super(Turning, self).__init__()
self.angle = target["angle"]
self.distance = target["distance"]
if abs(self.angle) > 0.2 :
print("Turning: Moving ",self.angle," radians towards target")
logo.right(self.angle)
else:
self.on_event('turn_finished')
while True:
message = check_queue()
if message != "no_message":
self.on_event(message)
if logo.finished_move():
self.on_event('turn_finished')
# check to see if rotation is safe
if mem.retrieveState("rotate") < 0.0:
self.on_event('turn_blocked')
def on_event(self, event):
if event == 'turn_finished':
return Moving_Forward(self.distance)
if event == 'turn_blocked':
voice.speak("Turn blocked")
return ManualControl()
if event == 'StayHere':
voice.speak("Staying put")
return ManualControl()
return self
class Turn_Around(State):
'''
The child state where K9 rotates by 90 or 180 degrees
'''
def __init__(self, angle):
super(Turn_Around, self).__init__()
logo.right(angle)
while True:
message = check_queue()
if message != "no_message":
self.on_event(message)
if logo.finished_move():
self.on_event('turn_finished')
# check to see if rotation is safe
if mem.retrieveState("rotate") < 0.0:
voice.speak("Turn blocked")
self.on_event('turn_blocked')
def on_event(self, event):
if event == 'turn_blocked':
return ManualControl()
if event == 'turn_finished':
return ManualControl()
if event == 'StayHere':
voice.speak("Staying put")
return ManualControl()
return self
class Moving_Forward(State):
'''
The child state where K9 is moving forwards to the target
'''
def __init__(self, distance):
super(Moving_Forward, self).__init__()
self.distance = distance
# self.avg_dist = 4.0
# z = float(self.target.depth_z)
# distance = float(z - SWEET_SPOT)
if self.distance > 0:
print("Moving Forward: ",self.distance,"m")
# logo.forwards(self.distance)
else:
print("Moving Forward: no need to move")
self.on_event('target_reached')
while True:
message = check_queue()
if message != "no_message":
self.on_event(message)
if not logo.finished_move():
pass
else:
self.on_event('target_reached')
def on_event(self, event):
if event == 'target_reached':
voice.speak("Master!")
return ManualControl()
if event == 'StayHere':
voice.speak("Staying put")
return ManualControl()
return self
class Following(State):
'''
Having reached the target, now follow it blindly
'''
def __init__(self):
super(Following, self).__init__()
logo.stop()
angle = 0
move = 0
while True:
message = check_queue()
if message != "no_message":
self.on_event(message)
# read in available target information from memory
target_dict = mem.retrieveLastSensorReading("follow")
person_dict = mem.retrieveLastSensorReading("person")
# chose the detected legs over person targetting
if target_dict["angle"] != 0 and target_dict["distance"] != 0 :
angle = target_dict["angle"]
move = target_dict["distance"]
elif person_dict["angle"] !=0 and person_dict["distance"] != 0:
angle = person_dict["angle"]
move = person_dict["distance"]
# if there is nothing detected, then aim for
# the last detected set of legs
else:
target_dicts = mem.retrieveSensorReadings("follow")
for target_dict in target_dicts:
if target_dict["angle"] != 0:
angle = target_dict["angle"]
break
# move if the angle or distance is not zero
if angle != 0 or move !=0:
print("Following: direction:", angle, "distance:", move)
damp_angle = 3.0
damp_distance = 2.0
if abs(angle) >= (0.1 * damp_angle) :
if mem.retrieveState("rotate") > 0.0:
logo.rt(angle / damp_angle, fast = True)
print("Turning: ",str(angle / damp_angle))
else:
voice.speak("Turn blocked")
time.sleep(3.0)
else:
if abs(move) >= (0.05 * damp_distance) :
distance = move / damp_distance
safe_forward = mem.retrieveState("forward")
# nb should also retrieve a backward state
if safe_forward > distance:
logo.forward(distance)
print("Moving forward detected distance: ", str(distance) )
else:
logo.forward(safe_forward)
print("Moving forward safe distance: ", str(safe_forward))
def on_event(self, event):
if event == 'StayHere':
voice.speak("Staying here")
return ManualControl()
return self
class K9MotorSM:
'''
A K9 finite state machine that starts in manual control state and
will transition to a new state on when a transition event occurs.
'''
def __init__(self):
''' Initialise K9 in his manual control state. '''
self.state = ManualControl()
def on_event(self,event):
'''
Process the incoming event using the on_event function of the
current K9 state. This may result in a change of state.
'''
# The next state will be the result of the on_event function.
print("Event:",event, "raised in state", str(self.state).lower())
self.state = self.state.on_event(event)
def mqtt_callback(client, userdata, message):
"""
Enables K9 to receive a message from an Epruino Watch via
MQTT over Bluetooth (BLE) to place it into active or inactive States
"""
payload = str(message.payload.decode("utf-8"))
#if payload != self.last_message:
# self.last_message = payload
# event = payload[3:-1].lower()
# # print("Event: ",str(event))
queue.put(payload)
print(str(payload),"put on queue by motor state machine")
def check_queue() -> str:
'''
Checks local MQTT queue cache for messages
'''
while not queue.empty():
message = queue.get()
if message is None:
continue
return(message)
return("no_message")
try:
queue = Queue()
client = mqtt.Client("k9-motor")
client.connect("localhost")
client.on_message = mqtt_callback # attach function to callback
client.subscribe("k9/events/motor", qos=2)
# self.client.subscribe("/ble/advertise/watch/m")
client.loop_start()
print("MQTT subscription interface active")
print("Creating K9 Motor State Machine instance")
k9 = K9MotorSM()
except KeyboardInterrupt:
logo.stop()
client.loop_stop()
"Motors stopped and MQTT client stopped"
sys.exit(0)