Skip to content

Commit

Permalink
Merge pull request #10 from migmartri/early-exit
Browse files Browse the repository at this point in the history
feat: close stream on file output
  • Loading branch information
miguelaeh authored Aug 21, 2023
2 parents c420ffc + c849b52 commit abf4b4f
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 4 deletions.
6 changes: 5 additions & 1 deletion core/src/pipeless_ai/lib/input/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ def on_bus_message(bus: Gst.Bus, msg: Gst.Message, loop: GLib.MainLoop):
# by saving the n_workers config option
logger.info('Notifying EOS to worker')
w_socket.ensure_send(m_msg)

if config.get_output().get_video().get_uri_protocol() == 'file':
# Stop after the first stream when using an output file
loop.quit()
elif mtype == Gst.MessageType.ERROR:
err, debug = msg.parse_error()
logger.error(f"Error received from element {msg.src.get_name()}: {err.message}")
Expand Down Expand Up @@ -226,9 +230,9 @@ def pad_added_callback(pad):
finally:
logger.info('Closing pipeline')
pipeline.set_state(Gst.State.NULL)
logger.info('Pipeline closed')
# Rettreive and close the sockets
m_socket = InputOutputSocket('w')
m_socket.close()
s_push = InputPushSocket()
s_push.close()
logger.info('Input finished. Please wait for workers and output.')
6 changes: 6 additions & 0 deletions core/src/pipeless_ai/lib/output/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,11 @@ def on_bus_message(bus: Gst.Bus, msg: Gst.Message, output: Output):
if mtype == Gst.MessageType.EOS:
logger.info("End of stream reached.")
output.remove_pipeline()

config = Config(None)
if config.get_output().get_video().get_uri_protocol() == 'file':
# Stop after the first stream when using an output file
output.get_mainloop().quit()
elif mtype == Gst.MessageType.ERROR:
err, debug = msg.parse_error()
logger.error(f"Error received from element {msg.src.get_name()}: {err.message}")
Expand Down Expand Up @@ -377,3 +382,4 @@ def output(config_dict):
m_socket.close()
r_socket = OutputPullSocket()
r_socket.close()
logger.info('Output finished.')
10 changes: 7 additions & 3 deletions core/src/pipeless_ai/lib/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,19 @@ def worker(config_dict, user_module_path):
while continue_worker:
continue_worker = fetch_and_process(user_app)
user_app._PipelessApp__after()

if config.get_output().get_video().get_uri_protocol() == 'file':
# Stop after the first stream when using an output file
break
except KeyboardInterrupt:
pass
except Exception:
traceback.print_exc()
finally:
logger.info('Worker finished!')
# Retreive and close the sockets
# Retrieve and close the sockets
logger.debug('Cleaning sockets')
r_socket = InputPullSocket()
r_socket.close()
s_socket = OutputPushSocket()
s_socket.close()
s_socket.close()
logger.info('Worker finished. Please wait for the output.')

0 comments on commit abf4b4f

Please sign in to comment.