-
Notifications
You must be signed in to change notification settings - Fork 1
/
tello.py
172 lines (122 loc) · 4.31 KB
/
tello.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
from contextlib import asynccontextmanager
from typing import (
AsyncIterator,
Awaitable,
TypeAlias,
Callable,
)
import asyncio
import cv2
import av
TELLO_IP = '192.168.10.1'
VIDEO_PORT = 11111
STATE_PORT = 8888
COMMAND_PORT = 8889
class Protocol(asyncio.Protocol):
__slots__ = ('queue',)
def __init__(self) -> None:
self.queue = asyncio.Queue[bytes]()
def connection_made(self, _) -> None:
print('Connection Made')
def datagram_received(self, data: bytes, _) -> None:
self.queue.put_nowait(data)
def error_received(self, exc: Exception) -> None:
raise exc
def connection_lost(self, exc: Exception | None) -> None:
print('Connection Lost')
if exc is not None:
raise exc
def timeout(*, length=5.0, retries=0):
def decorator(fn: Callable):
async def wrapper(*args, **kwargs) -> Callable:
for _ in range(retries):
try:
async with asyncio.timeout(length):
return await fn(*args, **kwargs)
except asyncio.TimeoutError:
continue
raise asyncio.TimeoutError(f'{fn.__name__} timed out after max retries: {retries}')
return wrapper
return decorator
class Drone:
Send: TypeAlias = Callable[[bytes], None]
Recv: TypeAlias = Callable[[], Awaitable[bytes]]
__slots__ = ('_send', '_recv')
def __init__(self, _send: Send, _recv: Recv) -> None:
self._send = _send
self._recv = _recv
async def send(self, msg: str) -> str:
self._send(msg.encode('utf-8'))
resp = await self._recv()
await asyncio.sleep(.1)
try:
return resp.decode()
except UnicodeDecodeError:
return None
async def stream(self) -> AsyncIterator:
await self.send('streamon')
await asyncio.sleep(1)
container = av.open('udp://@0.0.0.0:11111')
try:
for frame in container.decode(video=0):
frame = frame.to_ndarray(format='bgr24')
frame = cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE)
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
yield frame
finally:
await self.send('streamoff')
@asynccontextmanager
async def conn(ip=TELLO_IP) -> AsyncIterator:
loop = asyncio.get_running_loop()
cmd_transport, cmd_protocol = await loop.create_datagram_endpoint(
Protocol,
remote_addr=(ip, COMMAND_PORT),
)
def send(data: bytes) -> None:
cmd_transport.sendto(data)
@timeout(retries=4)
async def recv() -> bytes:
return await cmd_protocol.queue.get()
async def keepalive() -> None:
while True:
await asyncio.sleep(10)
if cmd_protocol.queue.empty():
print('Keepalive')
send('keepalive')
await cmd_protocol.queue.get()
#task = asyncio.create_task(keepalive())
try:
yield Drone(send, recv)
except Exception as e:
print('Ran into an error during drone connection')
raise e
finally:
#task.cancel()
cmd_transport.close()
'''
async def stream(self, *, process) -> AsyncIterator:
queue = asyncio.Queue(maxsize=10)
async def batch():
while True:
batch = [await queue.get()]
batch_task = asyncio.create_task(process(batch))
while not queue.empty() and not batch_task.done():
print('Adding frame to batch')
batch.append(await queue.get())
yield await batch_task
async def fetch():
container = av.open('udp://@0.0.0.0:11111')
for frame in container.decode(video=0):
try:
await asyncio.wait_for(queue.put(frame), timeout=.2)
print('Putting frame in queue')
except asyncio.TimeoutError:
print('Dropping frame')
fetch_task = asyncio.create_task(fetch())
try:
async for pred in batch():
yield pred
finally:
fetch_task.cancel()
await self.send('streamoff')
'''