插件

Dask 的插件系统使您能够在特定事件上运行自定义 Python 代码。您可以使用调度器、工作进程或 nanny 特定的插件。例如,工作进程插件允许您在工作进程生命周期的特定事件(例如工作进程启动时)在所有工作进程上运行自定义 Python 代码。在下面的每个部分中,您将看到如何创建自己的插件或使用 Dask 提供的内建插件。

调度器插件

class distributed.diagnostics.plugin.SchedulerPlugin[source]

扩展调度器的接口

插件能够在特定事件发生时运行自定义钩子(hooks)。调度器在运行其对应方法时会运行此插件的方法。这使得用户代码可以在调度器线程中同步运行,并与调度器本身执行任意操作。

插件通常用于诊断和测量,但它们对调度器拥有完全访问权限,原则上可以影响核心调度。

实现一个插件

  1. 继承此类

  2. 重写其部分方法

  3. 使用 Client.register_plugin 注册插件。

idempotent 属性用于控制当存在同名调度器插件时,是否应忽略当前插件的注册。如果为 True,则忽略该插件;否则,替换现有插件。默认为 False

示例

>>> class Counter(SchedulerPlugin):
...     def __init__(self):
...         self.counter = 0
...
...     def transition(self, key, start, finish, *args, **kwargs):
...         if start == 'processing' and finish == 'memory':
...             self.counter += 1
...
...     def restart(self, scheduler):
...         self.counter = 0
>>> plugin = Counter()
>>> scheduler.add_plugin(plugin)  
add_client(scheduler: Scheduler, client: str) None[source]

新客户端连接时运行

add_worker(scheduler: Scheduler, worker: str) None | Awaitable[None][source]

新工作进程加入集群时运行

如果此方法是同步的,它会立即同步执行,Scheduler.add_worker 不会向事件循环让步。如果它是异步的,它会在所有同步的 SchedulerPlugin.add_worker 钩子执行后被等待。

警告

不能保证各个 SchedulerPlugin.add_worker 钩子之间的执行顺序,并且该顺序可能会在没有弃用周期的情况下发生改变。

async before_close() None[source]

在任何调度器关闭逻辑之前运行

async close() None[source]

调度器关闭时运行

这在调度器关闭过程开始时运行,但在已要求工作进程优雅关闭之后。

log_event(topic: str, msg: Any) None[source]

日志事件发生时运行

remove_client(scheduler: Scheduler, client: str) None[source]

客户端断开连接时运行

remove_worker(scheduler: Scheduler, worker: str, *, stimulus_id: str, **kwargs: Any) None | Awaitable[None][source]

工作进程离开集群时运行

如果此方法是同步的,它会立即同步执行,Scheduler.remove_worker 不会向事件循环让步。如果它是异步的,它会在所有同步的 SchedulerPlugin.remove_worker 钩子执行后被等待。

警告

不能保证各个 SchedulerPlugin.remove_worker 钩子之间的执行顺序,并且该顺序可能会在没有弃用周期的情况下发生改变。

restart(scheduler: Scheduler) None[source]

调度器自身重启时运行

async start(scheduler: Scheduler) None[source]

调度器启动时运行

这在调度器启动过程结束时运行

transition(key: Key, start: SchedulerTaskStateState, finish: SchedulerTaskStateState, *args: Any, stimulus_id: str, **kwargs: Any) None[source]

任务状态改变时运行

有关转换机制和可用状态的说明,请参阅 调度器任务状态

警告

这是一个高级功能,转换机制和任务状态的细节可能会在没有弃用周期的情况下发生改变。

参数
key
start

转换的起始状态。可以是 released, waiting, processing, memory, error 中的一个。

finish

转换的最终状态。

stimulus_id

引起转换的刺激 ID。

*args, **kwargs

转换时传递的更多选项,可能包括工作进程 ID、计算时间等。

update_graph(scheduler: Scheduler, *, client: str, keys: set[Key], tasks: list[Key], annotations: dict[str, dict[Key, Any]], priority: dict[Key, tuple[int | float, ...]], stimulus_id: str, **kwargs: Any) None[source]

新图/任务进入调度器时运行

参数
scheduler

Scheduler 实例。

client

客户端的唯一 ID。

keys

客户端调用 update_graph 时关注的键。

tasks

任务

annotations

应用于任务的完全解析的注解,格式为

{
    "annotation": {
        "key": "value,
        ...
    },
    ...
}
priority

分配给任务的任务计算优先级。

stimulus_id

引起图更新的刺激 ID

**kwargs

建议允许插件接受更多参数以确保未来的兼容性。

valid_workers_downscaling(scheduler: Scheduler, workers: list[scheduler_module.WorkerState]) list[scheduler_module.WorkerState][source]

确定可以从集群中移除哪些工作进程

当调度器即将通过移除工作进程来缩减集群时,会调用此方法。该方法应返回可以从集群中移除的工作进程状态集。

参数
workerslist

候选移除的工作进程状态列表。

stimulus_idstr

引起缩减的刺激 ID。

返回
list

可以从集群中移除的工作进程状态列表。

RabbitMQ 示例

RabbitMQ 是一个分布式消息队列,我们可以用它来发布关于任务转换的更新。通过将转换发布到 RabbitMQ,我们可以让其他机器处理转换,并将调度器的处理工作量降至最低。有关 RabbitMQ 以及如何消费消息的更多信息,请参阅 RabbitMQ 教程

import json
from distributed.diagnostics.plugin import SchedulerPlugin
import pika

class RabbitMQPlugin(SchedulerPlugin):
    def __init__(self):
        # Update host to be your RabbitMQ host
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost'))
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue='dask_task_status', durable=True)

    def transition(self, key, start, finish, *args, **kwargs):
        message = dict(
            key=key,
            start=start,
            finish=finish,
        )
        self.channel.basic_publish(
            exchange='',
            routing_key='dask_task_status',
            body=json.dumps(message),
            properties=pika.BasicProperties(
                delivery_mode=2,  # make message persistent
            ))

@click.command()
def dask_setup(scheduler):
    plugin = RabbitMQPlugin()
    scheduler.add_plugin(plugin)

运行方式:dask scheduler --preload <filename.py>

访问完整任务状态

如果您想访问存储在调度器中的完整 distributed.scheduler.TaskState,可以通过传递并存储对调度器的引用来实现,如下所示:

from distributed.diagnostics.plugin import SchedulerPlugin

class MyPlugin(SchedulerPlugin):
    def __init__(self, scheduler):
         self.scheduler = scheduler

    def transition(self, key, start, finish, *args, **kwargs):
         # Get full TaskState
         ts = self.scheduler.tasks[key]

@click.command()
def dask_setup(scheduler):
    plugin = MyPlugin(scheduler)
    scheduler.add_plugin(plugin)

内建调度器插件

class distributed.diagnostics.plugin.PipInstall(packages: list[str], pip_options: list[str] | None = None, restart_workers: bool = False)[source]

用于 pip 安装一组包的插件

这接受一组要在所有工作进程上安装的包以及安装时使用的选项。您还可以选择是否在安装完成后重新启动工作进程。

注意

这会增加每个工作进程启动所需的时间。如果可能,我们建议将库包含在工作进程环境或镜像中。这主要用于实验和调试。

参数
packages

要使用 pip 安装的包列表。包应遵循 requirements file format 中定义的结构。包还可以包含 environment variables

pip_options

传递给 pip 的附加选项

restart_workers

安装包后是否重新启动工作进程;仅在工作进程附加了 nanny 进程时有效。

示例

>>> from dask.distributed import PipInstall
>>> plugin = PipInstall(packages=["scikit-learn"], pip_options=["--upgrade"])
>>> client.register_plugin(plugin)

使用 TOKEN 环境变量从私有仓库安装包。

>>> from dask.distributed import PipInstall
>>> plugin = PipInstall(packages=["private_package@git+https://${TOKEN}@github.com/dask/private_package.git])
>>> client.register_plugin(plugin)
class distributed.diagnostics.plugin.CondaInstall(packages: list[str], conda_options: list[str] | None = None, restart_workers: bool = False)[source]

用于 conda 安装一组包的插件

这接受一组要在调度器和所有工作进程上安装的包以及安装时使用的选项。

您还可以选择是否在安装完成后重新启动工作进程。

注意

这会增加集群启动所需的时间。如果可能,我们建议将库包含在集群环境或镜像中。这主要用于实验和调试。

参数
packages

要使用 conda 安装的包列表(可选包含版本)

conda_options

传递给 conda 的附加选项

restart_workers

安装包后是否重新启动工作进程,仅在工作进程附加了 nanny 进程时有效

另请参阅

InstallPlugin
PipInstall

示例

>>> from dask.distributed import CondaInstall
>>> plugin = CondaInstall(packages=["scikit-learn"], conda_options=["--update-deps"])
>>> client.register_plugin(plugin)
class distributed.diagnostics.plugin.InstallPlugin(install_fn: Callable[[], None], restart_workers: bool)[source]

用于在集群上安装软件的调度器插件

这接受一个在调度器和所有工作进程上安装软件的函数。您还可以选择是否在安装完成后重新启动工作进程。

注意

这会增加每个工作进程启动所需的时间。如果可能,我们建议将软件包含在工作进程环境或镜像中。这主要用于实验和调试。

参数
install_fn

用于安装软件的可调用对象;必须是幂等的。

restart_workers

安装包后是否重新启动工作进程,仅在工作进程附加了 nanny 进程时有效

另请参阅

CondaInstall
PipInstall
class distributed.diagnostics.plugin.SchedulerUploadFile(filepath: str, load: bool = True)[source]

工作进程插件

distributed.diagnostics.plugin.WorkerPlugin 提供了一个创建自己的工作进程插件的基础类。此外,Dask 提供了一些 内建插件

观看下面的视频,了解一个使用 WorkerPlugin 添加 concurrent.futures.ProcessPoolExecutor 的示例。

class distributed.diagnostics.plugin.WorkerPlugin[source]

扩展工作进程的接口

工作进程插件能够在工作进程生命周期的不同阶段运行自定义代码。

插件能够在工作进程生命周期的每个步骤中运行自定义代码。无论何时发生此类事件,都会调用此类的相应方法。请注意,用户代码始终在工作进程的主线程中运行。

实现一个插件

  1. 继承此类

  2. 重写其部分方法

  3. 使用 Client.register_plugin 注册插件。

idempotent 属性用于控制当存在同名工作进程插件时,是否应忽略当前插件的注册。如果为 True,则忽略该插件;否则,替换现有插件。默认为 False

示例

>>> class ErrorLogger(WorkerPlugin):
...     def __init__(self, logger):
...         self.logger = logger
...
...     def setup(self, worker):
...         self.worker = worker
...
...     def transition(self, key, start, finish, *args, **kwargs):
...         if finish == 'error':
...             ts = self.worker.tasks[key]
...             exc_info = (type(ts.exception), ts.exception, ts.traceback)
...             self.logger.error(
...                 "Error during computation of '%s'.", key,
...                 exc_info=exc_info
...             )
>>> import logging
>>> plugin = ErrorLogger(logging)
>>> client.register_plugin(plugin)  
setup(worker: Worker) None | Awaitable[None][source]

插件附加到工作进程时运行。这发生在插件注册并附加到现有工作进程时,或者在插件注册后创建工作进程时。

teardown(worker: Worker) None | Awaitable[None][source]

工作进程关闭或插件移除时运行。

transition(key: Key, start: WorkerTaskStateState, finish: WorkerTaskStateState, **kwargs: Any) None[source]

在任务的整个生命周期中(参阅 工作进程状态),工作进程根据调度器的指示计算特定任务,从而导致每个任务状态的转换。然后,拥有该任务的工作进程会收到状态转换通知。

每当任务改变状态时,都会调用此方法。

警告

这是一个高级功能,转换机制和任务状态的细节可能会在没有弃用周期的情况下发生改变。

参数
key
start

转换的起始状态。可以是 waiting, ready, executing, long-running, memory, error 中的一个。

finish

转换的最终状态。

kwargs

转换时传递的更多选项

内建工作进程插件

class distributed.diagnostics.plugin.UploadFile(filepath: str, load: bool = True)[source]

将本地文件上传到工作进程的 WorkerPlugin。

参数
filepath: str

要上传的文件 (.py, egg, or zip) 的路径

示例

>>> from distributed.diagnostics.plugin import UploadFile
>>> client.register_plugin(UploadFile("/path/to/file.py"))  

Nanny 插件

class distributed.diagnostics.plugin.NannyPlugin[source]

扩展 Nanny 的接口

工作进程插件能够在工作进程生命周期的不同阶段运行自定义代码。Nanny 插件也做同样的事情,但它的优势在于能够在工作进程启动之前运行代码,或者在必要时重新启动工作进程。

实现一个插件

  1. 继承此类

  2. 重写其部分方法

  3. 使用 Client.register_plugin 注册插件。

idempotent 属性用于控制当存在同名 nanny 插件时,是否应忽略当前插件的注册。如果为 True,则忽略该插件;否则,替换现有插件。默认为 False

restart 属性用于控制注册插件时是否需要重新启动正在运行的 Worker

setup(nanny)[source]

插件附加到 nanny 时运行。这发生在插件注册并附加到现有 nannies 时,或者在插件注册后创建 nanny 时。

teardown(nanny)[source]

插件附加到的 nanny 关闭时运行

内建 Nanny 插件

class distributed.diagnostics.plugin.Environ(environ: dict | None = None)[source]
class distributed.diagnostics.plugin.UploadDirectory(path, restart_workers=False, update_path=False, skip_words=('.git', '.github', '.pytest_cache', 'tests', 'docs'), skip=(<function UploadDirectory.<lambda>>, ), mode='workers'))[source]

将本地目录上传到集群的 Nanny 插件。

参数
path

要上传的目录路径

scheduler

是否将目录上传到调度器

示例

>>> from distributed.diagnostics.plugin import UploadDirectory
>>> client.register_plugin(UploadDirectory("/path/to/directory"))