调度器状态机

概览

使用 Dask 进行计算的生命周期可分为以下几个阶段:

  1. 用户使用某些库(例如 dask.delayed 或 dask.dataframe 或客户端上的 submit/map 函数)编写计算图。他们将这些任务提交给调度器。

  2. 调度器将这些任务整合到其跟踪所有任务的计算图中,并且随着它们的依赖项变为可用,调度器会依次要求 Worker 运行每个任务。

  3. Worker 接收关于如何运行任务的信息,与其对等 Worker 进行通信以收集数据依赖项,然后在适当的数据上运行相关函数。它向调度器报告已完成,并将结果存储在其计算所在的 Worker 中。

  4. 调度器向用户报告任务已完成。如果用户需要,则通过调度器从 Worker 获取数据。

最相关的逻辑在于跟踪任务的状态变化,从新提交、等待依赖项、在某个 Worker 上积极运行、在内存中完成,到垃圾回收。跟踪这个过程,以及跟踪此任务对可能依赖于它的其他任务产生的所有影响,是动态任务调度器大部分复杂性所在。本节介绍用于执行此跟踪的系统。

有关调度器所用策略的更多抽象信息,请参见 调度策略

调度器维护关于几种实体类型的内部状态:

  • 调度器已知晓的单个任务

  • 连接到调度器的 Worker

  • 连接到调度器的客户端

注意

本页列出的所有内容都是 Dask 内部操作的详细信息。它可能会在不同版本之间更改,因此您应尽量避免在用户代码中依赖它(包括此处解释的任何 API)。

任务状态

在内部,调度器会在一组固定状态之间移动任务,特别是 releasedwaitingno-workerqueuedprocessingmemoryerred

任务沿以下状态流转,允许的转换如下:

Dask scheduler task states

请注意,任务也可能从任何状态转换为 released (未在图中显示)。

released

已知但未主动计算或不在内存中

waiting

计划计算,等待依赖项进入内存

no-worker

准备计算,但没有合适的 Worker 可用(例如由于资源限制,或根本没有 Worker 连接)。

queued

准备计算,但所有 Worker 都已满负荷。

processing

所有依赖项都可用,任务已分配给 Worker 进行计算(调度器不知道它是在 Worker 队列中还是正在积极计算)。

memory

在一个或多个 Worker 的内存中

erred

任务计算或其某个依赖项发生错误

forgotten

任务不再被任何客户端或依赖任务需要,因此它也从调度器中消失。一旦任务达到此状态,它将立即从调度器中解除引用。

注意

distributed.scheduler.worker_saturation 配置值设置为 1.1(默认值)或任何其他有限值将使多余的根任务在调度器中以 queued 状态入队。这些任务只有在 Worker 有能力处理时才会被分配给 Worker,从而减少 Worker 上的任务队列长度。

distributed.scheduler.worker_saturation 配置值设置为 inf 时,waiting / no-workerprocessing 之间没有中间状态:一旦任务的所有依赖项在集群上的某个地方进入内存,它将立即分配给一个 Worker。这可能导致 Worker 上的任务队列非常长,然后通过 工作窃取 进行动态重新平衡。

除了字面状态之外,还需要保留和更新关于每个任务的其他信息。单个任务状态存储在一个名为 TaskState 的对象中;通过链接查看完整的 API。

调度器使用多个容器跟踪所有 TaskState 对象(不在“已遗忘”状态的对象):

tasks: {str: TaskState}

一个将任务键映射到 TaskState 对象的字典。任务键是调度器与客户端或调度器与 Worker 之间通信任务信息的方式;然后使用此字典查找相应的 TaskState 对象。

unrunnable: {TaskState}

一个处于“无 Worker”状态的 TaskState 对象集合。这些任务的所有 dependencies 都已满足(它们的 waiting_on 集合为空),并且正在等待合适的 Worker 加入网络后才能计算。

一旦任务在 Worker 上排队,它也会在 Worker 端由 Worker 状态机 进行跟踪。

Worker 状态

每个 Worker 的当前状态存储在一个 WorkerState 对象中;通过链接查看完整的 API。

这是调度器端对象,它包含调度器已知晓的关于集群上每个 Worker 的信息,请勿与 distributed.worker-state-machine.WorkerState 混淆。

此信息用于决定 在哪个 Worker 上运行任务

除了单个 Worker 状态外,调度器还维护两个容器以帮助调度任务:

Scheduler.saturated: {WorkerState}

一组 Worker,它们的计算能力(由 WorkerState.nthreads 衡量)已通过处理任务得到充分利用,并且其当前 occupancy 远高于平均水平。

Scheduler.idle: {WorkerState}

一组计算能力未得到充分利用的 Worker。这些 Worker 被认为能够立即开始计算新任务。

这两组是不相交的。此外,一些 Worker 可能既不“空闲”也不“饱和”。在 决定合适的 Worker 来运行新任务 时,将优先选择“空闲”的 Worker。相反,“饱和”的 Worker 可能会通过 工作窃取 来减轻其工作负载。

客户端状态

关于调度器每个独立客户端的信息存储在一个 ClientState 对象中;通过链接查看完整的 API。

理解任务流程

如上所示,有许多关于任务和 Worker 状态的信息片段,其中一些可以在任务转换期间计算、更新或删除。

下表显示了根据任务状态,任务处于哪些状态变量中。带勾选标记 () 的单元格表示任务键 必须 存在于给定状态变量中;带问号 (?) 的单元格表示任务键 可能 存在于给定状态变量中。

状态变量

已释放

等待中

无 Worker

正在处理

内存中

错误

TaskState.dependencies

TaskState.dependents

TaskState.host_restrictions

?

?

?

?

?

?

TaskState.worker_restrictions

?

?

?

?

?

?

TaskState.resource_restrictions

?

?

?

?

?

?

TaskState.loose_restrictions

?

?

?

?

?

?

TaskState.waiting_on

TaskState.waiters

TaskState.processing_on

WorkerState.processing

TaskState.who_has

WorkerState.has_what

TaskState.nbytes (1)

?

?

?

?

?

TaskState.exception (2)

?

TaskState.traceback (2)

?

TaskState.exception_blame

TaskState.retries

?

?

?

?

?

?

TaskState.suspicious_tasks

?

?

?

?

?

?

注释

  1. TaskState.nbytes: 只要任务已经计算过,即使后来被释放,也可以知道此属性。

  2. TaskState.exceptionTaskState.traceback 应在 TaskState.exception_blame 任务上查找。

下表显示了在每次任务状态转换时更新了哪些 Worker 状态变量。

转换

受影响的 Worker 状态

已释放 → 等待中

occupancy, idle, saturated

等待中 → 正在处理

occupancy, idle, saturated, used_resources

等待中 → 内存中

idle, saturated, nbytes

正在处理 → 内存中

occupancy, idle, saturated, used_resources, nbytes

正在处理 → 错误

occupancy, idle, saturated, used_resources

正在处理 → 已释放

occupancy, idle, saturated, used_resources

内存中 → 已释放

nbytes

内存中 → 已被遗忘

nbytes

注意

理解此表的另一种方式是观察到,进入或退出特定任务状态会更新一组明确定义的 Worker 状态变量。例如,进入和退出“内存中”状态会更新 WorkerState.nbytes

实现

状态之间的每次转换都是调度器中的一个独立方法。这些任务转换函数以 transition 作为前缀,然后是开始任务状态和结束任务状态的名称,如下所示:

def transition_released_waiting(self, key, stimulus_id): ...

def transition_processing_memory(self, key, stimulus_id): ...

def transition_processing_erred(self, key, stimulus_id): ...

这些函数各有三个作用:

  1. 它们对调度器状态(20 个字典/列表/集合)执行必要的转换,以在状态之间移动一个键。

  2. 它们返回一个包含推荐的 {key: state} 转换的字典,以便紧随其后对其他键执行。例如,我们将一个键转换到内存中后,可能会发现许多等待中的键现在已准备好从等待中转换为就绪状态。

  3. (可选)它们包含一组验证检查,可以在测试时开启。

与其直接调用这些函数,不如调用中心函数 transition

def transition(self, key, final_state, stimulus_id): ...

此转换函数找到从当前状态到最终状态的适当路径。它还作为日志记录和诊断的中心点。

通常我们希望一次执行多个转换,或者希望持续响应由初始转换推荐的新转换,直到达到稳定状态。为此,我们使用 transitions 函数(注意复数 s)。

def transitions(self, recommendations, stimulus_id):
    recommendations = recommendations.copy()
    while recommendations:
        key, finish = recommendations.popitem()
        new = self.transition(key, finish)
        recommendations.update(new)

此函数运行 transition,接受推荐并也运行它们,重复直到不再推荐进一步的任务转换。

触发因素

转换由触发因素引起,触发因素是来自 Worker 或客户端的状态变更消息。调度器响应以下触发因素:

Worker

task-finished

任务已在 Worker 上完成并已进入内存

task-erred

任务在 Worker 上运行并出错

reschedule

任务在 Worker 上通过抛出 Reschedule 完成

long-running

任务仍在 Worker 上运行,但它调用了 secede()

add-keys

复制完成。一个或多个之前在其他 Worker 内存中的任务,现在已在另一个附加 Worker 的内存中。也用于通知调度器成功的 scatter() 操作。

request-refresh-who-has

所有持有任务副本的对等节点(Worker)在 Worker 已知的范围内都不可用(临时或永久),因此 Worker 无法获取该副本,并正在询问调度器是否知道任何额外的副本。此调用会定期重复,直到出现新的副本。

release-worker-data

Worker 通知调度器它不再在内存中持有该任务

worker-status-change

Worker 的全局状态刚刚发生变化,例如在 runningpaused 之间。

log-event

Worker 上发生了一个通用事件,应集中记录。请注意,这与 Worker 的日志是分开的,客户端可以按需获取该日志(最多达到一定长度)。

keep-alive

Worker 通知它仍在线且响应正常。这使用批处理流通道,与使用专用 RPC 通信的 distributed.worker.Worker.heartbeat()Scheduler.heartbeat_worker() 不同,这是防止防火墙关闭批处理流所必需的。

register-worker

网络中添加了新的 Worker

unregister

现有 Worker 离开了网络

客户端

update-graph

客户端向调度器发送更多任务

client-releases-keys

客户端不再需要某些键的结果。

请注意,还有更多客户端 API 端点(例如用于提供 scatter() 等),此处为简洁起见未列出。

触发因素函数以文本 stimulus 为前缀,并从消息中接受各种关键字参数,示例如下:

def stimulus_task_finished(self, key=None, worker=None, nbytes=None,
                           type=None, compute_start=None, compute_stop=None,
                           transfer_start=None, transfer_stop=None):

def stimulus_task_erred(self, key=None, worker=None,
                        exception=None, traceback=None)

这些函数改变一些非必要的管理状态,然后调用转换函数。

请注意,我们还从 Worker 和客户端接收到其他一些非状态变更消息,例如请求调度器当前状态信息的请求。这些不被视为触发因素。

API

class distributed.scheduler.Scheduler(loop: tornado.ioloop.IOLoop | None = None, services: dict | None = None, service_kwargs: dict | None = None, allowed_failures: int | None = None, extensions: dict | None = None, validate: bool | None = None, scheduler_file: str | None = None, security: dict | distributed.security.Security | None = None, worker_ttl: float | None = None, idle_timeout: float | None = None, interface: str | None = None, host: str | None = None, port: int = 0, protocol: str | None = None, dashboard_address: str | None = None, dashboard: bool | None = None, http_prefix: str | None = '/', preload: str | collections.abc.Sequence[str] | None = None, preload_argv: str | collections.abc.Sequence[str] | collections.abc.Sequence[collections.abc.Sequence[str]] = (), plugins: collections.abc.Sequence[distributed.diagnostics.plugin.SchedulerPlugin] = (), contact_address: str | None = None, transition_counter_max: bool | int = False, jupyter: bool = False, **kwargs: Any)[source]

动态分布式任务调度器

调度器跟踪 Worker、数据和计算的当前状态。调度器监听事件并通过适当地控制 Worker 来响应。它不断尝试使用 Worker 执行不断增长的 Dask 计算图。

所有事件都能快速处理,与输入(通常是恒定大小)呈线性时间关系,通常在几毫秒内完成。为了实现这一点,调度器跟踪了大量状态。每个操作都维护此状态的一致性。

调度器通过 Comm 对象与外界通信。即使同时监听多个客户端,它也维护着一个一致且有效的世界视图。

调度器通常通过 dask scheduler 可执行文件启动:

$ dask scheduler
Scheduler started at 127.0.0.1:8786

或在 LocalCluster 中,客户端在没有连接信息的情况下启动:

>>> c = Client()  
>>> c.cluster.scheduler  
Scheduler(...)

用户通常不直接与调度器交互,而是通过客户端对象 Client 进行交互。

contact_address 参数允许向 Worker 公布一个特定的地址用于与调度器通信,该地址与调度器绑定的地址不同。这在调度器监听私有地址(Worker 因此无法联系到它)时很有用。

状态

调度器包含以下状态变量。每个变量及其存储的内容和简要说明如下所示:

  • tasks: {task key: TaskState}

    调度器当前已知晓的任务

  • unrunnable: {TaskState}

    处于“无 Worker”状态的任务

  • workers: {worker key: WorkerState}

    当前连接到调度器的 Worker

  • idle: {WorkerState}

    未充分利用的 Worker 集合

  • saturated: {WorkerState}

    充分利用的 Worker 集合

  • host_info: {hostname: dict}

    关于每个 Worker 主机的信息

  • clients: {client key: ClientState}

    当前连接到调度器的客户端

  • services: {str: port}

    在此调度器上运行的其他服务,例如 Bokeh

  • loop: IOLoop

    正在运行的 Tornado IOLoop

  • client_comms: {client key: Comm}

    对于每个客户端,用于接收任务请求和报告任务状态更新的 Comm 对象。

  • stream_comms: {worker key: Comm}

    对于每个 Worker,既接受触发因素又报告结果的 Comm 对象。

  • task_duration: {key-prefix: time}

    我们预期某些函数所需的时间,例如 {'sum': 0.25}

adaptive_target(target_duration: float | None = None) int[source]

根据当前工作负载所需的 Worker 数量

这查看当前正在运行的任务和内存使用情况,并返回所需 Worker 的数量。这通常用于自适应调度。

参数
target_durationstr

计算所需的期望持续时间。这影响调度器请求缩放的速度。

async add_client(comm: distributed.comm.core.Comm, client: str, versions: dict[str, Any]) None[source]

将客户端添加到网络

我们监听来自此 Comm 的所有未来消息。

add_keys(worker: str, keys: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]] = (), stimulus_id: str | None = None) Literal['OK', 'not found'][source]

知晓 Worker 拥有某些键

实际上不应使用此函数,主要出于历史原因。然而,Worker 会不时发送此消息。

add_plugin(plugin: distributed.diagnostics.plugin.SchedulerPlugin, *, idempotent: bool = False, name: str | None = None, **kwargs: Any) None[source]

将外部插件添加到调度器。

参见 https://distributed.dask.org.cn/en/stable/plugins.html

参数
pluginSchedulerPlugin

要添加的 SchedulerPlugin 实例

idempotentbool

如果为 True,则假定插件已存在且不执行任何操作。

namestr

插件的名称,如果为 None,则检查 Plugin 实例上的 name 属性,如果未找到则生成。

add_replica(ts: distributed.scheduler.TaskState, ws: distributed.scheduler.WorkerState) None

注意 Worker 持有一个状态为 'memory' 的任务副本

async add_worker(comm: distributed.comm.core.Comm, *, address: str, status: str, server_id: str, nthreads: int, name: str, resolve_address: bool = True, now: float, resources: dict[str, float], host_info: None = None, memory_limit: int | None, metrics: dict[str, Any], pid: int = 0, services: dict[str, int], local_directory: str, versions: dict[str, Any], nanny: str, extra: dict, stimulus_id: str) None[source]

向集群添加新 Worker

property address: str

可以联系到此 Server 的地址。如果 Server 尚未启动,则会引发 ValueError。

property address_safe: str

可以联系到此 Server 的地址。如果 Server 尚未启动,则返回 "not-running"

async benchmark_hardware() dict[str, dict[str, float]][source]

在 Worker 上运行内存、磁盘和网络带宽基准测试

返回值
result: dict

一个字典,将名称“disk”、“memory”和“network”映射到将大小映射到带宽的字典。这些带宽是 Worker 在集群上运行计算时多次测量的平均值。

async broadcast(*, msg: dict, workers: collections.abc.Collection[str] | None = None, hosts: collections.abc.Collection[str] | None = None, nanny: bool = False, serializers: Any = None, on_error: Literal['raise', 'return', 'return_pickle', 'ignore'] = 'raise') dict[str, Any][source]

向 Worker 广播消息,返回所有结果

bulk_schedule_unrunnable_after_adding_worker(ws: distributed.scheduler.WorkerState) dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']]

将此 Worker 可以处理的 no-worker 任务发送到 processing

返回按优先级排序的建议。

check_idle_saturated(ws: distributed.scheduler.WorkerState, occ: float = - 1.0) None

更新空闲和饱和状态的状态

调度器跟踪以下 Worker:

  • 饱和:有足够的工作以保持忙碌

  • 空闲:没有足够的工作以保持忙碌

如果它们既有足够的任务来占用其所有线程,并且这些任务的预期运行时足够长,则被视为饱和。

如果 distributed.scheduler.worker-saturation 不是 inf(启用调度器端队列),则如果 Worker 正在处理的任务少于 worker-saturation 阈值规定,则被视为空闲。

否则,如果它们正在处理的任务少于线程数,或者其任务的总预期运行时少于相同数量的平均任务的预期运行时的一半,则被视为空闲。

这对于负载均衡和自适应很有用。

client_heartbeat(client: str) None[source]

处理来自客户端的心跳。

client_releases_keys(keys: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], client: str, stimulus_id: str | None = None) None[source]

从客户端所需列表中移除键。

client_send(client: str, msg: dict) None[source]

向客户端发送消息。

async close(timeout: float | None = None, reason: str = 'unknown') None[source]

向所有协程发送清理信号,然后等待直到完成。

另请参见

Scheduler.cleanup
close_worker(worker: str) None[source]

要求工作节点自行关闭。不等待其生效。请注意,不能保证工作节点一定会接受该命令。

请注意,如果 close=True,remove_worker() 会在内部发送相同的命令。

coerce_address(addr: str | tuple, resolve: bool = True) str[source]

将可能的输入地址强制转换为规范形式。可以禁用 resolve 以使用伪造主机名进行测试。

处理字符串、元组或别名。

coerce_hostname(host: collections.abc.Hashable) str

强制转换工作节点的主机名。

decide_worker_non_rootish(ts: distributed.scheduler.TaskState) distributed.scheduler.WorkerState | None

为可运行的非根任务选择一个工作节点,同时考虑依赖关系和限制。

从持有 ts 依赖项的符合条件的工作节点中,选择一个工作节点,考虑到工作节点的积压和数据传输成本,该任务预计会尽快开始运行。

返回值
ws: WorkerState | None

要分配任务到的工作节点。如果没有工作节点满足 ts 的限制,或者没有正在运行的工作节点,则返回 None,在这种情况下,任务应转换为 no-worker 状态。

decide_worker_rootish_queuing_disabled(ts: distributed.scheduler.TaskState) distributed.scheduler.WorkerState | None

为可运行的根任务选择一个工作节点,不进行排队。

这会尝试在同一个工作节点上调度同级任务,从而减少未来的数据传输。它不考虑依赖项的位置,因为它们最终都会在每个工作节点上。

它假定它是按优先级顺序在一批任务上被调用的,并在 SchedulerState.last_root_workerSchedulerState.last_root_worker_tasks_left 中维护状态以实现此目的。

这将把每个可运行任务发送到一个工作节点,这通常会导致根任务的过度生产。

返回值
ws: WorkerState | None

要分配任务到的工作节点。如果集群中没有工作节点,则返回 None,在这种情况下,任务应转换为 no-worker 状态。

decide_worker_rootish_queuing_enabled() distributed.scheduler.WorkerState | None

为可运行的根任务选择一个工作节点,前提是并非所有工作节点都忙碌。

空闲 工作节点中选择最不忙的工作节点(空闲工作节点的运行任务少于线程数,由 distributed.scheduler.worker-saturation 设置)。它不考虑依赖项的位置,因为它们最终都会在每个工作节点上。

如果所有工作节点都已满载,则返回 None,表示任务应转换为 queued 状态。调度器将等待直到有线程可用时再将其发送到工作节点。这确保下游任务总是在新的根任务开始之前运行。

这不会尝试在同一个工作节点上调度同级任务;事实上,它通常会做相反的事情。尽管这会增加后续的数据传输,但通过消除根任务的过度生产,它通常会减少整体内存使用。

返回值
ws: WorkerState | None

要分配任务到的工作节点。如果没有空闲工作节点,则返回 None,在这种情况下,任务应转换为 queued 状态。

async delete_worker_data(worker_address: str, keys: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], stimulus_id: str) None[source]

删除工作节点上的数据并更新相应的工作节点/任务状态。

参数
worker_address: str

要删除键的工作节点地址。

keys: list[Key]

要在指定工作节点上删除的键列表。

async dump_cluster_state_to_url(url: str, exclude: collections.abc.Collection[str], format: Literal['msgpack', 'yaml'], **storage_options: dict[str, Any]) None[source]

将集群状态转储写入与 fsspec 兼容的 URL。

async feed(comm: distributed.comm.core.Comm, function: bytes | None = None, setup: bytes | None = None, teardown: bytes | None = None, interval: str | float = '1s', **kwargs: Any) None[source]

向外部请求者提供数据通讯(Comm)。

注意:这会在调度器上运行任意 Python 代码。这最终应该被淘汰。它主要用于诊断。

async finished()

等待直到服务器完成。

async gather(keys: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], serializers: list[str] | None = None) dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], object][source]

从工作节点收集数据到调度器。

async gather_on_worker(worker_address: str, who_has: dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], list[str]]) set[source]

将键从多个工作节点点对点复制到单个工作节点。

参数
worker_address: str

接收键的工作节点地址。

who_has: dict[Key, list[str]]

{key: [sender address, sender address, …], key: …}

返回值
返回值

复制失败的键集合。

async get_cluster_state(exclude: collections.abc.Collection[str]) dict[source]

生成集群状态转储中使用的状态字典。

get_comm_cost(ts: distributed.scheduler.TaskState, ws: distributed.scheduler.WorkerState) float

获取在给定工作节点上计算任务的估计通信成本(以秒为单位)。

get_connection_counters() dict[str, int]

包含各种连接计数器的字典。

另请参见

Server.incoming_comms_open
Server.incoming_comms_active
Server.outgoing_comms_open
Server.outgoing_comms_active
get_logs(start=0, n=None, timestamps=False)

获取此节点的日志条目。

参数
start浮点数, 可选

开始过滤日志条目的时间(以秒为单位)。

n整数, 可选

从过滤结果中返回的最大日志条目数。

timestamps布尔值, 默认为 False

日志条目是否应包含生成时间?

返回值
包含每个过滤条目的日志级别、消息和(可选)时间戳的元组列表,最新条目在前。
async get_story(keys_or_stimuli: collections.abc.Iterable[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]]) list[distributed.scheduler.Transition][source]

SchedulerState.story() 的 RPC 钩子。

请注意,msgpack 序列化/反序列化往返会将 Transition 命名元组转换为普通元组。

get_worker_service_addr(worker: str, service_name: str, protocol: bool = False) tuple[str, int] | str | None[source]

获取工作节点上命名服务的 (主机, 端口) 地址。如果服务不存在,则返回 None。

参数
worker地址
service_name字符串

常见服务包括 ‘bokeh’ 和 ‘nanny’。

protocol布尔值

是否包含带有协议的完整地址 (True) 或仅包含 (主机, 端口) 对。

handle_comm(comm: distributed.comm.core.Comm) distributed.utils.NoOpAwaitable

启动一个后台任务,将新的通讯分派给协程处理程序。

handle_long_running(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], worker: str, run_id: int, compute_duration: float | None, stimulus_id: str) None[source]

一个任务已脱离线程池。

我们阻止该任务将来被窃取,并更改任务持续时间计算,就像任务已停止一样。

handle_request_refresh_who_has(keys: collections.abc.Iterable[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], worker: str, stimulus_id: str) None[source]

来自工作节点的请求,要求刷新某些键的 who_has 信息。不要与 scheduler.who_has 混淆,后者是来自客户端的专用通讯 RPC 请求。

async handle_worker(comm: distributed.comm.core.Comm, worker: str) None[source]

监听来自单个工作节点的响应。

这是调度器与工作节点交互的主循环。

另请参见

Scheduler.handle_client

客户端的等效协程。

property host

此服务器运行所在的主机。

如果服务器正在监听非基于 IP 的协议,这将引发 ValueError。

identity(n_workers: int = - 1) dict[str, Any][source]

关于自身和集群的基本信息。

property incoming_comms_active: int

当前正在处理远程 RPC 的连接数。

property incoming_comms_open: int

监听远程 RPC 的总传入连接数。

property is_idle: bool

当且仅当没有尚未完成计算的任务时返回 True。

与测试 self.total_occupancy 不同,如果存在长时间运行、无工作节点或排队任务(由于没有任何工作节点),此属性返回 False。

不要与 idle 混淆。

is_rootish(ts: distributed.scheduler.TaskState) bool

ts 是否是根任务或类似根的任务。

根任务是比集群大得多的组的一部分,并且很少或没有依赖项。任务也可以被明确标记为根任务以覆盖此启发式规则。

property listen_address

此服务器正在监听的地址。这可能是通配符地址,例如 tcp://0.0.0.0:1234

log_event(topic: str | collections.abc.Collection[str], msg: Any) None[source]

在给定主题下记录事件。

参数
topic字符串, 字符串列表

记录事件的主题名称。要在多个主题下记录同一事件,请传递主题名称列表。

msg

要记录的事件消息。请注意,这必须是 msgpack 可序列化的。

另请参见

Client.log_event
new_task(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], spec: dask._task_spec.GraphNode | None = None, state: Literal['released', 'waiting','no-worker','queued','processing','memory','erred','forgotten'], computation: distributed.scheduler.Computation | None = None) distributed.scheduler.TaskState

创建一个新任务及相关状态。

property outgoing_comms_active: int

当前用于执行 RPC 的传出连接数。

property outgoing_comms_open: int

当前已打开并等待远程 RPC 的连接数。

property port

此服务器正在监听的端口号。

如果服务器正在监听非基于 IP 的协议,这将引发 ValueError。

async proxy(msg: dict, worker: str, serializers: Any = None) Any[source]

通过调度器代理通讯到其他工作节点。

async rebalance(keys: collections.abc.Iterable[Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]] | None = None, workers: collections.abc.Iterable[str] | None = None, stimulus_id: str | None = None) dict[source]

重新平衡键,以便每个工作节点最终具有大致相同的进程内存(托管内存+非托管内存)。

警告

此操作通常未在调度器正常运行的情况下得到充分测试。不建议在等待计算时使用此操作。

算法

  1. 计算集群的平均占用率,定义为 dask 管理的数据 + 已存在至少 30 秒的非托管进程内存 (distributed.worker.memory.recent-to-old-time)。这使我们可以忽略由任务堆使用引起的临时峰值。

    另外,您可以通过 distributed.worker.memory.rebalance.measure 更改单个工作节点以及计算平均值时测量内存的方式。这对于忽略不准确的操作系统内存测量特别有用。

  2. 丢弃占用率在平均集群占用率的 5% 以内的工作节点 (distributed.worker.memory.rebalance.sender-recipient-gap / 2)。这有助于避免数据在集群中反复弹跳。

  3. 高于平均值的工作节点是发送方;低于平均值的是接收方。

  4. 丢弃绝对占用率低于 30% 的发送方 (distributed.worker.memory.rebalance.sender-min)。换句话说,只要所有工作节点都低于 30%,无论是否失衡,都不会移动数据。

  5. 丢弃绝对占用率高于 60% 的接收方 (distributed.worker.memory.rebalance.recipient-max)。请注意,默认情况下此阈值与 distributed.worker.memory.target 相同,以防止工作节点接收数据后立即将其溢出到磁盘。

  6. 迭代地选择离平均值最远的发送方和接收方,并在两者之间移动最近最少插入的键,直到所有发送方或所有接收方的占用率都落在平均值的 5% 以内。

    如果接收方已经拥有数据的副本,则会跳过该接收方。换句话说,此方法不会降低复制等级。如果没有具有足够内存且尚未持有副本的接收方可用,则会跳过某个键。

最近最少插入 (LRI) 策略是一种贪心选择,其优点是 O(1) 复杂度,易于实现(它依赖于 Python 字典的插入排序),并且在大多数情况下都足够好。被丢弃的替代策略包括:

  • 最大优先。O(n*log(n)),但需要非平凡的额外数据结构,并且有导致最大数据块像弹球一样在集群中反复移动的风险。

  • 最近最少使用 (LRU)。此信息目前仅在工作节点上可用,在调度器上复制并不简单;通过网络传输此信息会非常昂贵。此外,请注意 dask 会不遗余力地最大限度地减少中间键在内存中停留的时间,因此在这种情况下,LRI 是 LRU 的一个近似值。

参数
keys: 可选

应考虑移动的 dask 键白名单。所有其他键将被忽略。请注意,这不保证某个键实际会被移动(例如,因为它不必要或没有可行的接收工作节点)。

workers: 可选

应考虑作为发送方或接收方的工作节点地址白名单。所有其他工作节点将被忽略。平均集群占用率将仅使用白名单中的工作节点计算。

async register_nanny_plugin(comm: None, plugin: bytes, name: str, idempotent: bool | None = None) dict[str, distributed.core.OKMessage][source]

在所有正在运行和未来的 nanny 上注册 nanny 插件。

async register_scheduler_plugin(plugin: bytes | distributed.diagnostics.plugin.SchedulerPlugin, name: str | None = None, idempotent: bool | None = None) None[source]

在调度器上注册插件。

async register_worker_plugin(comm: None, plugin: bytes, name: str, idempotent: bool | None = None) dict[str, distributed.core.OKMessage][source]

在所有正在运行和未来的工作节点上注册工作节点插件。

remove_all_replicas(ts: distributed.scheduler.TaskState) None

从所有工作节点移除任务的所有副本。

remove_client(client: str, stimulus_id: str | None = None) None[source]

从网络中移除客户端。

remove_plugin(name: str | None = None) None[source]

从调度器中移除外部插件。

参数
namestr

要移除的插件名称。

remove_replica(ts: distributed.scheduler.TaskState, ws: distributed.scheduler.WorkerState) None

请注意,工作节点不再持有任务的副本。

remove_worker(address: str, *, stimulus_id: str, expected: bool = False, close: bool = True) Literal['OK', 'already-removed'][source]

从集群中移除工作节点。

当工作节点报告计划离开或看起来无响应时,我们会执行此操作。这可能会将其任务返回到 released 状态。

async replicate(keys: list[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], n: int | None = None, workers: collections.abc.Iterable | None = None, branching_factor: int = 2, delete: bool = True, stimulus_id: str | None = None) dict | None[source]

在整个集群中复制数据。

这会在整个网络中对每个数据块单独执行数据的树状复制。

参数
keys: 可迭代对象

要复制的键列表。

n: 整数

期望在集群中看到的复制数量。

branching_factor: 整数, 可选

每一代中可以复制数据的工作节点数量。分支因子越大,我们在单个步骤中复制的数据越多,但给定工作节点被数据请求淹没的风险也越大。

另请参见

Scheduler.rebalance
report(msg: dict, ts: distributed.scheduler.TaskState | None = None, client: str | None = None) None[源代码]

向所有监听队列 (Queues) 和通信通道 (Comms) 发布更新

如果消息包含一个 key,则我们只将消息发送给那些关心该 key 的通信通道。

request_acquire_replicas(addr: str, keys: collections.abc.Iterable[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], *, stimulus_id: str) None[源代码]

异步请求工作节点从其他工作节点获取列出的键的副本。这是一个即发即弃操作,不提供成功或失败的反馈,旨在用于内部维护而非计算。

request_remove_replicas(addr: str, keys: list[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], *, stimulus_id: str) None[源代码]

异步请求工作节点丢弃其持有的列出键的副本。绝不能使用此操作销毁键的最后一个副本。这是一个即发即弃操作,旨在用于内部维护而非计算。

副本会立即从调度器端的 TaskState.who_has 中消失;如果工作节点拒绝删除(例如,因为该任务是其上运行的另一个任务的依赖项),它将(也是异步地)通知调度器将其重新添加到 who_has 中。如果工作节点同意丢弃任务,则没有反馈。

async restart(*, client: str | None = None, timeout: float = 30, wait_for_workers: bool = True, stimulus_id: str) None[源代码]

忘记所有任务并对所有工作节点调用 restart_workers。

参数
timeout

参见 restart_workers

wait_for_workers

参见 restart_workers

async restart_workers(workers: list[str] | None = None, *, client: str | None = None, timeout: float = 30, wait_for_workers: bool = True, on_error: Literal['raise', 'return'] = 'raise', stimulus_id: str) dict[str, Literal['OK', 'removed', 'timed out']][源代码]

重启选定的工作节点。可选地等待工作节点返回。

没有 nanny 的工作节点会被关闭,希望外部部署系统会重启它们。因此,如果不使用 nanny 且您的部署系统不自动重启工作节点,restart 将只会关闭所有工作节点,然后超时!

restart 后,所有连接的工作节点都是新的,无论是否抛出了 TimeoutError。未能及时关闭的任何工作节点都会被移除,并且将来可能会或可能不会自行关闭。

参数
workers

要重启的工作节点地址列表。如果省略,则重启所有工作节点。

timeout

如果 wait_for_workers 为 True,则等待工作节点关闭并重新上线的时间,否则仅等待工作节点关闭的时间。如果超出此时间,则抛出 asyncio.TimeoutError

wait_for_workers

是否等待所有工作节点重新连接,或仅等待它们关闭(默认为 True)。结合使用 restart(wait_for_workers=False)Client.wait_for_workers() 以精细控制要等待的工作节点数量。

on_error

如果为 'raise'(默认),则在重启工作节点时任何 nanny 超时,都会抛出异常。如果为 'return',则返回错误消息。

返回值
{工作节点地址:“OK”、“no nanny”或“timed out”或错误消息}
async retire_workers(workers: list[str], *, close_workers: bool = 'False', remove: bool = 'True', stimulus_id: str | None = 'None') dict[str, distributed.utils.Any][源代码]
async retire_workers(*, names: list, close_workers: bool = 'False', remove: bool = 'True', stimulus_id: str | None = 'None') dict[str, distributed.utils.Any]
async retire_workers(*, close_workers: bool = 'False', remove: bool = 'True', stimulus_id: str | None = 'None', memory_ratio: int | float | None = 'None', n: int | None = 'None', key: Callable[[WorkerState], Hashable] | bytes | None = 'None', minimum: int | None = 'None', target: int | None = 'None', attribute: str = "'address'") dict[str, distributed.utils.Any]

优雅地将工作节点从集群中下线。仅在下线工作节点内存中的任何键都会被复制到其他地方。

参数
workers: list[str] (可选)

要下线的工作节点地址列表。

names: list (可选)

要下线的工作节点名称列表。与 workers 互斥。如果既未提供 workers 也未提供 names,我们将调用 workers_to_close,它会找到一组合适的工作节点。

close_workers: bool (默认为 False)

是否真正从此处显式关闭工作节点。否则,我们期望某个外部作业调度器来结束工作节点。

remove: bool (默认为 True)

是否立即移除工作节点元数据,否则等待工作节点联系我们。

如果 close_workers=False 且 remove=False,此方法仅将内存中的任务从工作节点中刷新出去,然后返回。如果 close_workers=True 且 remove=False,此方法将在工作节点仍在集群中时返回,尽管它们不再接受新任务。如果 close_workers=False 或者由于某种原因工作节点不接受关闭命令,它将永久无法接受新任务,并期望通过其他方式关闭它。

**kwargs: dict

要传递给 workers_to_close 的额外选项,用于确定应丢弃哪些工作节点。仅在省略 workersnames 时接受。

返回值
字典,将工作节点 ID/地址映射到关于
每个下线工作节点的信息字典。
如果存在仅在待下线工作节点内存中的键,并且
无法将它们复制到其他地方(例如,因为没有
其他运行中的工作节点),则持有这些键的工作节点将不会被下线,并且
也不会出现在返回的字典中。
run_function(comm: distributed.comm.core.Comm, function: collections.abc.Callable, args: tuple = (), kwargs: dict | None = None, wait: bool = True) Any[源代码]

在此进程内运行一个函数

async scatter(data: dict, workers: collections.abc.Iterable | None, client: str, broadcast: bool = False, timeout: float = 2) list[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]][源代码]

将数据发送到工作节点

另请参见

Scheduler.broadcast
send_all(client_msgs: dict[str, list[dict[str, Any]]], worker_msgs: dict[str, list[dict[str, Any]]]) None[源代码]

向客户端和工作节点发送消息

send_task_to_worker(worker: str, ts: distributed.scheduler.TaskState) None[源代码]

向工作节点发送单个计算任务

start_http_server(routes, dashboard_address, default_port=0, ssl_options=None)

这将在当前节点上创建一个 HTTP 服务器

start_periodic_callbacks()

一致地启动周期性回调

如果 self.periodic_callbacks 中存储的所有周期性回调尚未运行,则此方法会启动它们。它通过检查是否正在使用正确的事件循环来安全地执行此操作。

async start_unsafe() Self[源代码]

清除旧状态并重启所有正在运行的协程

stimulus_cancel(keys: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], client: str, force: bool, reason: str, msg: str) None[源代码]

停止对键列表的执行

stimulus_queue_slots_maybe_opened(*, stimulus_id: str) None[源代码]

响应一个可能释放了工作节点线程池上位置的事件

根据工作节点上可用的任务槽总数(可能为 0),从队列头部选择适当数量的任务,并将其状态转换为 processing

注释

与此刺激相关的其他状态转换应提前完全处理,以便任何变为可运行的任务已处于 processing 状态。否则,如果排队中的任务在下游任务之前被调度,可能会发生过度生产。

必须在 check_idle_saturated 之后调用;即 idle_task_count 必须是最新状态。

stimulus_task_erred(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], worker: str, exception: Any, stimulus_id: str, traceback: Any, run_id: str, **kwargs: Any) tuple[dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']], dict[str, list[dict[str, typing.Any]]], dict[str, list[dict[str, typing.Any]]]][源代码]

标记任务在特定工作节点上出错

stimulus_task_finished(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], worker: str, stimulus_id: str, run_id: int, **kwargs: Any) tuple[dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']], dict[str, list[dict[str, typing.Any]]], dict[str, list[dict[str, typing.Any]]]][源代码]

标记任务在特定工作节点上已完成执行

story(*keys_or_tasks_or_stimuli: Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...], distributed.scheduler.TaskState]) list[distributed.scheduler.Transition]

获取涉及输入键或 stimulus_id 之一的所有状态转换

transition(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], finish: Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten'], stimulus_id: str, **kwargs: Any) dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']][源代码]

将键的状态从当前状态转换为完成状态

返回值
用于未来状态转换的推荐字典

另请参见

Scheduler.transitions

此函数的传递版本

示例

>>> self.transition('x', 'waiting')
{'x': 'processing'}
transitions(recommendations: dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']], stimulus_id: str) None[源代码]

处理状态转换直到没有剩余

这包括来自先前状态转换的反馈,并持续到达到稳定状态

async unregister_nanny_plugin(comm: None, name: str) dict[str, distributed.core.ErrorMessage | distributed.core.OKMessage][源代码]

注销工作节点插件

async unregister_scheduler_plugin(name: str) None[源代码]

在调度器上注销插件。

async unregister_worker_plugin(comm: None, name: str) dict[str, distributed.core.ErrorMessage | distributed.core.OKMessage][源代码]

注销工作节点插件

update_data(*, who_has: dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], list[str]], nbytes: dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], int], client: str | None = None) None[源代码]

获悉新数据已从外部源进入网络

valid_workers(ts: distributed.scheduler.TaskState) set[distributed.scheduler.WorkerState] | None

返回当前对键有效的工作节点集合

如果所有工作节点都有效,则返回 None,在这种情况下,可以使用任何运行中的工作节点。否则,返回对此任务有效的运行中工作节点的子集。这会检查并跟踪以下状态:

  • worker_restrictions

  • host_restrictions

  • resource_restrictions

worker_objective(ts: distributed.scheduler.TaskState, ws: distributed.scheduler.WorkerState) tuple

用于确定哪个工作节点应获取任务的目标函数

最小化预期开始时间。如果出现平局,则根据数据存储来打破平局。

worker_send(worker: str, msg: dict[str, Any]) None[源代码]

向工作节点发送消息

这也通过添加回调来处理连接失败,以便在下一个周期移除工作节点。

workers_list(workers: collections.abc.Iterable[str] | None) list[str][源代码]

符合条件的工作节点列表

接受工作节点地址或主机名列表。返回所有匹配的工作节点地址列表

workers_to_close(memory_ratio: int | float | None = None, n: int | None = None, key: collections.abc.Callable[[distributed.scheduler.WorkerState], collections.abc.Hashable] | bytes | None = 'None', minimum: int | None = 'None', target: int | None = 'None', attribute: str = 'address') list[str][源代码]

查找可以以低成本关闭的工作节点

这返回一个适合下线的工作节点列表。这些工作节点没有运行任何任务,并且相对于其对等节点存储的数据相对较少。如果所有工作节点都处于空闲状态,我们仍然会保留足够多的工作节点,以拥有足够的 RAM 来存储我们的数据,并留有足够的缓冲。

这适用于诸如 distributed.deploy.adaptive 之类的系统。

参数
memory_ratio数字

我们想要为存储的数据保留的额外空间量。默认为 2,即我们想要拥有当前数据量两倍的内存。

n整数

要关闭的工作节点数量

minimum整数

要保留的最小工作节点数量

key可调用对象(WorkerState)

一个可选的可调用对象,将 WorkerState 对象映射到组归属。分组将一起关闭。这在必须集体关闭工作节点时很有用,例如按主机名。

target整数

关闭后目标工作节点数量

attribute字符串

要返回的 WorkerState 对象属性,例如“address”或“name”。默认为“address”。

返回值
to_close: 可以安全关闭的工作节点地址列表

示例

>>> scheduler.workers_to_close()
['tcp://192.168.0.1:1234', 'tcp://192.168.0.2:1234']

在关闭前按主机名分组工作节点

>>> scheduler.workers_to_close(key=lambda ws: ws.host)
['tcp://192.168.0.1:1234', 'tcp://192.168.0.1:4567']

移除两个工作节点

>>> scheduler.workers_to_close(n=2)

保留足够的工作节点,以拥有我们所需内存的两倍。

>>> scheduler.workers_to_close(memory_ratio=2)
class distributed.scheduler.TaskState(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], run_spec: dask._task_spec.GraphNode | None, state: Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten'], group: distributed.scheduler.TaskGroup, validate: bool)[source]

一个简单的对象,用于保存有关任务的信息。

不要与 distributed.worker_state_machine.TaskState 混淆,后者在工作节点端保存类似的信息。

actor: bool

此任务是否为 Actor

add_dependency(other: distributed.scheduler.TaskState) None[source]

将另一个任务添加为此任务的依赖

annotations: dict[str, Any] | None

任务注解

dependencies: set[distributed.scheduler.TaskState]

此任务正确执行所依赖的任务集合。此集合中仅列出了仍存活的任务。如果出于某种原因,此任务还依赖于已遗忘的任务,则会设置 has_lost_dependencies 标志。

一个任务只有在其所有依赖项都已成功执行并在至少一个工作节点上存储了结果后才能执行。这通过逐步清空 waiting_on 集合来跟踪。

dependents: set[distributed.scheduler.TaskState]

依赖于此任务的任务集合。此集合中仅列出了仍存活的任务。这是 dependencies 的反向映射。

erred_on: set[str] | None

出现错误的工作节点地址,导致此任务处于错误状态。

exception: distributed.protocol.serialize.Serialized | None

如果此任务执行失败,异常对象将存储在此处。

exception_blame: distributed.scheduler.TaskState | None

如果此任务或其某个依赖项执行失败,失败的任务将存储在此处(可能就是它自身)。

exception_text: str | None

异常的字符串表示

group: distributed.scheduler.TaskGroup

此任务所属的任务组

has_lost_dependencies: bool

此任务的任何依赖项是否已被遗忘。出于内存消耗的原因,即使遗忘的任务可能有依赖于它们的任务,也不会将其保留在内存中。因此,当一个任务被遗忘时,其每个依赖者都会将 has_lost_dependencies 属性设置为 True

如果 has_lost_dependencies 为 True,则此任务无法再进入“processing”状态。

host_restrictions: set[str] | None

此任务可以运行的主机名集合(如果为空则为 None)。通常情况下此集合为空,除非任务被明确限制只能在特定主机上运行。一个主机名可能对应一个或多个连接的工作节点。

key: Union[str, int, float, tuple[ForwardRef('Key'), ...]]

Key 是任务的唯一标识符,通常由函数名称加上函数和参数的哈希值构成,例如 'inc-ab31c010444977004d656610d2d421ec'

loose_restrictions: bool
False

每个 host_restrictionsworker_restrictionsresource_restrictions 都是硬约束:如果没有满足这些限制的工作节点可用,任务就不能进入“processing”状态,而是会进入“no-worker”状态。

True

上述限制仅为偏好:如果没有满足这些限制的工作节点可用,任务仍然可以进入“processing”状态并发送到其他已连接的工作节点执行。

metadata: dict[str, Any] | None

与任务相关的元数据

nbytes: int

已完成任务结果的字节数,由 sizeof 确定。此数字用于诊断并帮助确定工作优先级。对于未完成的任务,此值设置为 -1。

property prefix: distributed.scheduler.TaskPrefix

此任务所属的大类,如“inc”或“read_csv”

priority: tuple[float, ...] | None

优先级为每个任务提供一个相对排名,用于在考虑执行多个任务时打破僵局。

此排名通常是一个包含两个元素的元组。第一个(主要)元素对应于任务提交的时间。通常,较早的任务优先。第二个元素由客户端确定,是一种在大图中优先处理重要任务(例如,位于关键路径上或执行后能释放许多依赖的任务)的方式。这在调度策略中有更详细的解释。

processing_on: distributed.scheduler.WorkerState | None

如果此任务处于“processing”状态,则表示当前正在处理此任务的工作节点。此属性与 WorkerState.processing 保持同步。

resource_restrictions: dict[str, float] | None

此任务所需的资源,例如 {'gpu': 1}{'memory': 1e9}。这些是用户定义的名称,将与每个 WorkerState.resources 字典的内容进行匹配。

retries: int

此任务在失败时可以自动重试的次数。如果任务执行失败(工作节点返回错误),则会检查其 retries 属性。如果等于 0,则任务被标记为“erred”。如果大于 0,则 retries 属性会递减,并再次尝试执行。

run_id: int | None

任务特定执行的唯一标识符。此标识符用于标记任务,以便分配的工作节点应在任务完成消息中返回相同的标识符。这用于关联响应。只有最近分配的工作节点是可信的。所有其他结果都将被拒绝。

run_spec: dask._task_spec.GraphNode | None

如何运行任务的规范。此值的类型和含义对于调度器来说是不透明的,因为它仅由接收任务执行的工作节点解释。

作为一种特殊情况,此属性也可能为 None,在这种情况下,任务是“纯数据”(例如,使用 Client.scatter() 加载到调度器中的一段数据)。“纯数据”任务如果其值丢失,则无法再次计算。

property state: Literal['released', 'waiting', 'no-worker', 'queued','processing', 'memory', 'erred', 'forgotten']

此任务的当前状态。有效状态包括 released (已释放)、waiting (等待中)、no-worker (无工作节点)、processing (处理中)、memory (内存中)、erred (出错) 和 forgotten (已遗忘)。如果状态是 forgotten,任务将不再存储在 tasks 字典中,并且可能很快从内存中消失。

suspicious: int

此任务导致工作节点死亡的次数。

某些任务可能会导致工作节点死亡(例如调用 os._exit(0))。当工作节点死亡时,该工作节点上的所有任务都会重新分配给其他工作节点。这种行为组合可能导致一个不良任务灾难性地逐个销毁集群中的所有工作节点。每当工作节点死亡时,我们会将当前在该工作节点上处理的每个任务(由 WorkerState.processing 记录)标记为“可疑”(suspicious)。如果一个任务导致了三次工作节点死亡(或其他固定常数),则将其标记为 erred (出错)。

traceback: distributed.protocol.serialize.Serialized | None

如果此任务执行失败,堆栈跟踪对象将存储在此处。

traceback_text: str | None

堆栈跟踪的字符串表示

type: str

对象的类型,以字符串形式表示。仅存在于已计算的任务。

waiters: set[distributed.scheduler.TaskState] | None

需要此任务保持存活的任务集合。这始终是 dependents 的一个子集。每当其中一个依赖者完成处理时,就会从 waiters 集合中移除。

一旦 waiterswho_wants 都变为空,调度器和 who_has 中的任何工作节点都可以释放此任务(如果它有一个非空的 run_spec)或遗忘此任务(否则)。

注意

与直觉相反,waiting_onwaiters 不是彼此的反向映射。

waiting_on: set[distributed.scheduler.TaskState] | None

此任务在执行之前正在等待的任务集合。这始终是 dependencies 的一个子集。每当其中一个依赖项完成处理时,就会从 waiting_on 集合中移除。

一旦 waiting_on 变为空,此任务就可以从“waiting”状态转移到“processing”状态(除非其中一个依赖项出错,在这种情况下此任务会被标记为“erred”)。

who_has: set[distributed.scheduler.WorkerState] | None

内存中拥有此任务结果的工作节点集合。仅当任务处于“memory”状态时,此集合非空。例如,如果使用了 Client.scatter()Client.replicate(),则此集合中可以有多个工作节点。

这是 WorkerState.has_what 的反向映射。

who_wants: set[distributed.scheduler.ClientState] | None

希望此任务结果保持存活的客户端集合。这是 TaskState.who_wants 的反向映射。

当客户端向调度器提交图时,它也会指定期望保留哪些输出任务,以便它们的结果不会从内存中释放。

一旦任务完成执行(即进入“memory”或“erred”状态),就会通知 who_wants 中的客户端。

一旦 waiterswho_wants 都变为空,调度器和 who_has 中的任何工作节点都可以释放此任务(如果它有一个非空的 run_spec)或遗忘此任务(否则)。

worker_restrictions: set[str] | None

此任务可以运行的完整工作节点地址集合(如果为空则为 None)。通常情况下此集合为空,除非任务被明确限制只能在特定工作节点上运行。请注意,这跟踪的是工作节点地址,而不是工作节点状态,因为特定工作节点此时可能未连接。

class distributed.scheduler.WorkerState(*, address: str, status: distributed.core.Status, pid: int, name: object, nthreads: int = 0, memory_limit: int, local_directory: str, nanny: str | None = None, server_id: str, services: dict[str, int] | None = None, versions: dict[str, Any] | None = None, extra: dict[str, Any] | None = None, scheduler: distributed.scheduler.SchedulerState | None = None)[source]

一个简单的对象,用于保存有关工作节点的信息。

不要与 distributed.worker_state_machine.WorkerState 混淆。

actors: set[distributed.scheduler.TaskState]

此工作节点上所有是 actor 的 TaskStates 集合。这仅包括其状态实际驻留在此工作节点上的 actor,不包括此工作节点引用的 actor。

add_replica(ts: distributed.scheduler.TaskState) None[source]

工作节点获取了任务的一个副本

add_to_processing(ts: distributed.scheduler.TaskState) None[source]

将任务分配给此工作节点进行计算。

address: Final[str]

此工作节点的唯一 key。它可以是其连接地址(例如 "tcp://127.0.0.1:8891")或别名(例如 "alice")。

clean() distributed.scheduler.WorkerState[source]

返回此对象适合用于序列化的版本

executing: dict[distributed.scheduler.TaskState, float]

当前在此工作节点上运行的任务字典。每个任务状态都关联着任务已运行的秒数。

extra: dict[str, Any]

添加到 identity() 的任意附加元数据

property has_what: collections.abc.Set[distributed.scheduler.TaskState]

当前驻留在此工作节点上的任务的有序集合。此处的任务都处于“memory”状态。这是 TaskState.who_has 的反向映射。

这是一个只读的公共访问器。数据实现为一个没有值的字典,因为 rebalance() 依赖于字典是按插入顺序排序的。

host: Final[str]

标识此工作节点所在机器的主机名/IP地址。

last_seen: float

我们在本地调度器时间中最后一次收到此工作节点心跳的时间。

long_running: set[distributed.scheduler.TaskState]

调用了 distributed.secede() 的运行中任务

property memory: distributed.scheduler.MemoryState

工作节点的优化内存指标。

关于受管理内存的设计说明

受管理内存有两种可用的度量方式

  • self.nbytes

  • self.metrics["managed_bytes"]

在静止状态下,这两个数字必须相同。然而,一旦每个任务进入工作节点的内存,self.nbytes 会通过批处理通信立即更新;而 self.metrics["managed_bytes"] 则由心跳更新,这可能会滞后几秒钟。

下面我们将 self.nbytes 中可能更新的受管理内存信息与心跳中的进程内存和溢出内存混合使用。这是有意为之,以便更频繁地更新受管理内存总量。

受管理内存直接且立即影响乐观内存,而乐观内存又用于活动内存管理器启发式算法(在撰写本文时;未来可能会添加更多用途)。因此,保持其最新状态非常重要;比进程内存重要得多。

调度器一旦得知任务完成就拥有最新的受管理内存信息,这也极大地简化了单元测试。

这种设计的另一面是它可能在 unmanaged_recent 度量中引入一些噪声。例如:

  1. 删除 100MB 受管理数据

  2. 更新后的受管理内存比更新后的进程内存更快到达调度器

  3. 调度器会出现一个瞬间的误判,认为 unmanaged_recent 突然增加了 100MB,因为进程内存没有变化,但受管理内存减少了 100MB

  4. 当心跳到达时,进程内存下降,unmanaged_recent 也随之下降。

这是可以接受的——将 unmanaged_recent / unmanaged_old 分开的主要原因之一正是将所有噪声集中在 unmanaged_recent 中,并将其排除在用于启发式算法的乐观内存之外。

不太理想但也不那么常见的情况是,突然删除溢出键会导致受管理内存出现负向波动

  1. 删除 100MB 溢出数据

  2. 更新后的受管理内存总量比更新后的溢出部分更快到达调度器

  3. 这会导致受管理内存在短时间内骤降,并被 unmanaged_recent 取代,而溢出内存保持不变

  4. 当心跳到达时,受管理内存回升,unmanaged_recent 回降,溢出内存也像最初应该发生的那样减少 100MB。

GH#6002 将使我们能够解决此问题。

memory_limit: int

工作节点可用的内存,以字节为单位

nanny: str | None

关联的 Nanny 地址,如果存在

nbytes: int

此工作节点在内存中持有的任务(即此工作节点 has_what 中的任务)使用的总内存大小,以字节为单位。

needs_what: dict[distributed.scheduler.TaskState, int]

可能需要获取到此工作节点的键,以及需要它们的任务数量。所有这些任务目前都在其他工作节点的 memory 状态。与 processing 非常相似,这并不完全反映工作节点状态:此处的键可能排队等待获取、正在传输中,或已在工作节点内存中。

nthreads: int

此工作节点上可用的 CPU 线程数

processing: set[distributed.scheduler.TaskState]

此处的任务都处于“processing”状态。此属性与 TaskState.processing_on 保持同步。

remove_from_processing(ts: distributed.scheduler.TaskState) None[source]

从工作节点的处理中移除任务

remove_replica(ts: distributed.scheduler.TaskState) None[source]

工作节点内存中不再有该任务

resources: dict[str, float]

此工作节点上可用的资源,例如 {"GPU": 2}。这些是抽象的数量,限制某些任务不能在此工作节点上同时运行。

status: distributed.core.Status

只读的工作节点状态,从远程 Worker 对象单向同步

used_resources: dict[str, float]

分配给此工作节点的所有任务所使用的每种资源的总和。此字典中的数字只能小于或等于此工作节点 resources 中的数字。

versions: dict[str, Any]

工作节点上 distributed.versions.get_versions() 的输出

class distributed.scheduler.ClientState(client: str, *, versions: dict[str, Any] | None = None)[source]

一个简单的对象,用于保存有关客户端的信息。

client_key: str

此客户端的唯一标识符。这通常是由客户端自身生成的非透明字符串。

last_seen: float

我们在本地调度器时间中最后一次收到此客户端心跳的时间。

versions: dict[str, Any]

客户端上 distributed.versions.get_versions() 的输出

wants_what: set[distributed.scheduler.TaskState]

此客户端希望保留在内存中的任务集合,以便在需要时下载其结果。这是 TaskState.who_wants 的反向映射。当客户端空间中对应的对象(例如 Future 或 Dask collection)被垃圾回收时,任务通常会从此集合中移除。

distributed.scheduler.decide_worker(ts: distributed.scheduler.TaskState, all_workers: set[distributed.scheduler.WorkerState], valid_workers: set[distributed.scheduler.WorkerState] | None, objective: collections.abc.Callable[[distributed.scheduler.WorkerState], Any]) distributed.scheduler.WorkerState | None[source]

决定哪个工作节点应该执行任务 ts

我们选择持有 ts 依赖数据的那个工作节点。

如果多个工作节点持有依赖项,则我们选择负载较低的工作节点。

可选地提供允许执行任务的 valid_workers 集合(如果所有工作节点都允许执行任务,则传入 None)。

如果任务需要数据通信,因为没有符合条件的工作节点已经持有所有依赖项,那么我们选择最小化工作节点之间发送的字节数。这通过调用 objective 函数来确定。

class distributed.scheduler.MemoryState(*, process: int, unmanaged_old: int, managed: int, spilled: int)[source]

工作节点或整个集群的内存读数。

请参阅 工作节点内存管理

属性 / 特性

managed_total

工作节点在内存中持有的所有 dask 键的 sizeof() 输出总和,加上溢出到磁盘的字节数

managed

RAM 中持有的 dask 键的 sizeof() 输出总和。请注意,这可能不准确,从而导致非管理内存不准确(见下文)。

spilled

溢出到硬盘的 dask 键的字节数。请注意,这是磁盘上的大小;由于压缩和 sizeof() 的不准确性,内存中的大小可能不同。换句话说,对于相同的键,“managed”将根据键是驻留在内存中还是溢出到磁盘而变化。

process

OS 测量的此工作节点进程的总 RSS 内存。这总是精确等于 managed + unmanaged。

unmanaged

process - managed。这是以下各项的总和:

  • Python 解释器和模块

  • 全局变量

  • 当前正在运行的 dask 任务临时分配的内存

  • 内存碎片

  • 内存泄漏

  • 尚未被垃圾回收的内存

  • 尚未由 Python 内存管理器释放给 OS 的内存

unmanaged_old

在过去 distributed.memory.recent-to-old-time 秒内‘unmanaged’度量的最小值

unmanaged_recent

unmanaged - unmanaged_old;换句话说,最近分配但未被 dask 计入的进程内存;希望这大部分是临时峰值。

optimistic

managed + unmanaged_old;换句话说,乐观地假设所有 unmanaged_recent 内存都是临时峰值,进程长期持有的内存。