diff --git a/examples/toaster_oven_2.py b/examples/toaster_oven_2.py index f4dadb6..2d23292 100644 --- a/examples/toaster_oven_2.py +++ b/examples/toaster_oven_2.py @@ -11,16 +11,16 @@ def __init__(self, name): super().__init__(name) def light_on(self): - print("light_on") + self.print("light_on") def light_off(self): - print("light_on") + self.print("light_on") def heater_on(self): - print("heater_on") + self.print("heater_on") def heater_off(self): - print("heater_off") + self.print("heater_off") @spy_on def door_closed(oven, e): @@ -59,7 +59,7 @@ def heating(oven, e): def baking(oven, e): status = return_status.UNHANDLED if(e.signal == signals.ENTRY_SIGNAL): - print("baking") + oven.print("baking") status = return_status.HANDLED else: oven.temp.fun = heating @@ -70,7 +70,7 @@ def baking(oven, e): def toasting(oven, e): status = return_status.UNHANDLED if(e.signal == signals.ENTRY_SIGNAL): - print("toasting") + oven.print("toasting") status = return_status.HANDLED else: oven.temp.fun = heating @@ -81,7 +81,7 @@ def toasting(oven, e): def off(oven, e): status = return_status.UNHANDLED if(e.signal == signals.ENTRY_SIGNAL): - print("off") + oven.print("off") status = return_status.HANDLED else: oven.temp.fun = door_closed diff --git a/examples/xml_chart.py b/examples/xml_chart.py index ad93478..9c2be90 100644 --- a/examples/xml_chart.py +++ b/examples/xml_chart.py @@ -285,7 +285,7 @@ def trace_callback(self, trace): def spy_callback(self, spy): '''spy with machine name pre-pending''' - print(spy) + self.print(spy) logging.debug("S: [{}] {}".format(self.name, spy)) def clear_log(self): @@ -432,4 +432,3 @@ def some_other_state_entry_signal(self, e): example.post_fifo(Event(signal=signals.e2)) time.sleep(0.10) -code diff --git a/miros/activeobject.py b/miros/activeobject.py index aa87c9e..052ab26 100644 --- a/miros/activeobject.py +++ b/miros/activeobject.py @@ -1,4 +1,5 @@ # from standard library +import sys import uuid import time @@ -23,12 +24,7 @@ def pp(item): pprint(item) - # Add to different signals to signal if they aren't there already -Signal().append("STOP_FABRIC_SIGNAL") # named like any other internal signal -Signal().append("STOP_ACTIVE_OBJECT_SIGNAL") # " -Signal().append("SUBSCRIBE_META_SIGNAL") -Signal().append("PUBLISH_META_SIGNAL") PublishEvent = namedtuple('PublishEvent', ['event', 'priority']) SubscribeEvent = namedtuple('SubscribeEvent', ['event_or_signal', 'queue_type']) @@ -36,7 +32,6 @@ def pp(item): class SourceThreadEvent(ThreadEvent): pass - # The FiberThreadEvent is a singleton. We want to be able to call # `FiberThreadEvent()` and get the same ThreadEvent (threading.Event) object in # many different places in this file. @@ -54,6 +49,61 @@ def __lt__(self, other): def __eq__(self, other): return self.priority == other.priority +Instrumention = namedtuple('Instrumention', ['fn', 'content']) + +def _print(content): + print(content) + sys.stdout.flush() + +class InstrumenationWriterClass(): + + def __init__(self): + super().__init__() + self._event = FiberThreadEvent() + self._queue = Queue() + self._thread = None + + def start(self): + self._event.set() + + def thread_runner(self): + while self._event.is_set(): + # i is an Instrumentation namedtuple + i = self._queue.get() + fn = i.fn + content = i.content + + fn(content) + self._queue.task_done() + + self._thread = Thread( + target=thread_runner, + args=(self,), + daemon=True + ) + self._thread.start() + + def _print(self, fn, content): + self._queue.put( + Instrumention( + fn=fn, + content=content + ) + ) + + def is_alive(self): + if self._thread is None: + result = False + else: + result = self._thread.is_alive() + return result + + def stop(self): + self._event.clear() + self._queue.put(None) + self._thread.join() + +InstrumentionWriter = SingletonDecorator(InstrumenationWriterClass) class ActiveFabricSource(): '''A (pub-sub) event dispatcher for active objects. @@ -481,6 +531,9 @@ def __init__(self, name=None, instrumented=None): # post directly into our locking_deque object, and as a result, provide the # 'get' method of this object to unlock our task. self.fabric = ActiveFabric() + # for writing live instrumentation + self.writer = InstrumentionWriter() + self.thread = None self.name = name @@ -504,6 +557,15 @@ def __init__(self, name=None, instrumented=None): ] ) + self.register_live_spy_callback( + self.__class__.live_spy_callback_default) + + self.register_live_trace_callback( + self.__class__.live_trace_callback_default) + + self.last_live_trace_datetime = len(self.full.trace) + + def top(self, *args): '''top most state given to all HSMs; treat it as an outside function''' chart, event = args[0], args[1] @@ -528,8 +590,6 @@ def __thread_running(self): result = True if self.thread.is_alive() else False return result - - def append_subscribe_to_spy(fn): '''instrument the full spy with our subscription request''' @wraps(fn) @@ -716,6 +776,9 @@ def start_thread(self): if self.fabric.is_alive() is False: self.fabric.start() + if not self.writer.is_alive(): + self.writer.start() + self.fabric_task_event = FiberThreadEvent() thread = Thread(target=self.run_event, @@ -1040,6 +1103,30 @@ def cancel_events(self, e): else: self.posted_events_queue.rotate(1) + def register_live_spy_callback(self, live_spy_callback): + # enclose the live_spy_callback + def _live_spy_callback(line): + self.writer._print(fn=live_spy_callback, content=line) + self.live_spy_callback = _live_spy_callback + + def register_live_trace_callback(self, live_trace_callback): + # enclose the live_trace_callback + def _live_trace_callback(line): + self.writer._print(fn=live_trace_callback, content=line) + self.live_trace_callback = _live_trace_callback + + @staticmethod + def live_spy_callback_default(spy_line): + print(spy_line) + + @staticmethod + def live_trace_callback_default(trace_line): + print(trace_line.replace("\n", "")) + + def print(self, content): + self.writer._print(fn=_print, content=content) + + class Factory(ActiveObject): class StateMethodBlueprint(): diff --git a/miros/event.py b/miros/event.py index ee35595..d6af5ff 100644 --- a/miros/event.py +++ b/miros/event.py @@ -100,12 +100,18 @@ class SignalSource(OrderedDictWithParams): ''' def __init__(self, *args, **kwargs): - self['ENTRY_SIGNAL'] = 1 - self['EXIT_SIGNAL'] = 2 - self['INIT_SIGNAL'] = 3 - self['REFLECTION_SIGNAL'] = 4 - self['EMPTY_SIGNAL'] = 5 - self['SEARCH_FOR_SUPER_SIGNAL'] = 6 + self['ENTRY_SIGNAL'] = 1 + self['EXIT_SIGNAL'] = 2 + self['INIT_SIGNAL'] = 3 + self['REFLECTION_SIGNAL'] = 4 + self['EMPTY_SIGNAL'] = 5 + self['SEARCH_FOR_SUPER_SIGNAL'] = 6 + self['STOP_FABRIC_SIGNAL'] = 7 + self['STOP_ACTIVE_OBJECT_SIGNAL'] = 8 + self['SUBSCRIBE_META_SIGNAL'] = 9 + self['PUBLISH_META_SIGNAL'] = 10 + + self.highest_inner_signal = len(self) def append(self, string): if string in self: @@ -116,7 +122,8 @@ def append(self, string): def is_inner_signal(self, other): def is_number_an_internal_signal(number): result = False - if number in list(self.values())[0:self.SEARCH_FOR_SUPER_SIGNAL]: + #if number in list(self.values())[0:self.SEARCH_FOR_SUPER_SIGNAL]: + if number in list(self.values())[0:self.highest_inner_signal]: result = True return result diff --git a/miros/hsm.py b/miros/hsm.py index ee0c9ae..0a9402f 100644 --- a/miros/hsm.py +++ b/miros/hsm.py @@ -1177,7 +1177,7 @@ def __init__(self, maxlen=QUEUE_SIZE, instrumented=True, priority=1): self.live_spy_callback = self.__class__.live_spy_callback_default self.live_trace_callback = self.__class__.live_trace_callback_default - self.last_live_trace_datetime = len(self.full.trace) + self.last_live_trace_datetime = stdlib_datetime.now() @staticmethod def live_spy_callback_default(spy_line): @@ -1230,9 +1230,10 @@ def _print_trace_if_live(self, initial_state): result = fn(self, initial_state) if self.instrumented and self.live_trace: strace = "" - tr = self.full.trace[-1] - strace += self.trace_tuple_to_formatted_string(tr) - self.live_trace_callback(strace) + if len(self.full.trace) != 0: + tr = self.full.trace[-1] + strace += self.trace_tuple_to_formatted_string(tr) + self.live_trace_callback(strace) if tr is not None: self.last_live_trace_datetime = tr.datetime return result @@ -1255,12 +1256,14 @@ def _print_trace_if_live(self): tr = None # fn is next_rtc/start_at result = fn(self) - if(self.instrumented and self.live_trace): - tr = self.full.trace[-1] - if tr.datetime != self.last_live_trace_datetime: - strace = "\n" - strace += self.trace_tuple_to_formatted_string(tr) - self.live_trace_callback(strace) + if(self.instrumented and self.live_trace ): + if len(self.full.trace) != 0: + tr = self.full.trace[-1] + if tr.datetime is not None: + if tr.datetime != self.last_live_trace_datetime: + strace = "\n" + strace += self.trace_tuple_to_formatted_string(tr) + self.live_trace_callback(strace) if tr is not None: self.last_live_trace_datetime = tr.datetime return result diff --git a/plan/release_notes.wiki b/plan/release_notes.wiki index e69de29..fdec9fd 100644 --- a/plan/release_notes.wiki +++ b/plan/release_notes.wiki @@ -0,0 +1,39 @@ +Thread-safe live instrumentation streams, thread-safe print and signal catching glob support. + +Added four new internal signals: ``STOP_FABRIC_SIGNAL``, +``STOP_ACTIVE_OBJECT_SIGNAL``, ``SUBSCRIBE_META_SIGNAL``, +``PUBLISH_META_SIGNAL``. These signals used to be created by the active object, +but they have been moved into the ``event.py`` class, so that signal catching +globs can be supported by miros. + +Writes to Python ``print`` are not thread safe. Before this release the default +behavior of ``live_spy`` and ``live_trace`` wrote out to python's ``print`` +function directly. This live instrumentation could cause an io-block and a +resulting run time error. I fixed this threading issue by changing the miros +active objects to write their live stream messages through a dedicated +instrumentation thread. This thread ensures that only one thing gets to print +at a time. Also, the active object now provides a thread safe version of +``print`` which uses the instrumentation thread. + +Miros provides the user with a way to over-write how their live ``trace`` and +live ``spy`` streams are written. The thread managing the live stream watches a +queue, which contains 0 or more ``Instrumentation`` named tuples. An +``Instrumentation`` named tuple has a ``fn`` and a ``content`` element. The +``fn`` describes the way the ``content`` is intended to be printed, so that the +``register_live_spy_callback`` and ``register_live_trace_callback`` features +will continue to work. The user can change "how" their instrumentation stream +will be represented, but the function-they-provide-to-do-this-with: ``fn``, must +be protected within a thread. + +Since there can be many active objects which all want to print their live +instrumentation information at the same time, but only one instrumentation +thread handler, I made the class which provides the instrumentation threading a +singleton. The class which provides the instrumentation threading is called +``InstrumentationWriterClass, `` and it is turned into the singleton +``InstrumentationWriter`` by the ``SingletonDectorator`` class. When an active +object is being constructed it creates an ``InstrumentationWriter``. If a +previously build active object has already done this, a reference to the +previously created ``InstrumentedWriter`` is returned. Then the instrumentation +writing thread is started if it hasn't been started already. + + diff --git a/plan/release_plan.wiki b/plan/release_plan.wiki index 21fe696..f3284e0 100644 --- a/plan/release_plan.wiki +++ b/plan/release_plan.wiki @@ -32,9 +32,12 @@ * [ ] Re-run all tests * [ ] Uprev release number in setup.py file * [ ] Write your release notes - * [ ] Commit and push changes to github - * [ ] In the github release page, place your release notes, tag using the release number - in setup.py file ex. v5.1.2 + * [ ] Commit + * [ ] Merge back into master + * [ ] Tag the release + * [ ] Push changes and tab to github + * [ ] In the github release page, place your release notes, use the tag you + just created within your local repo * [ ] Open cmd window * [ ] Navigate to miros project * [ ] > python setup.py sdist diff --git a/pytest.ini b/pytest.ini index a979a2d..de3be50 100644 --- a/pytest.ini +++ b/pytest.ini @@ -19,6 +19,8 @@ markers = rtc: test the run to completion event complete_circuit: test the complete_circuit method event: test the events + signal: test the signals + payload: test event payloads post_event: test the post_event method post_add: can we augment a statechart once it is made? is_in: test the is_in active object feature @@ -84,4 +86,5 @@ markers = thread_safe_attributes: test thread safe attributes provided by the MetaThreadSafeAttributes isolated: for debugging specific tests scxml_bugs: bugs found while building miros-scxml + snipe: isolated testing diff --git a/setup.py b/setup.py index 1d848af..e5d8441 100644 --- a/setup.py +++ b/setup.py @@ -24,7 +24,7 @@ py_modules=['miros'], # https://packaging.python.org/en/latest/single_source_version.html - version='4.2.0', + version='4.2.1', description='A statechart library for Python', long_description=long_description, diff --git a/test/active_objects_test.py b/test/active_objects_test.py index 44a8125..a3c682d 100644 --- a/test/active_objects_test.py +++ b/test/active_objects_test.py @@ -157,6 +157,7 @@ def test_start_stop_c(fabric_fixture): pp(ao1.spy_full()) +@pytest.mark.snipe @pytest.mark.aos def test_publish_subscribe(fabric_fixture): c1 = ActiveObject("c1") diff --git a/test/comprehensive_hsm_test.py b/test/comprehensive_hsm_test.py index 3e9ddc7..566b2fa 100644 --- a/test/comprehensive_hsm_test.py +++ b/test/comprehensive_hsm_test.py @@ -522,7 +522,7 @@ def trace_driver(chart, event, trace_target): try: assert(target == result[0]) except: - #print(chart.trace()) + print(chart.trace()) assert(target == result[0]) def helper_test_chart_is_working(chart, name, start_at): @@ -2580,22 +2580,29 @@ def get_chart(): chart.live_trace = True return (chart, ss2) + print("A") # chart must work chart, start_at = get_chart() helper_test_chart_is_working(chart, name, start_at) + print("B") # spy_on_decorators: yes # instrumented: yes chart, start_at = get_chart() assert(chart.instrumented == True) chart.start_at(start_at) + + print("C") # instrumented will turn off after start_at time.sleep(0.01) chart, start_at = get_chart() + helper_test_spy_on(chart, start_at) + print("C1") chart, start_at = get_chart() helper_test_trace_on(chart, name, start_at) + print("D") # live_spy: no chart, start_at = get_chart() helper_test_live_spy_off(chart, start_at) diff --git a/test/event_test.py b/test/event_test.py index ba3f9cf..4c70446 100644 --- a/test/event_test.py +++ b/test/event_test.py @@ -39,7 +39,8 @@ def test_inner_signals(): assert(signals.BAKE >= 6) assert(signals.is_inner_signal(1) is True) assert(signals.is_inner_signal(6) is True) - assert(signals.is_inner_signal(7) is False) + #assert(signals.is_inner_signal(7) is False) + assert(signals.is_inner_signal('PUBLISH_META_SIGNAL') is True) assert(signals.is_inner_signal('ENTRY_SIGNAL') is True) assert(signals.is_inner_signal('SEARCH_FOR_SUPER_SIGNAL') is True) assert(signals.is_inner_signal('BAKE') is False) diff --git a/test/post_fifo_in_chart_in_start_at_path_test.py b/test/post_fifo_in_chart_in_start_at_path_test.py index 9f591cf..4da238b 100644 --- a/test/post_fifo_in_chart_in_start_at_path_test.py +++ b/test/post_fifo_in_chart_in_start_at_path_test.py @@ -175,7 +175,7 @@ def test_build_a_small_chart(): [Scxml] <- Queued:(2) Deferred:(0) [Scxml] SUBSCRIBE_META_SIGNAL:Start [Scxml] SUBSCRIBING TO:(Whatever1, TYPE:fifo) -[Scxml] <- Queued:(3) Deferred:(0) +[Scxml] <- Queued:(1) Deferred:(0) [Scxml] SCXML_INIT_SIGNAL:Start [Scxml] SEARCH_FOR_SUPER_SIGNAL:Work [Scxml] SEARCH_FOR_SUPER_SIGNAL:Start @@ -183,7 +183,7 @@ def test_build_a_small_chart(): [Scxml] ENTRY_SIGNAL:Work [Scxml] Hello from 'work' [Scxml] INIT_SIGNAL:Work -[Scxml] <- Queued:(2) Deferred:(0) +[Scxml] <- Queued:(0) Deferred:(0) [Scxml] anything:Work [Scxml] <- Queued:(1) Deferred:(0) [Scxml] to_start:Work