Skip to content

Commit

Permalink
Fixes issue #5
Browse files Browse the repository at this point in the history
- Starts the handshake complete timeout
- Resets the handshake timeout on disconnection
- Makes 'stop' methods more robust for test conds.
  In automated testing we stress some edge cases
  that should not occur in real-life situations.
- Resets agent version on disconnection
  • Loading branch information
Alessandro Baldo committed May 28, 2019
1 parent 665344b commit 890a18b
Showing 1 changed file with 37 additions and 9 deletions.
46 changes: 37 additions & 9 deletions iottly_sdk/iottly.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import time
from collections import namedtuple
from functools import wraps
from threading import Thread, Condition, Event, Lock
from threading import Thread, Condition, Event, Lock, Timer
try:
from queue import Queue, Full
except:
Expand Down Expand Up @@ -125,6 +125,8 @@ def __init__(self, name,
# iottly agent <= 1.8.0 doesn't provide a version.
self._agent_version_state_lock = Lock()
self._agent_version = None
self._handshake_ended = Event()
self._handshake_timeout_timer = None

# Pre-computed messages (JSON strings)
# NOTE literal curly braces are double-up to use format spec-language
Expand Down Expand Up @@ -314,18 +316,25 @@ def stop(self):
self._sdk_stopped.set()
# Wake the connection thread so it can exit properly
self._disconnected_from_agent.set()
self._connection_t.join()
self._connection_t.join(2.0)
# Cancel handshake time if any
if self._handshake_timeout_timer:
self._handshake_timeout_timer.cancel()
# Wake the drainer thread so it can exit properly
with self._buffer_full:
self._buffer_full.notify()
self._drainer_t.join()
self._drainer_t.join(2.0)
# Wake up consumer thread waiting on empty buffer
self._buffer.put(None, False)
try:
self._buffer.put(None, False, timeout=2.0)
except Full:
# This is usefull during tests
pass
# Wake up the consumer and receiver threads so they can exit properly
with self._connected_to_agent:
self._connected_to_agent.notifyAll()
self._consumer_t.join()
self._receiver_t.join()
self._consumer_t.join(2.0)
self._receiver_t.join(2.0)


# ======================================================================== #
Expand Down Expand Up @@ -361,23 +370,32 @@ def _connect_to_agent(self):

self._socket = s
self._agent_linked = True
# Exec callback
if self._on_agent_status_changed_cb:
self._on_agent_status_changed_cb('started')
# Send notification of connected app to the iottly agent
self._buffer.put(Msg(self._app_start_msg, True, None)) # Signalling
self._handshake_ended.clear()
# Notify the other threads that require the connection
self._disconnected_from_agent.clear()
# Exec agent_status_changed_cb once the handshake with the agent
# is complete or a timeout is expired (agent <= 1.8.0)
self._handshake_timeout_timer = Timer(
1.0, self._invoke_initial_agent_status_changed_cb,
kwargs={'timeout': True})
self._handshake_timeout_timer.start()
self._connected_to_agent.notifyAll()
# Wait until the unix socket is broken
# and the event _disconnected_from_agent is fired
self._disconnected_from_agent.wait()
if self._handshake_timeout_timer:
self._handshake_timeout_timer.cancel()
with self._connected_to_agent:
# Set the disconnected state flag
self._agent_linked = False
if self._socket:
self._socket.close()
self.socket = None
# Reset the version on disconnection (ie. handle agent upgrade)
with self._agent_version_state_lock:
self._agent_version = None

# Exec callback
if self._on_agent_status_changed_cb:
Expand Down Expand Up @@ -503,6 +521,7 @@ def _handle_signals_from_agent(self, signal):
version = signal['sdkinit']['version']
with self._agent_version_state_lock:
self._agent_version = version
self._invoke_initial_agent_status_changed_cb()
else:
# NOTE ignore invalid signals to ensure retrocompatibility.
return
Expand All @@ -525,6 +544,15 @@ def _handle_cmd_from_agent(self, cmd):
# TODO handle invalid commands
pass

def _invoke_initial_agent_status_changed_cb(self, timeout=False):
if not self._handshake_ended.is_set():
self._handshake_ended.set()
if self._on_agent_status_changed_cb:
self._on_agent_status_changed_cb('started')
if not timeout:
self._handshake_timeout_timer.cancel()
self._handshake_timeout_timer = None

def _msg_serialize(self, msg, channel=None):
# Prepare message to be sent on a socket
if channel:
Expand Down

0 comments on commit 890a18b

Please sign in to comment.