Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
felipecadar committed Oct 29, 2024
0 parents commit ccf73db
Show file tree
Hide file tree
Showing 8 changed files with 376 additions and 0 deletions.
10 changes: 10 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
*.egg-info
__pycache__
.DS_Store
*.cpython*
*.pyc
*.pyd
*.pyo
*.pyz

PKG-INFO
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Verlab Demos

## List of Demos

- [Kinova Gen3 Face Follow](kinova-demos/README.md) by @felipecadar
26 changes: 26 additions & 0 deletions kinova-demos/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Kinova-Gen3-Face-Follow
![kinova mp4](https://user-images.githubusercontent.com/22056265/167654474-709d0f7e-6b22-4d16-bdd7-5ef0a0927964.gif)

This is the face following demo using the Kinova Gen3 robot arm.

## Requirements

Create a virtual environment and install the requirements:

```bash
conda create -n kinova python=3.9
conda activate kinova
pip install -e .
```

## Usage

### Show Camera with YOLO Face Detector

`python src/camera.py`

### Follow Faces

`python src/follow.py`


30 changes: 30 additions & 0 deletions kinova-demos/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[project]
name = "kinova"
description = "Kinova Gen3 Face Follow"
version = "0.0"
authors = [
{name = "Felipe Cadar Chamone"},
]
readme = "README.md"
requires-python = ">=3.6"
license = {file = "LICENSE"}
classifiers = [
"Programming Language :: Python :: 3",
"Operating System :: OS Independent",
]
dependencies = [
"ultralytics==8.3.24",
"opencv-python==4.10.0.84",
"fast-rtsp==0.0.2",
"kortex_api @ file://assets/kortex_api-2.6.0.post3-py3-none-any.whl"
]

[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"

[tool.setuptools.packages.find]
include = ["src/*"]

[tool.isort]
profile = "black"
Empty file added kinova-demos/src/__init__.py
Empty file.
74 changes: 74 additions & 0 deletions kinova-demos/src/camera.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Terminal command:
color: gst-launch-1.0 rtspsrc location=rtsp://192.168.1.10/color latency=30 ! rtph264depay ! avdec_h264 ! autovideosink
depth: gst-launch-1.0 rtspsrc location=rtsp://192.168.1.10/depth latency=30 ! rtpgstdepay ! videoconvert ! autovideosink
"""

import numpy as np
import cv2
# import dlib
# from imutils import face_utils
import time
from ultralytics import YOLO

from fast_rtsp import RTSPCapture

def find_one_face(frame, detector, FACTOR):
results = detector(frame)
if len(results) > 0:
if len(results[0].boxes) > 0:
bbx = results[0].boxes[0].xyxy.cpu().numpy()[0]
return int(bbx[0] / FACTOR), int(bbx[1] / FACTOR), int(bbx[2] / FACTOR), int(bbx[3] / FACTOR)
return None

if __name__ == "__main__":

video_capture = cv2.VideoCapture(0)
# video_capture = cv2.VideoCapture("rtsp://192.168.1.10/color")
# video_capture = RTSPCapture("rtsp://192.168.1.10/color")
# detector = dlib.get_frontal_face_detector()
detector = YOLO("yolov8n-face.pt")
FACTOR = 0.3


while True:
# Capture frame-by-frame
ret, frame = video_capture.read()
start = time.time()

res = cv2.resize(frame, None, fx=FACTOR, fy=FACTOR)
# gray = cv2.cvtColor(res, cv2.COLOR_BGR2GRAY)
# rects = detector(gray)
# results = detector(res)

# for (i, res) in enumerate(results):
# bbx = res.boxes[0].xyxy.cpu().numpy()[0]
# conf = res.boxes[0].conf
# if conf > 0.5:
# x1 = int(bbx[0] / FACTOR)
# y1 = int(bbx[1] / FACTOR)
# x2 = int(bbx[2] / FACTOR)
# y2 = int(bbx[3] / FACTOR)
# cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)

face = find_one_face(res, detector, FACTOR)
if face:
cv2.rectangle(frame, face[:2], face[2:], (0, 255, 0), 2)

# Display the video output
cv2.imshow('Video', frame)

# Quit video by typing Q
if cv2.waitKey(1) & 0xFF == ord('q'):
break

end = time.time()

print(chr(27) + "[2J")
print("FPS: {}".format(1/(end - start)))


video_capture.release()
cv2.destroyAllWindows()

180 changes: 180 additions & 0 deletions kinova-demos/src/follow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import numpy as np

import cv2
import dlib
import time

from videocaptureasync import VideoCaptureAsync

from kortex_api.TCPTransport import TCPTransport
from kortex_api.RouterClient import RouterClient

from kortex_api.SessionManager import SessionManager

from kortex_api.autogen.client_stubs.BaseClientRpc import BaseClient

from kortex_api.autogen.messages import Session_pb2, Base_pb2

from ultralytics import YOLO

def find_one_face(frame, detector, FACTOR):
results = detector(frame)
if len(results) > 0:
if len(results[0].boxes) > 0:
if results[0].boxes[0].conf > 0.5:
bbx = results[0].boxes[0].xyxy.cpu().numpy()[0]
return int(bbx[0] / FACTOR), int(bbx[1] / FACTOR), int(bbx[2] / FACTOR), int(bbx[3] / FACTOR)
return None

def close_gripper(base_client_service):
# Create the GripperCommand we will send
gripper_command = Base_pb2.GripperCommand()
finger = gripper_command.gripper.finger.add()
gripper_command.mode = Base_pb2.GRIPPER_POSITION
finger.value = 1
base_client_service.SendGripperCommand(gripper_command)

def open_gripper(base_client_service):
# Create the GripperCommand we will send
gripper_command = Base_pb2.GripperCommand()
finger = gripper_command.gripper.finger.add()
gripper_command.mode = Base_pb2.GRIPPER_POSITION
finger.value = 0
base_client_service.SendGripperCommand(gripper_command)

def send_home(base_client_service):
print('Going Home....')
action_type = Base_pb2.RequestedActionType()
action_type.action_type = Base_pb2.REACH_JOINT_ANGLES
action_list = base_client_service.ReadAllActions(action_type)
action_handle = None

for action in action_list.action_list:
if action.name == "Home":
action_handle = action.handle

base_client_service.ExecuteActionFromReference(action_handle)
time.sleep(6)
print("Done!")

def get_distance(p1, p2):
return (p2[0]-p1[0], p2[1]-p1[1])

def twist_command(base_client_service, cmd):
command = Base_pb2.TwistCommand()
# command.mode = Base_pb2.
command.duration = 0 # Unlimited time to execute

x, y, z, tx, ty, tz = cmd

twist = command.twist
twist.linear_x = x
twist.linear_y = y
twist.linear_z = z
twist.angular_x = tx
twist.angular_y = ty
twist.angular_z = tz
base_client_service.SendTwistCommand(command)

if __name__ == "__main__":

model = YOLO("assets/yolov8n-face.pt")

DEVICE_IP = "192.168.1.10"
DEVICE_PORT = 10000

# Setup API
error_callback = lambda kException: print("_________ callback error _________ {}".format(kException))
transport = TCPTransport()
router = RouterClient(transport, error_callback)
transport.connect(DEVICE_IP, DEVICE_PORT)

# Create session
print("Creating session for communication")
session_info = Session_pb2.CreateSessionInfo()
session_info.username = 'admin'
session_info.password = 'admin'
session_info.session_inactivity_timeout = 60000 # (milliseconds)
session_info.connection_inactivity_timeout = 2000 # (milliseconds)
print("Session created")

session_manager = SessionManager(router)
session_manager.CreateSession(session_info)

# Create required services
base_client_service = BaseClient(router)

send_home(base_client_service)

video_capture = VideoCaptureAsync("rtsp://192.168.1.10/color", use_rtsp=True)

FACTOR = 0.3
VEL = 100
movs = {
"look_up": np.array((0,0,0,-VEL,0,0)),
"look_left": np.array((0,0,0,0, VEL,0)),
"stop": np.array((0,0,0,0,0,0))
}

# look up
# twist_command(base_client_service, list(movs["look_up"]))
# time.sleep(0.5)
# twist_command(base_client_service, list(movs["stop"]))

video_capture.start()
while True:
ret, frame = video_capture.read()

center_X = int(frame.shape[1]/2)
center_Y = int(frame.shape[0]/2)

start = time.time()

res = cv2.resize(frame, None, fx=FACTOR, fy=FACTOR)
gray = cv2.cvtColor(res, cv2.COLOR_BGR2GRAY)

cmd = np.zeros(6)
face = find_one_face(res, model, FACTOR)

if face is not None:
print("Face found")
face_center = (int((face[0]+face[2])/2), int((face[1]+face[3])/2))

cv2.rectangle(frame, face[:2], face[2:], (0, 255, 0), 2)
cv2.circle(frame, (center_X, center_Y), 2, (255, 0, 0), 2)

distance = get_distance(face_center, (center_X, center_Y))
if distance[0] > 50 or distance[0] < -50:
cmd += movs["look_left"] * distance[0] * 0.5

if distance[1] > 50 or distance[1] < -50:
cmd += movs["look_up"] * distance[1] * 0.5

cmd = np.clip(cmd, -5, 5)

twist_command(base_client_service, list(cmd))

cv2.imshow('Video', frame)

if cv2.waitKey(1):
if 0xFF == ord('q'):
break
if 0xFF == ord('h'):
send_home(base_client_service)


end = time.time()

print(chr(27) + "[2J") # Clear terminal
print("Center = {}\nVel Vec = {}\nFPS: {}".format((center_X, center_Y), cmd, 1/(end - start)))

video_capture.stop()
cv2.destroyAllWindows()

send_home(base_client_service)

print('Closing Session..')
session_manager.CloseSession()
router.SetActivationStatus(False)
transport.disconnect()
print('Done!')
51 changes: 51 additions & 0 deletions kinova-demos/src/videocaptureasync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# file: videocaptureasync.py
import threading
import cv2
from fast_rtsp import RTSPCapture


class VideoCaptureAsync:
def __init__(self, src=0, width=640, height=480, use_rtsp=False):
self.src = src
if use_rtsp:
self.cap = RTSPCapture(self.src)
else:
self.cap = cv2.VideoCapture(self.src)
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)

self.grabbed, self.frame = self.cap.read()
self.started = False
self.read_lock = threading.Lock()

def set(self, var1, var2):
self.cap.set(var1, var2)

def start(self):
if self.started:
print('[!] Asynchroneous video capturing has already been started.')
return None
self.started = True
self.thread = threading.Thread(target=self.update, args=())
self.thread.start()
return self

def update(self):
while self.started:
grabbed, frame = self.cap.read()
with self.read_lock:
self.grabbed = grabbed
self.frame = frame

def read(self):
with self.read_lock:
frame = self.frame.copy()
grabbed = self.grabbed
return grabbed, frame

def stop(self):
self.started = False
self.thread.join()

def __exit__(self, exec_type, exc_value, traceback):
self.cap.release()

0 comments on commit ccf73db

Please sign in to comment.