Worker 状态机

任务状态

当调度器请求 Worker 计算一个任务时,Worker 通过一个 distributed.worker_state_machine.TaskState 对象来跟踪它——不要与调度器端的匹配类 distributed.scheduler.TaskState 混淆。

该类有一个关键属性 TaskState.state,它可以取以下值:

released

已知但未主动计算或不在内存中。当调度器要求遗忘某个任务,但该任务在同一个 worker 上有依赖它的任务时,该任务会保持在此状态。

waiting

调度器已将任务添加到 worker 队列中。它的所有依赖项都在集群中的某个地方的内存中,但并非所有依赖项都在当前 worker 的内存中,因此需要获取它们。

fetch

该任务在一个或多个对等 worker 的内存中,但不在当前 worker 的内存中。其数据已排队等待通过网络传输,原因可能是它是处于 waiting 状态的任务的依赖项,或者 活跃内存管理器 请求在此复制。该任务可以在 WorkerState.data_needed 堆中找到。

missing

类似于 fetch,但调度器列出的所有对等 worker 要么无法访问,要么已回应它们实际上没有任务数据。worker 将定期询问调度器是否知道额外的副本;如果知道,任务将再次转换到 fetch。该任务可以在 WorkerState.missing_dep_flight 集合中找到。

flight

任务数据当前正通过网络从另一个 worker 传输过来。该任务可以在 WorkerState.in_flight_tasksWorkerState.in_flight_workers 集合中找到。

ready

任务已准备好计算;它的所有依赖项都在当前 worker 的内存中,正在等待可用的线程。该任务可以在 WorkerState.ready 堆中找到。

constrained

类似于 ready,但用户为此任务指定了资源限制。该任务可以在 WorkerState.constrained 队列中找到。

executing

任务当前正在一个线程上计算。可以在 WorkerState.executing 集合和 distributed.worker.Worker.active_threads 字典中找到它。

long-running

类似于 executing,但用户代码调用了 distributed.secede(),因此该任务不再计入最大并发任务数。可以在 WorkerState.long_running 集合和 distributed.worker.Worker.active_threads 字典中找到它。

rescheduled

任务刚刚抛出 Reschedule 异常。这是一个瞬态,不会被永久存储。

cancelled

调度器要求遗忘此任务,但目前技术上不可能。参见 任务取消。可以在其 previous 状态所在的任何集合中找到该任务。

resumed

任务已从 cancelled 状态恢复。参见 任务取消。可以在其 previous 状态所在的任何集合中找到该任务。

memory

任务执行完成,或任务已成功从另一个 worker 传输过来,现在保存在 WorkerState.dataWorkerState.actors 中。

error

任务执行失败。或者,任务执行成功完成,或任务数据通过网络成功传输,但未能序列化或反序列化。完整的异常和回溯存储在任务本身中,以便在客户端上重新抛出。

forgotten

调度器要求此 worker 遗忘该任务,并且在同一个 worker 上既没有依赖它的任务,也没有它依赖的任务。任务一旦达到此状态,就会立即从 WorkerState 中取消引用,并很快被垃圾回收。这是唯一一种可能出现两个具有相同 keyTaskState 对象同时(瞬态地)存在于同一个解释器中的情况。

获取依赖项

Worker states for dependencies

当需要计算的任务到达 Worker 时,任何尚不在同一个 worker 内存中的依赖项都会被一个 TaskState 对象包装,并包含一个 worker 列表 (TaskState.who_has) 以从中收集结果。

这些 TaskState 对象的状体被设置为 fetch,被放入 data_needed 堆中,并通过网络逐步传输。对于每个依赖项,我们随机选择一个拥有该数据的 worker,并从该 worker 收集依赖项。为了提高带宽,我们会机会性地收集已知位于该 worker 上的其他任务的其他依赖项,最多达 50MB 数据(transfer_message_bytes_limit,该值从配置键 distributed.worker.transfer.message-bytes-limit 获取)——数据太少会影响带宽,数据太多会影响响应性。我们使用固定的 50 个连接数(transfer_incoming_count_limit,该值从配置键 distributed.worker.connections.outgoing 获取),以避免过度分散网络带宽。

如果两个 worker 之间的网络通信饱和,依赖任务可能会在 fetchflight 之间循环,直到成功收集。还可能发生对等 worker 回应不再有请求数据的副本的情况;最后,对等 worker 可能无法访问或无响应。发生这种情况时,该对等 worker 会从 who_has 中移除,任务转换回 fetch,以便 Worker 尝试从不同的对等 worker 获取相同的 key。如果在此过程中 who_has 变空,任务将转换到 missing 状态,Worker 开始定期询问调度器是否有额外的对等 worker 可用。

用于获取依赖项的同一系统也用于活跃内存管理器复制。

注意

对于任何给定的对等 worker,在任何给定时间最多只有一个 gather_dep() asyncio 任务运行。如果所有持有处于 fetch 状态的任务副本的 worker 都已经在 flight 状态,则该任务将保持在 fetch 状态,直到有 worker 再次可用。

计算任务

需要计算的 TaskState 在 Worker 上通过以下管道进行处理。它定义了其 run_spec,用于指示 worker 如何执行它。

Worker states for computing tasks

当任务的所有依赖项都在内存中后,任务从 waiting 转换到 readyconstrained,并被添加到 ready 堆中。

一旦有线程可用,我们就从堆顶弹出一个任务,并将任务放入本地线程池中的一个线程中执行。

可选地,在运行时,该任务可能会将自身标识为长时间运行的任务(参见从任务启动任务),此时它会从线程池中退出并更改状态为 long-runningexecutinglong-running 几乎是相同的状态,唯一的区别是后者不计入同时并行运行的最大任务数。

任务可以通过三种方式终止:

在所有情况下,结果都会被发送回调度器。

分散的数据

分散的数据 遵循更简单的路径,直接进入 memory 状态

Worker states for scattered data

遗忘任务

一旦任务处于 memoryerror 状态,Worker 将无限期地持有它,直到调度器明确要求 Worker 遗忘它。这发生在没有客户端再持有对 key 的引用,且没有等待任务(即尚未计算的依赖任务)时。此外,活跃内存管理器 可能会要求丢弃任务的额外副本。

rescheduled 的情况下,任务会立即转换到 released,然后转换到 forgotten,无需等待调度器。

Worker states for computing tasks

非正常流程

上述流程图有一些重要的例外情况:

  • 任务被窃取,在这种情况下,它会直接从 waitingreadyconstrained 转换到 released。请注意,对当前正在执行的任务的窃取请求会被拒绝。

  • 调度器介入,调度器将之前分配给其他 worker 的任务重新分配给新的 worker。这最常发生在计算过程中worker 死亡时。

  • 客户端介入,客户端显式释放 Future 或超出其范围;或者整个客户端可能关闭或无响应。当没有客户端再持有对 key 或其依赖项的引用时,调度器将释放它。

简而言之

重要

任务可以从任何状态转换到 released,而不仅仅是上面图示中的状态。

如果没有依赖它的任务,任务会立即转换到 forgotten 并超出作用域。但是,有一个重要的例外,任务取消

任务取消

Worker 可能会在 key 处于 flightexecutinglong-running 状态时收到释放该 key 的请求。由于取消 Python 线程的技术限制,以及当前从对等 worker 获取数据的方式,此类事件无法立即中止相关的 asyncio 任务(以及在 executing / long-running 的情况下运行用户代码的线程)。相反,处于这三种状态的任务会转换到另一个状态 cancelled,这意味着 asyncio 任务将继续执行直到完成(结果无关紧要),然后* Dask 任务才会被释放。

cancelled 状态有一个子状态 previous,它被设置为上述三种状态之一。常用表示法是 <state>(<previous>),例如 cancelled(flight)

当任务被取消时,会发生以下三种情况之一:

  • 在 asyncio 任务完成之前,什么都不会发生;例如,调度器没有改变主意,仍然希望 Worker 在最后彻底遗忘该任务。发生这种情况时,任务从 cancelled 转换到 released,并且通常是 forgotten

  • 调度器切换回其原始请求

    • 调度器要求 Worker 获取当前处于 cancelled(flight) 状态的任务;此时任务将立即恢复到 flight 状态,忘记取消曾发生过,并继续等待已在运行的数据获取;

    • 调度器要求 Worker 计算当前处于 cancelled(executing)cancelled(long-running) 状态的任务。Worker 将完全忽略新的 run_spec(如果已更改),切换回 previous 状态,并等待已经在执行的线程完成。

  • 调度器切换到相反的请求,从获取到计算或反之亦然。

为了处理最后一个用例,还有一个特殊的状体 resumed。任务只能从 cancelled 状态进入 resumed 状态。resumed 保留了来自 cancelled 状态的 previous 属性,并添加了另一个属性 next,该属性总是:

  • fetch,如果 previousexecutinglong-running

  • waiting,如果 previousflight

总结一下,以下是处理已取消任务的所有可能的状态和子状态组合:

状态

previous

next

cancelled

flight

None

cancelled

executing

None

cancelled

long-running

None

resumed

flight

waiting

resumed

executing

fetch

resumed

long-running

fetch

如果一个 resumed 任务成功完成,它将转换到 memory(与 cancelled 任务不同,后者会忽略输出),并且调度器将收到一个伪造的终止消息,如果任务是 resumed(executing->fetch)resumed(long-running->fetch),则这是 flight 状态的预期结束消息;如果任务是 resumed(flight->waiting),则这是 execute 状态的预期结束消息。

如果任务失败或抛出 Reschedule,Worker 将静默忽略异常并切换到其预定路径,因此 resumed(executing->fetch)resumed(long-running->fetch) 将转换到 fetch 状态,而 resumed(flight->waiting) 将转换到 waiting 状态。

最后,调度器可以在任务的整个生命周期内多次改变主意,因此一个 resumed(executing->fetch)resumed(long-running->fetch) 任务可能会被要求再次转换到 waiting 状态,此时它将恢复到其 previous 状态并忘记整个事件;同样,一个 resumed(flight->waiting) 任务也可能被要求再次转换到 fetch 状态,因此它将改为转换到 flight 状态。

Worker states for cancel/resumeWorker states for cancel/resume

一个常见的实际用例:

  1. 集群中至少有两个 worker,A 和 B。

  2. 任务 x 在 worker A 上成功计算。

  3. 当任务 x 在 worker A 上转换到 memory 状态时,调度器要求 worker B 计算任务 y,任务 y 依赖于任务 x。

  4. B 开始从 A 获取 key x,这使得任务进入 flight 模式。

  5. Worker A 崩溃,并且由于某种原因,调度器在 worker B 之前注意到了。

  6. 调度器将释放任务 y(因为它正在等待在内存中不再存在的依赖项),并在集群中的其他地方重新调度任务 x。任务 x 在 worker A 上将转换到 cancelled(flight) 状态。

  7. 如果调度器随机选择 worker A 来计算任务 X,任务将转换到 resumed(flight->waiting) 状态。

  8. 当,且仅当,从 A 到 B 的 TCP 套接字崩溃(例如,由于超时)时,任务将转换到 waiting 状态,并最终在 A 上重新计算。

重要

对于任何给定的 key,您始终有最多一个 compute()gather_dep() asyncio 任务运行;您绝不会同时拥有两者。

调度器与 Worker 之间的任务状态映射

调度器和 worker 上的任务状态是不同的,它们的映射关系有些微妙

调度器状态

典型的 worker 状态

边缘情况 worker 状态

  • released

  • waiting

  • no-worker

  • (未知)

  • released

  • cancelled

  • processing

  • waiting

  • ready

  • constrained

  • executing

  • long-running

  • resumed(waiting)

  • memory

  • memory

  • fetch

  • flight

  • error

  • missing

  • resumed(fetch)

  • erred

  • error

除了上述状态之外,worker 可能完全不知道某个特定任务。相反的情况,即 worker 知道某个任务但在调度器上找不到它,仅在任务取消的情况下发生。

还需要考虑竞态条件,即 worker(或某些 worker)在调度器之前知道某些事情,或反之亦然。例如:

  • 任务在 worker 上总是先从 executing 转换到 memory,然后才能在调度器上从 processing 转换到 memory

  • 任务总是先在调度器上转换到 releasedforgotten,只有当消息到达 worker 后,它才会在那里被释放。

流程控制

Worker state machine control flow

worker 状态机涉及几个类:

TaskState 包含与单个任务相关的所有信息;它还包含对依赖任务和被依赖任务的引用。这只是一个数据容器,没有修改方法。请注意,这与 distributed.scheduler.TaskState 是不同的类。

WorkerState 封装了 worker 的整体状态。它在其 tasks 字典和几个其他辅助集合中持有对 TaskState 的引用。至关重要的是,此类对 asyncio、网络、磁盘 I/O、线程等没有任何了解或可见性。请注意,这与 distributed.scheduler.WorkerState 是不同的类。

WorkerState.handle_stimulus() 提供了一个用于改变状态的单一方法:handle_stimulus()。状态不得以任何其他方式更改。该方法接收一个 StateMachineEvent,也称为刺激,它是一个数据类,用于确定发生了可能导致 worker 状态改变的事件。刺激可以来自调度器(例如计算任务的请求)或 worker 本身(例如任务已完成计算)。

WorkerState.handle_stimulus() 改变内部状态(例如,它可以将任务从 executing 转换到 memory),并返回一个 Instruction 对象列表,这些是 worker 需要执行但与状态本身无关的外部操作:

  • 向调度器发送消息

  • 计算任务

  • 从对等 worker 收集任务

WorkerState.handle_stimulus()BaseWorker.handle_stimulus() 包装,后者消费 Instruction 对象。BaseWorker 处理 asyncio 任务的创建、跟踪和清理,但并未实际实现任务的执行或收集;相反,它暴露了抽象的 async 方法 execute()gather_dep(),这些方法由其子类 Worker 重写,Worker 实际运行任务并执行网络 I/O。当实现的方法完成时,它们必须返回一个 StateMachineEvent,该事件会被反馈回 BaseWorker.handle_stimulus()

注意

这可能会在 worker 内部产生一个(可能很长)的事件链;例如,如果 ready 队列中的任务多于线程数,那么一个任务的终止 StateMachineEvent 将触发执行下一个任务的 Instruction

总结如下

内部状态转换

在内部,WorkerState.handle_stimulus() 的工作方式与调度器端的相同过程非常相似:

  1. WorkerState.handle_stimulus() 调用 WorkerState._handle_<stimulus name>()

  2. 它返回一个元组,包含:

  3. WorkerState.handle_stimulus() 接着将建议传递给 WorkerState._transitions()

  4. 对于每个建议,WorkerState._transitions() 调用 WorkerState._transition()

  5. 后者又调用 WorkerState._transition_<开始状态>_<结束状态>()

  6. 后者又返回一个额外的元组 (recommendations, instructions)

  7. 新的建议被 WorkerState._transitions() 消费,直到不再返回建议。

  8. WorkerState.handle_stimulus() 最后返回指令列表,该列表由转换逐步扩展。

API 文档

class distributed.worker_state_machine.TaskState(key: Key, run_id: int = -1, run_spec: T_runspec | None = None, dependencies: set[TaskState] = <factory>, dependents: set[TaskState] = <factory>, waiting_for_data: set[TaskState] = <factory>, waiters: set[TaskState] = <factory>, state: TaskStateState = 'released', previous: Literal['executing', 'long-running', 'flight', None] = None, next: Literal['fetch', 'waiting', None] = None, priority: tuple[int, ...] | None = None, who_has: set[str] = <factory>, coming_from: str | None = None, resource_restrictions: dict[str, float] = <factory>, exception: Serialize | None = None, traceback: Serialize | None = None, exception_text: str = '', traceback_text: str = '', type: type | None = None, suspicious_count: int = 0, startstops: list[StartStop] = <factory>, start_time: float | None = None, stop_time: float | None = None, metadata: dict = <factory>, nbytes: int | None = None, annotations: dict | None = None, span_id: str | None = None, done: bool = False)[source]

持有与单个 Dask 任务相关的易失性状态。

不要与在调度器端持有类似信息的 distributed.scheduler.TaskState 混淆。

annotations: dict | None

任意任务注解

coming_from: str | None

如果任务处于 flight 状态,当前任务数据来自的 worker

dependencies: set[TaskState]

此 key 运行所需的数据

dependents: set[TaskState]

使用此依赖项的 key

done: bool

如果服务此任务的 execute()gather_dep() 协程已完成,则为 True;否则为 False。此标志会改变从 executingflight 等状态转换的行为。

exception: Serialize | None

任务出错时(序列化)运行任务导致的异常

exception_text: str

异常的字符串表示

key: Key

任务 key。必需。

metadata: dict

与任务相关的元数据。存储的元数据应该是 msgpack 可序列化的(例如 int, string, list, dict)。

nbytes: int | None

任务值的字节大小(如果在内存中)

next: Literal['fetch', 'waiting', None]

任务的下一个状态。仅当 state == resumed 时才非 None。

prefix: str

任务前缀(key 的最左部分)

previous: Literal['executing', 'long-running', 'flight', None]

任务的先前状态。仅当 state 在 (cancelled, resumed) 中时才非 None。

priority: tuple[int, ...] | None

调度器赋予此任务的优先级。决定运行顺序。

resource_restrictions: dict[str, float]

运行任务所需的抽象资源

run_id: int

任务运行 ID。

run_spec: T_runspec | None

一个元组,包含与此 TaskState 实例相关的 functionargskwargstask。它默认为 None,如果它是此 worker 将从另一个 worker 接收的依赖项,则可以保持为空。

span_id: str | None

唯一的 span id(参见 distributed.spans)。与 distributed.scheduler.TaskState.group.span_id 匹配。

start_time: float | None

任务开始运行的时间

startstops: list[StartStop]

任务的传输、加载和计算时间的日志

state: TaskStateState

任务的当前状态

stop_time: float | None

任务完成运行的时间

suspicious_count: int

依赖项不在预期位置的次数

traceback: Serialize | None

任务出错时(序列化)运行任务导致的回溯

traceback_text: str

回溯的字符串表示

type: type | None

特定数据块的类型

waiters: set[TaskState]

未在内存中的依赖项的子集

waiting_for_data: set[TaskState]

未在内存中的依赖项的子集

who_has: set[str]

我们认为拥有此数据的 worker 地址

class distributed.worker_state_machine.WorkerState(*, nthreads: int = 1, address: str | None = None, data: MutableMapping[Key, object] | None = None, threads: dict[Key, int] | None = None, plugins: dict[str, WorkerPlugin] | None = None, resources: Mapping[str, float] | None = None, transfer_incoming_count_limit: int = 9999, validate: bool = True, transition_counter_max: int | Literal[False] = False, transfer_incoming_bytes_limit: float = inf, transfer_message_bytes_limit: float = inf)[source]

状态机,封装了工作节点上所有任务的生命周期。

不要与 distributed.scheduler.WorkerState 混淆。

注意

此类的数据属性是实现细节,可能会在没有弃用周期的情况下进行更改。

警告

此类的属性之间高度相关。永远不要直接修改它们,因为这样做非常容易导致状态损坏,进而可能导致整个集群的死锁。

状态应完全通过 handle_stimulus() 进行修改。

actors: dict[Key, object]

Actor 任务。参见 Actor

address: str

工作节点的 <IP 地址>:<端口>。状态机使用此信息进行决策,例如确定对等工作节点是否在同一主机上运行。WorkerState 初始化时此属性可能未知。但必须在首次调用 handle_stimulus() 之前设置。

property all_running_tasks: set[distributed.worker_state_machine.TaskState]

当前正在占用线程的所有任务。它们可能计入也可能不计入最大线程数。

这些是

  • ts.status 在 (executing, long-running) 中

  • ts.status 在 (cancelled, resumed) 中且 ts.previous 在 (executing, long-running) 中

available_resources: dict[str, float]

{资源名称: 数量}。当前未被任务执行消耗的资源。总是小于或等于 total_resources。参见 工作节点资源

busy_workers: set[str]

最近返回忙碌状态的对等工作节点。此集合中的工作节点在一段时间内不会被请求额外的依赖项。

constrained: HeapSet[TaskState]

已准备好运行,但正在等待 GPU 等抽象资源的任务的优先级堆。与 ready 互斥。参见 available_resources工作节点资源

data: MutableMapping[Key, object]

内存中的任务数据。此集合在 WorkerWorkerMemoryManager 和此类之间按引用共享。

data_needed: defaultdict[str, HeapSet[TaskState]]

仍需要数据才能执行且至少在另一个工作节点内存中的任务,按每个工作节点优先级堆排序。所有且仅有 TaskState.state == 'fetch' 的任务在此集合中。在 who_has 中有多个条目的 TaskState 将在此处多次出现。

executed_count: int

此工作节点在其生命周期内运行过的任务数量;包括失败和已取消的任务。另请参阅 executing_count()

executing: set[TaskState]

当前正在运行的任务集合。

此集合仅包含 state == ‘executing’ 的任务,以及 state 在 (‘cancelled’, ‘resumed’) 中且 previous == ‘executing’ 的任务。

另请参阅 executing_count()long_running

property executing_count: int

当前在此工作节点上执行并计入最大线程数的任务计数。

它包括已取消的任务,但不包括长期运行(又称 seceded)的任务。

fetch_count: int

处于 fetch 状态的任务总数。如果一个任务在多个 data_needed 堆中,则只计数一次。

generation: int

调度器每次调用计算任务处理程序时递减的计数器。它被添加到 TaskState.priority 中,并作为调度器上具有相同优先级的任务之间的决胜局,决定它们之间的后进先出顺序。

handle_stimulus(*stims: distributed.worker_state_machine.StateMachineEvent) list[distributed.worker_state_machine.Instruction][source]

处理一个或多个外部事件,将相关任务转换为新状态,并返回作为结果要执行的指令列表。

has_what: defaultdict[str, set[Key]]

{工作节点地址: {ts.key, ...}}。我们关心并认为某个工作节点拥有的数据。

in_flight_tasks: set[TaskState]

在当前对等连接中正在传输给我们的任务。

此集合仅包含 state == ‘flight’ 的任务,以及 state 在 (‘cancelled’, ‘resumed’) 中且 previous == ‘flight’ 的任务。

另请参阅 in_flight_tasks_count()

property in_flight_tasks_count: int

当前正在从其他工作节点复制到此工作节点的任务数量。

in_flight_workers: dict[str, set[Key]]

{工作节点地址: {ts.key, ...}} 我们当前正在从中收集数据的工作节点以及我们期望从这些连接中获得的依赖项。在此 dict 中的工作节点在当前查询返回之前不会被请求额外的依赖项。

log: deque[tuple]

转换日志:[(..., stimulus_id: str | None, timestamp: float), ...] 记录的刺激数量有限制。另请参阅 story()stimulus_log

long_running: set[TaskState]

当前正在运行且已调用 secede() 的任务集合,因此它们不再计入最大并发任务数 (nthreads)。这些任务不会出现在 executing 集合中。

此集合仅包含 state == ‘long-running’ 的任务,以及 state 在 (‘cancelled’, ‘resumed’) 中且 previous == ‘long-running’ 的任务。

missing_dep_flight: set[TaskState]

所有且仅有 TaskState.state == 'missing' 的任务。

nbytes: int

内存中所有任务的总大小。

nthreads: int

可以并行执行的任务数量。在任何给定时间,executing_count() <= nthreads。

plugins: dict[str, WorkerPlugin]

{名称: 工作节点插件}。此集合在 Worker 和此类之间按引用共享。Worker 管理插件的添加和移除,而 WorkerState 在 WorkerPlugin.transition 方法可用时调用它。

ready: HeapSet[TaskState]

已准备好运行且没有资源限制的任务的优先级堆。与 constrained 互斥。

rng: random.Random

静态种子随机状态,用于在需要伪随机选择时保证确定性。

running: bool

如果状态机应在有可用槽位时开始执行更多任务并获取依赖项,则为 True。此属性必须与 Worker 保持一致:WorkerState.running == (Worker.status is Status.running)

stimulus_log: deque[StateMachineEvent]

handle_stimulus() 接收到的所有刺激的日志。记录的事件数量有限制。另请参阅 logstimulus_story()

stimulus_story(*keys_or_tasks: Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...], distributed.worker_state_machine.TaskState]) list[distributed.worker_state_machine.StateMachineEvent][source]

返回涉及一个或多个任务的所有状态机事件。

story(*keys_or_tasks_or_stimuli: Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...], distributed.worker_state_machine.TaskState]) list[tuple][source]

返回转换日志中涉及一个或多个任务或 stimulus_id 的所有记录。

task_counter: TaskCounter

当前任务数量以及每个状态中的累计耗时,两者都按 prefix 分解。

tasks: dict[Key, TaskState]

{key: TaskState}。当前在此工作节点上执行的任务(以及这些任务的任何依赖项)。

threads: dict[Key, int]

{ts.key: 线程 ID}。此集合在 Worker 和此类之间按引用共享。虽然 WorkerState 不关心线程,但在某些情况下仍然需要访问此信息。此集合由 distributed.worker.Worker.execute() 填充。 WorkerState 工作并不需要填充此集合。

total_resources: dict[str, float]

{资源名称: 数量}。可用于任务执行的总资源。参见 :doc: resources

transfer_incoming_bytes: int

当前从其他工作节点开放数据传输的总大小。

transfer_incoming_bytes_limit: float

传入数据传输的字节限制;用于节流。

transfer_incoming_bytes_throttle_threshold: int

只要 transfer_incoming_bytes 小于此值,就忽略 transfer_incoming_count_limit

property transfer_incoming_count: int

当前从其他工作节点开放数据传输的数量。

transfer_incoming_count_limit: int

从其他工作节点最大并发传入数据传输数量。另请参阅 distributed.worker.Worker.transfer_outgoing_count_limit

transfer_incoming_count_total: int

自工作节点启动以来,从其他工作节点进行的数据传输总数。

transfer_message_bytes_limit: float

在单次调用 BaseWorker.gather_dep() 中从同一工作节点收集的字节数。只要其总大小不超过此值,就可以从同一工作节点收集的多个小任务将打包在单个指令中。如果第一个要收集的任务超过此限制,它仍将被收集以确保进度。因此,此限制不是绝对的。

transition_counter: int

迄今为止状态转换的总数。另请参阅 logtransition_counter_max

transition_counter_max: int | Literal[False] = False

如果 transition_counter 达到此值时引发错误。这仅用于调试,以捕获无限递归循环。在生产环境中,应始终设置为 False。

validate: bool

如果为 True,则启用昂贵的内部一致性检查。在生产环境中通常禁用。

waiting: set[TaskState]

当前正在等待数据的任务。

class distributed.worker_state_machine.BaseWorker(state: distributed.worker_state_machine.WorkerState)[source]

围绕 WorkerState 的包装器,用于实现指令处理。这是一个抽象类,包含多个 @abc.abstractmethod 方法,供 Worker 和单元测试模拟类继承。

abstract batched_send(msg: dict[str, Any]) None[source]

通过批量通信向调度器发送即发即弃消息。

参数
msg: dict

msgpack 可序列化的消息,发送给调度器。必须有一个在 Scheduler.stream_handlers 中注册的 ‘op’ 键。

async close(timeout: float = 30) None[source]

取消所有异步指令。

abstract digest_metric(name: collections.abc.Hashable, value: float) None[source]

记录任意数值指标。

abstract async execute(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent[source]

执行任务。

abstract async gather_dep(worker: str, to_gather: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], total_nbytes: int, *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent[source]

从拥有依赖项的工作节点收集任务的依赖项。

参数
workerstr

要从中收集依赖项的工作节点地址。

to_gatherlist

要从工作节点收集的依赖项的 Key - 这不一定等同于 dep 的完整依赖项列表,因为某些依赖项可能已在此工作节点上存在。

total_nbytesint

to_gather 中所有依赖项的总字节数。

handle_stimulus(*stims: distributed.worker_state_machine.StateMachineEvent) None[source]

将一个或多个外部刺激转发到 WorkerState.handle_stimulus() 并处理返回的指令,调用相关的 Worker 回调(下面的 @abc.abstractmethod 方法)。

为所有异步指令生成 asyncio 任务并开始跟踪它们。

abstract async retry_busy_worker_later(worker: str) distributed.worker_state_machine.StateMachineEvent[source]

等待一段时间,然后将对等工作节点从忙碌状态中移除。

class distributed.worker_state_machine.StateMachineEvent(*args: Any, **kwargs: Any)[source]

可修改工作节点状态的所有刺激的抽象基类。

static from_dict(d: dict) distributed.worker_state_machine.StateMachineEvent[source]

recursive_to_dict 的输出转换回原始对象。输出对象对于重建状态机有意义,但不一定与原始对象完全相同。

stimulus_id: str

事件的唯一 ID。

to_loggable(*, handled: float) distributed.worker_state_machine.StateMachineEvent[source]

生成一个自身的变体版本,该版本足够小,可以在中等时间内存储在内存中,并包含有意义的调试信息。

class distributed.worker_state_machine.Instruction(stimulus_id: str)[源代码]

工作器状态机响应事件后发送给工作器的命令

classmethod match(**kwargs: Any) distributed.worker_state_machine._InstructionMatch[源代码]

生成一个局部匹配,用于与 Instruction 实例进行比较。典型用法是将 WorkerState.handle_stimulus() 返回的指令列表或 WorkerState.stimulus_log 中的指令列表与期望的匹配列表进行比较。

示例

instructions = ws.handle_stimulus(...)
assert instructions == [
    TaskFinishedMsg.match(key="x"),
    ...
]

注意

StateMachineEventInstruction 是抽象类,有许多子类,为简洁起见在此不一一列出。完整列表请参阅实现模块 distributed.worker_state_machine