-
Notifications
You must be signed in to change notification settings - Fork 0
/
basic-tut-8.py
239 lines (205 loc) · 7.71 KB
/
basic-tut-8.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
#!/usr/bin/env python3
import sys
import logging
import gi
import signal
import struct
gi.require_version("GLib", "2.0")
gi.require_version("GObject", "2.0")
gi.require_version("Gst", "1.0")
gi.require_version("GstBase", "1.0")
gi.require_version("GstAudio", "1.0")
from gi.repository import GLib, GObject, Gst, GstAudio
logging.basicConfig(
level=logging.DEBUG, format="[%(name)s] [%(levelname)8s] - %(message)s"
)
logger = logging.getLogger(__name__)
class CustomData:
def __init__(self):
self.app_source = Gst.ElementFactory.make("appsrc", "audio_source")
self.tee = Gst.ElementFactory.make("tee", "tee")
self.audio_queue = Gst.ElementFactory.make("queue", "audio_queue")
self.audio_convert1 = Gst.ElementFactory.make("audioconvert", "audio_convert1")
self.audio_resample = Gst.ElementFactory.make("audioresample", "audio_resample")
self.audio_sink = Gst.ElementFactory.make("autoaudiosink", "audio_sink")
self.video_queue = Gst.ElementFactory.make("queue", "video_queue")
self.audio_convert2 = Gst.ElementFactory.make("audioconvert", "audio_convert2")
self.visual = Gst.ElementFactory.make("wavescope", "visual")
self.video_convert = Gst.ElementFactory.make("videoconvert", "video_convert")
self.video_sink = Gst.ElementFactory.make("autovideosink", "video_sink")
self.app_queue = Gst.ElementFactory.make("queue", "app_queue")
self.app_sink = Gst.ElementFactory.make("appsink", "app_sink")
self.pipeline = Gst.Pipeline.new("test-pipeline")
self.a, self.b, self.c, self.d = (
0,
0,
0,
0,
) # for waveform generation
self.sourceid = 0 # to control GSource
self.num_samples = (
0 # number of samples generated so far (for time stamp generation)
)
self.main_loop = None
def push_data(data):
chunk_size = 1024
sample_rate = 41000 # 88200 bytes
num_samples = chunk_size // 2 # because each sample is 16 bits
# Create a new empty buffer
buffer = Gst.Buffer.new_allocate(None, chunk_size, None)
buffer.pts = Gst.util_uint64_scale(data.num_samples, Gst.SECOND, sample_rate)
buffer.duration = Gst.util_uint64_scale(num_samples, Gst.SECOND, sample_rate)
# Generate some psychedelic waveforms aka make buffer data
# TO-DO: Triagnle wave instead of sine
ret, map_info = buffer.map(Gst.MapFlags.WRITE)
raw = map_info.data
raw = list(raw)
# data.c += data.d
# data.d -= (data.c // 1000)
# freq = 1100 + 1000 * data.d
# for i in range(num_samples):
# data.a += data.b
# data.b -= (data.a // freq)
# raw[i] = 500 * data.a
amplitude = 1000
#for direction in (1, -1):
for i in range(chunk_size):
raw[i] = bytearray(
struct.pack("f", i * (amplitude / num_samples))
)
for i in range(chunk_size):
raw[i] = bytearray(struct.pack("f", amplitude - (i * (amplitude / num_samples)) * -1))
data.num_samples += num_samples
# Push the buffer into the appsrc
ret = data.app_source.emit(
"push-buffer", buffer
) # push-buffer is an app_source signal
if ret != Gst.FlowReturn.OK:
return False
return True
def start_feed(object, arg0, data):
if data.sourceid == 0:
print("Start feeding\n")
# data.sourceid =
data.sourceid = GLib.idle_add(push_data, data)
def stop_feed(object, data):
if data.sourceid != 0:
print("Stop feeding\n")
GLib.source_remove(data.sourceid)
data.sourceid = 0
def new_sample(sink, data):
sample = sink.emit("pull-sample") #
if sample:
print("*")
return Gst.FlowReturn.OK
return Gst.FlowReturn.ERROR
def error_cb(bus, msg, data):
err, debug_info = msg.parse_error()
logger.error(f"Error received from element {msg.src.get_name()}: {err.message}")
logger.error(f"Debugging information: {debug_info if debug_info else 'none'}")
data.main_loop.quit()
def main():
Gst.init(None)
data = CustomData()
data.b = 1
data.d = 1
if (
not data.pipeline
or not data.app_source
or not data.tee
or not data.audio_queue
or not data.audio_convert1
or not data.audio_resample
or not data.audio_sink
or not data.video_queue
or not data.audio_convert2
or not data.visual
or not data.video_convert
or not data.video_sink
or not data.app_queue
or not data.app_sink
):
logger.error("Not all elements could be created")
sys.exit(1)
# Configure wavescope
data.visual.set_property("shader", 0)
data.visual.set_property("style", 0)
# Configure appsrc
sample_rate = 44100 # = 88200 bytes
info = GstAudio.AudioInfo()
info.set_format(
GstAudio.AudioFormat.S16, sample_rate, 1, None
) # Signed16LittleEndian
audio_caps = info.to_caps()
data.app_source.set_property("caps", audio_caps)
data.app_source.set_property("format", Gst.Format.TIME)
data.app_source.connect("need-data", start_feed, data)
data.app_source.connect("enough-data", stop_feed, data)
# configure appsink
data.app_sink.set_property("emit-signals", True)
data.app_sink.set_property(
"caps", audio_caps
) # try removing this and see what happenss
data.app_sink.connect("new-sample", new_sample, data)
# add elements to pipeline
data.pipeline.add(
data.app_source,
data.tee,
data.audio_queue,
data.audio_convert1,
data.audio_resample,
data.audio_sink,
data.video_queue,
data.audio_convert2,
data.visual,
data.video_convert,
data.video_sink,
data.app_queue,
data.app_sink,
)
# Link elements with "Always" pads
ret1 = data.app_source.link(data.tee)
ret2 = (
data.audio_queue.link(data.audio_convert1)
and data.audio_convert1.link(data.audio_resample)
and data.audio_resample.link(data.audio_sink)
)
ret3 = (
data.video_queue.link(data.audio_convert2)
and data.audio_convert2.link(data.visual)
and data.visual.link(data.video_convert)
and data.video_convert.link(data.video_sink)
)
ret4 = data.app_queue.link(data.app_sink)
if not ret1 or not ret2 or not ret3 or not ret4:
logger.error("Elements could not be linked")
sys.exit(1)
# Manually link tee, which has request pads
tee_audio_pad = data.tee.get_request_pad("src_%u")
logger.info("Obtained request pad for audio branch") # To do: add gst_pad_get_name
queue_audio_pad = data.audio_queue.get_static_pad("sink")
tee_video_pad = data.tee.get_request_pad("src_%u")
logger.info("Obtained request pad for video branch") # To do: add gst_pad_get_name
queue_video_pad = data.video_queue.get_static_pad("sink")
tee_app_pad = data.tee.get_request_pad("src_%u")
logger.info("Obtained request pad for app branch")
queue_app_pad = data.app_queue.get_static_pad("sink")
ret_audio = tee_audio_pad.link(queue_audio_pad)
ret_video = tee_video_pad.link(queue_video_pad)
ret_app = tee_app_pad.link(queue_app_pad)
if ret_audio != 0 or ret_video != 0 or ret_app != 0:
logger.error("Tee could not be linked")
sys.exit(1)
# TO DO : What is happening here?
# Setup bus and message handlers
bus = data.pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message::error", error_cb, data)
# Play pipeline
data.pipeline.set_state(Gst.State.PLAYING)
print("Pipeline play")
# Create a GLib Main loop and set it to run
data.main_loop = GLib.MainLoop(None)
data.main_loop.run()
if __name__ == "__main__":
main()