Skip to content

Commit

Permalink
Merge pull request DiamondLightSource#160 from DiamondLightSource/hyp…
Browse files Browse the repository at this point in the history
…erion_796_xbpm_feedback

Add XBPM feedback for i03
  • Loading branch information
DominicOram committed Sep 15, 2023
2 parents 7058d11 + 3d2a48e commit d384abd
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 4 deletions.
8 changes: 4 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ classifiers = [
]
description = "Ophyd devices and other utils that could be used across DLS beamlines"
dependencies = [
"ophyd@git+https://github.com/Tom-Willemsen/ophyd@40e4b72c582e65e63d13a1650f76de709e5c70bb", # Switch back to just "ophyd" once <relevant PR> merged.
"ophyd@git+https://github.com/DominicOram/ophyd@31a1762435bcb275d2555068233549885eab02e7", # Switch back to just "ophyd" once ophyd#1148, ophyd#1155, ophyd#1140 merged.
"bluesky",
"pyepics",
"dataclasses-json",
"pillow",
"requests",
"graypy",
"pydantic<2.0",
"opencv-python-headless", # For pin-tip detection.
"aioca", # Required for CA support with Ophyd V2.
"p4p", # Required for PVA support with Ophyd V2.
"opencv-python-headless", # For pin-tip detection.
"aioca", # Required for CA support with Ophyd V2.
"p4p", # Required for PVA support with Ophyd V2.
]
dynamic = ["version"]
license.file = "LICENSE"
Expand Down
16 changes: 16 additions & 0 deletions src/dodal/beamlines/i03.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from dodal.devices.smargon import Smargon
from dodal.devices.synchrotron import Synchrotron
from dodal.devices.undulator import Undulator
from dodal.devices.xbpm_feedback import XBPMFeedback
from dodal.devices.xspress3_mini.xspress3_mini import Xspress3Mini
from dodal.devices.zebra import Zebra
from dodal.log import set_beamline as set_log_beamline
Expand Down Expand Up @@ -284,3 +285,18 @@ def flux(wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False) ->
wait_for_connection,
fake_with_ophyd_sim,
)


def xbpm_feedback(
wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False
) -> Flux:
"""Get the i03 XBPM feeback device, instantiate it if it hasn't already been.
If this is called when already instantiated in i03, it will return the existing object.
"""
return device_instantiation(
XBPMFeedback,
"xbpm_feedback",
"",
wait_for_connection,
fake_with_ophyd_sim,
)
13 changes: 13 additions & 0 deletions src/dodal/devices/xbpm_feedback.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,24 @@
from ophyd import Component, Device, EpicsSignal, EpicsSignalRO
from ophyd.status import StatusBase, SubscriptionStatus


class XBPMFeedback(Device):
"""The XBPM feedback device is an IOC that moves the DCM, HFM and VFM to automatically
hold the beam into place, as measured by the XBPM sensor."""

# We need to wait for the beam to be locked into position for this amount of time
# until we are certain that it is stable.
STABILITY_TIME = 3

pos_ok: EpicsSignalRO = Component(EpicsSignalRO, "-EA-FDBK-01:XBPM2POSITION_OK")
pos_stable: EpicsSignalRO = Component(EpicsSignalRO, "-EA-FDBK-01:XBPM2_STABLE")
pause_feedback: EpicsSignal = Component(EpicsSignal, "-EA-FDBK-01:FB_PAUSE")
x: EpicsSignalRO = Component(EpicsSignalRO, "-EA-XBPM-02:PosX:MeanValue_RBV")
y: EpicsSignalRO = Component(EpicsSignalRO, "-EA-XBPM-02:PosY:MeanValue_RBV")

def trigger(self) -> StatusBase:
return SubscriptionStatus(
self.pos_stable,
lambda *, old_value, value, **kwargs: value == 1,
timeout=60,
)
32 changes: 32 additions & 0 deletions tests/devices/unit_tests/test_xbpm_feedback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import pytest
from ophyd.sim import make_fake_device

from dodal.devices.xbpm_feedback import XBPMFeedback


@pytest.fixture
def fake_xbpm_feedback():
FakeXBPMFeedback = make_fake_device(XBPMFeedback)
return FakeXBPMFeedback(name="xbpm")


def test_given_pos_stable_when_xbpm_feedback_kickoff_then_return_immediately(
fake_xbpm_feedback: XBPMFeedback,
):
fake_xbpm_feedback.pos_stable.sim_put(1)
status = fake_xbpm_feedback.trigger()
status.wait(0.1)
assert status.done and status.success


def test_given_pos_not_stable_and_goes_stable_when_xbpm_feedback_kickoff_then_return(
fake_xbpm_feedback: XBPMFeedback,
):
fake_xbpm_feedback.pos_stable.sim_put(0)
status = fake_xbpm_feedback.trigger()

assert not status.done

fake_xbpm_feedback.pos_stable.sim_put(1)
status.wait(0.1)
assert status.done and status.success

0 comments on commit d384abd

Please sign in to comment.