Skip to content

Commit

Permalink
Merge branch 'main' into 736_update_copier
Browse files Browse the repository at this point in the history
  • Loading branch information
DominicOram authored Aug 14, 2024
2 parents 06e2389 + 3c8a934 commit 261d0f0
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 34 deletions.
17 changes: 14 additions & 3 deletions src/dodal/common/udc_directory_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

class PandASubdirectoryProvider(UpdatingDirectoryProvider):
"""Directory provider for the HDFPanda. Points to a panda subdirectory within the
directory path provided, which must exist before attempting to arm the PCAP block"""
directory path provided"""

resource_dir = Path("panda")

Expand All @@ -23,9 +23,20 @@ def __init__(self, directory: Path | None = None):
else None
)

async def update(self, *, directory: Path, **kwargs):
async def update(self, *, directory: Path, suffix: str = "", **kwargs):
"""Update the root directory into which panda pcap files are written. This will result in the panda
subdirectory being created if it does not already exist.
Args:
directory: Path instance that identifies the root folder. This folder must exist. The panda will
attempt to write into the "panda" subdirectory which will be created if not already present.
suffix: Optional str that will be appended to the panda device name along with the file
type extension to construct the output filename
"""
output_directory = directory / self.resource_dir
output_directory.mkdir(exist_ok=True)

self._directory_info = DirectoryInfo(
root=directory, resource_dir=self.resource_dir
root=directory, resource_dir=self.resource_dir, suffix=suffix
)

def __call__(self) -> DirectoryInfo:
Expand Down
69 changes: 38 additions & 31 deletions src/dodal/devices/detector/detector_motion.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,44 @@
from enum import IntEnum
from enum import Enum

from ophyd import Component as Cpt
from ophyd import Device, EpicsMotor, EpicsSignal, EpicsSignalRO
from ophyd_async.core import Device
from ophyd_async.epics.motion import Motor
from ophyd_async.epics.signal import epics_signal_r, epics_signal_rw


class ShutterState(IntEnum):
CLOSED = 0
OPEN = 1
class ShutterState(str, Enum):
CLOSED = "Closed"
OPEN = "Open"


class DetectorMotion(Device):
"""Physical motion and interlocks for detector travel"""

upstream_x = Cpt(EpicsMotor, "-MO-DET-01:UPSTREAMX")
downstream_x = Cpt(EpicsMotor, "-MO-DET-01:DOWNSTREAMX")
x = Cpt(EpicsMotor, "-MO-DET-01:X")
y = Cpt(EpicsMotor, "-MO-DET-01:Y")
z = Cpt(EpicsMotor, "-MO-DET-01:Z")
yaw = Cpt(EpicsMotor, "-MO-DET-01:YAW")
shutter = Cpt(EpicsSignal, "-MO-DET-01:SET_SHUTTER_STATE") # 0=closed, 1=open
# monitors
shutter_closed_lim = Cpt(
EpicsSignalRO, "-MO-DET-01:CLOSE_LIMIT"
) # on limit = 1, off = 0
shutter_open_lim = Cpt(
EpicsSignalRO, "-MO-DET-01:OPEN_LIMIT"
) # on limit = 1, off = 0
z_disabled = Cpt(
EpicsSignalRO, "-MO-DET-01:Z:DISABLED"
) # robot interlock, 0=ok to move, 1=blocked
crate_power = Cpt(
EpicsSignalRO, "-MO-PMAC-02:CRATE2_HEALTHY"
) # returns 0 if no power
in_robot_load_safe_position = Cpt(
EpicsSignalRO, "-MO-PMAC-02:GPIO_INP_BITS.B2"
) # returns 1 if safe
def __init__(self, prefix: str, name: str = ""):
device_prefix = "-MO-DET-01:"
pmac_prefix = "-MO-PMAC-02:"

self.upstream_x = Motor(f"{prefix}{device_prefix}UPSTREAMX")
self.downstream_x = Motor(f"{prefix}{device_prefix}DOWNSTREAMX")
self.x = Motor(f"{prefix}{device_prefix}X")
self.y = Motor(f"{prefix}{device_prefix}Y")
self.z = Motor(f"{prefix}{device_prefix}Z")
self.yaw = Motor(f"{prefix}{device_prefix}YAW")

self.shutter = epics_signal_rw(
ShutterState, f"{prefix}{device_prefix}SET_SHUTTER_STATE"
)
self.shutter_closed_lim = epics_signal_r(
float, f"{prefix}{device_prefix}CLOSE_LIMIT"
) # on limit = 1, off = 0
self.shutter_open_lim = epics_signal_r(
float, f"{prefix}{device_prefix}OPEN_LIMIT"
) # on limit = 1, off = 0
self.z_disabled = epics_signal_r(
float, f"{prefix}{device_prefix}Z:DISABLED"
) # robot interlock, 0=ok to move, 1=blocked
self.crate_power = epics_signal_r(
float, f"{prefix}{pmac_prefix}CRATE2_HEALTHY"
) # returns 0 if no power
self.in_robot_load_safe_position = epics_signal_r(
int, f"{prefix}{pmac_prefix}GPIO_INP_BITS.B2"
) # returns 1 if safe

super().__init__(name)
33 changes: 33 additions & 0 deletions tests/common/test_udc_directory_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,36 @@ async def test_udc_directory_provider_after_update(initial, tmp_path):
directory_info = provider()
assert directory_info.root == tmp_path
assert directory_info.resource_dir == Path("panda")


async def test_udc_directory_provider_no_suffix(tmp_path):
initial = Path("initial")
provider = PandASubdirectoryProvider(initial)
root_path = tmp_path / "my_data"
root_path.mkdir()
await provider.update(directory=root_path)
directory_info = provider()
assert directory_info.root == root_path
assert directory_info.resource_dir == Path("panda")
assert directory_info.suffix == ""


async def test_udc_directory_provider_with_suffix(tmp_path):
initial = Path("initial")
provider = PandASubdirectoryProvider(initial)
root_path = tmp_path / "my_data"
root_path.mkdir()
await provider.update(directory=root_path, suffix="_123")
directory_info = provider()
assert directory_info.root == root_path
assert directory_info.resource_dir == Path("panda")
assert directory_info.suffix == "_123"


async def test_udc_directory_provider_creates_subdirectory_if_not_exists(tmp_path):
root = tmp_path
subdir = root / Path("panda")
assert not subdir.exists()
provider = PandASubdirectoryProvider(Path("initial"))
await provider.update(directory=root)
assert subdir.exists()

0 comments on commit 261d0f0

Please sign in to comment.