From 1aa2baedea2e0737e09ca502997d079df7ce666c Mon Sep 17 00:00:00 2001 From: Fabio Battaglia Date: Sun, 8 Sep 2024 19:46:31 +0200 Subject: [PATCH] Squashed commit of the following: commit fcf0930e9375f5c763e25b0756dc6f56d6246593 Author: Fabio Battaglia Date: Tue Sep 3 07:28:34 2024 +0200 Better check for CXFER response after a transfer commit d6027346df16367ac69af1f6bb7f0133ed8731cd Author: Fabio Battaglia Date: Mon Sep 2 20:56:34 2024 +0200 replace some magic numbers with named fields commit 837ca6be3816897ccfbd47d496c9c50cd2e3dec1 Author: Fabio Battaglia Date: Mon Sep 2 16:33:58 2024 +0200 Correctly purge remaining data in serial port, and use correct endianneess for received checksum commit cd3f467a5de0eebb9f1b50dae6d14f0551cbce3c Author: Fabio Battaglia Date: Mon Sep 2 15:47:59 2024 +0200 pad the cxfer commands, and transfer blocks of 1024k by default commit 23f327e7f66872d3359f5592a9befd40a6ece507 Author: Fabio Battaglia Date: Mon Sep 2 13:26:26 2024 +0200 Add the correct command code to cxfer commands commit a09684545144901eba1b172a25d68d036288b752 Author: Fabio Battaglia Date: Mon Sep 2 13:02:03 2024 +0200 update board command interface for cxfer_read commit 460edaf7654e5fac19f6dda851a616dd81857e7d Author: Fabio Battaglia Date: Mon Sep 2 11:39:35 2024 +0200 bump version and changelog commit bfc56ed176c1b278dbc935b8bc0bc2ecf41cd774 Author: Fabio Battaglia Date: Mon Sep 2 11:36:49 2024 +0200 wire up the call to the cxfer read command commit 0c67be24c4907d05d3724f8a392006153b5481c7 Author: Fabio Battaglia Date: Mon Sep 2 10:55:09 2024 +0200 Implement code to read data with a cxfer commit e8966be6d52dd2ef679acb2348306db5fdae9d20 Author: Fabio Battaglia Date: Sun Sep 1 19:56:29 2024 +0200 send command to start the transfer commit 3e1c9f355ccdc2d0f168585e15b49682c43a4ef5 Author: Fabio Battaglia Date: Sun Sep 1 19:51:36 2024 +0200 Make checksum calculator available to everyone, add option to ignore response, test the grouper commit d3385a8deb4fba3ff63a27c8b249eb2a4bc7b782 Author: Fabio Battaglia Date: Sun Sep 1 18:54:34 2024 +0200 send address and shift map commit 6350ded8260ea320b148501dc5cb0dd567b9c8c0 Author: Fabio Battaglia Date: Sun Sep 1 18:39:14 2024 +0200 Add new utils file with utility code commit 418d70d1923d81277e5529aa8db342679315f710 Author: Fabio Battaglia Date: Sun Sep 1 17:56:31 2024 +0200 cxfer: send clear command commit 9cdbac2665aee79434944e9e03aa40080e17babf Author: Fabio Battaglia Date: Sun Sep 1 17:51:48 2024 +0200 begin writing cxfer code commit 3b40abd88039980cfda8879850146666a1c6258c Author: Fabio Battaglia Date: Sun Sep 1 16:34:53 2024 +0200 removed useless code from hardware_board_commands, define empty cxfer_read in m3_board_commands commit 5406509e09c8b7171961f0fcf4ee762001fe9226 Author: Fabio Battaglia Date: Sun Sep 1 16:27:55 2024 +0200 bump version, begin defining the cxfer read command --- CHANGELOG.md | 4 + dupicolib/board_commands_interface.py | 18 +++- .../board_interfaces/m3_board_commands.py | 63 +++++++++++- .../board_interfaces/special_modes/cxfer.py | 96 +++++++++++++++++++ dupicolib/board_utilities.py | 25 +++-- dupicolib/hardware_board_commands.py | 71 -------------- dupicolib/utils.py | 16 ++++ pyproject.toml | 2 +- tests/test_board_utilities.py | 11 ++- tests/test_utils.py | 23 +++++ 10 files changed, 244 insertions(+), 85 deletions(-) create mode 100644 dupicolib/board_interfaces/special_modes/cxfer.py create mode 100644 dupicolib/utils.py create mode 100644 tests/test_utils.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e1f46a..ffaa331 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,10 @@ # Changelog Changelog for dupicolib +## [0.5.0] - 2024-09-02 +### Added +- Support for CXFER read mode + ## [0.4.2] - 2024-08-18 ### Added - Added pin '0' toboard pin translation map to indicate an 'NC' pin diff --git a/dupicolib/board_commands_interface.py b/dupicolib/board_commands_interface.py index bf63d6b..6560301 100644 --- a/dupicolib/board_commands_interface.py +++ b/dupicolib/board_commands_interface.py @@ -1,6 +1,6 @@ """This module is an abstract class to set the shape for classes providing higher-level interface to the boards""" -from typing import Dict +from typing import Callable, Dict from abc import ABC import serial @@ -99,6 +99,22 @@ def detect_osc_pins(reads: int, ser: serial.Serial | None = None) -> int | None: int | None: A bitmask with bits set to 1 for pins that were detected as flipping """ raise NotImplementedError() + + @classmethod + def cxfer_read(cls, address_pins: list[int], data_pins: list[int], hi_pins: list[int], update_callback: Callable[[int], None] | None, ser: serial.Serial | None = None) -> bytes | None: + """Uses the "Clever Transfer" mode on the dupico to read the content of an IC. + + Args: + address_pins (list[int]): List of the pins composing the address, in order, starting from A0, and already mapped on the dupico socket + data_pins (list[int]): List of the pins composing the data, in order, starting from D0, and already mapped on the dupico socket + hi_pins (list[int]): List of the pins that must be always set to a high logic level during the transfer. + update_callback (Callable[[int], None] | None): A callback that will receive periodic updates of bytes read. + ser (serial.Serial | None, optional): Serial port on which to send the commands. Defaults to None. + + Returns: + bytes | None: A bytes object containing the data read from the IC + """ + raise NotImplementedError() @classmethod def map_value_to_pins(cls, pins: list[int], value: int) -> int: diff --git a/dupicolib/board_interfaces/m3_board_commands.py b/dupicolib/board_interfaces/m3_board_commands.py index 0c565e5..a2a7c1e 100644 --- a/dupicolib/board_interfaces/m3_board_commands.py +++ b/dupicolib/board_interfaces/m3_board_commands.py @@ -1,13 +1,17 @@ """This module contains higher-level code for board interfacing""" -from typing import Dict, final +from typing import Callable, Dict, final import struct from enum import Enum import serial +from dupicolib.board_interfaces.special_modes.cxfer import CXFERTransfer from dupicolib.board_utilities import BoardUtilities from dupicolib.hardware_board_commands import HardwareBoardCommands +import dupicolib.utils as DPUtils + +_CXFER_SHIFT_BLOCK_SIZE: int = 16 class CommandCode(Enum): WRITE = 0 @@ -16,6 +20,8 @@ class CommandCode(Enum): POWER = 3 TEST = 5 OSC_DET = 8 + CXFER = 9 + @final class M3BoardCommands(HardwareBoardCommands): @@ -126,6 +132,61 @@ def detect_osc_pins(reads: int, ser: serial.Serial) -> int | None: return struct.unpack(' bytes | None: + address_shift_map: list[int] = [] + data_shift_map: list[int] = [] + hi_pin_mask: int + data_pin_mask: int + + for pin in address_pins: + address_shift_map.append(cls._PIN_NUMBER_TO_INDEX_MAP[pin]) + + for pin in data_pins: + data_shift_map.append(cls._PIN_NUMBER_TO_INDEX_MAP[pin]) + + hi_pin_mask = cls.map_value_to_pins(hi_pins, 0xFFFFFFFFFFFFFFFF) + data_pin_mask = cls.map_value_to_pins(data_pins, 0xFFFFFFFFFFFFFFFF) + + # Clear the configuration for CXFER on the board + BoardUtilities.send_binary_command(ser, bytes([CommandCode.CXFER.value, CXFERTransfer.CXFERSubCommand.CLEAR.value, *([0] * 16)]), 1) + + # Set the address shift map + for idx, addr_chunk in enumerate(DPUtils.iter_grouper(address_shift_map, _CXFER_SHIFT_BLOCK_SIZE, 0)): + BoardUtilities.send_binary_command(ser, bytes([CommandCode.CXFER.value, CXFERTransfer.CXFERSubCommand.SET_ADDR_MAP_0.value + idx, *struct.pack(f'{len(addr_chunk)}B', *addr_chunk)]), 1) + + # Set the data shift map + for idx, data_chunk in enumerate(DPUtils.iter_grouper(data_shift_map, _CXFER_SHIFT_BLOCK_SIZE, 0)): + BoardUtilities.send_binary_command(ser, bytes([CommandCode.CXFER.value, CXFERTransfer.CXFERSubCommand.SET_DATA_MAP_0.value + idx, *struct.pack(f'{len(data_chunk)}B', *data_chunk)]), 1) + + # Set the hi-out mask + BoardUtilities.send_binary_command(ser, bytes([CommandCode.CXFER.value, CXFERTransfer.CXFERSubCommand.SET_HI_OUT_MASK.value, *struct.pack(' int: diff --git a/dupicolib/board_interfaces/special_modes/cxfer.py b/dupicolib/board_interfaces/special_modes/cxfer.py new file mode 100644 index 0000000..001f928 --- /dev/null +++ b/dupicolib/board_interfaces/special_modes/cxfer.py @@ -0,0 +1,96 @@ +"""Code to support the CXFER transfer modes""" + +from enum import Enum +import logging +import struct +from typing import Callable, final + +import serial + +from dupicolib.board_utilities import BoardUtilities + +_LOGGER = logging.getLogger(__name__) + +@final +class CXFERTransfer: + _XMIT_BLOCK_SIZE: int = 1024 + _XFER_RESPONSE_SIZE: int = 4 + _XFER_CHECKSUM_SIZE: int = 2 + + class CXFERSubCommand(Enum): + SET_ADDR_MAP_0 = 0x00 + SET_ADDR_MAP_1 = 0x01 + SET_ADDR_MAP_2 = 0x02 + SET_ADDR_MAP_3 = 0x03 + SET_DATA_MAP_0 = 0x10 + SET_DATA_MAP_1 = 0x11 + SET_DATA_MAP_2 = 0x12 + SET_DATA_MAP_3 = 0x13 + SET_HI_OUT_MASK = 0xE0 + SET_DATA_MASK = 0xE1 + SET_ADDR_WIDTH = 0xE2 + SET_DATA_WIDTH = 0xE3 + CLEAR = 0xF0 + EXECUTE_READ = 0xFF + + class CXFERResponse(Enum): + XFER_PKT_START = 0xDEADBEEF + XFER_PKT_FAIL = 0xBAADF00D + XFER_DONE = 0xC00FFFEE + + @classmethod + def read(cls, command_code: int, ser: serial.Serial, update_callback: Callable[[int], None] | None = None) -> bytes | None: + file_data: bytearray = bytearray() + data_block: bytearray + resp: int + + # Start the transfer! + BoardUtilities.send_binary_command(ser, bytes([command_code, cls.CXFERSubCommand.EXECUTE_READ.value, *([0] * 16)]), 0) + + while True: + data: bytes = ser.read(cls._XFER_RESPONSE_SIZE) + + if (data_len := len(data)) != cls._XFER_RESPONSE_SIZE: + raise IOError(f'Received {data_len} data for starting block!') + + resp, = struct.unpack('>I', data) + + if resp == cls.CXFERResponse.XFER_PKT_START.value: + _LOGGER.debug(f'Received a XFER_PKT_START packet, current file size {len(file_data)}') + elif resp == cls.CXFERResponse.XFER_DONE.value: + _LOGGER.info(f'Received a XFER_DONE packet, current file size {len(file_data)}') + break + else: + raise IOError(f'Received {resp:0{4}X} while expecting a start block.') + + data_block = bytearray() + rem_data: int = cls._XMIT_BLOCK_SIZE + + while rem_data > 0: + data = ser.read(16) + + if (data_len := len(data)) <= 0: + raise IOError('Timed out while waiting to read data...') + + rem_data = rem_data - data_len + data_block.extend(data) + + calc_checksum: int = BoardUtilities.cxfer_checksum_calculator(bytes(data_block)) + data = ser.read(cls._XFER_CHECKSUM_SIZE) + if (data_len := len(data)) != cls._XFER_CHECKSUM_SIZE: + raise IOError(f'Received {data_len} data for checksum!') + resp, = struct.unpack(' bool: return False @classmethod - def send_binary_command(cls, ser: serial.Serial, cmd: bytes, resp_data_len: int) -> bytes | None: - chks: int = cls._checksum_calculator(cmd) + def send_binary_command(cls, ser: serial.Serial, cmd: bytes, resp_data_len: int = 0) -> bytes | None: + chks: int = cls.command_checksum_calculator(cmd) ser.write(bytes([*cmd, chks])) - expected_resp = cmd[0] | cls._BINARY_COMMAND_RESPONSE_FLAG + # In this case, we just send the command and ignore any response, that we expect to be handled by the caller + if resp_data_len <= 0: + cls._LOGGER.debug(f'Sending command {cmd}, ignoring any response.') + return None + + expected_resp = cmd[0] | cls.BINARY_COMMAND_RESPONSE_FLAG resp_code: bytes = ser.read(1) if (len(resp_code) != 1 or resp_code[0] != expected_resp): cls._LOGGER.error(f'Got response {resp_code[0]:0{2}X} while expected was {expected_resp:0{2}X}') @@ -91,13 +96,17 @@ def send_binary_command(cls, ser: serial.Serial, cmd: bytes, resp_data_len: int) ser.reset_input_buffer() return None - if cls._checksum_calculator(resp_code + resp_data) != 0: + if cls.command_checksum_calculator(resp_code + resp_data) != 0: cls._LOGGER.error(f'Command has wrong checksum') ser.reset_input_buffer() return None return resp_data[:-1] # Avoid returning the checksum - @classmethod - def _checksum_calculator(cls, data: bytes) -> int: + @staticmethod + def command_checksum_calculator(data: bytes) -> int: return reduce(operator.sub, bytes([0, *data])) & 0xFF + + @staticmethod + def cxfer_checksum_calculator(data: bytes) -> int: + return reduce(operator.add, bytes([0, *data])) & 0xFFFF diff --git a/dupicolib/hardware_board_commands.py b/dupicolib/hardware_board_commands.py index e659715..49ddef4 100644 --- a/dupicolib/hardware_board_commands.py +++ b/dupicolib/hardware_board_commands.py @@ -49,73 +49,6 @@ def get_version(ser: serial.Serial) -> str | None: else: return None - @staticmethod - def test_board(ser: serial.Serial) -> bool | None: - """Perform a minimal self-test of the board - - Args: - ser (serial.Serial): serial port on which to send the command - - Returns: - bool | None: True if test passed correctly, False otherwise - """ - raise NotImplementedError() - - @staticmethod - def set_power(state: bool, ser: serial.Serial) -> bool | None: - """Enable or disable the power on the socket VCC - - Args: - state (bool): True if we wish power applied, False otherwise - ser (serial.Serial): serial port on which to send the command - - Returns: - bool | None: True if power was applied, False otherwise, None in case we did not read the response correctly - """ - raise NotImplementedError() - - @staticmethod - def write_pins(pins: int, ser: serial.Serial) -> int | None: - """Toggle the specified pins and read their status back - - Args: - ser (serial.Serial): serial port on which to send the command - pins (int): value that the pins will be set to. A bit set to '1' means that the pin will be pulled high - - Returns: - int | None: The value we read back from the pins, or None in case of parsing issues - """ - raise NotImplementedError() - - @staticmethod - def read_pins(ser: serial.Serial) -> int | None: - """Read the value of the pins - - Args: - ser (serial.Serial): serial port on which to send the command - - Returns: - int | None: The value we read back from the pins, or None in case of parsing issues - """ - raise NotImplementedError() - - @staticmethod - def detect_osc_pins(reads: int, ser: serial.Serial) -> int | None: - """Repeat reads a number of times and reports which pins changed their state in at least one of the reads - - Args: - tries (int): Number of reads to perform - ser (serial.Serial | None, optional): serial port on which to send the command. Defaults to None. - - Returns: - int | None: A bitmask with bits set to 1 for pins that were detected as flipping - """ - raise NotImplementedError() - - @classmethod - def map_value_to_pins(cls, pins: list[int], value: int) -> int: - raise NotImplementedError() - @staticmethod def _map_value_to_pins(map: Dict[int, int], pins: list[int], value: int) -> int: """This method takes a number to set on selected pins and uses a list of said pins to @@ -138,10 +71,6 @@ def _map_value_to_pins(map: Dict[int, int], pins: list[int], value: int) -> int: ret_val = ret_val | (1 << pin_pos) return ret_val - - @classmethod - def map_pins_to_value(cls, pins: list[int], value: int) -> int: - raise NotImplementedError() @staticmethod def _map_pins_to_value(map: Dict[int, int], pins: list[int], value: int) -> int: diff --git a/dupicolib/utils.py b/dupicolib/utils.py new file mode 100644 index 0000000..91687d7 --- /dev/null +++ b/dupicolib/utils.py @@ -0,0 +1,16 @@ +"""Miscellaneous utilities used by the library""" + +from itertools import zip_longest +from collections.abc import Iterable +from typing import Iterator, Tuple, TypeVar + +T = TypeVar('T') + + +# From https://stackoverflow.com/questions/434287/how-to-iterate-over-a-list-in-chunks +# See https://docs.python.org/3/library/itertools.html#itertools.zip_longest +def iter_grouper(iterable: Iterable[T], group_size: int, fillvalue: T | None = None) -> Iterator[Tuple[T]]: + # We are feeding multiple copies of the same iterator to zip_longest, so it will consume from the same + # source when zipping values, resulting in grouping the values + args = [iter(iterable)] * group_size + return zip_longest(*args, fillvalue=fillvalue) \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 0c387b8..db0b08a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "dupicolib" -version = "0.4.2" +version = "0.5.0" description = "Library to interface with the dupico hardware" authors = [ { name = "Fabio Battaglia", email = "hkzlabnet@gmail.com" } diff --git a/tests/test_board_utilities.py b/tests/test_board_utilities.py index d099513..686e4a2 100644 --- a/tests/test_board_utilities.py +++ b/tests/test_board_utilities.py @@ -9,7 +9,12 @@ import pytest -def test_checksum_calculator(valid_semver_complete): +def test_command_checksum_calculator(valid_semver_complete): """Execute a checksum calculation""" - assert BoardUtilities._checksum_calculator(bytes([4])) == 252 - assert BoardUtilities._checksum_calculator(bytes([4, 252])) == 0 + assert BoardUtilities.command_checksum_calculator(bytes([4])) == 252 + assert BoardUtilities.command_checksum_calculator(bytes([4, 252])) == 0 + +def test_cxfer_checksum_calculator(valid_semver_complete): + """Execute a 16 bit checksum calculation""" + assert BoardUtilities.cxfer_checksum_calculator(bytes([4])) == 4 + assert BoardUtilities.cxfer_checksum_calculator(bytes([4, 252])) == 256 diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..11d0c7c --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,23 @@ +"""Tests for Utilities""" + +# pylint: disable=wrong-import-position,wrong-import-order + +import sys +from typing import Tuple +sys.path.insert(0, '.') # Make VSCode happy... + +import dupicolib.utils as DPUtils +import pytest + +def test_iter_grouper(): + """Test iter_grouper""" + example_list: list[int] = [0, 1, 2, 3, 4, 5, 6, 7] + + grouped_a: list[Tuple[int]] = list(DPUtils.iter_grouper(example_list, 3, fillvalue=255)) + grouped_b: list[Tuple[int]] = list(DPUtils.iter_grouper(example_list, 3)) + + assert len(grouped_a) == 3 + assert len(grouped_b) == 3 + + assert grouped_a == [(0, 1, 2), (3, 4, 5), (6, 7, 255)] + assert grouped_b == [(0, 1, 2), (3, 4, 5), (6, 7, None)]