-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathstreaming.py
180 lines (147 loc) · 6.24 KB
/
streaming.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# ----------------------------------------------------------------
# [^∆^] --> Created on 8:10 PM Nov 7 2019
# [•∆•] --> Author : Kumm & Google
# [^∆^] --> Last edited on 8:10 PM Nov 7 2019
# [•∆•] --> current Functions
# [^∆^] ------> RTSP - No Auth
# [•∆•] ------> Face Recognition
# [^∆^] ------> Local Face Repo
# [•∆•] ------> Offline Survillence
# [^∆^] ------> Integrate with firebase
# [•∆•] ------> NOT DRY JUST KISS
# ----------------------------------------------------------------
import os
import gi
import sys
import cv2
import time
import pickle
import signal
import imutils
import platform
import threading
gi.require_version("Gst", "1.0")
gi.require_version("GstRtspServer", "1.0")
from gi.repository import Gst, GstRtspServer, GLib
Gst.init()
def running_on_jetson_nano():
# To make the same code work on a laptop or on a Jetson Nano, we'll detect when we are running on the Nano
# so that we can access the camera correctly in that case.
# On a normal Intel laptop, platform.machine() will be "x86_64" instead of "aarch64"
return platform.machine() == "aarch64"
# ------------------------------------------------------------------ #
def route2stream(client, loop, url):
# ------------------------------------------------------------------ #
"""
This is callback function which is signaled when the new client
is added to the stream. Works for a single client
"""
# ------------------------------------------------------------------ #
if debug:
print("Streaming started")
global offline
offline = False
# ------------------------------------------------------------------ #
class SensorFactory(GstRtspServer.RTSPMediaFactory):
# ------------------------------------------------------------------------------------#
"""
Gstreamer Pipeline to stream data from camera with face recogniition via rtsp & udp
"""
# ------------------------------------------------------------------------------------#
global camera, debug
def __init__(self, **properties):
super(SensorFactory, self).__init__(**properties)
self.cap = camera
self.number_frames = 0
self.fps = 2
self.duration = 1 / self.fps * Gst.SECOND # duration of a frame in nanoseconds
self.launch_string = (
"appsrc name=source is-live=true block=true format=GST_FORMAT_TIME "
"caps=video/x-raw,format=BGR,width=1280,height=720,framerate={}/1 "
"! videoconvert ! video/x-raw,format=I420 "
"! x264enc speed-preset=ultrafast tune=zerolatency "
"! rtph264pay config-interval=1 name=pay0 pt=96".format(self.fps)
)
def on_need_data(self, src, lenght):
global offline
# if self.cap.isOpened():
_, frame = self.cap.read()
# if ret:
# Resize frame of video to 1/4 size for faster face recognition processing
assert _==True
data = frame.tostring()
buf = Gst.Buffer.new_allocate(None, len(data), None)
buf.fill(0, data)
buf.duration = self.duration
timestamp = self.number_frames * self.duration
buf.pts = buf.dts = int(timestamp)
buf.offset = timestamp
self.number_frames += 1
retval = src.emit("push-buffer", buf)
if retval != Gst.FlowReturn.OK and retval == Gst.FlowReturn.FLUSHING:
offline = True
if debug:
print("Going offline, streaming stopped")
def do_create_element(self, url):
if debug:
print("Gstreamer Pipeline Created\nURL : ", url.get_request_uri())
return Gst.parse_launch(self.launch_string)
def do_configure(self, rtsp_media):
self.number_frames = 0
appsrc = rtsp_media.get_element().get_child_by_name(
"source"
) # bin | pipe - series of elements with the launch string --> by definition bin is a element
appsrc.connect("need-data", self.on_need_data)
# ------------------------------------------------------------------ #
class GstServer(GstRtspServer.RTSPServer):
def __init__(self, **properties):
super(GstServer, self).__init__(**properties)
self.factory = SensorFactory()
self.factory.set_shared(True)
self.get_mount_points().add_factory("/video", self.factory)
self.attach(None)
# ------------------------------------------------------------------ #
"""
GstRtspServer.RTSPServer Configuration :
ip : 0.0.0.0
port : 8554
uri : rtsp://0.0.0.0:8554/video
"""
# ------------------------------------------------------------------ #
# ------------------------------------------------------------------ #
# Global Variables #
# ------------------------------------------------------------------ #
# To start with debug use the following command
# $ python3 $filename.py debug=true
if __name__ == "__main__":
try:
debug = True if (sys.argv[-1].split("="))[-1].lower() == "true" else False
thread_counter = 0
except:
pass
camera = None
if running_on_jetson_nano():
camera = cv2.VideoCapture(get_jetson_gstreamer_source(), cv2.CAP_GSTREAMER)
else:
camera = cv2.VideoCapture(0)
if debug:
if camera is not None:
deviceSet = True
print("Camera Initialized")
else:
deviceSet = False
print("Error Initializing Camera..\nExiting")
exit()
server = GstServer()
server.connect("client-connected", route2stream, None)
if debug:
print("RTSP off , Survillence starting")
mainloop = GLib.MainLoop()
# ------------------------------------------------------------------ #
# Awesome Loop #
# ------------------------------------------------------------------ #
mainloop.run()
# ------------------------------------------------------------------ #
# All the magic happens here #
# Event driven with signals. #
# ------------------------------------------------------------------ #