Skip to content

Commit

Permalink
Added atexitq: priority based shutdown functions to main.
Browse files Browse the repository at this point in the history
This solves threading/networking with atexit.
  • Loading branch information
smrg-lm committed Feb 24, 2020
1 parent e2ccfa4 commit b0f87f5
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 59 deletions.
10 changes: 6 additions & 4 deletions sc3/base/_oscinterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ def start(self):
self._server_thread.daemon = True
self._server_thread.start()
self._running = True
atexit.register(self.stop)
_libsc3.main._atexitq.add(
_libsc3.main._atexitprio.NETWORKING + 1, self.stop)

def stop(self):
if not self._running:
Expand All @@ -240,7 +241,7 @@ def stop(self):
self._server = None
self._running = False
self._server_thread = None
atexit.unregister(self.stop)
_libsc3.main._atexitq.remove(self.stop)

def running(self):
return self._running
Expand Down Expand Up @@ -286,7 +287,8 @@ def connect(self, target):
target=self._tcp_run, name=str(self))
self._tcp_thread.daemon = True
self._tcp_thread.start()
atexit.register(self.disconnect)
_libsc3.main._atexitq.add(
_libsc3.main._atexitprio.NETWORKING, self.disconnect)

def _tcp_run(self):
self._run_thread = True
Expand Down Expand Up @@ -339,7 +341,7 @@ def disconnect(self):
self._socket.shutdown(socket.SHUT_RDWR)
self._is_connected = False # Is sync.
self._socket.close() # OSError if underlying error.
atexit.unregister(self.disconnect)
_libsc3.main._atexitq.remove(self.disconnect)

def send(self, msg, _=None):
self._socket.send(msg.size.to_bytes(4, 'big'))
Expand Down
26 changes: 17 additions & 9 deletions sc3/base/main.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""Kernel.sc & Main.sc"""

import enum
import threading
import atexit
import time
import random
import sys

from ..seq import _taskq as tsq
from ..seq import stream as stm
from ..seq import clock as clk
from . import platform as plf
Expand All @@ -27,6 +29,17 @@ class Process(type):
RT = 0
NRT = 1

class _atexitprio(enum.IntEnum):
''' Library predefined _atexitq priority numbers.'''
USER = 0
SERVERS = 500
PLATFORM = 700
CLOCKS = 800
NETWORKING = 900

_atexitq = tsq.TaskQueue()
'''Functions registered in atexit with order by priority numbers.'''

def __init__(cls, name, bases, dict):
cls._main_lock = threading.RLock()
cls._switch_cond = threading.Condition(cls._main_lock)
Expand All @@ -47,6 +60,7 @@ def _init_platform(cls):
else:
raise RuntimeError('platform not defined')
cls._platform._startup()
cls._atexitq.add(cls._atexitprio.PLATFORM, cls.platform._shutdown)

def _init_rt(cls):
# In sclang these two are the same clock, it obtains time_since_epoch
Expand Down Expand Up @@ -131,15 +145,9 @@ def stop(cls):
...

def shutdown(cls):
cls.platform._shutdown()
# TODO: VER: PyrLexer shutdownLibrary, ahí llama sched_stop de SystemClock (acá) y TempoClock stop all entre otras cosas.
# TODO: sched_stop tiene join, no se puede usasr con atexit?
#cls._run_thread = False
# BUG: no me acuerdo cuál era el problema con atexit y los locks, una solución es demonizarlas.
# with cls._main_lock:
# cls._main_lock.notify_all()
#cls._thread.join()
# BUG: probablemente tenga que hacer lo mismo con todos los relojes.
while not cls._atexitq.empty():
cls._atexitq.pop()[1]()
atexit.unregister(cls.shutdown)

def add_osc_recv_func(cls, func):
cls._osc_interface.add_recv_func(func)
Expand Down
86 changes: 47 additions & 39 deletions sc3/seq/clock.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ def init_func(cls):
daemon=True)
cls._thread.start()
cls._sched_init()
_libsc3.main._atexitq.add(
_libsc3.main._atexitprio.CLOCKS, cls._sched_stop)

utl.ClassLibrary.add(cls, init_func)

Expand Down Expand Up @@ -119,8 +121,8 @@ def _sched_init(cls):
cls._resync_cond = threading.Condition()
cls._resync_thread = threading.Thread(
target=cls._resync_thread_func,
name=f'{cls.__name__}.resync')
cls._resync_thread.daemon = True
name=f'{cls.__name__}.resync',
daemon=True)
cls._resync_thread.start()

@classmethod
Expand Down Expand Up @@ -183,12 +185,6 @@ def _resync_thread_func(cls): # L408
cls._host_start_nanos * cls._NANOS_TO_OSC
) + cls._host_osc_offset

@classmethod
def _sched_cleanup(cls): # L265
with cls._resync_cond:
cls._run_resync = False
cls._resync_cond.notify()

@classmethod
def elapsed_time_to_osc(cls, elapsed: float) -> int: # int64
return int(
Expand Down Expand Up @@ -218,19 +214,19 @@ def _sched_add(cls, secs, task): # L353
cls._sched_cond.notify_all() # Call with acquired lock.

@classmethod
def _sched_stop(cls): # Shouldn't be stopped.
with cls._sched_cond:
cls._sched_cleanup()
if cls._run_sched:
cls._run_sched = False
cls._sched_cond.notify_all()

@classmethod
def sched_clear(cls): # L387, called by schedClearUnsafe() with gLangMutex
def _sched_stop(cls):
if not cls._run_sched:
return
with cls._sched_cond:
if cls._run_sched:
cls._task_queue.clear()
cls._sched_cond.notify_all()
with cls._resync_cond:
if cls._run_resync:
cls._run_resync = False
cls._resync_cond.notify()
cls._task_queue.clear()
cls._run_sched = False
cls._sched_cond.notify_all()
cls._resync_thread.join()
cls._thread.join()

@classmethod
def _run(cls):
Expand Down Expand Up @@ -438,6 +434,8 @@ def init_func(cls):
name=cls.__name__,
daemon=True)
cls._thread.start()
_libsc3.main._atexitq.add(
_libsc3.main._atexitprio.CLOCKS + 1, cls._stop)

utl.ClassLibrary.add(cls, init_func)

Expand Down Expand Up @@ -485,10 +483,13 @@ def tick(cls):
return cls._scheduler.queue.peek()[0]

@classmethod
def _stop(cls): # Shouldn't be stopped.
def _stop(cls):
if not cls._run_sched:
return
with cls._tick_cond:
cls._run_sched = False
cls._tick_cond.notify()
cls._thread.join()

# NOTE: Este comentario es un recordatorio.
# def _sched_notify(cls):
Expand All @@ -507,6 +508,8 @@ def __init__(self):
threading.Thread.__init__(self)
self.daemon = True
self.start()
_libsc3.main._atexitq.add(
_libsc3.main._atexitprio.CLOCKS + 3, self.stop)

def run(self):
self._run_sched = True
Expand All @@ -533,9 +536,14 @@ def add(self, time, clock_task):
self._sched_cond.notify()

def stop(self):
if not self._run_sched:
return
with self._sched_cond:
self.queue.clear()
self._run_sched = False
self._sched_cond.notify()
# self.join() # Who calls???
_libsc3.main._atexitq.remove(self.stop)


class ClockTask():
Expand Down Expand Up @@ -718,6 +726,8 @@ def __init__(self, tempo=None, beats=None, seconds=None):
name=f'{type(self).__name__} id: {id(self)}',
daemon=True)
self._thread.start()
_libsc3.main._atexitq.add(
_libsc3.main._atexitprio.CLOCKS + 2, self._stop)

def _run(self):
with self._sched_cond:
Expand Down Expand Up @@ -800,29 +810,27 @@ def stop(self):
if not self.running():
_logger.debug(f'{self} is not running')
return

# StopAndDelete
def stop_func(clock):
# Stop
with clock._sched_cond: # lock_guard
clock._run_sched = False # NOTE: son daemon y se liberan solas cuando terminan sin join.
type(clock)._all.remove(clock)
clock._sched_cond.notify_all() # NOTE: en TempoClock::Stop, es notify_all
# *** BUG: No estoy seguro de si en C++ notify_all notifica a todas
# *** BUG: las condiciones sobre el mismo lock. En Python no funciona
# *** BUG: así y clock._sched_cond trabaja sobre un solo hilo.
# *** BUG: Pero también puede ser que notifique varias veces a
# *** BUG: la misma condición por los distintos wait? Desconocimiento.
clock._thread = None
clock._sched_cond = None

# StopReq
stop_thread = threading.Thread(
target=stop_func,
args=(self,),
target=self._stop,
name=f'{type(self).__name__}.stop_thread id: {id(self)}',
daemon=True)
stop_thread.start()
_libsc3.main._atexitq.remove(self._stop)

def _stop(self):
# StopAndDelete
# Stop
if not self._run_sched:
return
with self._sched_cond:
self._task_queue.clear()
type(self)._all.remove(self)
self._run_sched = False
self._sched_cond.notify_all() # In TempoClock::Stop is notify_all
self._thread.join()
self._thread = None
self._sched_cond = None

# def __del__(self):
# # BUG: self needs to be stoped to be gc collected because is in
Expand Down
14 changes: 7 additions & 7 deletions sc3/synth/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,8 @@ def register(self, on_complete=None, on_failure=None):
def _on_complete(server):
self._status_watcher._server_registering = False
fn.value(on_complete, self)
_atexit.register(self._unregister_atexit)
_libsc3.main._atexitq.add(
_libsc3.main._atexitprio.SERVERS, self._unregister_atexit)

def _on_failure(server):
if self.addr.proto == 'tcp':
Expand Down Expand Up @@ -727,7 +728,7 @@ def _on_complete(server):
self.addr.disconnect()
self._status_watcher._server_unregistering = False
fn.value(on_complete, self)
_atexit.unregister(self._unregister_atexit)
_libsc3.main._atexitq.remove(self._unregister_atexit)

def _on_failure(server):
self._status_watcher._server_unregistering = False
Expand Down Expand Up @@ -764,7 +765,8 @@ def _on_complete(server):
server._status_watcher._server_booting = False
server._boot_init()
fn.value(on_complete, server)
_atexit.register(self._quit_atexit)
_libsc3.main._atexitq.add(
_libsc3.main._atexitprio.SERVERS, self._quit_atexit)

def _on_failure(server):
if self.addr.proto == 'tcp':
Expand Down Expand Up @@ -836,9 +838,7 @@ def _quit_atexit(self):
event = _threading.Event()
set_func = lambda: event.set()
self.quit(set_func, set_func)
event.wait(0.2) # _MainThread, set_func runs in AppClock thread.
# *** BUG: PUEDE NO DESBLOQUEAR SI APPCLOCK DAEMON SE LIBERA ANTES.
# *** BUG: VER SI SE PUEDE AJUSTAR EL ORDEN EN VEZ DE USAR TIMEOUT.
event.wait(5) # _MainThread, set_func runs in AppClock thread.

def reboot(self, func=None, on_failure=None):
# // func is evaluated when server is off.
Expand Down Expand Up @@ -879,7 +879,7 @@ def _on_complete():
if self.addr.proto == 'tcp':
self.addr.disconnect()
self._status_watcher._server_quitting = False
_atexit.unregister(self._quit_atexit)
_libsc3.main._atexitq.remove(self._quit_atexit)
fn.value(on_complete, self)

def _on_failure():
Expand Down

0 comments on commit b0f87f5

Please sign in to comment.