Skip to content

Commit

Permalink
rework package structure to prepare for distribution
Browse files Browse the repository at this point in the history
  • Loading branch information
willzhang05 committed Aug 11, 2023
1 parent ed8ec3e commit 1b37a22
Show file tree
Hide file tree
Showing 11 changed files with 336 additions and 397 deletions.
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Python PLIN library
The plin module provides a Python interface for interacting with PEAK devices such as the PCAN-USB Pro and PLIN-USB on Linux using the chardev API provided by the PEAK LIN Linux Beta. The PEAK Linux beta driver is required to use this library and is available [here](https://forum.peak-system.com/viewtopic.php?t=5875).
This library provides a Python interface for interacting with PEAK devices such as the PCAN-USB Pro and PLIN-USB on Linux using the chardev API provided by the PEAK LIN Linux Beta. The PEAK Linux beta driver is required to use this library and is available [here](https://forum.peak-system.com/viewtopic.php?t=5875).

## Installation
The `plin-linux` package is available on [PyPI](https://pypi.org/project/plin-linux/) and can be directly installed with `pip install plin-linux`.

## Examples
Runnable examples are located in the `examples/` directory.
Expand Down Expand Up @@ -76,7 +79,9 @@ while True:
```

## Unit Tests
Unit tests are located in the `unit_tests/` directory and require a PEAK LIN device connected to run.
* Unit tests are located in the `unit_tests/` directory.
* Requires a PEAK LIN device connected to run.
* Can be run with `pytest`.

## License

Expand Down
2 changes: 1 addition & 1 deletion examples/master_test.py → examples/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from plin.enums import (PLINFrameChecksumType, PLINFrameDirection,
PLINFrameFlag, PLINMode)
from plin.plin import PLIN
from plin.device import PLIN

pp = pprint.PrettyPrinter(indent=4)

Expand Down
2 changes: 1 addition & 1 deletion examples/slave_test.py → examples/slave.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import sys

from plin.enums import PLINFrameChecksumType, PLINFrameDirection, PLINMode
from plin.plin import PLIN
from plin.device import PLIN


def main():
Expand Down
294 changes: 1 addition & 293 deletions plin/plin.py → plin/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,299 +7,7 @@
from ioctl_opt import IO, IOW, IOWR

from plin.enums import *

PLIN_USB_FILTER_LEN = 8
PLIN_DAT_LEN = 8
PLIN_EMPTY_DATA = b'\xff' * PLIN_DAT_LEN


class PLINMessage(Structure):
'''
Class representing a LIN message.
'''
buffer_length = 32
_fields_ = [
("type", c_uint16),
("flags", c_uint16),
("id", c_uint8),
("len", c_uint8),
("dir", c_uint8),
("cs_type", c_uint8),
("ts_us", c_uint64),
("data", c_uint8 * PLIN_DAT_LEN),
("reserved", c_uint8 * 8)
]

def __setattr__(self, name: str, value: Any) -> None:
if name == "data":
buf = (c_uint8 * PLIN_DAT_LEN)(*value)
return super().__setattr__(name, buf)
else:
return super().__setattr__(name, value)

def __repr__(self) -> str:
return str(self._asdict())

def _asdict(self) -> dict:
result = {field[0]: getattr(self, field[0])
for field in self._fields_[:-2]}
result["type"] = PLINMessageType(self.type)
result["dir"] = PLINFrameDirection(self.dir)
result["flags"] = PLINFrameFlag(self.flags)
result["cs_type"] = PLINFrameChecksumType(self.cs_type)
result["data"] = bytearray(self.data)
return result


class PLINUSBInitHardware(Structure):
_fields_ = [
("baudrate", c_uint16),
("mode", c_uint8),
("unused", c_uint8)
]


class PLINUSBFrameEntry(Structure):
_fields_ = [
("id", c_uint8),
("len", c_uint8),
("direction", c_uint8),
("checksum", c_uint8),
("flags", c_uint16),
("unused", c_uint16),
("d", c_uint8 * PLIN_DAT_LEN)
]

def __setattr__(self, name: str, value: Any) -> None:
if name == "d":
buf = (c_uint8 * PLIN_DAT_LEN)(*value)
return super().__setattr__(name, buf)
else:
return super().__setattr__(name, value)

def __repr__(self) -> str:
return str(self._asdict())

def _asdict(self) -> dict:
result = {field: getattr(self, field)
for field, _ in self._fields_}
result["direction"] = PLINFrameDirection(self.direction)
result["checksum"] = PLINFrameChecksumType(self.checksum)
result["flags"] = PLINFrameFlag(self.flags)
result["d"] = bytearray(self.d)
del result["unused"]
return result


class PLINUSBAutoBaud(Structure):
_fields_ = [
("timeout", c_uint16),
("err", c_uint8),
("unused", c_uint8)
]


class PLINUSBGetBaudrate(Structure):
_fields_ = [
("baudrate", c_uint16),
("unused", c_uint16)
]


class PLINUSBIDFilter(Structure):
_fields_ = [
("id_mask", c_uint8 * PLIN_USB_FILTER_LEN)
]


class PLINUSBGetMode(Structure):
_fields_ = [
("mode", c_uint8),
("unused", c_uint8 * 3)
]


class PLINUSBIDString(Structure):
_fields_ = [
("str", c_char * 48)
]


class PLINUSBFirmwareVersion(Structure):
_fields_ = [
("major", c_uint8),
("minor", c_uint8),
("sub", c_uint16)
]


class PLINUSBKeepAlive(Structure):
_fields_ = [
("err", c_uint8),
("id", c_uint8),
("period_ms", c_uint16)
]


class PLINUSBAddScheduleSlot(Structure):
_fields_ = [
("schedule", c_uint8),
("err", c_uint8),
("unused", c_uint16),
("type", c_uint8), # PLIN_USB_SLOT_xxx
("count_resolve", c_uint8),
("delay", c_uint16),
("id", c_uint8 * PLINUSBSlotNumber.MAX),
("handle", c_uint32)
]


class PLINUSBDeleteSchedule(Structure):
_fields_ = [
("schedule", c_uint8),
("err", c_uint8),
("unused", c_uint16),
]


class PLINUSBGetSlotCount(Structure):
_fields_ = [
("schedule", c_uint8),
("unused", c_uint8),
("count", c_uint16)
]


class PLINUSBGetScheduleSlot(Structure):
_fields_ = [
("schedule", c_uint8), # schedule from which the slot is returned
("slot_idx", c_uint8), # slot index returned
("err", c_uint8), # if 1, no schedule present
("unused", c_uint8),
("type", c_uint8), # PLIN_USB_SLOT_xxx
("count_resolve", c_uint8),
("delay", c_uint16),
("id", c_uint8 * PLINUSBSlotNumber.MAX),
("handle", c_uint32)
]

def __repr__(self) -> str:
return str(self._asdict())

def _asdict(self) -> dict:
result = {field: getattr(self, field)
for field, _ in self._fields_}
result["type"] = PLINUSBSlotType(self.type)
result["id"] = list(self.id)
del result["unused"]
return result


class PLINUSBSetScheduleBreakpoint(Structure):
_fields_ = [
("brkpt", c_uint8), # either 0 or 1
("unused", c_uint8 * 3),
("handle", c_uint32) # slot handle returned
]


class PLINUSBStartSchedule(Structure):
_fields_ = [
("schedule", c_uint8),
("err", c_uint8),
("unused", c_uint16),
]


class PLINUSBResumeSchedule(Structure):
_fields_ = [
("err", c_uint8), # if 1, not master / no schedule started
("unused", c_uint8 * 3),
]


class PLINUSBSuspendSchedule(Structure):
_fields_ = [
("err", c_uint8),
("schedule", c_uint8), # suspended schedule index [0..7]
("unused", c_uint8 * 2),
("handle", c_uint32)
]


class PLINUSBGetStatus(Structure):
_fields_ = [
("mode", c_uint8),
("tx_qfree", c_uint8),
("schd_poolfree", c_uint16),
("baudrate", c_uint16),
("usb_rx_ovr", c_uint16), # USB data overrun counter
("usb_filter", c_uint64),
("bus_state", c_uint8),
("unused", c_uint8 * 3)
]

def __repr__(self) -> str:
return str(self._asdict())

def _asdict(self) -> dict:
result = {field: getattr(self, field)
for field, _ in self._fields_}
result["mode"] = PLINMode(self.mode)
if self.usb_filter == 0:
result["usb_filter"] = bytearray([0] * PLIN_USB_FILTER_LEN)
else:
result["usb_filter"] = bytearray.fromhex(
f"{self.usb_filter:x}").ljust(PLIN_USB_FILTER_LEN, b'\x00')
result["bus_state"] = PLINBusState(self.bus_state)
del result["unused"]
return result


class PLINUSBUpdateData(Structure):
_fields_ = [
("id", c_uint8), # frame ID to update [0..63]
("len", c_uint8), # count of data bytes to update [1..8]
("idx", c_uint8), # data offset [0..7]
("unused", c_uint8),
("d", c_uint8 * PLIN_DAT_LEN) # new data bytes
]

def __setattr__(self, name: str, value: Any) -> None:
if name == "d":
buf = (c_uint8 * PLIN_DAT_LEN)(*value)
return super().__setattr__(name, buf)
else:
return super().__setattr__(name, value)

def __repr__(self) -> str:
return str(self._asdict())

def _asdict(self) -> dict:
result = {field: getattr(self, field)
for field, _ in self._fields_}
result["d"] = bytearray(result["d"])
del result["unused"]
return result


PLIN_USB_RSP_REMAP_ID_LEN = (PLINFrameID.MAX - PLINFrameID.MIN + 1)


class PLINUSBResponseRemap(Structure):
_fields_ = [
("set_get", c_uint8),
("unused", c_uint8 * 3),
("id", c_uint8 * PLIN_USB_RSP_REMAP_ID_LEN)
]


class PLINUSBLEDState(Structure):
_fields_ = [
("on_off", c_uint8), # PLIN_USB_LEDS_xxx
("unused", c_uint8 * 3)
]

from plin.structs import *

PLIOHWINIT = IOW(ord('u'), 0, PLINUSBInitHardware)
PLIORSTHW = IO(ord('u'), 1)
Expand Down
Loading

0 comments on commit 1b37a22

Please sign in to comment.