诊断性能

理解分布式计算的性能可能很困难。这部分是由于分布式计算机的许多组件可能影响性能

  1. 计算时间

  2. 内存带宽

  3. 网络带宽

  4. 磁盘带宽

  5. 调度器开销

  6. 序列化成本

这种困难因这些成本信息分散在许多机器上而加剧,因此没有一个中心位置可以收集数据来识别性能问题。

幸运的是,Dask 在执行期间收集了各种诊断信息。这样做既是为了向用户提供性能反馈,也是为了其自身的内部调度决策。查看这些反馈的主要地方是诊断仪表盘。本文档介绍了可用的各种性能信息以及如何访问它们。

任务开始和停止时间

工作节点捕获与任务相关的持续时间。对于通过工作节点的每个任务,我们记录以下各项的开始时间和停止时间:

  1. 序列化 (灰色)

  2. 从对等节点收集依赖项 (红色)

  3. 用于收集本地数据的磁盘 I/O (橙色)

  4. 执行时间 (按任务着色)

查看这些时间的主要方式是查看调度器的 /status 页面上的任务流图,其中柱状图的颜色对应于上面列出的颜色。

Dask task stream

或者,如果您想对每个任务事件进行自己的诊断,您可能需要创建一个 调度器插件。当任务从处理状态转换为内存状态或出错时,所有这些信息都将可用。

统计性能分析

对于单线程性能分析,Python 用户通常依赖标准库中的 CProfile 模块(Dask 开发者推荐使用 snakeviz 工具进行单线程性能分析)。遗憾的是,标准的 CProfile 模块不适用于多线程或分布式计算。

为了解决这个问题,Dask 实现了自己的分布式统计性能分析器。每 10 毫秒,每个工作节点进程都会检查其每个工作节点线程正在做什么。它捕获调用堆栈并将此堆栈添加到计数数据结构中。此计数数据结构每秒记录和清除一次,以建立性能随时间变化的记录。

用户通常通过工作节点或调度器诊断仪表盘上的 /profile 图查看此数据。在调度器页面上,他们可以看到所有工作节点所有线程的总聚合配置文件。点击配置文件中的任何柱状图都会将用户放大到该部分,这与大多数性能分析工具一样。页面底部有一个时间线,允许用户选择不同的时间段。

Dask profiler

配置文件还按当时正在运行的任务进行分组。您可以从页面顶部的选择菜单中选择任务名称。您也可以点击 /status 页面上主任务流图中与任务对应的矩形。

用户还可以直接使用 Client.profile 函数查询此数据。这将返回用于生成这些图的原始数据结构。他们还可以传递文件名以将图直接保存为 HTML 文件。请注意,此文件需要通过 Web 服务器(例如 python -m http.server)提供服务才能可见。

10 毫秒和 1 秒的参数可以通过 config.yaml 文件中的 profile-intervalprofile-cycle-interval 条目进行控制。

带宽

Dask 工作节点通过 Worker.transfer_outgoing_logWorker.transfer_incoming_log 属性跟踪每次传入和传出传输,包括:

  1. 传输的总字节数

  2. 传输的压缩字节数

  3. 开始/停止时间

  4. 移动的键

  5. 对等节点

这些信息通过工作节点的诊断仪表盘上的 /status 页面向用户提供。您可以通过在工作节点上运行命令来明确捕获其状态:

client.run(lambda dask_worker: dask_worker.transfer_outgoing_log)
client.run(lambda dask_worker: dask_worker.transfer_incoming_log)

性能报告

通常在基准测试和/或性能分析时,用户可能希望记录特定的计算或完整的流程。Dask 可以将 bokeh 仪表盘保存为静态 HTML 图,包括任务流、工作节点配置文件、带宽等。这可以通过使用 distributed.performance_report 上下文管理器包装计算来实现。

from dask.distributed import performance_report

with performance_report(filename="dask-report.html"):
    ## some dask computation

以下视频更详细地演示了 performance_report 上下文管理器

关于时间的一点说明

不同的计算机维护着不同的时钟,这些时钟可能无法完全匹配。为了解决这个问题,Dask 调度器在响应每次工作节点心跳时会发送其当前时间。工作节点将其本地时间与此时间进行比较,以获取差异的估计。工作节点中记录的所有时间都考虑了此估计延迟。这有所帮助,但仍然可能存在不精确的测量。

所有时间均旨在从调度器的角度表示。

分析随时间变化的内存使用情况

您可能想了解随着计算的进行,整个集群的内存使用量如何随时间演变,或者同一算法的两种不同实现方式在内存方面的比较。

这可以通过使用 distributed.diagnostics.MemorySampler 上下文管理器包装计算来实现。

from distributed import Client
from distributed.diagnostics import MemorySampler

client = Client(...)
ms = MemorySampler()
with ms.sample("collection 1"):
    collection1.compute()
with ms.sample("collection 2"):
    collection2.compute()
...
ms.plot(align=True)

示例输出

Sample output of the MemorySampler
distributed.diagnostics.MemorySampler[源代码]

秒对整个集群的内存使用情况进行采样。

用法

client = Client()
ms = MemorySampler()

with ms.sample("run 1"):
    <run first workflow>
with ms.sample("run 2"):
    <run second workflow>
...
ms.plot()

或使用异步客户端

client = await Client(asynchronous=True)
ms = MemorySampler()

async with ms.sample("run 1"):
    <run first workflow>
async with ms.sample("run 2"):
    <run second workflow>
...
ms.plot()
plot(*, align: bool = False, **kwargs: Any) Any[源代码]

绘制目前已收集的数据系列

参数
alignbool (可选的)

参见 to_pandas()

kwargs

原样传递给 pandas.DataFrame.plot()

返回值
pandas.DataFrame.plot() 的输出
sample(label: str | None = None, *, client: Client | None = None, measure: str = 'process', interval: float = 0.5) Any[源代码]

记录集群内存使用情况的上下文管理器。如果客户端是同步的,则此为同步操作;如果客户端是异步的,则此为异步操作。

样本记录在 self.samples[<label>] 中。

参数
label: str, 可选的

在 self.samples 字典中记录样本的标签。默认值:自动生成随机标签

client: Client, 可选的

用于连接到调度器的客户端。默认值:使用全局客户端

measure: str, 可选的

来自 distributed.scheduler.MemoryState 的度量之一。默认值:采样进程内存

interval: float, 可选的

采样间隔,单位为秒。默认值:0.5

to_pandas(*, align: bool = False) pd.DataFrame[源代码]

将数据系列作为 pandas.Dataframe 返回。

参数
alignbool, 可选的

如果为 True,则将绝对时间戳更改为相对于每个系列第一个样本的时间差,以便可以将不同系列并排可视化。如果为 False(默认值),则使用绝对时间戳。