插件
目录
插件¶
Dask 的插件系统使您能够在特定事件上运行自定义 Python 代码。您可以使用调度器、工作进程或 nanny 特定的插件。例如,工作进程插件允许您在工作进程生命周期的特定事件(例如工作进程启动时)在所有工作进程上运行自定义 Python 代码。在下面的每个部分中,您将看到如何创建自己的插件或使用 Dask 提供的内建插件。
调度器插件¶
- class distributed.diagnostics.plugin.SchedulerPlugin[source]¶
扩展调度器的接口
插件能够在特定事件发生时运行自定义钩子(hooks)。调度器在运行其对应方法时会运行此插件的方法。这使得用户代码可以在调度器线程中同步运行,并与调度器本身执行任意操作。
插件通常用于诊断和测量,但它们对调度器拥有完全访问权限,原则上可以影响核心调度。
实现一个插件
继承此类
重写其部分方法
使用
Client.register_plugin
注册插件。
idempotent 属性用于控制当存在同名调度器插件时,是否应忽略当前插件的注册。如果为
True
,则忽略该插件;否则,替换现有插件。默认为False
。示例
>>> class Counter(SchedulerPlugin): ... def __init__(self): ... self.counter = 0 ... ... def transition(self, key, start, finish, *args, **kwargs): ... if start == 'processing' and finish == 'memory': ... self.counter += 1 ... ... def restart(self, scheduler): ... self.counter = 0
>>> plugin = Counter() >>> scheduler.add_plugin(plugin)
- add_worker(scheduler: Scheduler, worker: str) None | Awaitable[None] [source]¶
新工作进程加入集群时运行
如果此方法是同步的,它会立即同步执行,
Scheduler.add_worker
不会向事件循环让步。如果它是异步的,它会在所有同步的SchedulerPlugin.add_worker
钩子执行后被等待。警告
不能保证各个
SchedulerPlugin.add_worker
钩子之间的执行顺序,并且该顺序可能会在没有弃用周期的情况下发生改变。
- remove_worker(scheduler: Scheduler, worker: str, *, stimulus_id: str, **kwargs: Any) None | Awaitable[None] [source]¶
工作进程离开集群时运行
如果此方法是同步的,它会立即同步执行,
Scheduler.remove_worker
不会向事件循环让步。如果它是异步的,它会在所有同步的SchedulerPlugin.remove_worker
钩子执行后被等待。警告
不能保证各个
SchedulerPlugin.remove_worker
钩子之间的执行顺序,并且该顺序可能会在没有弃用周期的情况下发生改变。
- transition(key: Key, start: SchedulerTaskStateState, finish: SchedulerTaskStateState, *args: Any, stimulus_id: str, **kwargs: Any) None [source]¶
任务状态改变时运行
有关转换机制和可用状态的说明,请参阅 调度器任务状态。
警告
这是一个高级功能,转换机制和任务状态的细节可能会在没有弃用周期的情况下发生改变。
- 参数
- key
- start
转换的起始状态。可以是 released, waiting, processing, memory, error 中的一个。
- finish
转换的最终状态。
- stimulus_id
引起转换的刺激 ID。
- *args, **kwargs
转换时传递的更多选项,可能包括工作进程 ID、计算时间等。
- update_graph(scheduler: Scheduler, *, client: str, keys: set[Key], tasks: list[Key], annotations: dict[str, dict[Key, Any]], priority: dict[Key, tuple[int | float, ...]], stimulus_id: str, **kwargs: Any) None [source]¶
新图/任务进入调度器时运行
- 参数
- scheduler
Scheduler 实例。
- client
客户端的唯一 ID。
- keys
客户端调用 update_graph 时关注的键。
- tasks
任务
- annotations
应用于任务的完全解析的注解,格式为
{ "annotation": { "key": "value, ... }, ... }
- priority
分配给任务的任务计算优先级。
- stimulus_id
引起图更新的刺激 ID
- **kwargs
建议允许插件接受更多参数以确保未来的兼容性。
RabbitMQ 示例¶
RabbitMQ 是一个分布式消息队列,我们可以用它来发布关于任务转换的更新。通过将转换发布到 RabbitMQ,我们可以让其他机器处理转换,并将调度器的处理工作量降至最低。有关 RabbitMQ 以及如何消费消息的更多信息,请参阅 RabbitMQ 教程。
import json
from distributed.diagnostics.plugin import SchedulerPlugin
import pika
class RabbitMQPlugin(SchedulerPlugin):
def __init__(self):
# Update host to be your RabbitMQ host
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
self.channel.queue_declare(queue='dask_task_status', durable=True)
def transition(self, key, start, finish, *args, **kwargs):
message = dict(
key=key,
start=start,
finish=finish,
)
self.channel.basic_publish(
exchange='',
routing_key='dask_task_status',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
@click.command()
def dask_setup(scheduler):
plugin = RabbitMQPlugin()
scheduler.add_plugin(plugin)
运行方式:dask scheduler --preload <filename.py>
访问完整任务状态¶
如果您想访问存储在调度器中的完整 distributed.scheduler.TaskState
,可以通过传递并存储对调度器的引用来实现,如下所示:
from distributed.diagnostics.plugin import SchedulerPlugin
class MyPlugin(SchedulerPlugin):
def __init__(self, scheduler):
self.scheduler = scheduler
def transition(self, key, start, finish, *args, **kwargs):
# Get full TaskState
ts = self.scheduler.tasks[key]
@click.command()
def dask_setup(scheduler):
plugin = MyPlugin(scheduler)
scheduler.add_plugin(plugin)
内建调度器插件¶
- class distributed.diagnostics.plugin.PipInstall(packages: list[str], pip_options: list[str] | None = None, restart_workers: bool = False)[source]¶
用于 pip 安装一组包的插件
这接受一组要在所有工作进程上安装的包以及安装时使用的选项。您还可以选择是否在安装完成后重新启动工作进程。
注意
这会增加每个工作进程启动所需的时间。如果可能,我们建议将库包含在工作进程环境或镜像中。这主要用于实验和调试。
- 参数
- packages
要使用 pip 安装的包列表。包应遵循 requirements file format 中定义的结构。包还可以包含 environment variables。
- pip_options
传递给 pip 的附加选项
- restart_workers
安装包后是否重新启动工作进程;仅在工作进程附加了 nanny 进程时有效。
示例
>>> from dask.distributed import PipInstall >>> plugin = PipInstall(packages=["scikit-learn"], pip_options=["--upgrade"]) >>> client.register_plugin(plugin)
使用
TOKEN
环境变量从私有仓库安装包。>>> from dask.distributed import PipInstall >>> plugin = PipInstall(packages=["private_package@git+https://${TOKEN}@github.com/dask/private_package.git]) >>> client.register_plugin(plugin)
- class distributed.diagnostics.plugin.CondaInstall(packages: list[str], conda_options: list[str] | None = None, restart_workers: bool = False)[source]¶
用于 conda 安装一组包的插件
这接受一组要在调度器和所有工作进程上安装的包以及安装时使用的选项。
您还可以选择是否在安装完成后重新启动工作进程。
注意
这会增加集群启动所需的时间。如果可能,我们建议将库包含在集群环境或镜像中。这主要用于实验和调试。
- 参数
- packages
要使用 conda 安装的包列表(可选包含版本)
- conda_options
传递给 conda 的附加选项
- restart_workers
安装包后是否重新启动工作进程,仅在工作进程附加了 nanny 进程时有效
另请参阅
示例
>>> from dask.distributed import CondaInstall >>> plugin = CondaInstall(packages=["scikit-learn"], conda_options=["--update-deps"])
>>> client.register_plugin(plugin)
- class distributed.diagnostics.plugin.InstallPlugin(install_fn: Callable[[], None], restart_workers: bool)[source]¶
用于在集群上安装软件的调度器插件
这接受一个在调度器和所有工作进程上安装软件的函数。您还可以选择是否在安装完成后重新启动工作进程。
注意
这会增加每个工作进程启动所需的时间。如果可能,我们建议将软件包含在工作进程环境或镜像中。这主要用于实验和调试。
- 参数
- install_fn
用于安装软件的可调用对象;必须是幂等的。
- restart_workers
安装包后是否重新启动工作进程,仅在工作进程附加了 nanny 进程时有效
另请参阅
工作进程插件¶
distributed.diagnostics.plugin.WorkerPlugin
提供了一个创建自己的工作进程插件的基础类。此外,Dask 提供了一些 内建插件。
观看下面的视频,了解一个使用 WorkerPlugin
添加 concurrent.futures.ProcessPoolExecutor
的示例。
- class distributed.diagnostics.plugin.WorkerPlugin[source]¶
扩展工作进程的接口
工作进程插件能够在工作进程生命周期的不同阶段运行自定义代码。
插件能够在工作进程生命周期的每个步骤中运行自定义代码。无论何时发生此类事件,都会调用此类的相应方法。请注意,用户代码始终在工作进程的主线程中运行。
实现一个插件
继承此类
重写其部分方法
使用
Client.register_plugin
注册插件。
idempotent 属性用于控制当存在同名工作进程插件时,是否应忽略当前插件的注册。如果为
True
,则忽略该插件;否则,替换现有插件。默认为False
。示例
>>> class ErrorLogger(WorkerPlugin): ... def __init__(self, logger): ... self.logger = logger ... ... def setup(self, worker): ... self.worker = worker ... ... def transition(self, key, start, finish, *args, **kwargs): ... if finish == 'error': ... ts = self.worker.tasks[key] ... exc_info = (type(ts.exception), ts.exception, ts.traceback) ... self.logger.error( ... "Error during computation of '%s'.", key, ... exc_info=exc_info ... )
>>> import logging >>> plugin = ErrorLogger(logging) >>> client.register_plugin(plugin)
- setup(worker: Worker) None | Awaitable[None] [source]¶
插件附加到工作进程时运行。这发生在插件注册并附加到现有工作进程时,或者在插件注册后创建工作进程时。
- transition(key: Key, start: WorkerTaskStateState, finish: WorkerTaskStateState, **kwargs: Any) None [source]¶
在任务的整个生命周期中(参阅 工作进程状态),工作进程根据调度器的指示计算特定任务,从而导致每个任务状态的转换。然后,拥有该任务的工作进程会收到状态转换通知。
每当任务改变状态时,都会调用此方法。
警告
这是一个高级功能,转换机制和任务状态的细节可能会在没有弃用周期的情况下发生改变。
- 参数
- key
- start
转换的起始状态。可以是 waiting, ready, executing, long-running, memory, error 中的一个。
- finish
转换的最终状态。
- kwargs
转换时传递的更多选项
Nanny 插件¶
- class distributed.diagnostics.plugin.NannyPlugin[source]¶
扩展 Nanny 的接口
工作进程插件能够在工作进程生命周期的不同阶段运行自定义代码。Nanny 插件也做同样的事情,但它的优势在于能够在工作进程启动之前运行代码,或者在必要时重新启动工作进程。
实现一个插件
继承此类
重写其部分方法
使用
Client.register_plugin
注册插件。
idempotent 属性用于控制当存在同名 nanny 插件时,是否应忽略当前插件的注册。如果为
True
,则忽略该插件;否则,替换现有插件。默认为False
。restart 属性用于控制注册插件时是否需要重新启动正在运行的
Worker
。
内建 Nanny 插件¶
- class distributed.diagnostics.plugin.UploadDirectory(path, restart_workers=False, update_path=False, skip_words=('.git', '.github', '.pytest_cache', 'tests', 'docs'), skip=(<function UploadDirectory.<lambda>>, ), mode='workers'))[source]¶
将本地目录上传到集群的 Nanny 插件。
- 参数
- path
要上传的目录路径
- scheduler
是否将目录上传到调度器
示例
>>> from distributed.diagnostics.plugin import UploadDirectory >>> client.register_plugin(UploadDirectory("/path/to/directory"))