调度策略

本文档描述了 Dask 分布式调度器用于选择任务偏好和工作节点偏好的策略。有关这些策略如何高效执行的更多信息,请参见调度状态

选择工作节点

当一个任务从等待状态转换为处理状态时,我们会为该任务决定一个合适的工作节点。如果任务具有重要的数据依赖关系,或者工作节点负载较高,那么对工作节点的选择会强烈影响整体性能。类似地,根任务的放置方式会影响下游计算的性能,因为它能决定未来需要在工作节点之间传输多少数据。对于这些不同的场景,会使用不同的启发式方法。

初始任务调度 - 启用队列

队列启用时(默认设置),每个初始任务会被简单地调度到当前最不繁忙的工作节点上。如果一个初始任务想要运行,但所有工作节点线程都已占用,那么该任务将进入(或留在)队列中,而不会被发送到任何工作节点。

初始任务调度 - 禁用队列

目前,这种调度策略仅在队列被禁用时使用(即配置值 distributed.scheduler.worker-saturation 设置为 inf)。

我们希望相邻的根任务在同一个工作节点上运行,因为这些相邻任务很有可能在下游操作中被组合处理。

  i       j
 / \     / \
e   f   g   h
|   |   |   |
a   b   c   d
\   \  /   /
     X

在上述情况下,我们希望 ab 在同一个工作节点上运行,cd 在同一个工作节点上运行,以减少将来的数据传输。我们也可以忽略 X 的位置,因为假设我们将 a b c d 组分布到所有工作节点上以最大化并行度,那么 X 最终会被传输到所有地方。(请注意,即使 X 不存在,希望将 a bc d 共存的愿望仍然适用。)

直接通过遍历图来计算这些关联任务会非常耗时。相反,我们使用任务的 TaskGroup,它是由所有具有相同键前缀的任务组成的集合。((random-a1b2c3, 0)(random-a1b2c3, 1)(random-a1b2c3, 2) 都属于 TaskGroup random-a1b2c3。)

为了识别根任务(或类似根的任务),我们使用以下启发式方法:

  1. TaskGroup 中的任务数量是集群中线程数量的 2 倍以上

  2. TaskGroup 中 *所有* 任务的唯一依赖项少于 5 个。

    我们不只是说“任务没有依赖项”,因为像 dask.array.from_zarrdask.array.from_array 这样的实际用例会生成如上所示的图,其中数据创建任务(a b c d)都共享一个依赖项(X)——例如 Zarr 数据集。虽然 a b c d 在技术上不是根任务,但我们希望将它们视为根任务,因此允许所有任务共享少量琐碎的依赖项。

然后,我们使用 根据子任务数量和深度打破僵局 中描述的相同优先级来确定哪些任务是相关的。这种带有子任务权重的深度优先指标通常可以用来将图的叶节点适当地分割成相互之间连接度较低且分离较好的子图。

按照此优先级顺序遍历任务时,我们将一批后续任务分配给一个工作节点,然后选择一个新的工作节点(最不繁忙的那个)并重复此过程。

尽管这不能提供完美的初始任务分配(少数同级任务可能会被分配到不同的工作节点上),但在大多数情况下效果良好,同时只增加了极少的调度开销。

初始任务调度是一项前瞻性决策。通过将相关的根任务共存放置,我们确保其下游任务能够顺利执行。

下游任务调度

当初始任务被妥善放置后,后续任务的放置则更具回顾性:考虑到数据传输和工作节点忙碌程度,任务能在哪里最快开始运行?

不符合上述类似根任务标准的任务按如下方式选择工作节点:

首先,我们确定可用的工作节点池。

  1. 如果任务没有依赖项也没有限制,我们就找到最不繁忙的工作节点。

  2. 否则,如果任务有用户提供的限制(例如必须在带有 GPU 的机器上运行),则我们将可用工作节点池限制为该集合。否则,我们考虑所有工作节点。

  3. 我们将上述集合限制为仅包含持有该任务至少一个依赖项的工作节点。

从此工作节点池中,我们然后使用 Scheduler.worker_objective() 函数来确定我们认为任务将在其上最快开始运行的工作节点。对于每个工作节点:

  1. 我们考虑该工作节点上已排队的其他任务的预估运行时间。然后,我们加上将该工作节点尚不拥有的任何依赖项传输到该工作节点所需的时间,该时间基于依赖项的字节大小以及测量的工作节点之间的网络带宽。请注意,这考虑(反)序列化时间、如果数据溢出到磁盘则从磁盘检索数据的时间,或内存大小与序列化大小之间的潜在差异。实际上,队列等待时间(称为占用率)通常占主导地位,因此如果将数据传输到不同的工作节点意味着任务可以更快开始,数据通常会被传输到那里。

  2. “最快开始”指标可能会出现并列情况,尽管在所有工作节点都繁忙时这种情况不常见。我们通过选择存储 Dask 数据字节数(包括溢出数据)最少的工作节点来打破僵局。请注意,这等同于 托管内存 加上 溢出内存,而不是 进程内存

此过程易于修改(并且本文档可能确实已过时)。我们鼓励读者查看 scheduler.py 文件中的 decide_workerworker_objective 函数。

decide_worker(ts, all_workers, ...)

决定哪个工作节点应该执行任务 ts

Scheduler.decide_worker_non_rootish(ts)

为可运行的非根任务选择一个工作节点,同时考虑依赖项和限制。

Scheduler.decide_worker_rootish_queuing_disabled(ts)

为可运行的类似根任务选择一个工作节点,不使用队列。

Scheduler.decide_worker_rootish_queuing_enabled()

如果并非所有工作节点都繁忙,则为可运行的类似根任务选择一个工作节点。

Scheduler.worker_objective(ts, ws)

用于确定哪个工作节点应获得任务的目标函数

选择任务

我们经常需要在许多有效任务中进行选择。有几个相互竞争的关注点可能会影响我们的选择:

  1. 按先到先得的原则运行任务,以确保多个客户端之间的公平性

  2. 运行属于关键路径的任务,以减少总运行时间并最大程度地减少拖后腿的工作负载

  3. 运行那些能让我们释放大量依赖项的任务,以努力保持较小的内存占用

  4. 运行相关的任务,以便在运行新的工作块之前,可以完全完成大块工作

  5. 优先运行使用现有结果的任务,而非创建新结果的任务

同时实现所有这些目标是不可能的。完美地优化任何一个目标都可能导致高昂的开销。调度器的启发式方法在快速优化所有这些目标(它们都在重要工作负载中出现)方面做得不错,但并不完美。

后进先出 (LIFO)

当一个工作节点完成一个任务后,该任务的直接依赖项将获得最高优先级。这鼓励了一种行为:在开始新任务之前立即完成正在进行的工作(深度优先图遍历)。这通常与先到先得的目标冲突,但通常会导致显著减少内存占用,并且由于避免数据溢出到磁盘,总体运行时间更佳。

根据子任务数量和深度打破僵局

任务通常有多个依赖项,我们需要使用其他目标来打破它们之间的僵局。打破这些僵局对性能和内存占用有惊人的强大影响。

当客户端提交一个图时,我们对图进行几次线性扫描,以确定类似每个节点的后代数量(不完全是,因为它是 DAG 而不是树,但这近似)。这个数量可以用来打破僵局,帮助我们优先处理关键路径更长的节点和有许多子节点的节点。实际使用的算法更为复杂,并在 dask/order.py 中详细描述。

粗粒度先到先得 (FCFS)

工作节点使用的后进先出行为旨在最小化内存占用,但这可能会扭曲客户端提供的任务顺序。最近提交的任务可能比很久以前提交的任务运行得更快,因为它们在当前内存数据的情况下恰好更方便。这种行为可能不公平,但能提升整体运行时间和系统效率,有时提升非常显著。

然而,工作节点不可避免地会耗尽与刚完成任务相关的任务,后进先出策略最终也会失效。在这种情况下,工作节点通常会从公共任务池中拉取任务。此池中的任务按先到先得的原则排序的,因此工作节点在调度行为上对多个提交在粗粒度层面是公平的,尽管不是细粒度层面。

Dask 的调度策略是短期高效且长期对多个客户端公平的。

避免工作节点过度饱和

当有许多初始任务要运行时,工作节点无需预先了解所有任务

 o   o   o   o   o   o   o   o   o   o
/ \ / \ / \ / \ / \ / \ / \ / \ / \ / \
o o o o o o o o o o o o o o o o o o o o
| | | | | | | | | | | | | | | | | | | |
* * * * * * * * * * * * * * * * * * * *  <-- initial tasks

调度器仅将初始任务(上图中 * 任务)提交给工作节点,直到所有工作节点线程被占满 1。剩余的初始任务按照优先级顺序放入调度器的队列中。

每当工作节点上有一个线程空闲下来,并且没有其他可以运行的更高优先级任务(此图中的 o 任务)时,就会从队列中取出任务并进行调度。

这确保我们在开始新工作之前完成现有工作流。这能最大限度地减少内存使用,并且与一次性提交所有初始任务相比,通常能提供更稳定的执行。

这种队列机制有两个缺点:

  1. 初始任务不会被共同分配。这意味着工作节点可能需要进行本可避免的数据传输。这可能会导致某些工作负载相比禁用队列时出现中等程度的减慢。然而,在许多情况下,禁用队列可能会导致工作节点内存不足,因此适度减慢通常是一个更好的权衡。

  2. 对于像 client.map 这样的易于并行的工作负载,每个任务的开销可能会略有增加,因为每次任务完成时,在下一个任务开始之前,都需要调度器与工作节点之间进行一次往返消息交换。在大多数情况下,这种开销甚至无法测量,无需担心。

    这种情况仅在任务非常快或网络非常慢时才重要——也就是说,如果你的任务运行时间与网络延迟处于同一量级。例如,如果每个任务仅耗时 1ms,而调度器与工作节点之间的往返消息需要 10ms,那么这些往返消息将主导整个运行时间。

    这意味着你应该让你的任务更大(通过更大的块大小,或者将更多工作批量处理到单个 Dask 任务中)。总的来说,任务运行时间应远大于网络延迟,Dask 才能良好地运行。

1

默认情况下,它实际上会比每个工作节点的线程数提交稍微多一点的任务(例如,对于拥有 <= 10 个线程的工作节点,会多提交 1 个任务)。这种轻微的缓冲在任务非常快时能保持更好的性能。详情请参阅下一节。

调整或禁用队列

很少需要调整队列。默认值适用于几乎所有情况。*只有希望在特殊情况下调整性能的高级用户才可能考虑调整此参数。*

队列行为由配置值 distributed.scheduler.worker-saturation 控制。这通过 Dask 配置系统 设置。该配置值必须在调度器启动之前在调度器上设置。

该值控制工作节点一次在内存中拥有的初始数据块数量。这基本上是执行图的“广度”。具体来说,每次最多将 ceil(worker-saturation * nthreads) 个初始任务发送给一个工作节点。

默认情况下,worker-saturation 的值为 1.1。选择此值是为了保持工作节点内存相对较低(拥有 <= 10 个线程的工作节点每个只会额外获得 1 个初始数据块),同时减轻在非常慢的网络上运行的用户所面临的额外延迟的影响。

  • 如果工作节点出现内存不足,请考虑将 worker-saturation 的值设置为 1.0 而非 1.1

  • 如果你的网络非常慢,或者你的任务极快,并且你想减少运行时间,请考虑增加 worker-saturation 的值。这可能会稍微加快速度,但代价是增加了内存使用。值超过 2.0 通常收效甚微。

  • 如果你的图能够从共同分配中获益,并且集群内存充足,请考虑通过将 worker-saturation 设置为 inf 来禁用队列,以加快运行时间。

这些决策的制定位置

上述目标主要通过客户端、调度器和工作节点在计算过程中的各个点做出的微小决策来实现。

  1. 当我们将图从客户端提交给调度器时,会为该图中的每个任务分配一个数字优先级。此优先级侧重于先深度计算后广度计算,优先处理关键路径,优先处理有许多依赖项的节点等。这与单机调度器使用的逻辑相同,位于 dask/order.py 文件中。

  2. 当图到达调度器时,调度器会将每个数字优先级转换为一个包含两个数字的元组,其中第一个数字是一个递增计数器,第二个数字是客户端生成的上述优先级。这个按图进行的计数器鼓励了计算之间的先入先出策略。来自前一次 compute 调用的所有任务都比来自后续 compute 调用(或 submit、persist、map 或任何生成 Future 的操作)的所有任务具有更高的优先级。

  3. 每当任务准备好运行(其依赖项(如果有)已完成)时,调度器就会将其分配给一个工作节点。当多个任务同时准备就绪时,它们会按优先级顺序提交给工作节点。如果调度器端的队列处于活动状态,则任务会提交到所有工作节点满载为止,然后将剩余的可运行任务放入调度器队列中。如果禁用队列,则所有可运行任务会一次性提交。

  4. 然而,当工作节点接收到这些任务时,它会根据它们的优先级来决定哪些任务优先获取数据或优先计算。工作节点维护一个按此优先级排序的所有就绪任务的堆。

  5. 如果调度器端的队列处于活动状态:当工作节点上任何任务完成时,如果没有其他更高优先级的任务可运行,调度器就会从队列中取出下一个任务并在该工作节点上运行它。