Skip to content

Commit

Permalink
Thread safe instrumenation streams and signal catching globs.
Browse files Browse the repository at this point in the history
dded four new internal signals: ``STOP_FABRIC_SIGNAL``,
``STOP_ACTIVE_OBJECT_SIGNAL``, ``SUBSCRIBE_META_SIGNAL``,
``PUBLISH_META_SIGNAL``.  These signals used to be created by the active
object, but they have been moved into the ``event.py`` class, so that
signal catching globs can be supported by miros.

Writes to Python ``print`` are not thread safe.  Before this release the
default behavior of ``live_spy`` and ``live_trace`` wrote out to
python's ``print`` function directly.  This live instrumentation could
cause an io-block and a resulting run time error. I fixed this threading
issue by changing the miros active objects to write their live stream
messages through a dedicated instrumentation thread.  This thread
ensures that only one thing gets to print at a time.  Also, the active
object now provides a thread safe version of ``print`` which uses the
instrumentation thread.

Miros provides the user with a way to over-write how their live
``trace`` and live ``spy`` streams are written.  The thread managing the
live stream watches a queue, which contains 0 or more
``Instrumentation`` named tuples.  An ``Instrumentation`` named tuple
has a ``fn`` and a ``content`` element.  The ``fn`` describes the way
the ``content`` is intended to be printed, so that the
``register_live_spy_callback`` and ``register_live_trace_callback``
features will continue to work.  The user can change "how" their
instrumentation stream will be represented, but the
function-they-provide-to-do-this-with: ``fn``, must be protected within
a thread.

Since there can be many active objects which all want to print their
live instrumentation information at the same time, but only one
instrumentation thread handler, I made the class which provides the
instrumentation threading a singleton.  The class which provides the
instrumentation threading is called ``InstrumentationWriterClass, `` and
it is turned into the singleton ``InstrumentationWriter`` by the
``SingletonDectorator`` class.  When an active object is being
constructed it creates an ``InstrumentationWriter``.  If a previously
build active object has already done this, a reference to the previously
created ``InstrumentedWriter`` is returned.  Then the instrumentation
writing thread is started if it hasn't been started already.
  • Loading branch information
aleph2c committed Jan 2, 2020
1 parent a980647 commit cc391b9
Show file tree
Hide file tree
Showing 13 changed files with 192 additions and 42 deletions.
14 changes: 7 additions & 7 deletions examples/toaster_oven_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ def __init__(self, name):
super().__init__(name)

def light_on(self):
print("light_on")
self.print("light_on")

def light_off(self):
print("light_on")
self.print("light_on")

def heater_on(self):
print("heater_on")
self.print("heater_on")

def heater_off(self):
print("heater_off")
self.print("heater_off")

@spy_on
def door_closed(oven, e):
Expand Down Expand Up @@ -59,7 +59,7 @@ def heating(oven, e):
def baking(oven, e):
status = return_status.UNHANDLED
if(e.signal == signals.ENTRY_SIGNAL):
print("baking")
oven.print("baking")
status = return_status.HANDLED
else:
oven.temp.fun = heating
Expand All @@ -70,7 +70,7 @@ def baking(oven, e):
def toasting(oven, e):
status = return_status.UNHANDLED
if(e.signal == signals.ENTRY_SIGNAL):
print("toasting")
oven.print("toasting")
status = return_status.HANDLED
else:
oven.temp.fun = heating
Expand All @@ -81,7 +81,7 @@ def toasting(oven, e):
def off(oven, e):
status = return_status.UNHANDLED
if(e.signal == signals.ENTRY_SIGNAL):
print("off")
oven.print("off")
status = return_status.HANDLED
else:
oven.temp.fun = door_closed
Expand Down
3 changes: 1 addition & 2 deletions examples/xml_chart.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def trace_callback(self, trace):

def spy_callback(self, spy):
'''spy with machine name pre-pending'''
print(spy)
self.print(spy)
logging.debug("S: [{}] {}".format(self.name, spy))

def clear_log(self):
Expand Down Expand Up @@ -432,4 +432,3 @@ def some_other_state_entry_signal(self, e):
example.post_fifo(Event(signal=signals.e2))
time.sleep(0.10)

code
103 changes: 95 additions & 8 deletions miros/activeobject.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# from standard library
import sys
import uuid
import time

Expand All @@ -23,20 +24,14 @@
def pp(item):
pprint(item)


# Add to different signals to signal if they aren't there already
Signal().append("STOP_FABRIC_SIGNAL") # named like any other internal signal
Signal().append("STOP_ACTIVE_OBJECT_SIGNAL") # "
Signal().append("SUBSCRIBE_META_SIGNAL")
Signal().append("PUBLISH_META_SIGNAL")

PublishEvent = namedtuple('PublishEvent', ['event', 'priority'])
SubscribeEvent = namedtuple('SubscribeEvent', ['event_or_signal', 'queue_type'])

class SourceThreadEvent(ThreadEvent):
pass


# The FiberThreadEvent is a singleton. We want to be able to call
# `FiberThreadEvent()` and get the same ThreadEvent (threading.Event) object in
# many different places in this file.
Expand All @@ -54,6 +49,61 @@ def __lt__(self, other):
def __eq__(self, other):
return self.priority == other.priority

Instrumention = namedtuple('Instrumention', ['fn', 'content'])

def _print(content):
print(content)
sys.stdout.flush()

class InstrumenationWriterClass():

def __init__(self):
super().__init__()
self._event = FiberThreadEvent()
self._queue = Queue()
self._thread = None

def start(self):
self._event.set()

def thread_runner(self):
while self._event.is_set():
# i is an Instrumentation namedtuple
i = self._queue.get()
fn = i.fn
content = i.content

fn(content)
self._queue.task_done()

self._thread = Thread(
target=thread_runner,
args=(self,),
daemon=True
)
self._thread.start()

def _print(self, fn, content):
self._queue.put(
Instrumention(
fn=fn,
content=content
)
)

def is_alive(self):
if self._thread is None:
result = False
else:
result = self._thread.is_alive()
return result

def stop(self):
self._event.clear()
self._queue.put(None)
self._thread.join()

InstrumentionWriter = SingletonDecorator(InstrumenationWriterClass)

class ActiveFabricSource():
'''A (pub-sub) event dispatcher for active objects.
Expand Down Expand Up @@ -481,6 +531,9 @@ def __init__(self, name=None, instrumented=None):
# post directly into our locking_deque object, and as a result, provide the
# 'get' method of this object to unlock our task.
self.fabric = ActiveFabric()
# for writing live instrumentation
self.writer = InstrumentionWriter()

self.thread = None
self.name = name

Expand All @@ -504,6 +557,15 @@ def __init__(self, name=None, instrumented=None):
]
)

self.register_live_spy_callback(
self.__class__.live_spy_callback_default)

self.register_live_trace_callback(
self.__class__.live_trace_callback_default)

self.last_live_trace_datetime = len(self.full.trace)


def top(self, *args):
'''top most state given to all HSMs; treat it as an outside function'''
chart, event = args[0], args[1]
Expand All @@ -528,8 +590,6 @@ def __thread_running(self):
result = True if self.thread.is_alive() else False
return result



def append_subscribe_to_spy(fn):
'''instrument the full spy with our subscription request'''
@wraps(fn)
Expand Down Expand Up @@ -716,6 +776,9 @@ def start_thread(self):
if self.fabric.is_alive() is False:
self.fabric.start()

if not self.writer.is_alive():
self.writer.start()

self.fabric_task_event = FiberThreadEvent()

thread = Thread(target=self.run_event,
Expand Down Expand Up @@ -1040,6 +1103,30 @@ def cancel_events(self, e):
else:
self.posted_events_queue.rotate(1)

def register_live_spy_callback(self, live_spy_callback):
# enclose the live_spy_callback
def _live_spy_callback(line):
self.writer._print(fn=live_spy_callback, content=line)
self.live_spy_callback = _live_spy_callback

def register_live_trace_callback(self, live_trace_callback):
# enclose the live_trace_callback
def _live_trace_callback(line):
self.writer._print(fn=live_trace_callback, content=line)
self.live_trace_callback = _live_trace_callback

@staticmethod
def live_spy_callback_default(spy_line):
print(spy_line)

@staticmethod
def live_trace_callback_default(trace_line):
print(trace_line.replace("\n", ""))

def print(self, content):
self.writer._print(fn=_print, content=content)


class Factory(ActiveObject):

class StateMethodBlueprint():
Expand Down
21 changes: 14 additions & 7 deletions miros/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,18 @@ class SignalSource(OrderedDictWithParams):
'''
def __init__(self, *args, **kwargs):

self['ENTRY_SIGNAL'] = 1
self['EXIT_SIGNAL'] = 2
self['INIT_SIGNAL'] = 3
self['REFLECTION_SIGNAL'] = 4
self['EMPTY_SIGNAL'] = 5
self['SEARCH_FOR_SUPER_SIGNAL'] = 6
self['ENTRY_SIGNAL'] = 1
self['EXIT_SIGNAL'] = 2
self['INIT_SIGNAL'] = 3
self['REFLECTION_SIGNAL'] = 4
self['EMPTY_SIGNAL'] = 5
self['SEARCH_FOR_SUPER_SIGNAL'] = 6
self['STOP_FABRIC_SIGNAL'] = 7
self['STOP_ACTIVE_OBJECT_SIGNAL'] = 8
self['SUBSCRIBE_META_SIGNAL'] = 9
self['PUBLISH_META_SIGNAL'] = 10

self.highest_inner_signal = len(self)

def append(self, string):
if string in self:
Expand All @@ -116,7 +122,8 @@ def append(self, string):
def is_inner_signal(self, other):
def is_number_an_internal_signal(number):
result = False
if number in list(self.values())[0:self.SEARCH_FOR_SUPER_SIGNAL]:
#if number in list(self.values())[0:self.SEARCH_FOR_SUPER_SIGNAL]:
if number in list(self.values())[0:self.highest_inner_signal]:
result = True
return result

Expand Down
23 changes: 13 additions & 10 deletions miros/hsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,7 @@ def __init__(self, maxlen=QUEUE_SIZE, instrumented=True, priority=1):
self.live_spy_callback = self.__class__.live_spy_callback_default
self.live_trace_callback = self.__class__.live_trace_callback_default

self.last_live_trace_datetime = len(self.full.trace)
self.last_live_trace_datetime = stdlib_datetime.now()

@staticmethod
def live_spy_callback_default(spy_line):
Expand Down Expand Up @@ -1230,9 +1230,10 @@ def _print_trace_if_live(self, initial_state):
result = fn(self, initial_state)
if self.instrumented and self.live_trace:
strace = ""
tr = self.full.trace[-1]
strace += self.trace_tuple_to_formatted_string(tr)
self.live_trace_callback(strace)
if len(self.full.trace) != 0:
tr = self.full.trace[-1]
strace += self.trace_tuple_to_formatted_string(tr)
self.live_trace_callback(strace)
if tr is not None:
self.last_live_trace_datetime = tr.datetime
return result
Expand All @@ -1255,12 +1256,14 @@ def _print_trace_if_live(self):
tr = None
# fn is next_rtc/start_at
result = fn(self)
if(self.instrumented and self.live_trace):
tr = self.full.trace[-1]
if tr.datetime != self.last_live_trace_datetime:
strace = "\n"
strace += self.trace_tuple_to_formatted_string(tr)
self.live_trace_callback(strace)
if(self.instrumented and self.live_trace ):
if len(self.full.trace) != 0:
tr = self.full.trace[-1]
if tr.datetime is not None:
if tr.datetime != self.last_live_trace_datetime:
strace = "\n"
strace += self.trace_tuple_to_formatted_string(tr)
self.live_trace_callback(strace)
if tr is not None:
self.last_live_trace_datetime = tr.datetime
return result
Expand Down
39 changes: 39 additions & 0 deletions plan/release_notes.wiki
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
Thread-safe live instrumentation streams, thread-safe print and signal catching glob support.

Added four new internal signals: ``STOP_FABRIC_SIGNAL``,
``STOP_ACTIVE_OBJECT_SIGNAL``, ``SUBSCRIBE_META_SIGNAL``,
``PUBLISH_META_SIGNAL``. These signals used to be created by the active object,
but they have been moved into the ``event.py`` class, so that signal catching
globs can be supported by miros.

Writes to Python ``print`` are not thread safe. Before this release the default
behavior of ``live_spy`` and ``live_trace`` wrote out to python's ``print``
function directly. This live instrumentation could cause an io-block and a
resulting run time error. I fixed this threading issue by changing the miros
active objects to write their live stream messages through a dedicated
instrumentation thread. This thread ensures that only one thing gets to print
at a time. Also, the active object now provides a thread safe version of
``print`` which uses the instrumentation thread.

Miros provides the user with a way to over-write how their live ``trace`` and
live ``spy`` streams are written. The thread managing the live stream watches a
queue, which contains 0 or more ``Instrumentation`` named tuples. An
``Instrumentation`` named tuple has a ``fn`` and a ``content`` element. The
``fn`` describes the way the ``content`` is intended to be printed, so that the
``register_live_spy_callback`` and ``register_live_trace_callback`` features
will continue to work. The user can change "how" their instrumentation stream
will be represented, but the function-they-provide-to-do-this-with: ``fn``, must
be protected within a thread.

Since there can be many active objects which all want to print their live
instrumentation information at the same time, but only one instrumentation
thread handler, I made the class which provides the instrumentation threading a
singleton. The class which provides the instrumentation threading is called
``InstrumentationWriterClass, `` and it is turned into the singleton
``InstrumentationWriter`` by the ``SingletonDectorator`` class. When an active
object is being constructed it creates an ``InstrumentationWriter``. If a
previously build active object has already done this, a reference to the
previously created ``InstrumentedWriter`` is returned. Then the instrumentation
writing thread is started if it hasn't been started already.


9 changes: 6 additions & 3 deletions plan/release_plan.wiki
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@
* [ ] Re-run all tests
* [ ] Uprev release number in setup.py file
* [ ] Write your release notes
* [ ] Commit and push changes to github
* [ ] In the github release page, place your release notes, tag using the release number
in setup.py file ex. v5.1.2
* [ ] Commit
* [ ] Merge back into master
* [ ] Tag the release
* [ ] Push changes and tab to github
* [ ] In the github release page, place your release notes, use the tag you
just created within your local repo
* [ ] Open cmd window
* [ ] Navigate to miros project
* [ ] > python setup.py sdist
Expand Down
3 changes: 3 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ markers =
rtc: test the run to completion event
complete_circuit: test the complete_circuit method
event: test the events
signal: test the signals
payload: test event payloads
post_event: test the post_event method
post_add: can we augment a statechart once it is made?
is_in: test the is_in active object feature
Expand Down Expand Up @@ -84,4 +86,5 @@ markers =
thread_safe_attributes: test thread safe attributes provided by the MetaThreadSafeAttributes
isolated: for debugging specific tests
scxml_bugs: bugs found while building miros-scxml
snipe: isolated testing
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
py_modules=['miros'],

# https://packaging.python.org/en/latest/single_source_version.html
version='4.2.0',
version='4.2.1',

description='A statechart library for Python',
long_description=long_description,
Expand Down
1 change: 1 addition & 0 deletions test/active_objects_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ def test_start_stop_c(fabric_fixture):
pp(ao1.spy_full())


@pytest.mark.snipe
@pytest.mark.aos
def test_publish_subscribe(fabric_fixture):
c1 = ActiveObject("c1")
Expand Down
Loading

0 comments on commit cc391b9

Please sign in to comment.