工作窃取

有些任务倾向于在特定的工作节点上运行。这可能是因为该工作节点持有任务的数据依赖项,或者用户已表达了希望任务在特定位置运行的松散意愿。偶尔,这会导致少数工作节点非常繁忙,而其他工作节点处于空闲状态。在这种情况下,空闲的工作节点可能会选择从繁忙的工作节点窃取工作,即使窃取工作需要昂贵的数据移动。

这是一种性能优化,并非正确性所必需。工作窃取在许多临时情况下提供了鲁棒性,但也可能在我们窃取了错误的任务并降低性能时适得其反。

窃取的标准

计算通信比

当任务的计算时间远远长于任务依赖项的通信时间时,窃取工作是盈利的。

坏例子

如果计算速度快,我们不想窃取需要将一个大型依赖数据块通过网络从受害者工作节点移动到窃取者工作节点的任务。最终,我们花费在通信上的时间远远多于仅仅等待更长时间并放弃并行化。

[data] = client.scatter([np.arange(1000000000)])
x = client.submit(np.sum, data)

好例子

我们确实想要窃取只需要移动依赖数据块的任务,特别是当计算时间昂贵时(此处为 100 秒)。

[data] = client.scatter([100])
x = client.submit(sleep, data)

幸运的是,我们通常既知道依赖项的字节数(通过在工作节点上调用 sys.getsizeof 报告),也知道先前见过函数的运行时成本,后者以指数加权移动平均的形式维护。

饱和工作节点负担

即使计算时间与通信时间的比率很差,窃取也可能是有益的。这发生在饱和工作节点有很长的任务积压,并且有大量的空闲工作节点时。我们通过判断如果窃取该任务,饱和工作节点要运行的最后一个任务是否能更快完成,或者如果它留在原始/受害者工作节点上是否能更快完成,来确定是否可以窃取该任务。

可窃取任务的积压越长,以及我们拥有的活跃工作节点数量越少,都增加了我们进行窃取的意愿。这与计算通信成本比率相平衡。

从富者那里窃取

我们希望从特别负担过重的工作节点窃取任务,而不是从只有少量冗余任务的工作节点窃取。

限制

如果任务被明确限制在特定的工作节点上运行(例如需要特殊硬件的情况),那么我们不会进行窃取。

选择要窃取的任务

我们维护一个可窃取任务集合的列表,按计算通信时间比率分组到桶中。第一个桶包含所有计算通信比率大于或等于 8 的任务(被认为足够高,总是窃取),下一个桶比率为 4,再下一个桶比率为 2,依此类推,一直到比率为 1/256,这种任务我们绝不窃取。

这种数据结构提供了一种对所有可窃取任务的某种程度有序的视图,我们可以在常数时间添加和移除,而不是像堆等更传统的数据结构那样需要 log(n) 的时间。

在向工作节点提交任务的任何阶段,我们检查是否有空闲和饱和的工作节点,如果存在,我们会快速遍历这个集合列表,首先从最好的桶中选择任务,然后向下遍历到那些不那么理想的可窃取任务桶。当没有更多可窃取任务、没有更多空闲工作节点时停止,或者当待窃取的任务质量不足以匹配当前的积压量时停止。

这种方法快速,优先窃取计算通信成本比率最优的任务(最大相差两倍),并且倾向于从积压量最大的工作节点窃取,这仅仅是因为随机选择倾向于从最大的总体中抽取。

事务性工作窃取

为了避免同一任务运行两次,Dask 实现了事务性工作窃取。当调度器识别出应该移动的任务时,它首先向繁忙的工作节点发送一个请求。工作节点检查任务的当前状态,并向调度器发送响应。

  1. 如果任务尚未运行,则工作节点取消该任务,并通知调度器可以将该任务重新路由到其他地方。

  2. 如果任务已经在运行或已完成,则工作节点告诉调度器不应在其他地方复制该任务。

这避免了冗余工作,也避免了对于更特殊任务的副作用重复。然而,在工作节点死亡或网络连接中断的情况下,并发或重复执行同一任务仍然可能发生

禁用工作窃取

工作窃取是 Dask 调度器上的一个可切换设置;要禁用工作窃取,您可以通过设置 DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING="False" 或通过您的 Dask 配置文件将调度器 work-stealing 配置选项切换为 "False"惠灵">工作窃取配置选项切换为 "False"