From 26143eaaf637a775b909bf7d27ca0785bd612bfe Mon Sep 17 00:00:00 2001 From: Software Delivery Bot Date: Wed, 4 Dec 2024 20:43:59 +0100 Subject: [PATCH 1/2] bug: correct print statement --- examples/bmi323/bmi323_i2c_read_write.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/bmi323/bmi323_i2c_read_write.py b/examples/bmi323/bmi323_i2c_read_write.py index 7c7ed6b..8ef6b7c 100644 --- a/examples/bmi323/bmi323_i2c_read_write.py +++ b/examples/bmi323/bmi323_i2c_read_write.py @@ -30,7 +30,7 @@ def setup_logging(level: int = logging.DEBUG) -> logging.Logger: logger.info(f"chip_id=0x{shuttle.sensor.chip_id:04X}") logger.info(f"err_reg=0x{shuttle.sensor.err_reg:04X}") - logger.info(f"err_reg=0x{shuttle.sensor.status:04X}") + logger.info(f"status=0x{shuttle.sensor.status:04X}") a_x, a_y, a_z = shuttle.sensor.acc_data logger.info(f"acceleration=({a_x=:04X}, {a_y=:04X}, {a_z=:04X})") From 89350a335ce45940c9e0470d604011d5525115da Mon Sep 17 00:00:00 2001 From: Software Delivery Bot Date: Wed, 11 Dec 2024 10:15:45 +0100 Subject: [PATCH 2/2] feat: support for BMP390 Add support for BMP390 sensor and shuttle. Add examples: * Register read/write via I2C/SPi; * Interrupt streaming via I2C/SPI; * Polling streaming via I2C/SPI. Update documentation and add unit tests. --- examples/README.md | 27 +- examples/bmp390/__init__.py | 0 .../bmp390/bmp390_i2c_interrupt_streaming.py | 56 +++ .../bmp390/bmp390_i2c_polling_streaming.py | 56 +++ examples/bmp390/bmp390_i2c_read_write.py | 49 +++ .../bmp390/bmp390_spi_interrupt_streaming.py | 57 +++ .../bmp390/bmp390_spi_polling_streaming.py | 58 +++ examples/bmp390/bmp390_spi_read_write.py | 50 +++ src/umrx_app_v3/sensors/bmp390.py | 366 ++++++++++++++++++ .../shuttle_board/bmp390/__init__.py | 0 .../shuttle_board/bmp390/bmp390_shuttle.py | 215 ++++++++++ tests/conftest.py | 6 + tests/sensors/test_bmp390.py | 58 +++ 13 files changed, 992 insertions(+), 6 deletions(-) create mode 100644 examples/bmp390/__init__.py create mode 100644 examples/bmp390/bmp390_i2c_interrupt_streaming.py create mode 100644 examples/bmp390/bmp390_i2c_polling_streaming.py create mode 100644 examples/bmp390/bmp390_i2c_read_write.py create mode 100644 examples/bmp390/bmp390_spi_interrupt_streaming.py create mode 100644 examples/bmp390/bmp390_spi_polling_streaming.py create mode 100644 examples/bmp390/bmp390_spi_read_write.py create mode 100644 src/umrx_app_v3/sensors/bmp390.py create mode 100644 src/umrx_app_v3/shuttle_board/bmp390/__init__.py create mode 100644 src/umrx_app_v3/shuttle_board/bmp390/bmp390_shuttle.py create mode 100644 tests/sensors/test_bmp390.py diff --git a/examples/README.md b/examples/README.md index b6b0629..daeb3a1 100644 --- a/examples/README.md +++ b/examples/README.md @@ -27,12 +27,27 @@ show different communication features for the [BMI323 shuttle](https://www.bosch-sensortec.com/media/boschsensortec/downloads/shuttle_board_flyer/bst-bmi323-sf000.pdf) board: -* [`bmi088/bmi323_i2c_read_write.py`](./bmi323/bmi323_i2c_read_write.py) -* [`bmi088/bmi323_i2c_polling_streaming.py`](./bmi323/bmi323_i2c_polling_streaming.py) -* [`bmi088/bmi323_i2c_interrupt_streaming.py`](./bmi323/bmi323_i2c_interrupt_streaming.py) -* [`bmi088/bmi323_spi_read_write.py`](./bmi323/bmi323_spi_read_write.py) -* [`bmi088/bmi323_spi_polling_streaming.py`](./bmi323/bmi323_spi_polling_streaming.py) -* [`bmi088/bmi323_spi_interrupt_streaming.py`](./bmi323/bmi323_spi_interrupt_streaming.py) +* [`bmi323/bmi323_i2c_read_write.py`](./bmi323/bmi323_i2c_read_write.py) +* [`bmi323/bmi323_i2c_polling_streaming.py`](./bmi323/bmi323_i2c_polling_streaming.py) +* [`bmi323/bmi323_i2c_interrupt_streaming.py`](./bmi323/bmi323_i2c_interrupt_streaming.py) +* [`bmi323/bmi323_spi_read_write.py`](./bmi323/bmi323_spi_read_write.py) +* [`bmi323/bmi323_spi_polling_streaming.py`](./bmi323/bmi323_spi_polling_streaming.py) +* [`bmi323/bmi323_spi_interrupt_streaming.py`](./bmi323/bmi323_spi_interrupt_streaming.py) + +## [`bmp390`](https://www.bosch-sensortec.com/products/environmental-sensors/pressure-sensors/bmp390/) + +The examples in the [`bmp390`](./bmp390) folder +show different communication features for the +[BMP390 shuttle](https://www.bosch-sensortec.com/media/boschsensortec/downloads/shuttle_board_flyer/application_board_3_1/bst-bmp390-sf000.pdf) +board: + +* [`bmp390/bmp390_i2c_read_write.py`](./bmp390/bmp390_i2c_read_write.py) +* [`bmp390/bmp390_i2c_polling_streaming.py`](./bmp390/bmp390_i2c_polling_streaming.py) +* [`bmp390/bmp390_i2c_interrupt_streaming.py`](./bmp390/bmp390_i2c_interrupt_streaming.py) +* [`bmp390/bmp390_spi_read_write.py`](./bmp390/bmp390_spi_read_write.py) +* [`bmp390/bmp390_spi_polling_streaming.py`](./bmp390/bmp390_spi_polling_streaming.py) +* [`bmp390/bmp390_spi_interrupt_streaming.py`](./bmp390/bmp390_spi_interrupt_streaming.py) + ## Need a specific example or do not know how to read data from your sensor? diff --git a/examples/bmp390/__init__.py b/examples/bmp390/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/bmp390/bmp390_i2c_interrupt_streaming.py b/examples/bmp390/bmp390_i2c_interrupt_streaming.py new file mode 100644 index 0000000..9715c04 --- /dev/null +++ b/examples/bmp390/bmp390_i2c_interrupt_streaming.py @@ -0,0 +1,56 @@ +import logging +import struct +import sys +import time +from pathlib import Path + +from umrx_app_v3.shuttle_board.bmp390.bmp390_shuttle import BMP390Shuttle + + +def setup_logging(level: int = logging.DEBUG) -> logging.Logger: + logger = logging.getLogger() + logger.setLevel(level) + stdout_handler = logging.StreamHandler(sys.stdout) + log_format = "(%(asctime)s) [%(levelname)-8s] %(filename)s:%(lineno)d: %(message)s" + log_formatter = logging.Formatter(log_format) + stdout_handler.setFormatter(log_formatter) + file_handler = logging.FileHandler(f"{Path(__file__).parent / Path(__file__).stem}.log", mode="w") + file_handler.setFormatter(log_formatter) + logger.addHandler(stdout_handler) + logger.addHandler(file_handler) + return logger + + +if __name__ == "__main__": + logger = setup_logging() + # This example is for Application Board 3.1 hardware + shuttle = BMP390Shuttle.on_hardware_v3_rev1() + shuttle.initialize() + shuttle.check_connected_hw() + + shuttle.configure_i2c() + logger.info(f"chip_id=0x{shuttle.sensor.chip_id:02X}") + assert shuttle.sensor.chip_id == 0x60 + _ = shuttle.sensor.compensate_temperature(0x0000) # cache temperature NVM registers + _ = shuttle.sensor.compensate_pressure(0x00, 24.0) # cache pressure NVM registers + shuttle.configure_interrupt_streaming() + shuttle.start_streaming() + time.sleep(0.1) + for idx in range(1000): + for streaming in shuttle.board.receive_interrupt_streaming_multiple(includes_mcu_timestamp=False): + sensor_id, packet, time_stamp, payload = streaming + pressure_b0, pressure_b1, pressure_b2, temp_b0, temp_b1, temp_b2, time_b0, time_b1, time_b2 = struct.unpack( + " logging.Logger: + logger = logging.getLogger() + logger.setLevel(level) + stdout_handler = logging.StreamHandler(sys.stdout) + log_format = "(%(asctime)s) [%(levelname)-8s] %(filename)s:%(lineno)d: %(message)s" + log_formatter = logging.Formatter(log_format) + stdout_handler.setFormatter(log_formatter) + file_handler = logging.FileHandler(f"{Path(__file__).parent / Path(__file__).stem}.log", mode="w") + file_handler.setFormatter(log_formatter) + logger.addHandler(stdout_handler) + logger.addHandler(file_handler) + return logger + + +if __name__ == "__main__": + logger = setup_logging() + # This example is for Application Board 3.1 hardware + shuttle = BMP390Shuttle.on_hardware_v3_rev1() + shuttle.initialize() + shuttle.check_connected_hw() + + shuttle.configure_i2c() + logger.info(f"chip_id=0x{shuttle.sensor.chip_id:04X}") + assert shuttle.sensor.chip_id == 0x60 + _ = shuttle.sensor.compensate_temperature(0x0000) # caching NVM registers + _ = shuttle.sensor.compensate_pressure(0x00, 24.0) # caching NVM registers + shuttle.configure_polling_streaming() + shuttle.start_streaming() + time.sleep(0.1) + for idx in range(1000): + for streaming in shuttle.board.receive_polling_streaming_multiple(): + sensor_id, payload = streaming + (pressure_b0, pressure_b1, pressure_b2, temp_b0, temp_b1, temp_b2, time_b0, time_b1, time_b2) = ( + struct.unpack(" logging.Logger: + logger = logging.getLogger() + logger.setLevel(level) + stdout_handler = logging.StreamHandler(sys.stdout) + log_format = "(%(asctime)s) [%(levelname)-8s] %(filename)s:%(lineno)d: %(message)s" + log_formatter = logging.Formatter(log_format) + stdout_handler.setFormatter(log_formatter) + file_handler = logging.FileHandler(f"{Path(__file__).parent / Path(__file__).stem}.log", mode="w") + file_handler.setFormatter(log_formatter) + logger.addHandler(stdout_handler) + logger.addHandler(file_handler) + return logger + + +if __name__ == "__main__": + logger = setup_logging() + shuttle = BMP390Shuttle.on_hardware_v3_rev1() + shuttle.initialize() + shuttle.check_connected_hw() + + shuttle.configure_i2c() + + logger.info(f"chip_id=0x{shuttle.sensor.chip_id:02X}") + logger.info(f"rev_id=0x{shuttle.sensor.rev_id:02X}") + logger.info(f"err_reg=0x{shuttle.sensor.err_reg:02X}") + logger.info(f"status=0x{shuttle.sensor.status:02X}") + logger.info(f"pwr_ctrl=0b{shuttle.sensor.pwr_ctrl:08b}") + logger.info(f"odr=0b{shuttle.sensor.odr:08b}") + logger.info(f"osr=0b{shuttle.sensor.osr:08b}") + + shuttle.sensor.pwr_ctrl = (1 << 0) | (1 << 1) | (0b11 << 4) + time.sleep(0.1) + logger.info(f"pwr_ctrl=0b{shuttle.sensor.pwr_ctrl:08b}") + + raw_temperature = shuttle.sensor.temperature + compensated_temperature = shuttle.sensor.compensate_temperature(raw_temperature) + logger.info(f"temperature(raw)=0x{raw_temperature:06X}, temperature(C)={compensated_temperature}") + raw_pressure = shuttle.sensor.pressure + compensated_pressure = shuttle.sensor.compensate_pressure(raw_pressure, compensated_temperature) + logger.info(f"pressure(raw)=0x{raw_pressure:06X}, pressure(Pa)={compensated_pressure}") + logger.info(f"sensor_time=0x{shuttle.sensor.sensor_time:06X}") diff --git a/examples/bmp390/bmp390_spi_interrupt_streaming.py b/examples/bmp390/bmp390_spi_interrupt_streaming.py new file mode 100644 index 0000000..cbd22b9 --- /dev/null +++ b/examples/bmp390/bmp390_spi_interrupt_streaming.py @@ -0,0 +1,57 @@ +import logging +import struct +import sys +import time +from pathlib import Path + +from umrx_app_v3.shuttle_board.bmp390.bmp390_shuttle import BMP390Shuttle + + +def setup_logging(level: int = logging.DEBUG) -> logging.Logger: + logger = logging.getLogger() + logger.setLevel(level) + stdout_handler = logging.StreamHandler(sys.stdout) + log_format = "(%(asctime)s) [%(levelname)-8s] %(filename)s:%(lineno)d: %(message)s" + log_formatter = logging.Formatter(log_format) + stdout_handler.setFormatter(log_formatter) + file_handler = logging.FileHandler(f"{Path(__file__).parent / Path(__file__).stem}.log", mode="w") + file_handler.setFormatter(log_formatter) + logger.addHandler(stdout_handler) + logger.addHandler(file_handler) + return logger + + +if __name__ == "__main__": + logger = setup_logging() + shuttle = BMP390Shuttle.on_hardware_v3_rev1() + shuttle.initialize() + shuttle.check_connected_hw() + + shuttle.configure_spi() + _ = shuttle.board.read_spi(shuttle.CS, 0, 1) # dummy read is required, do not delete + + logger.info(f"chip_id=0x{shuttle.sensor.chip_id:02X}") + assert shuttle.sensor.chip_id == 0x60 + _ = shuttle.sensor.compensate_temperature(0x0000) # cache temperature NVM registers + _ = shuttle.sensor.compensate_pressure(0x00, 24.0) # cache pressure NVM registers + shuttle.configure_interrupt_streaming() + shuttle.start_streaming() + time.sleep(0.1) + for idx in range(1000): + for streaming in shuttle.board.receive_interrupt_streaming_multiple(includes_mcu_timestamp=False): + sensor_id, packet, time_stamp, payload = streaming + pressure_b0, pressure_b1, pressure_b2, temp_b0, temp_b1, temp_b2, time_b0, time_b1, time_b2 = struct.unpack( + " logging.Logger: + logger = logging.getLogger() + logger.setLevel(level) + stdout_handler = logging.StreamHandler(sys.stdout) + log_format = "(%(asctime)s) [%(levelname)-8s] %(filename)s:%(lineno)d: %(message)s" + log_formatter = logging.Formatter(log_format) + stdout_handler.setFormatter(log_formatter) + file_handler = logging.FileHandler(f"{Path(__file__).parent / Path(__file__).stem}.log", mode="w") + file_handler.setFormatter(log_formatter) + logger.addHandler(stdout_handler) + logger.addHandler(file_handler) + return logger + + +if __name__ == "__main__": + logger = setup_logging() + # This example is for Application Board 3.1 hardware + shuttle = BMP390Shuttle.on_hardware_v3_rev1() + shuttle.initialize() + shuttle.check_connected_hw() + + shuttle.configure_spi() + _ = shuttle.board.read_spi(shuttle.CS, 0, 1) # dummy read is required, do not delete + + logger.info(f"chip_id=0x{shuttle.sensor.chip_id:02X}") + assert shuttle.sensor.chip_id == 0x60 + _ = shuttle.sensor.compensate_temperature(0x0000) # caching NVM registers + _ = shuttle.sensor.compensate_pressure(0x00, 24.0) # caching NVM registers + shuttle.configure_polling_streaming() + shuttle.start_streaming() + time.sleep(0.1) + for idx in range(1000): + for streaming in shuttle.board.receive_polling_streaming_multiple(): + sensor_id, payload = streaming + (pressure_b0, pressure_b1, pressure_b2, temp_b0, temp_b1, temp_b2, time_b0, time_b1, time_b2) = ( + struct.unpack(" logging.Logger: + logger = logging.getLogger() + logger.setLevel(level) + stdout_handler = logging.StreamHandler(sys.stdout) + log_format = "(%(asctime)s) [%(levelname)-8s] %(filename)s:%(lineno)d: %(message)s" + log_formatter = logging.Formatter(log_format) + stdout_handler.setFormatter(log_formatter) + file_handler = logging.FileHandler(f"{Path(__file__).parent / Path(__file__).stem}.log", mode="w") + file_handler.setFormatter(log_formatter) + logger.addHandler(stdout_handler) + logger.addHandler(file_handler) + return logger + + +if __name__ == "__main__": + logger = setup_logging() + shuttle = BMP390Shuttle.on_hardware_v3_rev1() + shuttle.initialize() + shuttle.check_connected_hw() + + shuttle.configure_spi() + _ = shuttle.board.read_spi(shuttle.CS, 0, 1) # dummy read is required, do not delete + + logger.info(f"chip_id=0x{shuttle.sensor.chip_id:02X}") + logger.info(f"rev_id=0x{shuttle.sensor.rev_id:02X}") + logger.info(f"err_reg=0x{shuttle.sensor.err_reg:02X}") + logger.info(f"status=0x{shuttle.sensor.status:02X}") + logger.info(f"pwr_ctrl=0b{shuttle.sensor.pwr_ctrl:08b}") + logger.info(f"odr=0b{shuttle.sensor.odr:08b}") + logger.info(f"osr=0b{shuttle.sensor.osr:08b}") + + shuttle.sensor.pwr_ctrl = (1 << 0) | (1 << 1) | (0b11 << 4) + time.sleep(0.1) + logger.info(f"pwr_ctrl=0b{shuttle.sensor.pwr_ctrl:08b}") + + raw_temperature = shuttle.sensor.temperature + compensated_temperature = shuttle.sensor.compensate_temperature(raw_temperature) + logger.info(f"temperature(raw)=0x{raw_temperature:06X}, temperature(C)={compensated_temperature}") + raw_pressure = shuttle.sensor.pressure + compensated_pressure = shuttle.sensor.compensate_pressure(raw_pressure, compensated_temperature) + logger.info(f"pressure(raw)=0x{raw_pressure:06X}, pressure(Pa)={compensated_pressure}") + logger.info(f"sensor_time=0x{shuttle.sensor.sensor_time:06X}") diff --git a/src/umrx_app_v3/sensors/bmp390.py b/src/umrx_app_v3/sensors/bmp390.py new file mode 100644 index 0000000..338141e --- /dev/null +++ b/src/umrx_app_v3/sensors/bmp390.py @@ -0,0 +1,366 @@ +import struct +from collections.abc import Callable +from enum import Enum + +from cryptography.utils import cached_property + + +class BMP390Addr(Enum): + chip_id = 0x00 + rev_id = 0x01 + err_reg = 0x02 + status = 0x03 + data_0 = 0x04 + data_1 = 0x05 + data_2 = 0x06 + data_3 = 0x07 + data_4 = 0x08 + data_5 = 0x09 + sensor_time_0 = 0x0C + sensor_time_1 = 0x0D + sensor_time_2 = 0x0E + event = 0x10 + int_status = 0x11 + fifo_length_0 = 0x12 + fifo_length_1 = 0x13 + fifo_data = 0x14 + fifo_wtm_0 = 0x15 + fifo_wtm_1 = 0x16 + fifo_config_1 = 0x17 + fifo_config_2 = 0x18 + int_ctrl = 0x19 + int_conf = 0x1A + pwr_ctrl = 0x1B + osr = 0x1C + odr = 0x1D + config = 0x1F + cmd = 0x7E + + +class BMP390NVMAddr(Enum): + nvm_par_t1 = 0x31 + nvm_par_t2 = 0x33 + nvm_par_t3 = 0x35 + nvm_par_p1 = 0x36 + nvm_par_p2 = 0x38 + nvm_par_p3 = 0x3A + nvm_par_p4 = 0x3B + nvm_par_p5 = 0x3C + nvm_par_p6 = 0x3E + nvm_par_p7 = 0x40 + nvm_par_p8 = 0x41 + nvm_par_p9 = 0x42 + nvm_par_p10 = 0x44 + nvm_par_p11 = 0x45 + + +class BMP390: + def __init__(self) -> None: + self.read: Callable | None = None + self.write: Callable | None = None + + def assign_callbacks(self, read_callback: Callable, write_callback: Callable) -> None: + self.read = read_callback + self.write = write_callback + + @property + def chip_id(self) -> int: + return self.read(BMP390Addr.chip_id) + + @property + def rev_id(self) -> int: + return self.read(BMP390Addr.rev_id) + + @property + def err_reg(self) -> int: + return self.read(BMP390Addr.err_reg) + + @property + def status(self) -> int: + return self.read(BMP390Addr.status) + + @property + def pressure(self) -> int: + byte_0, byte_1, byte_2 = self.read(BMP390Addr.data_0, 3) + return (byte_2 << 16) | (byte_1 << 8) | byte_0 + + @property + def temperature(self) -> int: + byte_0, byte_1, byte_2 = self.read(BMP390Addr.data_3, 3) + return (byte_2 << 16) | (byte_1 << 8) | byte_0 + + @property + def sensor_time(self) -> int: + byte_0, byte_1, byte_2 = self.read(BMP390Addr.sensor_time_0, 3) + return (byte_2 << 16) | (byte_1 << 8) | byte_0 + + @property + def event(self) -> int: + return self.read(BMP390Addr.event) + + @property + def int_status(self) -> int: + return self.read(BMP390Addr.int_status) + + @property + def fifo_length(self) -> int: + # TODO: interpret the payload + return self.read(BMP390Addr.fifo_length_0, 2) + + @property + def fifo_data(self) -> int: + return self.read(BMP390Addr.fifo_data) + + @property + def fifo_wtm_0(self) -> int: + return self.read(BMP390Addr.fifo_wtm_0) + + @fifo_wtm_0.setter + def fifo_wtm_0(self, value: int) -> None: + self.write(BMP390Addr.fifo_wtm_0, value) + + @property + def fifo_wtm_1(self) -> int: + return self.read(BMP390Addr.fifo_wtm_1) + + @fifo_wtm_1.setter + def fifo_wtm_1(self, value: int) -> None: + self.write(BMP390Addr.fifo_wtm_1, value) + + @property + def fifo_config_1(self) -> int: + return self.read(BMP390Addr.fifo_config_1) + + @fifo_config_1.setter + def fifo_config_1(self, value: int) -> None: + self.write(BMP390Addr.fifo_config_1, value) + + @property + def fifo_config_2(self) -> int: + return self.read(BMP390Addr.fifo_config_2) + + @fifo_config_2.setter + def fifo_config_2(self, value: int) -> None: + self.write(BMP390Addr.fifo_config_2, value) + + @property + def int_ctrl(self) -> int: + return self.read(BMP390Addr.int_ctrl) + + @int_ctrl.setter + def int_ctrl(self, value: int) -> None: + self.write(BMP390Addr.int_ctrl, value) + + @property + def int_conf(self) -> int: + return self.read(BMP390Addr.int_conf) + + @int_conf.setter + def int_conf(self, value: int) -> None: + self.write(BMP390Addr.int_conf, value) + + @property + def pwr_ctrl(self) -> int: + return self.read(BMP390Addr.pwr_ctrl) + + @pwr_ctrl.setter + def pwr_ctrl(self, value: int) -> None: + self.write(BMP390Addr.pwr_ctrl, value) + + @property + def osr(self) -> int: + return self.read(BMP390Addr.osr) + + @osr.setter + def osr(self, value: int) -> None: + self.write(BMP390Addr.osr, value) + + @property + def odr(self) -> int: + return self.read(BMP390Addr.odr) + + @odr.setter + def odr(self, value: int) -> None: + self.write(BMP390Addr.odr, value) + + @property + def config(self) -> int: + return self.read(BMP390Addr.config) + + @config.setter + def config(self, value: int) -> None: + self.write(BMP390Addr.config, value) + + @property + def cmd(self) -> int: + return self.read(BMP390Addr.cmd) + + @cmd.setter + def cmd(self, value: int) -> None: + self.write(BMP390Addr.cmd, value) + + @cached_property + def nvm_par_t1(self) -> int: + payload = self.read(BMP390NVMAddr.nvm_par_t1, 2) + (coefficient,) = struct.unpack(" int: + payload = self.read(BMP390NVMAddr.nvm_par_t2, 2) + (coefficient,) = struct.unpack(" int: + payload = self.read(BMP390NVMAddr.nvm_par_t3) + (coefficient,) = struct.unpack(" int: + payload = self.read(BMP390NVMAddr.nvm_par_p1, 2) + (coefficient,) = struct.unpack(" int: + payload = self.read(BMP390NVMAddr.nvm_par_p2, 2) + (coefficient,) = struct.unpack(" int: + payload = self.read(BMP390NVMAddr.nvm_par_p3) + (coefficient,) = struct.unpack(" int: + payload = self.read(BMP390NVMAddr.nvm_par_p4) + (coefficient,) = struct.unpack(" int: + payload = self.read(BMP390NVMAddr.nvm_par_p5, 2) + (coefficient,) = struct.unpack(" int: + payload = self.read(BMP390NVMAddr.nvm_par_p6, 2) + (coefficient,) = struct.unpack(" int: + payload = self.read(BMP390NVMAddr.nvm_par_p7) + (coefficient,) = struct.unpack(" int: + payload = self.read(BMP390NVMAddr.nvm_par_p8) + (coefficient,) = struct.unpack(" int: + payload = self.read(BMP390NVMAddr.nvm_par_p9, 2) + (coefficient,) = struct.unpack(" int: + payload = self.read(BMP390NVMAddr.nvm_par_p10) + (coefficient,) = struct.unpack(" int: + payload = self.read(BMP390NVMAddr.nvm_par_p11) + (coefficient,) = struct.unpack(" float: + return self.nvm_par_t1 / (2**-8) + + @cached_property + def par_t2(self) -> float: + return self.nvm_par_t2 / (2**30) + + @cached_property + def par_t3(self) -> float: + return self.nvm_par_t3 / (2**48) + + @cached_property + def par_p1(self) -> float: + return (self.nvm_par_p1 - 2**14) / (2**20) + + @cached_property + def par_p2(self) -> float: + return (self.nvm_par_p2 - 2**14) / (2**29) + + @cached_property + def par_p3(self) -> float: + return self.nvm_par_p3 / (2**32) + + @cached_property + def par_p4(self) -> float: + return self.nvm_par_p4 / (2**37) + + @cached_property + def par_p5(self) -> float: + return self.nvm_par_p5 / (2**-3) + + @cached_property + def par_p6(self) -> float: + return self.nvm_par_p6 / (2**6) + + @cached_property + def par_p7(self) -> float: + return self.nvm_par_p7 / (2**8) + + @cached_property + def par_p8(self) -> float: + return self.nvm_par_p8 / (2**15) + + @cached_property + def par_p9(self) -> float: + return self.nvm_par_p9 / (2**48) + + @cached_property + def par_p10(self) -> float: + return self.nvm_par_p10 / (2**48) + + @cached_property + def par_p11(self) -> float: + return self.nvm_par_p11 / (2**65) + + def compensate_temperature(self, raw_temperature: int) -> float: + p_data1 = raw_temperature - self.par_t1 + p_data2 = p_data1 * self.par_t2 + return p_data2 + (p_data1**2) * self.par_t3 + + def compensate_pressure(self, raw_pressure: int, compensated_temperature: float) -> float: + p_out1 = ( + self.par_p5 + + self.par_p6 * compensated_temperature + + self.par_p7 * (compensated_temperature**2) + + self.par_p8 * (compensated_temperature**3) + ) + + p_out2 = raw_pressure * ( + self.par_p1 + + self.par_p2 * compensated_temperature + + self.par_p3 * (compensated_temperature**2) + + self.par_p4 * (compensated_temperature**3) + ) + + p_data4 = (self.par_p9 + self.par_p10 * compensated_temperature) * (raw_pressure**2) + self.par_p11 * ( + raw_pressure**3 + ) + + return p_out1 + p_out2 + p_data4 diff --git a/src/umrx_app_v3/shuttle_board/bmp390/__init__.py b/src/umrx_app_v3/shuttle_board/bmp390/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/umrx_app_v3/shuttle_board/bmp390/bmp390_shuttle.py b/src/umrx_app_v3/shuttle_board/bmp390/bmp390_shuttle.py new file mode 100644 index 0000000..8491739 --- /dev/null +++ b/src/umrx_app_v3/shuttle_board/bmp390/bmp390_shuttle.py @@ -0,0 +1,215 @@ +import logging +import time +from array import array +from typing import Any, Self + +from umrx_app_v3.mcu_board.app_board_v3_rev0 import ApplicationBoardV3Rev0 +from umrx_app_v3.mcu_board.app_board_v3_rev1 import ApplicationBoardV3Rev1 +from umrx_app_v3.mcu_board.bst_app_board import ApplicationBoard +from umrx_app_v3.mcu_board.bst_protocol_constants import ( + I2CMode, + MultiIOPin, + PinDirection, + PinValue, + SPIBus, + StreamingSamplingUnit, +) +from umrx_app_v3.mcu_board.commands.spi import SPIConfigureCmd +from umrx_app_v3.sensors.bmp390 import BMP390, BMP390Addr, BMP390NVMAddr + +logger = logging.getLogger(__name__) + + +class BMP390ShuttleError(Exception): ... + + +class BMP390Shuttle: + # 1-wire PROM + SHUTTLE_ID = 0x173 + # Pins + SDO = MultiIOPin.MINI_SHUTTLE_PIN_2_3 + CS = MultiIOPin.MINI_SHUTTLE_PIN_2_1 + INT1 = MultiIOPin.MINI_SHUTTLE_PIN_1_6 + # I2C addresses + I2C_DEFAULT_ADDRESS = 0x76 # when SDO -> GND + I2C_ALTERNATIVE_ADDRESS = 0x77 # when SDO -> VDDIO + + def __init__(self, **kw: Any) -> None: + self.board: ApplicationBoard | None = kw["board"] if kw.get("board") else None + self.sensor: BMP390 = BMP390() + self.is_initialized: bool = False + self.is_i2c_configured: bool = False + self.is_spi_configured: bool = False + self.is_polling_streaming_configured: bool = False + self.is_interrupt_streaming_configured: bool = False + + def attach_to(self, board: ApplicationBoard) -> None: + self.board = board + + @classmethod + def on_hardware_v3_rev0(cls) -> Self: + return cls(board=ApplicationBoardV3Rev0()) + + @classmethod + def on_hardware_v3_rev1(cls) -> Self: + return cls(board=ApplicationBoardV3Rev1()) + + def initialize(self) -> None: + self.board.initialize() + self.board.start_communication() + self.is_initialized = True + + def check_connected_hw(self) -> None: + board_info = self.board.board_info + if board_info.shuttle_id != self.SHUTTLE_ID: + error_message = f"Expect shuttle_id={self.SHUTTLE_ID} got {board_info.shuttle_id}" + raise BMP390ShuttleError(error_message) + + def assign_sensor_callbacks(self) -> None: + self.sensor.assign_callbacks(read_callback=self.read_register, write_callback=self.write_register) + + def configure_i2c(self) -> None: + self.board.set_pin_config(self.SDO, PinDirection.OUTPUT, PinValue.LOW) + self.board.set_pin_config(self.CS, PinDirection.OUTPUT, PinValue.HIGH) + self.board.set_vdd_vddio(3.3, 3.3) + time.sleep(0.01) + self.board.configure_i2c(I2CMode.FAST_MODE) + self.assign_sensor_callbacks() + self.is_i2c_configured = True + self.is_spi_configured = False + + def configure_spi(self) -> None: + self.board.set_pin_config(self.CS, PinDirection.OUTPUT, PinValue.HIGH) + self.board.set_vdd_vddio(3.3, 3.3) + time.sleep(0.2) + if isinstance(self.board, ApplicationBoardV3Rev1): + SPIConfigureCmd.set_bus(SPIBus.BUS_1) + self.board.configure_spi() + self.assign_sensor_callbacks() + self.is_spi_configured = True + self.is_i2c_configured = False + + def read_register(self, reg_addr: int, bytes_to_read: int = 1) -> array[int] | int: + if isinstance(reg_addr, (BMP390Addr | BMP390NVMAddr)): + reg_addr = reg_addr.value + if self.is_i2c_configured: + values = self.board.read_i2c(self.I2C_DEFAULT_ADDRESS, reg_addr, bytes_to_read) + if bytes_to_read == 1: + return values[0] + return values + if self.is_spi_configured: + if bytes_to_read == 1: + return self.read_single_register_spi(reg_addr) + return self.read_multiple_spi(reg_addr, bytes_to_read) + + error_message = "Configure I2C or SPI protocol prior to reading registers" + raise BMP390ShuttleError(error_message) + + def read_single_register_spi(self, reg_addr: int) -> int: + values = self.board.read_spi(self.CS, reg_addr, 2) + return values[1] + + def read_multiple_spi(self, start_register_addr: int, bytes_to_read: int) -> array[int]: + values = self.board.read_spi(self.CS, start_register_addr, bytes_to_read + 1) + return values[1:] + + def write_register(self, reg_addr: int, value: int) -> None: + if isinstance(reg_addr, (BMP390Addr | BMP390NVMAddr)): + reg_addr = reg_addr.value + if self.is_i2c_configured: + return self.board.write_i2c(self.I2C_DEFAULT_ADDRESS, reg_addr, array("B", (value,))) + if self.is_spi_configured: + return self.board.write_spi(self.CS, reg_addr, array("B", (value,))) + error_message = "Configure I2C or SPI protocol prior to reading registers" + raise BMP390ShuttleError(error_message) + + def _configure_i2c_polling_streaming( + self, + sampling_time: int, + sampling_unit: StreamingSamplingUnit, + ) -> None: + self.board.streaming_polling_set_i2c_channel( + i2c_address=self.I2C_DEFAULT_ADDRESS, + sampling_time=sampling_time, + sampling_unit=sampling_unit, + register_address=BMP390Addr.data_0.value, + bytes_to_read=(3 + 3 + 2 + 3), + ) + self.board.configure_streaming_polling(interface="i2c") + self.is_polling_streaming_configured = True + + def _configure_spi_polling_streaming( + self, + sampling_time: int, + sampling_unit: StreamingSamplingUnit, + ) -> None: + self.board.streaming_polling_set_spi_channel( + cs_pin=self.CS, + sampling_time=sampling_time, + sampling_unit=sampling_unit, + register_address=BMP390Addr.data_0.value, + bytes_to_read=(1 + 3 + 3 + 2 + 3), + ) + self.board.configure_streaming_polling(interface="spi") + self.is_polling_streaming_configured = True + + def start_measurement(self) -> None: + self.sensor.pwr_ctrl = (1 << 0) | (1 << 1) | (0b11 << 4) + time.sleep(0.1) + + def configure_polling_streaming( + self, + sampling_time: int = 5, + sampling_unit: StreamingSamplingUnit = StreamingSamplingUnit.MILLI_SECOND, + ) -> None: + self.start_measurement() + if self.is_i2c_configured: + return self._configure_i2c_polling_streaming(sampling_time, sampling_unit) + if self.is_spi_configured: + return self._configure_spi_polling_streaming(sampling_time, sampling_unit) + error_message = "Configure I2C or SPI protocol first" + raise BMP390ShuttleError(error_message) + + def _configure_i2c_interrupt_streaming(self) -> None: + self.board.streaming_interrupt_set_i2c_channel( + interrupt_pin=self.INT1, + i2c_address=self.I2C_DEFAULT_ADDRESS, + register_address=BMP390Addr.data_0.value, + bytes_to_read=(3 + 3 + 2 + 3), + ) + self.board.configure_streaming_interrupt(interface="i2c") + self.is_interrupt_streaming_configured = True + + def _configure_spi_interrupt_streaming(self) -> None: + self.board.streaming_interrupt_set_spi_channel( + interrupt_pin=self.INT1, + cs_pin=self.CS, + register_address=BMP390Addr.data_0.value, + bytes_to_read=(1 + 3 + 3 + 2 + 3), + ) + self.board.configure_streaming_interrupt(interface="spi") + self.is_interrupt_streaming_configured = True + + def configure_interrupt_streaming(self) -> None: + self.start_measurement() + self.sensor.int_ctrl = (1 << 1) | (1 << 6) + time.sleep(0.02) + if self.is_i2c_configured: + return self._configure_i2c_interrupt_streaming() + if self.is_spi_configured: + return self._configure_spi_interrupt_streaming() + error_message = "Configure I2C or SPI protocol first" + raise BMP390ShuttleError(error_message) + + def start_streaming(self) -> None: + if self.is_polling_streaming_configured: + return self.board.start_polling_streaming() + if self.is_interrupt_streaming_configured: + return self.board.start_interrupt_streaming() + error_message = "Configure polling or interrupt streaming before streaming start" + raise BMP390ShuttleError(error_message) + + def stop_streaming(self) -> None: + self.board.stop_polling_streaming() + time.sleep(0.15) + self.board.stop_interrupt_streaming() diff --git a/tests/conftest.py b/tests/conftest.py index 4381f5b..524cb55 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -22,6 +22,7 @@ from umrx_app_v3.mcu_board.commands.timer import TimerCmd from umrx_app_v3.sensors.bmi088 import BMI088 from umrx_app_v3.sensors.bmi323 import BMI323 +from umrx_app_v3.sensors.bmp390 import BMP390 from umrx_app_v3.shuttle_board.bmi088.bmi088_shuttle import BMI088Shuttle from umrx_app_v3.shuttle_board.bmi323.bmi323_shuttle import BMI323Shuttle @@ -163,3 +164,8 @@ def bmi323_shuttle(app_board_v3_rev1: ApplicationBoardV3Rev1) -> BMI323Shuttle: @pytest.fixture(scope="session", autouse=True) def bmi323() -> BMI323: return BMI323() + + +@pytest.fixture(scope="session", autouse=True) +def bmp390() -> BMP390: + return BMP390() diff --git a/tests/sensors/test_bmp390.py b/tests/sensors/test_bmp390.py new file mode 100644 index 0000000..a30353d --- /dev/null +++ b/tests/sensors/test_bmp390.py @@ -0,0 +1,58 @@ +import struct +from typing import Any +from unittest.mock import patch + +from umrx_app_v3.sensors.bmp390 import BMP390 + +# def test_bmp390_cached_properties(bmp390: BMP390) -> None: +# all_properties = {key: value for key, value in bmp390.__class__.__dict__.items() if isinstance(value, property)} +# write_only_properties = [ +# key for key, value in all_properties.items() if (value.fset is not None) and (value.fget is None) +# ] +# readable_properties = all_properties.keys() - write_only_properties +# +# for readable_property in readable_properties: +# with patch.object(bmp390, "read") as mocked_read, patch.object(struct, "unpack", return_value=(1, 2, 3)): +# getattr(bmp390, readable_property) +# if readable_property == "sensor_time": +# assert mocked_read.call_count == 2 +# else: +# mocked_read.assert_called_once() + + +def test_bmp390_read_properties(bmp390: BMP390) -> None: + all_properties = {key: value for key, value in bmp390.__class__.__dict__.items() if isinstance(value, property)} + write_only_properties = [ + key for key, value in all_properties.items() if (value.fset is not None) and (value.fget is None) + ] + readable_properties = all_properties.keys() - write_only_properties + + def fake_read(*args: Any) -> int | bytes: + if len(args) <= 1: + return 42 + return b"b" * args[-1] + + for readable_property in readable_properties: + with ( + patch.object(bmp390, "read", side_effect=fake_read) as mocked_read, + patch.object(struct, "unpack", return_value=(42,)), + ): + if not readable_property.startswith("par_"): + getattr(bmp390, readable_property) + mocked_read.assert_called_once() + + for readable_property in readable_properties: + if readable_property.startswith("par_"): + with patch.object(bmp390, "read", side_effect=fake_read) as mocked_read: + getattr(bmp390, readable_property) + mocked_read.assert_not_called() + + +def test_bmp390_write_properties(bmp390: BMP390) -> None: + all_properties = {key: value for key, value in bmp390.__class__.__dict__.items() if isinstance(value, property)} + writable_properties = [key for key, value in all_properties.items() if value.fset is not None] + + for writable_property in writable_properties: + with patch.object(bmp390, "write") as mocked_write: + setattr(bmp390, writable_property, 123) + mocked_write.assert_called_once()