diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 60fc23f9a..4f652dd2b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -22,3 +22,12 @@ repos: entry: ruff format --force-exclude types: [python] require_serial: true + + # Type checking + - repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.4.1 + hooks: + - id: mypy + files: 'src/.*\.py$' + additional_dependencies: [types-requests, types-redis] + args: ["--ignore-missing-imports", "--no-strict-optional"] diff --git a/docs/developer/index.rst b/docs/developer/index.rst index 5038b495d..0b4bf99ba 100644 --- a/docs/developer/index.rst +++ b/docs/developer/index.rst @@ -29,3 +29,15 @@ Documentation is split general and topic-specific sections. Each section is spli +++ Documentation relating to serial crystallography on I24 + + .. grid-item-card:: :material-regular:`apps;3em` + + .. toctree:: + :caption: Murko Integration + :maxdepth: 1 + + murko-integration/index + + +++ + + Documentation relating to integrating the murko image recognition system diff --git a/docs/developer/murko-integration/explanations/architecture.rst b/docs/developer/murko-integration/explanations/architecture.rst new file mode 100644 index 000000000..a00c27625 --- /dev/null +++ b/docs/developer/murko-integration/explanations/architecture.rst @@ -0,0 +1,18 @@ +Murko Architecture +------------------ + +The architecture of how Murko is integrated is still in flux but as of 07/08 the following has been tried on the beamline. + +.. image:: ../images/murko_setup.drawio.png + +The mx_bluesky code is deployed into the `beamline kubernetes cluster `_ behind a `blueAPI `_ REST interface. Alongside this an instance of murko is also deployed to the k8s cluster. + +When GDA reaches a point where it will thaw the pin (usually just after robot load), if the ``gda.mx.bluesky.thaw`` property has been set to True it will instead call out to the thaw_and_center plan inside mx_bluesky. + +The thawing plan will first create a ``MurkoCallback`` callback, this will stream metadata into a key in a redis database running on i04-control (this should be moved in `#145 `_). + +It will then trigger the ``OAVToRedisForwarder`` device in ``dodal`` that will stream image data to redis in the form of pickled RGB numpy arrays. Each image is given a uuid by this device, which is used to correlate the images with the other metadata, which could be streamed with a different frequency. + +The image streaming must be done with an ophyd device as there is too much data for it all to be emitted in bluesky documents. + +When the data is entered into redis it will publish a message to the redis ``murko`` channel. This will get picked up by the `socket_handler `_, which will forward the data to murko. Currently, during testing the socket_handler is just manually run on a workstation, `#146 `_ should fix this. diff --git a/docs/developer/murko-integration/images/murko_setup.drawio.png b/docs/developer/murko-integration/images/murko_setup.drawio.png new file mode 100644 index 000000000..481097d2d Binary files /dev/null and b/docs/developer/murko-integration/images/murko_setup.drawio.png differ diff --git a/docs/developer/murko-integration/index.rst b/docs/developer/murko-integration/index.rst new file mode 100644 index 000000000..3b69aaafb --- /dev/null +++ b/docs/developer/murko-integration/index.rst @@ -0,0 +1,22 @@ +Murko Integration +================= + +The `Murko system `_ uses ML to analyse images of sample from optical cameras and determine where the sample is. + +Its integration at DLS, using Bluesky, is still a work in progress but this documentation aims to give an overview of how it works. + + +.. grid:: 2 + :gutter: 2 + + .. grid-item-card:: :material-regular:`apartment;3em` + + .. toctree:: + :caption: Explanations + :maxdepth: 1 + + explanations/architecture + + +++ + + Explanations of how and why the architecture is why it is. \ No newline at end of file diff --git a/src/mx_bluesky/i04/__init__.py b/src/mx_bluesky/i04/__init__.py index ad869ba61..1de75fce0 100644 --- a/src/mx_bluesky/i04/__init__.py +++ b/src/mx_bluesky/i04/__init__.py @@ -1,3 +1,3 @@ -from mx_bluesky.i04.thawing_plan import thaw +from mx_bluesky.i04.thawing_plan import thaw, thaw_and_center -__all__ = ["thaw"] +__all__ = ["thaw", "thaw_and_center"] diff --git a/src/mx_bluesky/i04/callbacks/murko_callback.py b/src/mx_bluesky/i04/callbacks/murko_callback.py new file mode 100644 index 000000000..55df2abdf --- /dev/null +++ b/src/mx_bluesky/i04/callbacks/murko_callback.py @@ -0,0 +1,46 @@ +import copy +import json +from typing import Optional + +from bluesky.callbacks import CallbackBase +from dodal.log import LOGGER +from event_model.documents import Event, RunStart +from redis import StrictRedis + + +class MurkoCallback(CallbackBase): + def __init__(self, redis_host: str, redis_password: str, redis_db: int = 0): + self.redis_client = StrictRedis( + host=redis_host, password=redis_password, db=redis_db + ) + self.last_uuid = None + + def start(self, doc: RunStart) -> Optional[RunStart]: + self.murko_metadata = { + "zoom_percentage": doc.get("zoom_percentage"), + "microns_per_x_pixel": doc.get("microns_per_x_pixel"), + "microns_per_y_pixel": doc.get("microns_per_y_pixel"), + "beam_centre_i": doc.get("beam_centre_i"), + "beam_centre_j": doc.get("beam_centre_j"), + "sample_id": doc.get("sample_id"), + } + self.last_uuid = None + return doc + + def event(self, doc: Event) -> Event: + if latest_omega := doc["data"].get("smargon-omega"): + if self.last_uuid is not None: + self.call_murko(self.last_uuid, latest_omega) + elif (uuid := doc["data"].get("oav_to_redis_forwarder-uuid")) is not None: + self.last_uuid = uuid + return doc + + def call_murko(self, uuid: str, omega: float): + metadata = copy.deepcopy(self.murko_metadata) + metadata["omega_angle"] = omega + metadata["uuid"] = uuid + + # Send metadata to REDIS and trigger murko + self.redis_client.hset("test-metadata", uuid, json.dumps(metadata)) + self.redis_client.publish("murko", json.dumps(metadata)) + LOGGER.info("Metadata sent to redis") diff --git a/src/mx_bluesky/i04/thawing_plan.py b/src/mx_bluesky/i04/thawing_plan.py index 1908ebe33..baa49edb5 100644 --- a/src/mx_bluesky/i04/thawing_plan.py +++ b/src/mx_bluesky/i04/thawing_plan.py @@ -1,16 +1,58 @@ import bluesky.plan_stubs as bps import bluesky.preprocessors as bpp +from bluesky.preprocessors import run_decorator, subs_decorator from dls_bluesky_core.core import MsgGenerator +from dodal.beamlines.i04 import MURKO_REDIS_DB, REDIS_HOST, REDIS_PASSWORD from dodal.common import inject +from dodal.devices.oav.oav_detector import OAV +from dodal.devices.oav.oav_to_redis_forwarder import OAVToRedisForwarder +from dodal.devices.robot import BartRobot from dodal.devices.smargon import Smargon from dodal.devices.thawer import Thawer, ThawerStates +from mx_bluesky.i04.callbacks.murko_callback import MurkoCallback + + +def thaw_and_center( + time_to_thaw: float, + rotation: float = 360, + robot: BartRobot = inject("robot"), + thawer: Thawer = inject("thawer"), + smargon: Smargon = inject("smargon"), + oav: OAV = inject("oav"), + oav_to_redis_forwarder: OAVToRedisForwarder = inject("oav_to_redis_forwarder"), +) -> MsgGenerator: + zoom_percentage = yield from bps.rd(oav.zoom_controller.percentage) + sample_id = yield from bps.rd(robot.sample_id) + + yield from bps.abs_set(oav.zoom_controller.level, "1.0x", wait=True) + + @subs_decorator(MurkoCallback(REDIS_HOST, REDIS_PASSWORD, MURKO_REDIS_DB)) + @run_decorator( + md={ + "microns_per_x_pixel": oav.parameters.micronsPerXPixel, + "microns_per_y_pixel": oav.parameters.micronsPerYPixel, + "beam_centre_i": oav.parameters.beam_centre_i, + "beam_centre_j": oav.parameters.beam_centre_j, + "zoom_percentage": zoom_percentage, + "sample_id": sample_id, + } + ) + def _thaw_and_center(): + yield from bps.kickoff(oav_to_redis_forwarder, wait=True) + yield from bps.monitor(smargon.omega.user_readback, name="smargon") + yield from bps.monitor(oav_to_redis_forwarder.uuid, name="oav") + yield from thaw(time_to_thaw, rotation, thawer, smargon) + yield from bps.complete(oav_to_redis_forwarder) + + yield from _thaw_and_center() + def thaw( time_to_thaw: float, rotation: float = 360, - thawer: Thawer = inject("thawer"), # type: ignore - smargon: Smargon = inject("smargon"), # type: ignore + thawer: Thawer = inject("thawer"), + smargon: Smargon = inject("smargon"), ) -> MsgGenerator: """Rotates the sample and thaws it at the same time. diff --git a/tests/i04/callbacks/test_murko_callback.py b/tests/i04/callbacks/test_murko_callback.py new file mode 100644 index 000000000..2e160df15 --- /dev/null +++ b/tests/i04/callbacks/test_murko_callback.py @@ -0,0 +1,107 @@ +import json +from unittest.mock import MagicMock + +import pytest +from event_model import Event + +from mx_bluesky.i04.callbacks.murko_callback import MurkoCallback + +test_oav_uuid = "UUID" +test_smargon_data = 90 + + +def event_template(data_key, data_value) -> Event: + return { + "descriptor": "bd45c2e5-2b85-4280-95d7-a9a15800a78b", + "time": 1666604299.828203, + "data": {data_key: data_value}, + "timestamps": {data_key: 1666604299.8220396}, + "seq_num": 1, + "uid": "29033ecf-e052-43dd-98af-c7cdd62e8173", + "filled": {}, + } + + +test_oav_event = event_template("oav_to_redis_forwarder-uuid", test_oav_uuid) +test_smargon_event = event_template("smargon-omega", test_smargon_data) + +test_start_document = { + "uid": "event_uuid", + "zoom_percentage": 80, + "microns_per_x_pixel": 1.2, + "microns_per_y_pixel": 2.5, + "beam_centre_i": 158, + "beam_centre_j": 452, + "sample_id": 12345, +} + + +@pytest.fixture +def murko_callback() -> MurkoCallback: + callback = MurkoCallback("", "") + callback.redis_client = MagicMock() + return callback + + +@pytest.fixture +def murko_with_mock_call(murko_callback) -> MurkoCallback: + murko_callback.call_murko = MagicMock() + return murko_callback + + +def test_when_oav_data_arrives_then_murko_not_called( + murko_with_mock_call: MurkoCallback, +): + murko_with_mock_call.event(test_oav_event) + murko_with_mock_call.call_murko.assert_not_called() # type: ignore + + +def test_when_smargon_data_arrives_then_murko_not_called( + murko_with_mock_call: MurkoCallback, +): + murko_with_mock_call.event(test_smargon_event) + murko_with_mock_call.call_murko.assert_not_called() # type: ignore + + +def test_when_smargon_data_arrives_before_oav_data_then_murko_not_called( + murko_with_mock_call: MurkoCallback, +): + murko_with_mock_call.event(test_smargon_event) + murko_with_mock_call.event(test_oav_event) + murko_with_mock_call.call_murko.assert_not_called() # type: ignore + + +def test_when_smargon_data_arrives_before_oav_data_then_murko_called_with_smargon_data( + murko_with_mock_call: MurkoCallback, +): + murko_with_mock_call.event(test_oav_event) + murko_with_mock_call.event(test_smargon_event) + murko_with_mock_call.call_murko.assert_called_once_with( # type: ignore + test_oav_uuid, test_smargon_data + ) + + +def test_when_murko_called_with_event_data_then_meta_data_put_in_redis( + murko_callback: MurkoCallback, +): + murko_callback.start(test_start_document) # type: ignore + murko_callback.event(test_oav_event) + murko_callback.event(test_smargon_event) + + expected_metadata = { + "zoom_percentage": 80, + "microns_per_x_pixel": 1.2, + "microns_per_y_pixel": 2.5, + "beam_centre_i": 158, + "beam_centre_j": 452, + "sample_id": 12345, + "omega_angle": test_smargon_data, + "uuid": test_oav_uuid, + } + + murko_callback.redis_client.hset.assert_called_once_with( # type: ignore + "test-metadata", test_oav_uuid, json.dumps(expected_metadata) + ) + murko_callback.redis_client.publish.assert_called_once_with( # type: ignore + "murko", json.dumps(expected_metadata) + ) diff --git a/tests/i04/test_thawing.py b/tests/i04/test_thawing.py index a85436c71..0f7e832b2 100644 --- a/tests/i04/test_thawing.py +++ b/tests/i04/test_thawing.py @@ -1,21 +1,45 @@ from collections.abc import AsyncGenerator -from unittest.mock import ANY, MagicMock, call +from typing import Literal +from unittest.mock import ANY, MagicMock, call, patch import pytest +from _pytest.python_api import ApproxBase +from bluesky.run_engine import RunEngine from dodal.beamlines import i04 +from dodal.devices.oav.oav_detector import OAV +from dodal.devices.oav.oav_to_redis_forwarder import OAVToRedisForwarder from dodal.devices.smargon import Smargon from dodal.devices.thawer import Thawer, ThawerStates -from ophyd.sim import NullStatus -from ophyd_async.core import callback_on_mock_put, get_mock_put, set_mock_value +from ophyd.sim import NullStatus, instantiate_fake_device +from ophyd_async.core import ( + DeviceCollector, + callback_on_mock_put, + get_mock_put, + set_mock_value, +) from ophyd_async.epics.motion import Motor -from mx_bluesky.i04.thawing_plan import thaw +from mx_bluesky.i04.thawing_plan import thaw, thaw_and_center + +DISPLAY_CONFIGURATION = "tests/devices/unit_tests/test_display.configuration" +ZOOM_LEVELS_XML = "tests/devices/unit_tests/test_jCameraManZoomLevels.xml" class MyException(Exception): pass +@pytest.fixture +def oav() -> OAV: + oav: OAV = instantiate_fake_device(OAV, params=MagicMock()) + + oav.zoom_controller.zrst.set("1.0x") + + oav.wait_for_connection() + + return oav + + def patch_motor(motor: Motor, initial_position: float = 0): set_mock_value(motor.user_setpoint, initial_position) set_mock_value(motor.user_readback, initial_position) @@ -29,7 +53,7 @@ def patch_motor(motor: Motor, initial_position: float = 0): @pytest.fixture -async def smargon(RE) -> AsyncGenerator[Smargon, None]: +async def smargon(RE: RunEngine) -> AsyncGenerator[Smargon, None]: smargon = Smargon(name="smargon") await smargon.connect(mock=True) @@ -40,15 +64,25 @@ async def smargon(RE) -> AsyncGenerator[Smargon, None]: @pytest.fixture -async def thawer(RE) -> Thawer: +async def thawer(RE: RunEngine) -> Thawer: return i04.thawer(fake_with_ophyd_sim=True) +@pytest.fixture +async def oav_forwarder(RE: RunEngine) -> OAVToRedisForwarder: + with DeviceCollector(mock=True): + oav_forwarder = OAVToRedisForwarder( + "prefix", "host", "password", name="oav_to_redis_forwarder" + ) + return oav_forwarder + + def _do_thaw_and_confirm_cleanup( move_mock: MagicMock, smargon: Smargon, thawer: Thawer, do_thaw_func ): smargon.omega.set = move_mock set_mock_value(smargon.omega.velocity, initial_velocity := 10) + smargon.omega.set = move_mock do_thaw_func() last_thawer_call = get_mock_put(thawer.control).call_args_list[-1] assert last_thawer_call == call(ThawerStates.OFF, wait=ANY, timeout=ANY) @@ -57,7 +91,7 @@ def _do_thaw_and_confirm_cleanup( def test_given_thaw_succeeds_then_velocity_restored_and_thawer_turned_off( - smargon: Smargon, thawer: Thawer, RE + smargon: Smargon, thawer: Thawer, RE: RunEngine ): def do_thaw_func(): RE(thaw(10, thawer=thawer, smargon=smargon)) @@ -68,7 +102,7 @@ def do_thaw_func(): def test_given_moving_smargon_gives_error_then_velocity_restored_and_thawer_turned_off( - smargon: Smargon, thawer: Thawer, RE + smargon: Smargon, thawer: Thawer, RE: RunEngine ): def do_thaw_func(): with pytest.raises(MyException): @@ -88,7 +122,12 @@ def do_thaw_func(): ], ) def test_given_different_rotations_and_times_then_velocity_correct( - smargon: Smargon, thawer: Thawer, time, rotation, expected_speed, RE + smargon: Smargon, + thawer: Thawer, + time: float | Literal[10] | Literal[50], + rotation: Literal[360] | Literal[100] | Literal[-100], + expected_speed: ApproxBase | Literal[72] | Literal[4], + RE: RunEngine, ): RE(thaw(time, rotation, thawer=thawer, smargon=smargon)) first_velocity_call = get_mock_put(smargon.omega.velocity).call_args_list[0] @@ -104,7 +143,12 @@ def test_given_different_rotations_and_times_then_velocity_correct( ], ) def test_given_different_rotations_then_motor_moved_relative( - smargon: Smargon, thawer: Thawer, start_pos, rotation, expected_end, RE + smargon: Smargon, + thawer: Thawer, + start_pos: Literal[0] | Literal[78], + rotation: Literal[360] | Literal[100] | Literal[-100], + expected_end: Literal[360] | Literal[178] | Literal[-100], + RE: RunEngine, ): set_mock_value(smargon.omega.user_readback, start_pos) RE(thaw(10, rotation, thawer=thawer, smargon=smargon)) @@ -112,3 +156,61 @@ def test_given_different_rotations_then_motor_moved_relative( call(expected_end, wait=ANY, timeout=ANY), call(start_pos, wait=ANY, timeout=ANY), ] + + +@patch("mx_bluesky.i04.thawing_plan.MurkoCallback") +def test_thaw_and_centre_adds_murko_callback_and_produces_expected_messages( + patch_murko_callback: MagicMock, + smargon: Smargon, + thawer: Thawer, + oav_forwarder: OAVToRedisForwarder, + oav: OAV, + RE: RunEngine, +): + patch_murko_instance = patch_murko_callback.return_value + RE( + thaw_and_center( + 10, + 360, + thawer=thawer, + smargon=smargon, + oav=oav, + robot=MagicMock(), + oav_to_redis_forwarder=oav_forwarder, + ) + ) + + docs = patch_murko_instance.call_args_list + start_params = [c.args[1] for c in docs if c.args[0] == "start"] + event_params = [c.args[1] for c in docs if c.args[0] == "event"] + assert len(start_params) == 1 + assert len(event_params) == 4 + oav_updates = [ + e for e in event_params if "oav_to_redis_forwarder-uuid" in e["data"].keys() + ] + smargon_updates = [e for e in event_params if "smargon-omega" in e["data"].keys()] + assert len(oav_updates) > 0 + assert len(smargon_updates) > 0 + + +@patch("mx_bluesky.i04.thawing_plan.MurkoCallback.call_murko") +def test_thaw_and_centre_will_produce_events_that_call_murko( + patch_murko_call: MagicMock, + smargon: Smargon, + thawer: Thawer, + oav_forwarder: OAVToRedisForwarder, + oav: OAV, + RE: RunEngine, +): + RE( + thaw_and_center( + 10, + 360, + thawer=thawer, + smargon=smargon, + oav=oav, + robot=MagicMock(), + oav_to_redis_forwarder=oav_forwarder, + ) + ) + patch_murko_call.assert_called()