Skip to content

Commit

Permalink
Refactoring services
Browse files Browse the repository at this point in the history
  • Loading branch information
akadlec committed Dec 7, 2021
1 parent 0ef01dd commit 360eadd
Show file tree
Hide file tree
Showing 19 changed files with 340 additions and 443 deletions.
2 changes: 1 addition & 1 deletion fb_bus_connector_plugin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@
FastyBird BUS connector module
"""

__version__ = "0.1.3"
__version__ = "0.2.0"
2 changes: 1 addition & 1 deletion fb_bus_connector_plugin/api/v1validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def validate(cls, payload: bytearray) -> bool:
return True

if (
cls.validate_read_single_register(payload=payload)
cls.validate_read_single_register(payload=payload) # pylint: disable=too-many-boolean-expressions
or cls.validate_read_multiple_registers(payload=payload)
or cls.validate_write_single_register(payload=payload)
or cls.validate_write_multiple_registers(payload=payload)
Expand Down
12 changes: 2 additions & 10 deletions fb_bus_connector_plugin/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
from fb_bus_connector_plugin.clients.client import Client, ClientFactory
from fb_bus_connector_plugin.connector import FbBusConnector
from fb_bus_connector_plugin.consumers.consumer import Consumer
from fb_bus_connector_plugin.handlers.apiv1 import ApiV1Handler
from fb_bus_connector_plugin.handlers.handler import Handler
from fb_bus_connector_plugin.logger import Logger
from fb_bus_connector_plugin.pairing.apiv1 import ApiV1Pairing
from fb_bus_connector_plugin.pairing.pairing import DevicesPairing
Expand Down Expand Up @@ -105,13 +103,6 @@ def create_container(logger: logging.Logger = logging.getLogger("dummy")) -> Non
di[Receiver] = Receiver(logger=di[Logger])
di["fb-bus-connector-plugin_receiver-proxy"] = di[Receiver]

# Clients handlers
di[ApiV1Handler] = ApiV1Handler(parser=di[V1Parser], receiver=di[Receiver], logger=di[Logger])
di["fb-bus-connector-plugin_bus-handler-api-v1"] = di[ApiV1Handler]

di[Handler] = Handler() # type: ignore[call-arg]
di["fb-bus-connector-plugin_bus-handler-proxy"] = di[Handler]

# Clients publishers
di[ApiV1Publisher] = ApiV1Publisher(
devices_registry=di[DevicesRegistry],
Expand All @@ -127,7 +118,8 @@ def create_container(logger: logging.Logger = logging.getLogger("dummy")) -> Non
# Connector clients factory
di[ClientFactory] = ClientFactory(
client=di[Client],
handler=di[Handler],
receiver=di[Receiver],
parser=di[V1Parser],
logger=di[Logger],
)
di["fb-bus-connector-plugin_data-client-factory"] = di[ClientFactory]
Expand Down
16 changes: 11 additions & 5 deletions fb_bus_connector_plugin/clients/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@
# Library dependencies
from kink import inject

from fb_bus_connector_plugin.api.v1parser import V1Parser
from fb_bus_connector_plugin.clients.base import BaseClient
from fb_bus_connector_plugin.clients.pjon import PjonClient
from fb_bus_connector_plugin.exceptions import InvalidArgumentException
from fb_bus_connector_plugin.handlers.handler import Handler
from fb_bus_connector_plugin.logger import Logger
from fb_bus_connector_plugin.receivers.receiver import Receiver
from fb_bus_connector_plugin.types import ClientType, ProtocolVersion


Expand Down Expand Up @@ -94,7 +95,7 @@ def send_packet(

# -----------------------------------------------------------------------------

def handle(self) -> int:
def loop(self) -> int:
"""Handle communication from client"""
packets_to_be_sent = 0

Expand Down Expand Up @@ -181,6 +182,8 @@ class ClientFactory: # pylint: disable=too-few-public-methods
"""

__client: Client
__receiver: Receiver
__parser: V1Parser

__logger: Logger

Expand All @@ -189,11 +192,13 @@ class ClientFactory: # pylint: disable=too-few-public-methods
def __init__(
self,
client: Client,
handler: Handler,
receiver: Receiver,
parser: V1Parser,
logger: Logger,
) -> None:
self.__client = client
self.__handler = handler
self.__receiver = receiver
self.__parser = parser

self.__logger = logger

Expand All @@ -217,7 +222,8 @@ def create( # pylint: disable=too-many-arguments
client_interface=client_interface,
client_state=True,
protocol_version=protocol_version,
handler=self.__handler,
receiver=self.__receiver,
parser=self.__parser,
logger=self.__logger,
)

Expand Down
55 changes: 44 additions & 11 deletions fb_bus_connector_plugin/clients/pjon.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@
from kink import inject

# Library libs
from fb_bus_connector_plugin.api.v1parser import V1Parser
from fb_bus_connector_plugin.api.v1validator import V1Validator
from fb_bus_connector_plugin.clients.base import BaseClient
from fb_bus_connector_plugin.handlers.handler import Handler
from fb_bus_connector_plugin.exceptions import ParsePayloadException
from fb_bus_connector_plugin.logger import Logger
from fb_bus_connector_plugin.receivers.receiver import Receiver
from fb_bus_connector_plugin.types import Packet, PacketContent, ProtocolVersion
from fb_bus_connector_plugin.utilities.helpers import PacketsHelpers

Expand All @@ -50,7 +53,8 @@ class PjonClient(BaseClient, pjon.ThroughSerialAsync):
__state: bool = True
__version: ProtocolVersion

__handler: Handler
__receiver: Receiver
__parser: V1Parser

__MASTER_ADDRESS: int = 254
__SERIAL_BAUD_RATE: int = 38400
Expand All @@ -66,7 +70,8 @@ def __init__( # pylint: disable=too-many-arguments
client_interface: Optional[str],
client_state: bool,
protocol_version: ProtocolVersion,
handler: Handler,
receiver: Receiver,
parser: V1Parser,
logger: Logger,
) -> None:
BaseClient.__init__(self=self, logger=logger)
Expand All @@ -84,7 +89,8 @@ def __init__( # pylint: disable=too-many-arguments
self.__state = client_state
self.__version = protocol_version

self.__handler = handler
self.__receiver = receiver
self.__parser = parser

# -----------------------------------------------------------------------------

Expand Down Expand Up @@ -179,7 +185,7 @@ def send_packet(self, address: int, payload: List[int], waiting_time: float = 0.
# -----------------------------------------------------------------------------

def handle(self) -> int:
"""Process clients"""
"""Process client"""
if not self.__state:
return 0

Expand Down Expand Up @@ -255,12 +261,12 @@ def receive(self, received_payload: bytes, length: int, packet_info: Dict) -> No
sender_address,
)

self.__handler.on_message(
payload=bytearray(payload),
length=len(payload),
address=sender_address,
client_id=self.id,
)
if self.__version == ProtocolVersion.V1:
self.__handle_api_v1_message(
payload=bytearray(payload),
length=len(payload),
address=sender_address,
)

# -----------------------------------------------------------------------------

Expand Down Expand Up @@ -303,3 +309,30 @@ def __crc16(calculate_data: bytes, poly: int = 0x8408) -> int:
crc = (crc << 8) | ((crc >> 8) & 0xFF)

return crc & 0xFFFF

# -----------------------------------------------------------------------------

def __handle_api_v1_message(self, payload: bytearray, length: int, address: Optional[int]) -> None:
if V1Validator.validate_version(payload=payload) is False:
return

if V1Validator.validate(payload=payload) is False:
self._logger.warning("Received message is not valid FIB v1 convention message: %s", payload)

return

try:
entity = self.__parser.parse_message(
payload=payload,
length=length,
address=address,
client_id=self.__id,
)

except ParsePayloadException as ex:
self._logger.error("Received message could not be successfully parsed to entity")
self._logger.exception(ex)

return

self.__receiver.append(entity=entity)
73 changes: 29 additions & 44 deletions fb_bus_connector_plugin/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
"""

# Python base dependencies
import time
import uuid
from threading import Thread
from typing import List, Optional, Set
from typing import Optional, Set

# Library libs
from fb_bus_connector_plugin.clients.client import Client, ClientFactory
Expand All @@ -35,7 +33,7 @@
from fb_bus_connector_plugin.types import ClientType, ConnectionState, ProtocolVersion


class FbBusConnector(Thread): # pylint: disable=too-many-instance-attributes
class FbBusConnector: # pylint: disable=too-many-instance-attributes
"""
FastyBird BUS connector
Expand All @@ -48,7 +46,6 @@ class FbBusConnector(Thread): # pylint: disable=too-many-instance-attributes
__stopped: bool = False

__packets_to_be_sent: int = 0
__processed_devices: List[str] = []

__connectors_identifiers: Set[uuid.UUID] = set()

Expand Down Expand Up @@ -78,12 +75,6 @@ def __init__( # pylint: disable=too-many-arguments
pairing_handler: DevicesPairing,
logger: Logger,
) -> None:
Thread.__init__(
self,
name="FB Bus connector plugin thread",
daemon=True,
)

self.__receiver = receiver
self.__publisher = publisher
self.__consumer = consumer
Expand Down Expand Up @@ -141,7 +132,12 @@ def start(self) -> None:
"""Start connector services"""
self.__stopped = False

super().start()
# When connector is closing...
for device in self.__devices_registry:
# ...set device state to disconnected
self.__devices_registry.set_state(device=device, state=ConnectionState.UNKNOWN)

self.__logger.info("Connector FB BUS has been started.")

# -----------------------------------------------------------------------------

Expand All @@ -158,42 +154,31 @@ def stop(self) -> None:

# -----------------------------------------------------------------------------

def run(self) -> None:
"""Run connector service"""
self.__stopped = False

while True: # pylint: disable=too-many-nested-blocks
self.__receiver.receive()
self.__consumer.consume()

# Check is pairing enabled
if self.__pairing_helper.is_enabled() is True:
self.__pairing_helper.handle()
def has_unfinished_tasks(self) -> bool:
"""Check if connector has some unfinished task"""
return not self.__receiver.is_empty() or not self.__consumer.is_empty()

# Pairing is not enabled...
else:
# Check packets queue
if self.__packets_to_be_sent == 0:
# ...continue in communication

# Check for processing queue
if len(self.__processed_devices) == len(self.__devices_registry):
self.__processed_devices = []
# -----------------------------------------------------------------------------

# Continue processing devices
for device in self.__devices_registry:
if device.id.__str__() not in self.__processed_devices and device.enabled and device.ready:
if self.__publisher.handle(device=device):
self.__processed_devices.append(device.id.__str__())
def loop(self) -> None:
"""Run connector service"""
if self.__stopped and not self.has_unfinished_tasks():
self.__logger.warning("Connector FB BUS is stopped")

continue
return

self.__packets_to_be_sent = self.__client.handle()
self.__receiver.loop()
self.__consumer.loop()

# All records have to be processed before thread is closed
if self.__stopped and self.__receiver.is_empty() and self.__consumer.is_empty():
break
# Check is pairing enabled...
if self.__pairing_helper.is_enabled() is True:
self.__pairing_helper.loop()

time.sleep(0.001)
# Pairing is not enabled...
else:
# Check packets queue...
if self.__packets_to_be_sent == 0:
# Continue processing devices
self.__publisher.loop()

self.__logger.info("Connector FB BUS was closed")
self.__packets_to_be_sent = self.__client.loop()
Loading

0 comments on commit 360eadd

Please sign in to comment.