Worker 状态机
目录
Worker 状态机¶
任务状态¶
当调度器请求 Worker 计算一个任务时,Worker 通过一个 distributed.worker_state_machine.TaskState
对象来跟踪它——不要与调度器端的匹配类 distributed.scheduler.TaskState
混淆。
该类有一个关键属性 TaskState.state
,它可以取以下值:
- released
已知但未主动计算或不在内存中。当调度器要求遗忘某个任务,但该任务在同一个 worker 上有依赖它的任务时,该任务会保持在此状态。
- waiting
调度器已将任务添加到 worker 队列中。它的所有依赖项都在集群中的某个地方的内存中,但并非所有依赖项都在当前 worker 的内存中,因此需要获取它们。
- fetch
该任务在一个或多个对等 worker 的内存中,但不在当前 worker 的内存中。其数据已排队等待通过网络传输,原因可能是它是处于
waiting
状态的任务的依赖项,或者 活跃内存管理器 请求在此复制。该任务可以在WorkerState.data_needed
堆中找到。- missing
类似于
fetch
,但调度器列出的所有对等 worker 要么无法访问,要么已回应它们实际上没有任务数据。worker 将定期询问调度器是否知道额外的副本;如果知道,任务将再次转换到fetch
。该任务可以在WorkerState.missing_dep_flight
集合中找到。- flight
任务数据当前正通过网络从另一个 worker 传输过来。该任务可以在
WorkerState.in_flight_tasks
和WorkerState.in_flight_workers
集合中找到。- ready
任务已准备好计算;它的所有依赖项都在当前 worker 的内存中,正在等待可用的线程。该任务可以在
WorkerState.ready
堆中找到。- constrained
类似于
ready
,但用户为此任务指定了资源限制。该任务可以在WorkerState.constrained
队列中找到。- executing
任务当前正在一个线程上计算。可以在
WorkerState.executing
集合和distributed.worker.Worker.active_threads
字典中找到它。- long-running
类似于
executing
,但用户代码调用了distributed.secede()
,因此该任务不再计入最大并发任务数。可以在WorkerState.long_running
集合和distributed.worker.Worker.active_threads
字典中找到它。- rescheduled
任务刚刚抛出
Reschedule
异常。这是一个瞬态,不会被永久存储。- cancelled
- resumed
- memory
任务执行完成,或任务已成功从另一个 worker 传输过来,现在保存在
WorkerState.data
或WorkerState.actors
中。- error
任务执行失败。或者,任务执行成功完成,或任务数据通过网络成功传输,但未能序列化或反序列化。完整的异常和回溯存储在任务本身中,以便在客户端上重新抛出。
- forgotten
调度器要求此 worker 遗忘该任务,并且在同一个 worker 上既没有依赖它的任务,也没有它依赖的任务。任务一旦达到此状态,就会立即从
WorkerState
中取消引用,并很快被垃圾回收。这是唯一一种可能出现两个具有相同key
的TaskState
对象同时(瞬态地)存在于同一个解释器中的情况。
获取依赖项¶
当需要计算的任务到达 Worker 时,任何尚不在同一个 worker 内存中的依赖项都会被一个 TaskState
对象包装,并包含一个 worker 列表 (TaskState.who_has
) 以从中收集结果。
这些 TaskState
对象的状体被设置为 fetch
,被放入 data_needed
堆中,并通过网络逐步传输。对于每个依赖项,我们随机选择一个拥有该数据的 worker,并从该 worker 收集依赖项。为了提高带宽,我们会机会性地收集已知位于该 worker 上的其他任务的其他依赖项,最多达 50MB 数据(transfer_message_bytes_limit
,该值从配置键 distributed.worker.transfer.message-bytes-limit
获取)——数据太少会影响带宽,数据太多会影响响应性。我们使用固定的 50 个连接数(transfer_incoming_count_limit
,该值从配置键 distributed.worker.connections.outgoing
获取),以避免过度分散网络带宽。
如果两个 worker 之间的网络通信饱和,依赖任务可能会在 fetch
和 flight
之间循环,直到成功收集。还可能发生对等 worker 回应不再有请求数据的副本的情况;最后,对等 worker 可能无法访问或无响应。发生这种情况时,该对等 worker 会从 who_has
中移除,任务转换回 fetch
,以便 Worker 尝试从不同的对等 worker 获取相同的 key。如果在此过程中 who_has
变空,任务将转换到 missing
状态,Worker 开始定期询问调度器是否有额外的对等 worker 可用。
用于获取依赖项的同一系统也用于活跃内存管理器复制。
注意
对于任何给定的对等 worker,在任何给定时间最多只有一个 gather_dep()
asyncio 任务运行。如果所有持有处于 fetch
状态的任务副本的 worker 都已经在 flight 状态,则该任务将保持在 fetch
状态,直到有 worker 再次可用。
计算任务¶
需要计算的 TaskState
在 Worker 上通过以下管道进行处理。它定义了其 run_spec
,用于指示 worker 如何执行它。
当任务的所有依赖项都在内存中后,任务从 waiting
转换到 ready
或 constrained
,并被添加到 ready
堆中。
一旦有线程可用,我们就从堆顶弹出一个任务,并将任务放入本地线程池中的一个线程中执行。
可选地,在运行时,该任务可能会将自身标识为长时间运行的任务(参见从任务启动任务),此时它会从线程池中退出并更改状态为 long-running。executing
和 long-running
几乎是相同的状态,唯一的区别是后者不计入同时并行运行的最大任务数。
任务可以通过三种方式终止:
成功完成;其返回值存储在
data
或WorkerState.actors
中抛出异常;异常和回溯存储在
TaskState
对象上抛出
Reschedule
;它会立即被遗忘。
在所有情况下,结果都会被发送回调度器。
遗忘任务¶
一旦任务处于 memory
或 error
状态,Worker 将无限期地持有它,直到调度器明确要求 Worker 遗忘它。这发生在没有客户端再持有对 key 的引用,且没有等待任务(即尚未计算的依赖任务)时。此外,活跃内存管理器 可能会要求丢弃任务的额外副本。
在 rescheduled
的情况下,任务会立即转换到 released
,然后转换到 forgotten
,无需等待调度器。
非正常流程¶
上述流程图有一些重要的例外情况:
任务被窃取,在这种情况下,它会直接从
waiting
、ready
或constrained
转换到released
。请注意,对当前正在执行的任务的窃取请求会被拒绝。调度器介入,调度器将之前分配给其他 worker 的任务重新分配给新的 worker。这最常发生在计算过程中worker 死亡时。
客户端介入,客户端显式释放 Future 或超出其范围;或者整个客户端可能关闭或无响应。当没有客户端再持有对 key 或其依赖项的引用时,调度器将释放它。
简而言之
重要
任务可以从任何状态转换到 released
,而不仅仅是上面图示中的状态。
如果没有依赖它的任务,任务会立即转换到 forgotten
并超出作用域。但是,有一个重要的例外,任务取消。
任务取消¶
Worker 可能会在 key 处于 flight
、executing
或 long-running
状态时收到释放该 key 的请求。由于取消 Python 线程的技术限制,以及当前从对等 worker 获取数据的方式,此类事件无法立即中止相关的 asyncio 任务(以及在 executing
/ long-running
的情况下运行用户代码的线程)。相反,处于这三种状态的任务会转换到另一个状态 cancelled
,这意味着 asyncio 任务将继续执行直到完成(结果无关紧要),然后* Dask 任务才会被释放。
cancelled
状态有一个子状态 previous
,它被设置为上述三种状态之一。常用表示法是 <state>(<previous>)
,例如 cancelled(flight)
。
当任务被取消时,会发生以下三种情况之一:
在 asyncio 任务完成之前,什么都不会发生;例如,调度器没有改变主意,仍然希望 Worker 在最后彻底遗忘该任务。发生这种情况时,任务从
cancelled
转换到released
,并且通常是forgotten
。调度器切换回其原始请求
调度器切换到相反的请求,从获取到计算或反之亦然。
为了处理最后一个用例,还有一个特殊的状体 resumed
。任务只能从 cancelled
状态进入 resumed
状态。resumed
保留了来自 cancelled
状态的 previous
属性,并添加了另一个属性 next
,该属性总是:
总结一下,以下是处理已取消任务的所有可能的状态和子状态组合:
状态 |
previous |
next |
---|---|---|
cancelled |
flight |
None |
cancelled |
executing |
None |
cancelled |
long-running |
None |
resumed |
flight |
waiting |
resumed |
executing |
fetch |
resumed |
long-running |
fetch |
如果一个 resumed
任务成功完成,它将转换到 memory
(与 cancelled
任务不同,后者会忽略输出),并且调度器将收到一个伪造的终止消息,如果任务是 resumed(executing->fetch)
或 resumed(long-running->fetch)
,则这是 flight
状态的预期结束消息;如果任务是 resumed(flight->waiting)
,则这是 execute
状态的预期结束消息。
如果任务失败或抛出 Reschedule
,Worker 将静默忽略异常并切换到其预定路径,因此 resumed(executing->fetch)
或 resumed(long-running->fetch)
将转换到 fetch
状态,而 resumed(flight->waiting)
将转换到 waiting
状态。
最后,调度器可以在任务的整个生命周期内多次改变主意,因此一个 resumed(executing->fetch)
或 resumed(long-running->fetch)
任务可能会被要求再次转换到 waiting
状态,此时它将恢复到其 previous
状态并忘记整个事件;同样,一个 resumed(flight->waiting)
任务也可能被要求再次转换到 fetch
状态,因此它将改为转换到 flight
状态。
一个常见的实际用例:
集群中至少有两个 worker,A 和 B。
任务 x 在 worker A 上成功计算。
当任务 x 在 worker A 上转换到 memory 状态时,调度器要求 worker B 计算任务 y,任务 y 依赖于任务 x。
B 开始从 A 获取 key x,这使得任务进入
flight
模式。Worker A 崩溃,并且由于某种原因,调度器在 worker B 之前注意到了。
调度器将释放任务 y(因为它正在等待在内存中不再存在的依赖项),并在集群中的其他地方重新调度任务 x。任务 x 在 worker A 上将转换到
cancelled(flight)
状态。如果调度器随机选择 worker A 来计算任务 X,任务将转换到
resumed(flight->waiting)
状态。当,且仅当,从 A 到 B 的 TCP 套接字崩溃(例如,由于超时)时,任务将转换到
waiting
状态,并最终在 A 上重新计算。
重要
对于任何给定的 key,您始终有最多一个 compute()
或 gather_dep()
asyncio 任务运行;您绝不会同时拥有两者。
调度器与 Worker 之间的任务状态映射¶
调度器和 worker 上的任务状态是不同的,它们的映射关系有些微妙
调度器状态 |
典型的 worker 状态 |
边缘情况 worker 状态 |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
除了上述状态之外,worker 可能完全不知道某个特定任务。相反的情况,即 worker 知道某个任务但在调度器上找不到它,仅在任务取消的情况下发生。
还需要考虑竞态条件,即 worker(或某些 worker)在调度器之前知道某些事情,或反之亦然。例如:
任务在 worker 上总是先从
executing
转换到memory
,然后才能在调度器上从processing
转换到memory
任务总是先在调度器上转换到
released
或forgotten
,只有当消息到达 worker 后,它才会在那里被释放。
流程控制¶
worker 状态机涉及几个类:
TaskState
包含与单个任务相关的所有信息;它还包含对依赖任务和被依赖任务的引用。这只是一个数据容器,没有修改方法。请注意,这与 distributed.scheduler.TaskState
是不同的类。
WorkerState
封装了 worker 的整体状态。它在其 tasks
字典和几个其他辅助集合中持有对 TaskState
的引用。至关重要的是,此类对 asyncio、网络、磁盘 I/O、线程等没有任何了解或可见性。请注意,这与 distributed.scheduler.WorkerState
是不同的类。
WorkerState.handle_stimulus()
提供了一个用于改变状态的单一方法:handle_stimulus()
。状态不得以任何其他方式更改。该方法接收一个 StateMachineEvent
,也称为刺激,它是一个数据类,用于确定发生了可能导致 worker 状态改变的事件。刺激可以来自调度器(例如计算任务的请求)或 worker 本身(例如任务已完成计算)。
WorkerState.handle_stimulus()
改变内部状态(例如,它可以将任务从 executing
转换到 memory
),并返回一个 Instruction
对象列表,这些是 worker 需要执行但与状态本身无关的外部操作:
向调度器发送消息
计算任务
从对等 worker 收集任务
WorkerState.handle_stimulus()
由 BaseWorker.handle_stimulus()
包装,后者消费 Instruction
对象。BaseWorker
处理 asyncio 任务的创建、跟踪和清理,但并未实际实现任务的执行或收集;相反,它暴露了抽象的 async 方法 execute()
和 gather_dep()
,这些方法由其子类 Worker
重写,Worker
实际运行任务并执行网络 I/O。当实现的方法完成时,它们必须返回一个 StateMachineEvent
,该事件会被反馈回 BaseWorker.handle_stimulus()
。
注意
这可能会在 worker 内部产生一个(可能很长)的事件链;例如,如果 ready
队列中的任务多于线程数,那么一个任务的终止 StateMachineEvent
将触发执行下一个任务的 Instruction
。
总结如下
WorkerState
与 asyncio、网络、线程和磁盘 I/O 无关;它包含TaskState
对象的集合。BaseWorker
封装了WorkerState
并增加了对 asyncio 的感知Worker
是BaseWorker
的子类,并增加了对网络、线程和磁盘 I/O 的感知。
内部状态转换¶
在内部,WorkerState.handle_stimulus()
的工作方式与调度器端的相同过程非常相似:
WorkerState.handle_stimulus()
调用WorkerState._handle_<stimulus name>()
,它返回一个元组,包含:
转换任务的*建议*:{
TaskState
: <新状态>}Instruction
对象列表
WorkerState.handle_stimulus()
接着将建议传递给WorkerState._transitions()
对于每个建议,
WorkerState._transitions()
调用WorkerState._transition()
,后者又调用
WorkerState._transition_<开始状态>_<结束状态>()
,后者又返回一个额外的元组 (recommendations, instructions)
新的建议被
WorkerState._transitions()
消费,直到不再返回建议。WorkerState.handle_stimulus()
最后返回指令列表,该列表由转换逐步扩展。
API 文档¶
- class distributed.worker_state_machine.TaskState(key: Key, run_id: int = -1, run_spec: T_runspec | None = None, dependencies: set[TaskState] = <factory>, dependents: set[TaskState] = <factory>, waiting_for_data: set[TaskState] = <factory>, waiters: set[TaskState] = <factory>, state: TaskStateState = 'released', previous: Literal['executing', 'long-running', 'flight', None] = None, next: Literal['fetch', 'waiting', None] = None, priority: tuple[int, ...] | None = None, who_has: set[str] = <factory>, coming_from: str | None = None, resource_restrictions: dict[str, float] = <factory>, exception: Serialize | None = None, traceback: Serialize | None = None, exception_text: str = '', traceback_text: str = '', type: type | None = None, suspicious_count: int = 0, startstops: list[StartStop] = <factory>, start_time: float | None = None, stop_time: float | None = None, metadata: dict = <factory>, nbytes: int | None = None, annotations: dict | None = None, span_id: str | None = None, done: bool = False)[source]¶
持有与单个 Dask 任务相关的易失性状态。
不要与在调度器端持有类似信息的
distributed.scheduler.TaskState
混淆。- done: bool¶
如果服务此任务的
execute()
或gather_dep()
协程已完成,则为 True;否则为 False。此标志会改变从executing
、flight
等状态转换的行为。
- key: Key¶
任务 key。必需。
- previous: Literal['executing', 'long-running', 'flight', None]¶
任务的先前状态。仅当
state
在 (cancelled, resumed) 中时才非 None。
- run_spec: T_runspec | None¶
一个元组,包含与此 TaskState 实例相关的
function
、args
、kwargs
和task
。它默认为None
,如果它是此 worker 将从另一个 worker 接收的依赖项,则可以保持为空。
- span_id: str | None¶
唯一的 span id(参见
distributed.spans
)。与distributed.scheduler.TaskState.group.span_id
匹配。
- state: TaskStateState¶
任务的当前状态
- class distributed.worker_state_machine.WorkerState(*, nthreads: int = 1, address: str | None = None, data: MutableMapping[Key, object] | None = None, threads: dict[Key, int] | None = None, plugins: dict[str, WorkerPlugin] | None = None, resources: Mapping[str, float] | None = None, transfer_incoming_count_limit: int = 9999, validate: bool = True, transition_counter_max: int | Literal[False] = False, transfer_incoming_bytes_limit: float = inf, transfer_message_bytes_limit: float = inf)[source]¶
状态机,封装了工作节点上所有任务的生命周期。
不要与
distributed.scheduler.WorkerState
混淆。注意
此类的数据属性是实现细节,可能会在没有弃用周期的情况下进行更改。
- address: str¶
工作节点的 <IP 地址>:<端口>。状态机使用此信息进行决策,例如确定对等工作节点是否在同一主机上运行。WorkerState 初始化时此属性可能未知。但必须在首次调用
handle_stimulus()
之前设置。
- property all_running_tasks: set[distributed.worker_state_machine.TaskState]¶
当前正在占用线程的所有任务。它们可能计入也可能不计入最大线程数。
这些是
ts.status 在 (executing, long-running) 中
ts.status 在 (cancelled, resumed) 中且 ts.previous 在 (executing, long-running) 中
- constrained: HeapSet[TaskState]¶
已准备好运行,但正在等待 GPU 等抽象资源的任务的优先级堆。与
ready
互斥。参见available_resources
和 工作节点资源。
- data_needed: defaultdict[str, HeapSet[TaskState]]¶
仍需要数据才能执行且至少在另一个工作节点内存中的任务,按每个工作节点优先级堆排序。所有且仅有
TaskState.state == 'fetch'
的任务在此集合中。在who_has
中有多个条目的TaskState
将在此处多次出现。
- executed_count: int¶
此工作节点在其生命周期内运行过的任务数量;包括失败和已取消的任务。另请参阅
executing_count()
。
- executing: set[TaskState]¶
当前正在运行的任务集合。
此集合仅包含
state
== ‘executing’ 的任务,以及state
在 (‘cancelled’, ‘resumed’) 中且previous
== ‘executing’ 的任务。另请参阅
executing_count()
和long_running
。
- generation: int¶
调度器每次调用计算任务处理程序时递减的计数器。它被添加到
TaskState.priority
中,并作为调度器上具有相同优先级的任务之间的决胜局,决定它们之间的后进先出顺序。
- handle_stimulus(*stims: distributed.worker_state_machine.StateMachineEvent) list[distributed.worker_state_machine.Instruction] [source]¶
处理一个或多个外部事件,将相关任务转换为新状态,并返回作为结果要执行的指令列表。
- in_flight_tasks: set[TaskState]¶
在当前对等连接中正在传输给我们的任务。
此集合仅包含
state
== ‘flight’ 的任务,以及state
在 (‘cancelled’, ‘resumed’) 中且previous
== ‘flight’ 的任务。另请参阅
in_flight_tasks_count()
。
- in_flight_workers: dict[str, set[Key]]¶
{工作节点地址: {ts.key, ...}}
我们当前正在从中收集数据的工作节点以及我们期望从这些连接中获得的依赖项。在此 dict 中的工作节点在当前查询返回之前不会被请求额外的依赖项。
- log: deque[tuple]¶
转换日志:
[(..., stimulus_id: str | None, timestamp: float), ...]
记录的刺激数量有限制。另请参阅story()
和stimulus_log
。
- long_running: set[TaskState]¶
当前正在运行且已调用
secede()
的任务集合,因此它们不再计入最大并发任务数 (nthreads)。这些任务不会出现在executing
集合中。此集合仅包含
state
== ‘long-running’ 的任务,以及state
在 (‘cancelled’, ‘resumed’) 中且previous
== ‘long-running’ 的任务。
- nthreads: int¶
可以并行执行的任务数量。在任何给定时间,
executing_count()
<= nthreads。
- plugins: dict[str, WorkerPlugin]¶
{名称: 工作节点插件}
。此集合在Worker
和此类之间按引用共享。Worker 管理插件的添加和移除,而 WorkerState 在WorkerPlugin.transition
方法可用时调用它。
- ready: HeapSet[TaskState]¶
已准备好运行且没有资源限制的任务的优先级堆。与
constrained
互斥。
- rng: random.Random¶
静态种子随机状态,用于在需要伪随机选择时保证确定性。
- running: bool¶
如果状态机应在有可用槽位时开始执行更多任务并获取依赖项,则为 True。此属性必须与 Worker 保持一致:
WorkerState.running == (Worker.status is Status.running)
。
- stimulus_log: deque[StateMachineEvent]¶
handle_stimulus()
接收到的所有刺激的日志。记录的事件数量有限制。另请参阅log
和stimulus_story()
。
- stimulus_story(*keys_or_tasks: Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...], distributed.worker_state_machine.TaskState]) list[distributed.worker_state_machine.StateMachineEvent] [source]¶
返回涉及一个或多个任务的所有状态机事件。
- story(*keys_or_tasks_or_stimuli: Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...], distributed.worker_state_machine.TaskState]) list[tuple] [source]¶
返回转换日志中涉及一个或多个任务或 stimulus_id 的所有记录。
- task_counter: TaskCounter¶
当前任务数量以及每个状态中的累计耗时,两者都按
prefix
分解。
- threads: dict[Key, int]¶
{ts.key: 线程 ID}
。此集合在Worker
和此类之间按引用共享。虽然 WorkerState 不关心线程,但在某些情况下仍然需要访问此信息。此集合由distributed.worker.Worker.execute()
填充。 WorkerState 工作并不需要填充此集合。
- transfer_incoming_bytes_throttle_threshold: int¶
只要
transfer_incoming_bytes
小于此值,就忽略transfer_incoming_count_limit
。
- transfer_incoming_count_limit: int¶
从其他工作节点最大并发传入数据传输数量。另请参阅
distributed.worker.Worker.transfer_outgoing_count_limit
。
- transfer_message_bytes_limit: float¶
在单次调用
BaseWorker.gather_dep()
中从同一工作节点收集的字节数。只要其总大小不超过此值,就可以从同一工作节点收集的多个小任务将打包在单个指令中。如果第一个要收集的任务超过此限制,它仍将被收集以确保进度。因此,此限制不是绝对的。
- transition_counter: int¶
迄今为止状态转换的总数。另请参阅
log
和transition_counter_max
。
- transition_counter_max: int | Literal[False] = False¶
如果
transition_counter
达到此值时引发错误。这仅用于调试,以捕获无限递归循环。在生产环境中,应始终设置为 False。
- class distributed.worker_state_machine.BaseWorker(state: distributed.worker_state_machine.WorkerState)[source]¶
围绕
WorkerState
的包装器,用于实现指令处理。这是一个抽象类,包含多个@abc.abstractmethod
方法,供Worker
和单元测试模拟类继承。- abstract batched_send(msg: dict[str, Any]) None [source]¶
通过批量通信向调度器发送即发即弃消息。
- 参数
- msg: dict
msgpack 可序列化的消息,发送给调度器。必须有一个在 Scheduler.stream_handlers 中注册的 ‘op’ 键。
- abstract digest_metric(name: collections.abc.Hashable, value: float) None [source]¶
记录任意数值指标。
- abstract async execute(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent [source]¶
执行任务。
- abstract async gather_dep(worker: str, to_gather: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], total_nbytes: int, *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent [source]¶
从拥有依赖项的工作节点收集任务的依赖项。
- 参数
- workerstr
要从中收集依赖项的工作节点地址。
- to_gatherlist
要从工作节点收集的依赖项的 Key - 这不一定等同于
dep
的完整依赖项列表,因为某些依赖项可能已在此工作节点上存在。- total_nbytesint
to_gather 中所有依赖项的总字节数。
- handle_stimulus(*stims: distributed.worker_state_machine.StateMachineEvent) None [source]¶
将一个或多个外部刺激转发到
WorkerState.handle_stimulus()
并处理返回的指令,调用相关的 Worker 回调(下面的@abc.abstractmethod
方法)。为所有异步指令生成 asyncio 任务并开始跟踪它们。
- abstract async retry_busy_worker_later(worker: str) distributed.worker_state_machine.StateMachineEvent [source]¶
等待一段时间,然后将对等工作节点从忙碌状态中移除。
- class distributed.worker_state_machine.StateMachineEvent(*args: Any, **kwargs: Any)[source]¶
可修改工作节点状态的所有刺激的抽象基类。
- static from_dict(d: dict) distributed.worker_state_machine.StateMachineEvent [source]¶
将
recursive_to_dict
的输出转换回原始对象。输出对象对于重建状态机有意义,但不一定与原始对象完全相同。
- to_loggable(*, handled: float) distributed.worker_state_machine.StateMachineEvent [source]¶
生成一个自身的变体版本,该版本足够小,可以在中等时间内存储在内存中,并包含有意义的调试信息。
- class distributed.worker_state_machine.Instruction(stimulus_id: str)[源代码]¶
工作器状态机响应事件后发送给工作器的命令
- classmethod match(**kwargs: Any) distributed.worker_state_machine._InstructionMatch [源代码]¶
生成一个局部匹配,用于与
Instruction
实例进行比较。典型用法是将WorkerState.handle_stimulus()
返回的指令列表或WorkerState.stimulus_log
中的指令列表与期望的匹配列表进行比较。示例
instructions = ws.handle_stimulus(...) assert instructions == [ TaskFinishedMsg.match(key="x"), ... ]
注意
StateMachineEvent
和 Instruction
是抽象类,有许多子类,为简洁起见在此不一一列出。完整列表请参阅实现模块 distributed.worker_state_machine
。