编写自定义计划
如在以上简单自定义中提到,count()和scan()的"预装配"计划是从更小的"plan stubs"构建的。我们组合和匹配"stubs"和/或"预装配"计划来构建自定义计划。
有很多计划stubs,因此导入整个模块并且使用它是非常方便的。
import bluesky.plan_stubs as bps
并行移动
在编写一个协调多个设备移动的自定义计划前,考虑你的使用情况是否能用内建的多维扫描之一解决。
我们先前介绍了mv()计划,它移动一个或多个设备并且等待它们都到达。也有用于相对当前位置移动的mvr()。
from ophyd.sim import motor1, motor2
from bluesky import RunEngine
from bluesky.callbacks.best_effort import BestEffortCallbackbec = BestEffortCallback()
RE = RunEngine({})RE.subscribe(bec)# 相对于它们当前位置正方向移动motor1到1个单位和motor2到10个单位.
# 并且等待它们两个到达.
RE(bps.mvr(motor1, 1, motor2, 10))
某些场景需要在等待发生时更底层控制。对于这些,我们使用wait()和abs_set()("绝对设置")或rel_set()("相对设置")。
这是一个需要自定义方案的场景:我们想要一次设置若干电机,包括多个快速电机和一个慢速电机。我们想要等待快速电机到达,打印一条消息,接着等待慢速电机到达,并且打印第二天消息。
def staggered_wait(fast_motors, slow_motor):# 立即启动所有电机,快速和慢速# 放置所有快速电机在一个组...for motor in fast_motors:yield from bps.abs_set(motor, 5, group='A')# ...而在一个单独组中放置慢速电机yield from bps.abs_set(slow_motor, 5, group='B')# 等待所有快速电机print('Waiting on the fast motors.')yield from bps.wait('A')print('Fast motors are in place. Just waiting on the slow one now.')# 接着等待慢速电机yield from bps.wait('B')
睡眠(定时延迟)
注意:如果你需要等待电机结束移动,温度完成平衡,或者快门完成打开,插入延时到计划不是做这件事的最好方法。准确地报告它何时结束应该是设备的事情,包括为稳定或平衡的任何额外的填充。在某些设备上,注入EpicsMotor,这可以像motor.setting_time=3被设置。
对于定时延迟,bluesky有一个特殊计划,它允许RunEngine在睡眠过程中继续它的事情。
from ophyd.sim import motordef sleepy_plan(motor, positions):# 在步与步之间1秒延时,步进一个电机通过一个位置列表i = 1for position in positions:print("step %d" % (i,))yield from bps.mv(motor, position)yield from bps.sleep(1)i = i + 1RE(sleepy_plan(motor, [1,2,3]))
你应该总是使用这个计划,不要Python的内建函数:func:'time.sleep'。为什么?RunEngine使用一个事件循环并发管理很多任务。它认为那些任务中没有任务阻塞非常长。(对应"非常长"代表性数值是0.2秒)。因而,你不应该在你的计划中包含长阻塞函数,诸如time.sleep(1)。
捕获数据
产生数据的任何计划必须包含把读取分组到事件的指令(即,在一个表格中的行)并且把那些事件分组到Runs(被赋予一个"扫描ID"的数据集)。通过示例最好地解释了这个。
import bluesky.plan_stubs as bps
from ophyd.sim import det1, det2def one_run_one_event(detectors):# 声明一个新run地开始yield from bps.open_run()yield from bps.declare_stream(*detectors, name='primary')# 触发每个探测器并且等待触发结束.# 接着读取探测器并且把这些读取捆绑到一个事件# (即,在表格中一行)yield from bps.trigger_and_read(detectors)# 声明这个run的结束yield from bps.close_run()
像这样执行这个计划:
RE(one_run_one_event([det1, det2]))
我们观察:
1、一个表格(一个Run)
2、一行(一个Event)
3、两列(每个探测器一列)
再次,这是相同的计划,将trigger_and_read移动到一个for循环中。
import bluesky.plan_stubs as bps
from ophyd.sim import det1, det2def one_run_multi_event(detectors, num):# 声明一个新run地开始yield from bps.open_run()yield from bps.declare_stream(*detectors, name='primary')for i in range(num):yield from bps.trigger_and_read(detectors)# 声明这个run的结束yield from bps.close_run()
像这样执行这个计划:
我们观察:
1、一个表格(一个Run)
2、三行(三个事件)
3、两列(每个探测器1列)
最后,我们添加另一个在其循环中重复使用one_run_multi_events的循环。
def multi_runs_multi_events(detectors, num, num_runs):for i in range(num_runs):yield from one_run_multi_event(detectors,num)
我们观察:
1、两个表格(两个Run)
2、三行(三个事件)
3、两列(每个探测器1列)
我们也注意到,从RunEngine输出的返回值是一个二个唯一IDs的元组,由此计划产生每个Run一个唯一ID。
为了关注Event和Run的作用域,我们省去了一个重要细节,在下面章节讲述,在真是设备上尝试这些计划前,它必需包含。
stage和unstage
复杂设备在它们可以用于数据采集前经常需要某些初始设置,使它们从一个空闲状态转变为它们为一个准备采集数据的状态。Bluesky通过允许每个Device实现一个可选的stage()方法和一个对应的unstage()方法,用一种通用的方法提供了此特性。计划应该stage每个设备它们准确到达一次并且在结束时unstage一次。如果一个设备没有stage()方法,RunEngine将只是跳过它。
修改我们最简单的示例one_run_one_event:
import bluesky.plan_stubs as bps
from ophyd.sim import det1, det2def one_run_one_event(detectors):yield from bps.open_run()yield from bps.declare_stream(*detectors, name='primary')yield from bps.trigger_and_read(detectors)yield from bps.close_run()
我们像这样组合了staging:
import bluesky.plan_stubs as bps
from ophyd.sim import det1, det2def one_run_one_event(detectors):# ‘Stage’每个设备for det in detectors:yield from bps.stage(det)yield from bps.open_run()yield from bps.declare_stream(*detectors, name='primary')yield from bps.trigger_and_read(detectors)yield from bps.close_run()# 'unstage'每个设备for det in detectors:yield from bps.unstage(det)
这是开始获取详细说明。到此,我们会想要接受某些额外复杂性以换取简洁,并且确保我们不忘记在匹配括号中使用这些计划。到此,这个计划相当于:
import bluesky.plan_stubs as bps
import bluesky.preprocessors as bpp
from ophyd.sim import det1, det2def one_run_one_event(detectors):@bpp.stage_decorator(detectors)def inner():yield from bps.open_run()yield from bps.declare_stream(*detectors, name='primary')yield from bps.trigger_and_read(detectors)yield from bps.close_run()return (yield from inner())
stage_decorator()是一个计划预处理程序,一个消费另一个计划并且修改其指令的计划。在这种情况中,它插入了'stage'和'unstage'消息,取代了stage()和unstage()。我们通过使用run_decorator(),取代了open_run()和close_run(),可以还可以减少冗长内容。
import bluesky.plan_stubs as bps
import bluesky.preprocessors as bpp
from ophyd.sim import det1, det2def one_run_one_event(detectors):@bpp.stage_decorator(detectors)@bpp.run_decorator()def inner():yield from bps.declare_stream(*detectors, name='primary')yield from bps.trigger_and_read(detectors)return (yield from inner())
在有关baseline读取的部分中,回忆我们已经在本教程中遇到过一个预处理器。supplementaldata是一个预处理程序。
添加元数据
为了使搜索由计划产生的数据变得容易,并且检查之后做了什么,我们应该包含某些元数据。我们创建了一个字典并且传递它给run_decorator()(或者,用更详细的公式,传给open_run)。RunEngine将组合元数据和由用户提供的任何信息,如在有关元数据更早章节中展示的。
import bluesky.plan_stubs as bps
import bluesky.preprocessors as bpp
from ophyd.sim import det1, det2def one_run_one_event(detectors):md = {# 探测器Devices的人性化名称(对搜索有用)'detectors': [det.name for det in detectors],# python 'repr'的每个参数给这个计划'plan_args':{'detectors':list(map(repr, detectors))},# 这个计划的名称'plan_name': 'one_run_one_event',}@bpp.stage_decorator(detectors)@bpp.run_decorator()def inner():yield from bps.declare_stream(*detectors, name='primary')yield from bps.trigger_and_read(detectors)return (yield from inner())
警告:在元数据字段中的值必须仅是字符串,数值,lists/arrays,或者字典。元数据不能包含任意Python类型,因为下游消费者(如数据库)不知道对那些做什么并且将出错。
出于礼貌,我们应该允许用户重写这些元数据。为此目的,所有bluesky的"预组装"计划(count(), scan()等)提供了一个可选的md参数,像这样实现:
import bluesky.plan_stubs as bps
import bluesky.preprocessors as bppdef one_run_one_event(detectors, md=None):md = {# 探测器Devices的人性化名称(对搜索有用)'detectors': [det.name for det in detectors],# python 'repr'的每个参数给这个计划'plan_args':{'detectors':list(map(repr, detectors))},# 这个计划的名称'plan_name': 'one_run_one_event',}# 如果在md中存在一个键,它用在_md中重写默认值_md.update(md or {})@bpp.stage_decorator(detectors)@bpp.run_decorator()def inner():yield from bps.declare_stream(*detectors, name='primary')yield from bps.trigger_and_read(detectors)return (yield from inner())
在元数据中添加"提示"
元数据字典可以可选地包含一个名为'hints'的键。这个键对BestEffortCallback和潜在地其它下游消费者尤其重要,它们使用它尝试推断表示数据地有用方式。当前,它解决两个特别问题。
1、缩小较大范围读数范围到放入表格中可管理数目的最重要读数
2、标识数据维度(1D扫描?2D网格?N-D网格)和应变和自变参数,用于可视化和峰拟合目的。
由每个设备来解决(1)。计划在那中没有作用。每个设备有一个可选的hints属性,如{'fields':[...]}的一个值来回答这个问题。“你产生的所有读数,最重要读数色名称是什么?”
我们需要计划在(2)帮助我们。只有计划能够分类哪些设备被用于"独立"轴而哪些被作为应变量被测量。单看设备是不明确的,因为任何可移动的设备取决于上下文可以用作一个轴或者一个"探测器"-count([motor])是一件要做的完美有效事情。
计划的提示元数据的方案是:
{'dimensions': [([<FIELD>, ...], <STREAM_NAME>),([<FIELD>, ...], <STREAM_NAME>),...]}
示例:
# 在X上1维扫描
{'dimensions': [(['x'], 'primary')]}# 在X和Y上2网格维扫描
{'dimensions': [(['x'], 'primary'),(['y'], 'primary')]}# 一个沿一条对角线一起移动X和Y的扫描
{'dimensions': [(['x', 'y'], 'primary')]}# 一条在温度上的1D扫描,用C和K为单位表示
{'dimensions': [(['C', 'K'], 'primary')]}# 在能量上的一维扫描,用能量和衍射仪未知测量
{'dimensions': [(['E', 'dcm'], 'primary')]}# 特殊情况:一个读取序列,在其中自变轴仅是时间
{'dimensions': [(['time'], 'primary')]}
在外层列表中的每个条目表示一个独立维度。一个维度可能由多个字段表示,要么来自用通过计划(['x', 'y'])协调方式移动的不同设备,表示为来自一个设备(['C', 'K'])完全冗余信息,或者来自两个子设备(['E', 'dcm'])的耦合信息。
在每个条目中第二个元素是这个流名称:在每个以上示例中'primary'。这应该对应在此计划中传递给trigger_and_read()或create()的name。默认名称是primary。
放置它们都在一起,这个计划为它们的重要字段请求设备被用作独立轴并且构建一个像这样的维度列表:
dimensions = [(motor.hints['fields'], 'primary')]
我们必须考虑hints是可选的实际。一个指定设备可能完全没有hints属性,并且即使它有,提示可能不包含我们感兴趣的'fields'字段。如果设备没有提供必要信息,这种模式悄悄地忽略维度提示:
def scan(..., md=None):_md = {...}_md.update(md or {})try:dimensions = [(motor.hints['fields'], 'primary')]except (AttributeError, KeyError):passelse:_md['hints'].setdefault('dimensions', dimensions)...
最后,通过使用setdefault,如果它们更了解传入scan(..., md={'hint':...}),我们运行用户重写这些hints。
在计划中自适应逻辑
在生成器和RunEngine之间,双向通信是可能的。例如,trigger_and_read()计划用其读数响应。我们可以使用它做有关是否继续或者停止的即时决策。
import bluesky.preprocessors as bpp
import bluesky.plan_stubs as bps
from ophyd.sim import det, motor
def conditional_break(threshold):"""在探测器读数强度小于阈值前,设置,触发,读取 """@bpp.stage_decorator([det, motor])@bpp.run_decorator()def inner():i = 0yield from bps.declare_stream(det, name='primary')while True:yield from bps.mv(motor, i)readings = yield from bps.trigger_and_read([det])if readings['det']['value'] < threshold:breaki += 1return (yield from inner())
演示:
在本例中重要行是:
readings = yield from bps.trigger_and_read([det])
此操作像这样运行:
1、此计划生成一个'read'消息给RunEngine
2、RunEngine读取这个探测器
3、RunEngine发回读数给这个计划,并且给这个变量reading分配那个响应。
响应reading被格式化如下:
{<name>: {'value': <value>, 'timestamp': <timestamp>}, ...}
需要此消息和它们响应的详细技术描述,见消息协议。
计划"清理"(异常处理)
如果产生了一个异常,RunEngine给计划捕获异常的机会,并且要么处理它或者在再次产生这个异常并且杀死计划执行前仅产生一些"清理"消息。(从一行暂停,继续,挂起回忆这个)。
这是一般想法:
# 本例是说明的,单它不是完全正确。
# 使用'finalize_wrapper'替代(或者读取它的源代码)
def plan_with_cleanup():def main_plan():# do stuff...def cleanup_plan():# do other stuff...try:yield from main_plan()finally:# 即使产生了一个异常,做这件事yield from cleanup_plan()
出现的异常可能源自这个计划自身或者来自RunEngine尝试执行一指定命令时的RunEngine。
finalize_wrapper()预处理程序提供了使用这种通用模式的简洁和完全正确的方式。
import bluesky.preprocessors as bppdef plan_with_cleanup():yield from bpp.finalize_wrapper(main_plan(), cleanup_plan())