活动内存管理器

活动内存管理器(Active Memory Manager,简称 AMM)是一个实验性守护进程,用于优化 Dask 集群中工作节点的内存使用。默认情况下它是启用的,但可以禁用或配置。有关详细信息,请参阅启用活动内存管理器

内存不平衡和重复

每当 Dask 任务返回数据时,它就会存储在执行该任务的工作节点上,只要它是其他任务的依赖项,或者通过 ClientFuture 引用,或者属于已发布数据集的一部分。

Dask 根据 CPU 占用率、工作节点资源和本地性等标准将任务分配给工作节点。在任务彼此不相关、计算时间相同、返回数据大小相同且没有资源约束的简单用例中,您会发现工作节点之间的内存占用完美平衡。然而,在所有其他用例中,随着计算的进行,可能会导致内存使用不平衡。

当任务在工作节点上运行并需要另一个工作节点的任务输出作为输入时,Dask 会透明地在工作节点之间传输数据,最终在不同工作节点上出现相同数据的多个副本。这通常是期望的行为,因为它避免了以后再次需要时重复传输数据。但是,这也会导致整个集群的总体内存使用量增加。

启用活动内存管理器

AMM 默认情况下是启用的。可以通过Dask 配置文件禁用或调整它

distributed:
  scheduler:
    active-memory-manager:
      start: true
      interval: 2s
      measure: optimistic

上述是推荐的设置,它将每两秒运行一次所有启用的 AMM 策略(参见下文)。或者,您也可以从Client手动启动/停止 AMM 或触发一次性迭代

>>> client.amm.start()  # Start running every 2 seconds
>>> client.amm.stop()  # Stop running periodically
>>> client.amm.running()
False
>>> client.amm.run_once()

策略

AMM 本身不会执行任何操作。用户必须启用策略,这些策略会就 Dask 数据提出建议。AMM 运行这些策略并执行它们的建议,前提是这些建议不会损害数据完整性。这些建议可以是两种类型

  • 将内存中的 Dask 任务数据从一个工作节点复制到另一个工作节点。这不应与任务依赖项引起的复制混淆。

  • 删除内存中任务的一个或多个副本。即使策略要求删除,AMM 也绝不会删除任务的最后一个副本。

没有“移动”操作。移动分两次进行:首先一个策略创建一个副本;在下一次 AMM 迭代中,相同或另一个策略删除原始副本(如果复制成功)。

除非策略对应影响哪些工作节点施加限制,否则 AMM 将自动优先在内存使用率最低的工作节点上创建副本,并优先从内存使用率最高的工作节点上删除副本。

单个策略可以通过 Dask 配置启用、禁用和配置

distributed:
  scheduler:
    active-memory-manager:
      start: true
      interval: 2s
      measure: optimistic
      policies:
      - class: distributed.active_memory_manager.ReduceReplicas
      - class: my_package.MyPolicy
        arg1: foo
        arg2: bar

有关像上面示例中那样的自定义策略,请参见下文。

默认的 Dask 配置文件包含一组合理的内置策略选择,这些策略通常是可取的。在调整单个策略之前,您应该首先在 Dask 配置中尝试仅设置 start: true,看看它是否适合您的目的。

内置策略

ReduceReplicas

class

distributed.active_memory_manager.ReduceReplicas

参数

None

此策略在默认的 Dask 配置中已启用。每当 Dask 任务在多个工作节点上复制,并且额外的副本似乎并未服务于正在进行的计算时,此策略会丢弃所有多余的副本。

注意

此策略与 replicate()scatter()broadcast=True 参数不兼容。如果您调用 replicate() 创建额外的副本,然后稍后运行此策略,它将删除除一个以外的所有副本(但不一定是新创建的副本)。

RetireWorker

class

distributed.active_memory_manager.RetireWorker

参数
addressstr

正在退役的工作节点的地址。

这是一个特殊策略,不应出现在 Dask 配置文件中。

它由 distributed.Client.retire_workers() 和自适应集群缩容时动态注入。此策略负责将正在退役的工作节点上独占内存的所有任务移动到不同的工作节点。一旦工作节点不再独占任何任务的数据,此策略就会自动从活动内存管理器中卸载自身,并且工作节点被关闭。

如果同时有多个工作节点正在退役,AMM 中将安装此策略的多个实例。

如果活动内存管理器在 Dask 配置中被禁用,则 distributed.Client.retire_workers() 和自适应缩容将启动一个临时管理器,将此策略安装到其中,然后在完成后将其关闭。

自定义策略

高级用户可以通过继承 ActiveMemoryManagerPolicy 类来编写自己的策略。该类应定义两个方法

__init__

自定义策略可以通过 __init__ 参数从 Dask 配置加载参数。如果您不需要配置,则无需实现此方法。

run

此方法不接受参数,由 AMM 每 2 秒(或 AMM 间隔设置的时间)调用一次。它必须 yield 零个或多个以下 Suggestion 命名元组

yield Suggestion("replicate", <TaskState>)

在尚未持有副本的内存使用率最低的工作节点上为目标任务创建一个副本。要创建多个副本,需要多次 yield 相同的命令。

yield Suggestion("replicate", <TaskState>, {<WorkerState>, <WorkerState>, ...})

在列出的候选项中,内存最低的工作节点上为目标任务创建一个副本。

yield Suggestion("drop", <TaskState>)

在整个集群中内存使用率最高的工作节点上删除目标任务的一个副本。

yield Suggestion("drop", <TaskState>, {<WorkerState>, <WorkerState>, ...})

在列出的候选项中,内存最高的工作节点上删除目标任务的一个副本。

AMM 将默默地拒绝不可接受的建议,例如

  • 删除任务的最后一个副本

  • 从不持有副本的工作节点子集中删除副本

  • 从当前计算需要此副本的工作节点上删除副本

  • 复制尚未加载到内存中的任务

  • 创建的任务副本数多于工作节点数

  • 在已经持有任务副本的工作节点上创建副本

  • 在暂停或正在退役的工作节点上创建副本

通常最好设计策略时尽量简单,让 AMM 通过忽略某些建议来处理上述边缘情况。

可选地,run 方法可以检索 AMM 刚刚选择的工作节点,如下所示

ws = (yield Suggestion("drop", ts))

run 方法可以访问以下属性

self.manager

策略附加到的 ActiveMemoryManagerExtension

self.manager.scheduler

将应用建议的 Scheduler。从那里您可以访问各种属性,例如 tasksworkers

self.manager.workers_memory

只读映射 {WorkerState: bytes}。bytes 是当前 AMM 迭代中所有策略接受的所有建议生效后,工作节点的预期内存使用量。请注意,如果您乐意始终分别在内存使用率最低和最高的工作节点上创建/删除副本,则无需访问此属性 - AMM 会为您处理。

self.manager.pending

只读映射 {TaskState: ({<WorkerState>, ...}, {<WorkerState>, ...})。第一个集合包含根据目前接受的建议将接收任务新副本的工作节点;第二个集合包含将失去副本的工作节点。

self.manager.policies

在 AMM 中注册的策略集合。策略可以如下所示注销自身

def run(self):
    self.manager.policies.drop(self)

示例

以下自定义策略确保键“foo”和“bar”始终在所有工作节点上复制。新工作节点连接到调度器后不久就会收到副本。如果目标键不在内存中或所有工作节点已持有副本,则策略将不执行任何操作。请注意,此示例与 ReduceReplicas 内置策略不兼容。

在 mymodule.py 中(它必须对调度器可访问)

from distributed.active_memory_manager import ActiveMemoryManagerPolicy, Suggestion


class EnsureBroadcast(ActiveMemoryManagerPolicy):
    def __init__(self, key):
        self.key = key

    def run(self):
        ts = self.manager.scheduler.tasks.get(self.key)
        if not ts:
            return
        for _ in range(len(self.manager.scheduler.workers) - len(ts.who_has)):
            yield Suggestion("replicate", ts)

请注意,该策略无需测试暂停的工作节点或其他策略也请求副本等边缘情况;AMM 会处理这些情况。理论上,您可以将最后两行重写如下(代价是一些浪费的 CPU 周期)

for _ in range(1000):
    yield Suggestion("replicate", ts)

在 distributed.yaml 中

distributed:
  scheduler:
    active-memory-manager:
      start: true
      interval: 2s
      policies:
      - class: mymodule.EnsureBroadcast
        key: foo
      - class: mymodule.EnsureBroadcast
        key: bar

或者,我们可以使用一个带有键列表的策略实例 - 上述设计仅说明您可以同时运行同一策略的多个实例。

API 参考

class distributed.active_memory_manager.ActiveMemoryManagerExtension(scheduler: distributed.scheduler.Scheduler, policies: set[distributed.active_memory_manager.ActiveMemoryManagerPolicy] | None = None, *, measure: str | None = None, register: bool = True, start: bool | None = None, interval: float | None = None)[源代码]

调度器扩展,用于优化整个集群的内存使用。它可以手动触发,也可以每隔几秒自动触发;在每次迭代中,它执行以下一项或两项操作

  • 创建内存中任务的新副本

  • 销毁内存中任务的副本;这绝不会销毁最后一个可用副本。

没有‘移动’操作。移动分两步执行:首先创建一个副本,然后在下一次迭代中删除原始副本(如果复制成功)。

此扩展通过 dask 配置的 distributed.scheduler.active-memory-manager 部分进行配置。

amm_handler(method: str) Any[源代码]

调度器处理器,由 Client 通过 AMMClientProxy 调用

interval: float

每隔这么多秒自动运行

measure: str

要使用的内存度量。必须是 distributed.scheduler.MemoryState 的属性或特性之一。

pending: dict[distributed.scheduler.TaskState, tuple[set[distributed.scheduler.WorkerState], set[distributed.scheduler.WorkerState]]]

每个任务的待处理复制和删除。此属性仅存在于 self.run() 的范围内。

policies: set[distributed.active_memory_manager.ActiveMemoryManagerPolicy]

所有活动策略

run_once() None[源代码]

运行所有策略一次并异步(即发即忘)执行其复制/丢弃任务的建议

property running: bool

如果 AMM 定期触发,则返回 True;否则返回 False

scheduler: distributed.scheduler.Scheduler

指向持有此扩展的调度器的后向引用

start() None[源代码]

每隔 self.interval 秒开始执行,直到调度器关闭

stop() None[源代码]

停止周期性执行

workers_memory: dict[distributed.scheduler.WorkerState, int]

每个工作节点上当前分配的内存(字节),加上/减去待处理操作。此属性仅存在于 self.run() 的范围内。

class distributed.active_memory_manager.ActiveMemoryManagerPolicy[源代码]

抽象父类

abstract run() collections.abc.Generator[distributed.active_memory_manager.Suggestion, Optional[distributed.scheduler.WorkerState], None][源代码]

此方法由 ActiveMemoryManager 每隔几秒调用一次,或在用户调用 client.amm.run_once 时调用。

它是一个迭代器,必须发出 Suggestion 对象

  • Suggestion("replicate", <TaskState>)

  • Suggestion("replicate", <TaskState>, {潜在接收副本的工作节点子集})

  • Suggeston("drop", <TaskState>)

  • Suggestion("drop", <TaskState>, {潜在丢弃副本的工作节点子集})

每次 yield 的元素表示创建或销毁键的单个副本的意图。如果未提供工作节点子集,则默认为集群上的所有工作节点。活动内存管理器或工作节点之后可能会决定忽略该请求,例如,因为它会删除键的最后一个副本,或者因为该键当前在该工作节点上被需要。

您可以选择性地检索决定将键复制到或从哪个工作节点删除,如下所示

choice = (yield Suggestion("replicate", ts))

choice 要么是 WorkerState,要么是 None;如果 ActiveMemoryManager 选择忽略请求,则返回 None。

可以在 self.manager.pending 上查看当前待处理(已接受)的建议;这包括此方法之前 yield 的建议。

可以在 self.manager.workers_memory 上查看每个工作节点上的当前内存使用量,这是所有待处理建议生效后的状态。

class distributed.active_memory_manager.Suggestion(op, ts, candidates)[源代码]
candidates: set[distributed.scheduler.WorkerState] | None

字段 2 的别名

op: Literal['replicate', 'drop']

字段 0 的别名

ts: distributed.scheduler.TaskState

字段 1 的别名

class distributed.active_memory_manager.AMMClientProxy(client: distributed.client.Client)[源代码]

从 dask 客户端操作 AMM 的便捷访问器

用法:client.amm.start() 等。

如果客户端是异步的,则所有方法都是异步的;如果客户端是同步的,则所有方法都是同步的。

run_once() Any[源代码]
running() Any[源代码]
start() Any[源代码]
stop() Any[源代码]
class distributed.active_memory_manager.ReduceReplicas[源代码]

确保内存中任务不会在超出需要的工作节点上复制;丢弃多余的副本。

class distributed.active_memory_manager.RetireWorker(address: str)[源代码]

将工作节点上所有唯一的内存中任务复制到其他地方,为工作节点关闭做准备。

在任何给定时间,AMM 可能注册了此策略的多个实例,每个正在退役的工作节点一个实例 - 这意味着大多数时候根本不会注册任何实例。因此,此策略不会出现在 dask 配置 (distributed.yaml) 中。实例由 distributed.Scheduler.retire_workers() 添加,并在工作节点退役后自动移除自身。如果 AMM 在 dask 配置中被禁用,retire_workers() 将启动一个临时的临时管理器。

失败条件

可能没有合适的工作节点来接收来自正在退役的工作节点的任务。这发生在两种用例中

  1. 这是集群中唯一的工作节点,或者

  2. 所有工作节点同时处于暂停状态或正在退役

在任何一种情况下,此策略都将无法移出所有键,并将 no_recipients 布尔值设置为 True。distributed.Scheduler.retire_workers() 将中止退役。

还有第三种用例,即任务因某种原因无法复制出去,例如接收方无响应但调度器尚不知道。在这种情况下,我们将等待下一次 AMM 迭代并再次尝试(可能使用不同的接收工作节点,例如,如果接收工作节点挂起但尚未被声明死亡)。

退役有溢出任务的工作节点

在第一次迭代中,此策略建议其他工作节点应获取正在退役的工作节点的所有唯一的内存中任务。通常,这意味着在接下来的几分钟内,退役工作节点将受到来自集群其余部分的 distributed.worker.Worker.get_data() 调用轰炸。如果工作节点的大部分托管内存已溢出,这可能会导致问题,因为它可能使工作节点超出其终止阈值。为了防止这种情况,采取了两项措施

  • 在每次迭代中,此策略会丢弃正在退役的工作节点上已在其他地方复制的所有任务。这为将更多任务从溢出文件中移出以复制到另一个工作节点腾出了空间。

  • 一旦工作节点超过 pause 阈值,get_data() 会将传出连接数限制为 1。

参数
address: str

待退役工作节点的 URI

done() bool[源代码]

如果可以安全关闭工作节点,则返回 True;否则返回 False