-
Notifications
You must be signed in to change notification settings - Fork 0
/
reverse_gpu.py
92 lines (71 loc) · 2.92 KB
/
reverse_gpu.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import cv2
from ultralytics import YOLO
import supervision as sv
import numpy as np
import torch
import os
import pyttsx3
import threading
import multiprocessing
# global settings for audio output
tt_to_speech = pyttsx3.init()
tt_to_speech.setProperty('rate', 150)
tt_to_speech.setProperty('volume', 0.9)
# Cords corresponding to i/p vdo
polygon = np.array([
[290, 706],[518, 494],[686, 494],[838, 710],[290, 706]
])
# gpu
os.environ['CUDA_VISIBLE_DEVICES'] = "0"
print(torch.cuda.is_available())
videopath = "test_vdo_city.m4v"
def voice_output(text):
tt_to_speech.say(text)
tt_to_speech.runAndWait()
def main():
video_info = sv.VideoInfo.from_video_path(videopath)
zone = sv.PolygonZone(polygon=polygon, frame_resolution_wh=video_info.resolution_wh)
box_annotator = sv.BoxAnnotator(
thickness=2,
text_thickness=1,
text_scale=0.5
)
model = YOLO("yolov8n.pt")
for result in model.track(source=videopath, stream=True, agnostic_nms=True, device=0):
frame = result.orig_img
detections = sv.Detections.from_yolov8(result)
# prevent the error when no object is detected in the frame
if result.boxes.id is not None:
detections.tracker_id = result.boxes.id.cpu().numpy().astype(int)
#filtering the detections with higher confidence
detections = detections[(detections.confidence >= 0.45)]
labels = [
f"{tracker_id} {model.model.names[class_id]} {confidence:0.2f}"
for _, confidence, class_id, tracker_id
in detections
]
frame = box_annotator.annotate(
scene=frame,
detections=detections,
labels=labels
)
zone.trigger(detections=detections)
# audio alerts
print(f'objects behind: {zone.current_count} ')
if(zone.current_count != 0 and zone.current_count > 1):
aud_thread = threading.Thread(target=voice_output, args=(f'{zone.current_count} objects behind the vehicle',))
aud_thread.start()
if(zone.current_count == 1):
aud_thread = threading.Thread(target=voice_output, args=(f'{zone.current_count} object behind the vehicle',))
aud_thread.start()
box_annotator = sv.BoxAnnotator(thickness=1, text_thickness=1, text_scale=0.2)
zone_annotator = sv.PolygonZoneAnnotator(zone=zone, color=sv.Color.white(), thickness=4, text_thickness=4, text_scale=3)
frame = box_annotator.annotate(scene=frame, detections=detections)
frame = zone_annotator.annotate(scene=frame)
cv2.imshow("REAR MONITOR / REAR PARKING ASSIST", frame)
if (cv2.waitKey(30) == 27):
break
if __name__ == "__main__":
proc_main = multiprocessing.Process(target=main)
proc_main.start()
proc_main.join()