diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c56c436 --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +*.egg-info +__pycache__ +.DS_Store +*.cpython* +*.pyc +*.pyd +*.pyo +*.pyz + +PKG-INFO diff --git a/README.md b/README.md new file mode 100644 index 0000000..bce85be --- /dev/null +++ b/README.md @@ -0,0 +1,5 @@ +# Verlab Demos + +## List of Demos + +- [Kinova Gen3 Face Follow](kinova-demos/README.md) by @felipecadar diff --git a/kinova-demos/README.md b/kinova-demos/README.md new file mode 100644 index 0000000..4247014 --- /dev/null +++ b/kinova-demos/README.md @@ -0,0 +1,26 @@ +# Kinova-Gen3-Face-Follow +![kinova mp4](https://user-images.githubusercontent.com/22056265/167654474-709d0f7e-6b22-4d16-bdd7-5ef0a0927964.gif) + +This is the face following demo using the Kinova Gen3 robot arm. + +## Requirements + +Create a virtual environment and install the requirements: + +```bash +conda create -n kinova python=3.9 +conda activate kinova +pip install -e . +``` + +## Usage + +### Show Camera with YOLO Face Detector + +`python src/camera.py` + +### Follow Faces + +`python src/follow.py` + + diff --git a/kinova-demos/pyproject.toml b/kinova-demos/pyproject.toml new file mode 100644 index 0000000..09281c4 --- /dev/null +++ b/kinova-demos/pyproject.toml @@ -0,0 +1,30 @@ +[project] +name = "kinova" +description = "Kinova Gen3 Face Follow" +version = "0.0" +authors = [ + {name = "Felipe Cadar Chamone"}, +] +readme = "README.md" +requires-python = ">=3.6" +license = {file = "LICENSE"} +classifiers = [ + "Programming Language :: Python :: 3", + "Operating System :: OS Independent", +] +dependencies = [ + "ultralytics==8.3.24", + "opencv-python==4.10.0.84", + "fast-rtsp==0.0.2", + "kortex_api @ file://assets/kortex_api-2.6.0.post3-py3-none-any.whl" +] + +[build-system] +requires = ["setuptools"] +build-backend = "setuptools.build_meta" + +[tool.setuptools.packages.find] +include = ["src/*"] + +[tool.isort] +profile = "black" \ No newline at end of file diff --git a/kinova-demos/src/__init__.py b/kinova-demos/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/kinova-demos/src/camera.py b/kinova-demos/src/camera.py new file mode 100644 index 0000000..a24b19d --- /dev/null +++ b/kinova-demos/src/camera.py @@ -0,0 +1,74 @@ +"""Terminal command: + +color: gst-launch-1.0 rtspsrc location=rtsp://192.168.1.10/color latency=30 ! rtph264depay ! avdec_h264 ! autovideosink + +depth: gst-launch-1.0 rtspsrc location=rtsp://192.168.1.10/depth latency=30 ! rtpgstdepay ! videoconvert ! autovideosink +""" + +import numpy as np +import cv2 +# import dlib +# from imutils import face_utils +import time +from ultralytics import YOLO + +from fast_rtsp import RTSPCapture + +def find_one_face(frame, detector, FACTOR): + results = detector(frame) + if len(results) > 0: + if len(results[0].boxes) > 0: + bbx = results[0].boxes[0].xyxy.cpu().numpy()[0] + return int(bbx[0] / FACTOR), int(bbx[1] / FACTOR), int(bbx[2] / FACTOR), int(bbx[3] / FACTOR) + return None + +if __name__ == "__main__": + + video_capture = cv2.VideoCapture(0) + # video_capture = cv2.VideoCapture("rtsp://192.168.1.10/color") + # video_capture = RTSPCapture("rtsp://192.168.1.10/color") + # detector = dlib.get_frontal_face_detector() + detector = YOLO("yolov8n-face.pt") + FACTOR = 0.3 + + + while True: + # Capture frame-by-frame + ret, frame = video_capture.read() + start = time.time() + + res = cv2.resize(frame, None, fx=FACTOR, fy=FACTOR) + # gray = cv2.cvtColor(res, cv2.COLOR_BGR2GRAY) + # rects = detector(gray) + # results = detector(res) + + # for (i, res) in enumerate(results): + # bbx = res.boxes[0].xyxy.cpu().numpy()[0] + # conf = res.boxes[0].conf + # if conf > 0.5: + # x1 = int(bbx[0] / FACTOR) + # y1 = int(bbx[1] / FACTOR) + # x2 = int(bbx[2] / FACTOR) + # y2 = int(bbx[3] / FACTOR) + # cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) + + face = find_one_face(res, detector, FACTOR) + if face: + cv2.rectangle(frame, face[:2], face[2:], (0, 255, 0), 2) + + # Display the video output + cv2.imshow('Video', frame) + + # Quit video by typing Q + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + end = time.time() + + print(chr(27) + "[2J") + print("FPS: {}".format(1/(end - start))) + + + video_capture.release() + cv2.destroyAllWindows() + diff --git a/kinova-demos/src/follow.py b/kinova-demos/src/follow.py new file mode 100644 index 0000000..4be8315 --- /dev/null +++ b/kinova-demos/src/follow.py @@ -0,0 +1,180 @@ +import numpy as np + +import cv2 +import dlib +import time + +from videocaptureasync import VideoCaptureAsync + +from kortex_api.TCPTransport import TCPTransport +from kortex_api.RouterClient import RouterClient + +from kortex_api.SessionManager import SessionManager + +from kortex_api.autogen.client_stubs.BaseClientRpc import BaseClient + +from kortex_api.autogen.messages import Session_pb2, Base_pb2 + +from ultralytics import YOLO + +def find_one_face(frame, detector, FACTOR): + results = detector(frame) + if len(results) > 0: + if len(results[0].boxes) > 0: + if results[0].boxes[0].conf > 0.5: + bbx = results[0].boxes[0].xyxy.cpu().numpy()[0] + return int(bbx[0] / FACTOR), int(bbx[1] / FACTOR), int(bbx[2] / FACTOR), int(bbx[3] / FACTOR) + return None + +def close_gripper(base_client_service): + # Create the GripperCommand we will send + gripper_command = Base_pb2.GripperCommand() + finger = gripper_command.gripper.finger.add() + gripper_command.mode = Base_pb2.GRIPPER_POSITION + finger.value = 1 + base_client_service.SendGripperCommand(gripper_command) + +def open_gripper(base_client_service): + # Create the GripperCommand we will send + gripper_command = Base_pb2.GripperCommand() + finger = gripper_command.gripper.finger.add() + gripper_command.mode = Base_pb2.GRIPPER_POSITION + finger.value = 0 + base_client_service.SendGripperCommand(gripper_command) + +def send_home(base_client_service): + print('Going Home....') + action_type = Base_pb2.RequestedActionType() + action_type.action_type = Base_pb2.REACH_JOINT_ANGLES + action_list = base_client_service.ReadAllActions(action_type) + action_handle = None + + for action in action_list.action_list: + if action.name == "Home": + action_handle = action.handle + + base_client_service.ExecuteActionFromReference(action_handle) + time.sleep(6) + print("Done!") + +def get_distance(p1, p2): + return (p2[0]-p1[0], p2[1]-p1[1]) + +def twist_command(base_client_service, cmd): + command = Base_pb2.TwistCommand() + # command.mode = Base_pb2. + command.duration = 0 # Unlimited time to execute + + x, y, z, tx, ty, tz = cmd + + twist = command.twist + twist.linear_x = x + twist.linear_y = y + twist.linear_z = z + twist.angular_x = tx + twist.angular_y = ty + twist.angular_z = tz + base_client_service.SendTwistCommand(command) + +if __name__ == "__main__": + + model = YOLO("assets/yolov8n-face.pt") + + DEVICE_IP = "192.168.1.10" + DEVICE_PORT = 10000 + + # Setup API + error_callback = lambda kException: print("_________ callback error _________ {}".format(kException)) + transport = TCPTransport() + router = RouterClient(transport, error_callback) + transport.connect(DEVICE_IP, DEVICE_PORT) + + # Create session + print("Creating session for communication") + session_info = Session_pb2.CreateSessionInfo() + session_info.username = 'admin' + session_info.password = 'admin' + session_info.session_inactivity_timeout = 60000 # (milliseconds) + session_info.connection_inactivity_timeout = 2000 # (milliseconds) + print("Session created") + + session_manager = SessionManager(router) + session_manager.CreateSession(session_info) + + # Create required services + base_client_service = BaseClient(router) + + send_home(base_client_service) + + video_capture = VideoCaptureAsync("rtsp://192.168.1.10/color", use_rtsp=True) + + FACTOR = 0.3 + VEL = 100 + movs = { + "look_up": np.array((0,0,0,-VEL,0,0)), + "look_left": np.array((0,0,0,0, VEL,0)), + "stop": np.array((0,0,0,0,0,0)) + } + + # look up + # twist_command(base_client_service, list(movs["look_up"])) + # time.sleep(0.5) + # twist_command(base_client_service, list(movs["stop"])) + + video_capture.start() + while True: + ret, frame = video_capture.read() + + center_X = int(frame.shape[1]/2) + center_Y = int(frame.shape[0]/2) + + start = time.time() + + res = cv2.resize(frame, None, fx=FACTOR, fy=FACTOR) + gray = cv2.cvtColor(res, cv2.COLOR_BGR2GRAY) + + cmd = np.zeros(6) + face = find_one_face(res, model, FACTOR) + + if face is not None: + print("Face found") + face_center = (int((face[0]+face[2])/2), int((face[1]+face[3])/2)) + + cv2.rectangle(frame, face[:2], face[2:], (0, 255, 0), 2) + cv2.circle(frame, (center_X, center_Y), 2, (255, 0, 0), 2) + + distance = get_distance(face_center, (center_X, center_Y)) + if distance[0] > 50 or distance[0] < -50: + cmd += movs["look_left"] * distance[0] * 0.5 + + if distance[1] > 50 or distance[1] < -50: + cmd += movs["look_up"] * distance[1] * 0.5 + + cmd = np.clip(cmd, -5, 5) + + twist_command(base_client_service, list(cmd)) + + cv2.imshow('Video', frame) + + if cv2.waitKey(1): + if 0xFF == ord('q'): + break + if 0xFF == ord('h'): + send_home(base_client_service) + + + end = time.time() + + print(chr(27) + "[2J") # Clear terminal + print("Center = {}\nVel Vec = {}\nFPS: {}".format((center_X, center_Y), cmd, 1/(end - start))) + + video_capture.stop() + cv2.destroyAllWindows() + + send_home(base_client_service) + + print('Closing Session..') + session_manager.CloseSession() + router.SetActivationStatus(False) + transport.disconnect() + print('Done!') \ No newline at end of file diff --git a/kinova-demos/src/videocaptureasync.py b/kinova-demos/src/videocaptureasync.py new file mode 100644 index 0000000..9d1a505 --- /dev/null +++ b/kinova-demos/src/videocaptureasync.py @@ -0,0 +1,51 @@ +# file: videocaptureasync.py +import threading +import cv2 +from fast_rtsp import RTSPCapture + + +class VideoCaptureAsync: + def __init__(self, src=0, width=640, height=480, use_rtsp=False): + self.src = src + if use_rtsp: + self.cap = RTSPCapture(self.src) + else: + self.cap = cv2.VideoCapture(self.src) + self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) + self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) + + self.grabbed, self.frame = self.cap.read() + self.started = False + self.read_lock = threading.Lock() + + def set(self, var1, var2): + self.cap.set(var1, var2) + + def start(self): + if self.started: + print('[!] Asynchroneous video capturing has already been started.') + return None + self.started = True + self.thread = threading.Thread(target=self.update, args=()) + self.thread.start() + return self + + def update(self): + while self.started: + grabbed, frame = self.cap.read() + with self.read_lock: + self.grabbed = grabbed + self.frame = frame + + def read(self): + with self.read_lock: + frame = self.frame.copy() + grabbed = self.grabbed + return grabbed, frame + + def stop(self): + self.started = False + self.thread.join() + + def __exit__(self, exec_type, exc_value, traceback): + self.cap.release() \ No newline at end of file