Worker 内存管理

关于集群范围的内存管理,请参阅管理内存

通过命令行关键字 --memory-limit 或 Python 关键字参数 memory_limit= 为 Worker 设置一个目标内存限制,该限制应用于由 dask worker 启动的每个 worker 进程。

$ dask worker tcp://scheduler:port --memory-limit=auto  # TOTAL_MEMORY * min(1, nthreads / total_nthreads)
$ dask worker tcp://scheduler:port --memory-limit="4 GiB"  # four gigabytes per worker process.

Worker 使用几种不同的启发式方法来将内存使用保持在此限制以下

基于托管内存的溢出

每当 worker 完成一个任务时,它会使用 sizeof 函数估算结果保存在内存中所需的字节大小。对于任意对象,此函数默认使用标准 Python __sizeof__ 协议的 sys.getsizeof(),但也为 NumPy 数组和 Pandas Dataframe 等常见数据类型提供了特殊实现。Dask 跟踪的所有数据的 sizeof 总和称为托管内存

当托管内存超过内存限制的 60%(目标阈值)时,worker 将开始将最近最少使用的数据转存到磁盘。默认情况下,它会写入操作系统的临时目录(Linux 中为 /tmp);您可以使用 --local-directory 关键字控制此位置

$ dask worker tcp://scheduler:port --memory-limit="4 GiB" --local-directory /scratch

这些数据仍然可用,并在需要时会从磁盘读回。在诊断仪表板的状态页面上,磁盘 I/O 将在任务流图中显示为橙色块。此外,左上角的内存图将显示一部分条形为灰色。

基于进程内存的溢出

上述方法可能因以下几个原因而失败

  1. 自定义对象可能无法准确报告其内存大小

  2. 用户函数可能占用比预期更多的 RAM

  3. 大量数据可能堆积在网络 I/O 缓冲区中

为了解决这个问题,我们每隔 200 毫秒周期性地监控 worker 的进程内存。如果系统报告的内存使用量超过目标内存使用量的 70%(溢出阈值),则 worker 将开始将未使用的数据转存到磁盘,即使内部 sizeof 记录尚未达到正常的 60% 阈值。这种更积极的溢出将持续进行,直到进程内存降至 60% 以下。

暂停 worker

进程内存负载达到 80% 时,worker 的线程池将停止开始计算 worker 队列中的额外任务。这为写入磁盘功能留出了时间,即使面对快速累积的数据也能生效。当前正在执行的任务会继续运行。此外,与其他 worker 之间的数据传输将被限制在最低限度。

杀死 Worker

进程内存负载达到 95%(终止阈值)时,worker 的 nanny 进程将终止它。任务将在执行中被取消并在其他地方重新调度;worker 上的所有独特数据将丢失并需要重新计算。这是为了避免我们的 worker 作业被外部看门狗(如 Kubernetes, YARN, Mesos, SGE 等)终止。终止后,nanny 将以全新状态重新启动 worker。

阈值配置

可以通过修改 ~/.config/dask/distributed.yaml 文件来配置这些值

distributed:
  worker:
   # Fractions of worker process memory at which we take action to avoid memory
   # blowup. Set any of the values to False to turn off the behavior entirely.
    memory:
      target: 0.60     # fraction of managed memory where we start spilling to disk
      spill: 0.70      # fraction of process memory where we start spilling to disk
      pause: 0.80      # fraction of process memory at which we pause worker threads
      terminate: 0.95  # fraction of process memory at which we terminate the worker

使用仪表板监控内存使用

仪表板(通常在端口 8787 上可用)显示集群上的总体内存使用摘要,以及每个 worker 上的单独使用情况。它提供了不同的内存读数

进程

worker 进程使用的总体内存 (RSS),由操作系统测量

托管

存储在 worker 上的所有 Dask 数据(不包括溢出数据)的 sizeof 总和。

非托管

Dask 未直接感知的内存使用量。它通过从总进程内存中减去托管内存来估算,通常包括

  • Python 解释器代码、加载的模块和全局变量

  • 正在运行的任务临时使用的内存

  • 尚未进行垃圾回收的解引用 Python 对象

  • Python 内存分配器尚未通过 free 返回给 libc 的未使用内存

  • 用户空间 libc free 函数尚未释放回操作系统的未使用内存(见下面的内存分配器)

  • 内存碎片

  • 内存泄漏

非托管近期

过去 30 秒内出现的非托管内存。这不包含在上面的“非托管”内存度量中。理想情况下,这部分内存应主要由任务的堆使用量以及即将进行垃圾回收的对象引起的临时峰值。

非托管内存从“近期”状态过渡所需的时间可以通过修改 ~/.config/dask/distributed.yaml 文件中的 distributed.worker.memory.recent-to-old-time 键进行调整。如果您的任务通常运行时间超过 30 秒,建议您相应地增加此设置。

默认情况下,distributed.Client.rebalance()distributed.scheduler.Scheduler.rebalance()活动内存管理器 会忽略非托管的近期内存。此行为也可以通过 Dask 配置进行调整 - 请参阅特定组件的文档。

溢出

已溢出到磁盘的托管内存。这不包含在上面的“托管”度量中。此度量报告实际溢出到磁盘的字节数,这可能与 sizeof 的输出不同,特别是在压缩的情况下。

根据定义,托管内存 + 非托管内存 + 非托管近期内存的总和等于进程内存。

内存使用量的变化也会改变条形的颜色

蓝色

worker 正常运行

橙色

worker 可能正在将数据溢出到磁盘

红色

worker 已暂停或正在退出

灰色

已溢出到磁盘的数据;这不计入进程内存

内存未释放回操作系统

在许多情况下,worker 上的高非托管内存使用量或“内存泄漏”警告可能具有误导性:worker 可能实际上并没有使用其内存做任何事情,而只是尚未将未使用的内存返回给操作系统,并且保留着它以防万一再次需要内存容量。这既不是您的代码中的错误,也不是 Dask 中的错误——这实际上是 Linux 和 MacOS 上所有进程的正常行为,是低级内存分配器工作方式的结果(详情见下文)。

由于 Dask 根据操作系统报告的 worker 内存使用量做出决策(溢出到磁盘、暂停、终止、活动内存管理器rebalance()),并且不知道这些内存中实际有多少正在使用以及有多少是空的并被“囤积”起来,因此它可能会高估(有时是显著高估)进程正在使用的内存量,并认为 worker 内存不足,而实际上并非如此。

更详细地说:Linux 和 MacOS 内存分配器都尝试通过实现用户空间内存管理系统来避免应用程序每次调用 free 时都执行 brk 内核调用。在调用 free 后,内存可以保留在用户空间中并可能被下一个 malloc 重复使用——这反过来也不需要内核调用。这对于没有自己的内存分配器的 C/C++ 应用程序来说通常非常理想,因为它可以在以更大的内存占用为代价的情况下显著提升性能。然而,CPython 在其之上添加了自己的内存分配器,这减少了对这种额外抽象的需求(但有一些注意事项)。

您可以采取一些步骤来缓解 worker 内存未释放回操作系统的情况。这些步骤将在以下部分讨论。

手动修剪内存

仅限 Linux worker

可以通过以下方式强制释放已分配但未使用的内存

import ctypes

def trim_memory() -> int:
    libc = ctypes.CDLL("libc.so.6")
    return libc.malloc_trim(0)

client.run(trim_memory)

这仅应作为一次性调试实验使用。运行上述代码时观察仪表板。如果在调用 client.run(trim_memory) 后,非托管 worker 内存(在“存储字节”图上)显著减少,则继续下一节。否则,您很可能确实存在内存泄漏。

请注意,仅在使用默认 glibc 内存分配器时才应运行此 malloc_trim。使用自定义分配器(如 jemalloc(见下文))时,这可能会导致意外行为,包括段错误。(如果您不知道这意味着什么,您可能正在使用默认的 glibc 分配器,并且可以安全地运行此命令)。

自动修剪内存

仅限 Linux worker

要在生产环境中积极地自动修剪内存,您应该将环境变量 MALLOC_TRIM_THRESHOLD_(注意末尾的下划线)设置为 0 或较低的值;详细信息请参阅 mallopt 手册页。减小此值会增加系统调用次数,因此可能会降低性能。

注意

必须在启动 dask worker 进程之前设置此变量。

注意

如果使用 Nanny,则对于 nanny 监控的 worker 进程,MALLOC_TRIM_THRESHOLD_ 环境变量将自动设置为 65536。您可以使用 distributed.nanny.environ 配置值修改此行为。

jemalloc

Linux 和 MacOS worker

作为上述方法的替代,您可以尝试使用 jemalloc 内存分配器,如下所示

在 Linux 上

conda install jemalloc
LD_PRELOAD=$CONDA_PREFIX/lib/libjemalloc.so dask worker <...>

在 macOS 上

conda install jemalloc
DYLD_INSERT_LIBRARIES=$CONDA_PREFIX/lib/libjemalloc.dylib dask worker <...>

或者在 macOS 上,使用 homebrew 全局安装

brew install jemalloc
DYLD_INSERT_LIBRARIES=$(brew --prefix jemalloc)/lib/libjemalloc.dylib dask worker <...>

jemalloc 提供了丰富的配置设置;请参阅其文档。

忽略进程内存

如果所有其他方法都失败了,您可能希望停止 Dask 在决策中使用来自操作系统的内存指标 (RSS)

distributed:
  scheduler:
    active-memory-manager:
      measure: managed

  worker:
    memory:
      rebalance:
        measure: managed
      spill: false
      pause: false
      terminate: false

当然,如果您确实存在非托管内存问题(例如内存泄漏和/或严重的内存碎片),这将会带来问题。

用户自定义托管内存容器

警告

此功能仅适用于高级用户;内置的托管内存容器应能满足大多数需求。如果您希望将 CUDA 设备内存动态溢出到主机内存,应使用 dask-cuda

上述部分描述的设计将数据存储在 worker 的 RAM 中,当超过 targetspill 阈值时,数据会自动溢出到磁盘。如果需要不同的行为,可以在初始化 WorkerNanny 时传递 data= 参数。此可选参数接受以下任何值

  • MutableMapping[str, Any] 的实例

  • 返回 MutableMapping[str, Any] 的可调用对象

  • 一个元组,包含

    • 返回 MutableMapping[str, Any] 的可调用对象

    • 可调用对象的关键字参数字典

这样做会使 Worker 忽略 targetspill 阈值。但是,如果该对象除了支持 MutableMapping API 外,还支持以下鸭子类型 API,则 spill 阈值将保持激活状态

class distributed.spill.ManualEvictProto(*args, **kwargs)[source]

如果第三方替代 SpillBuffer 的类希望在超出 distributed.worker.memory.spill 阈值时支持溢出,则必须遵守(除了 MutableMapping)此鸭子类型 API。

这是公共 API。在撰写本文时,Dask-CUDA 在 ProxifyHostFile 类中实现了此协议。

evict() int[source]

手动将键/值对从快速内存逐出到慢速内存。返回逐出值在快速内存中的大小。

如果由于某种原因逐出失败,返回 -1。此方法必须保证导致问题的键/值对已保留在快速内存中,并且问题已在内部记录。

此方法永不抛出异常。

property fast: collections.abc.Sized | bool

访问快速内存。这通常是一个 MutableMapping,但对于手动逐出 API 的目的,它只用于测试是否为空,以便知道是否有任何东西可逐出。