-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwebsockettest.py
38 lines (28 loc) · 1.1 KB
/
websockettest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import asyncio
import cv2
import websockets
import base64
async def send_video_stream(websocket):
cap = cv2.VideoCapture(0) # Open the default camera (change the index if you have multiple cameras)
while cap.isOpened():
ret, frame = cap.read() # Read a frame from the camera
if not ret:
break
# Convert frame to base64 string
_, buffer = cv2.imencode('.jpg', frame)
jpg_as_text = base64.b64encode(buffer)
# Send frame as base64 string over WebSocket
await websocket.send(jpg_as_text)
cap.release()
async def connect_to_websocket():
uri = "wss://api.hume.ai/v0/stream/models" # Replace with your WebSocket server URL
try:
async with websockets.connect(uri) as websocket:
# Connection established
print("Connected to WebSocket server")
# Start sending video stream
await send_video_stream(websocket)
except Exception as e:
print("Failed to connect to WebSocket server:", e)
# Run the WebSocket client
asyncio.run(connect_to_websocket())