管理内存

Dask.distributed 将任务结果存储在工作节点的分布式内存中。中心调度器跟踪集群上的所有数据,并确定何时释放数据。已完成的结果通常会尽快从内存中清除,以便为更多计算腾出空间。如果满足以下任一条件,任务的结果将保留在内存中

  1. 客户端持有指向此任务的 Future 对象。数据应保留在 RAM 中,以便客户端按需收集数据。

  2. 此任务是正在进行的计算所必需的,这些计算旨在生成由 Future 对象指向的最终结果。一旦没有正在进行的任务需要它们,这些任务将被移除。

当用户持有 Future 对象或持久化集合(其 Dask 图中包含许多此类 Future 对象,通常可通过其 .dask 属性访问)时,他们将这些结果固定到活动内存中。当用户从其本地 Python 进程中删除 Future 对象或集合时,调度器会从分布式 RAM 中删除相关数据。由于这种关系,分布式内存反映了本地内存的状态。用户可以通过删除本地会话中的持久化集合来释放集群上的分布式内存。

创建 Future 对象

以下函数生成 Future 对象

Client.submit(func, *args[, key, workers, ...])

向调度器提交函数应用

Client.map(func, *iterables[, key, workers, ...])

将函数应用于参数序列

Client.compute(collections[, sync, ...])

在集群上计算 dask 集合

Client.persist(collections[, ...])

在集群上持久化 dask 集合

Client.scatter(data[, workers, broadcast, ...])

将数据分散到分布式内存中

submitmap 方法处理原始 Python 函数。computepersist 方法处理 Dask 集合,如数组、bag、延迟值和 Dataframe。scatter 方法直接从本地进程发送数据。

持久化集合

调用 Client.computeClient.persist 会向集群提交任务图,并返回指向特定输出任务的 Future 对象。

Compute 对每个输入返回一个 Future 对象;persist 返回集合的副本,其中每个块或分区都由一个 Future 对象替换。简而言之,当您想将少量结果作为单个 Future 对象时,使用 compute;当您想在集群上保留完整的集合时,使用 persist

Persist 更常见,并且经常按如下方式与集合一起使用

>>> # Construct dataframe; no work happens
>>> df = dd.read_csv(...)
>>> df = df[df.x > 0]
>>> df = df.assign(z = df.x + df.y)

>>> # Pin data in distributed RAM; this triggers computation
>>> df = client.persist(df)

>>> # continue operating on df

Spark 用户请注意:这与您习惯的不同。Persist 是一个即时操作。但是,由于计算在后台进行,您将立即获得控制权。

在这个例子中,我们通过解析 CSV 数据、过滤行,然后添加新列来构建计算。到目前为止,所有工作都是惰性的;我们只是在 df 对象中构建了一个以图的形式执行工作的“食谱”。

当我们调用 df = client.persist(df) 时,我们将图从 df 对象中“切断”,将其发送给调度器,收到 Future 对象作为返回,并创建一个新的 dataframe,其图非常浅,直接指向这些 Future 对象。这或多或少是即时发生的(取决于序列化和发送图所需的时间),我们可以在集群在后台评估图时继续处理我们的新 df 对象。

与 dask.compute 的区别

如果将 Client 设置为默认调度器,则 dask.computedask.persist 以及所有 dask 集合的 .compute.persist 方法将在底层调用 Client.computeClient.persist,除非明确指定了不同的调度器。除非用户明确传递 set_as_default=False 参数,否则创建新的 Client 时会默认发生这种情况。

然而,存在一个区别:client.compute(df) 操作是异步的,因此与传统的 df.compute() 方法或 dask.compute 函数不同,后者会阻塞直到结果可用,不会在集群上持久化任何数据,并将整个结果带回本地机器,因此不建议在大型数据集上使用它们,但对于较小的结果来说非常方便,特别是由于它们以大多数其他工具期望的方式返回具体结果。

换句话说,df.compute() 等同于 client.compute(df).result()

通常我们使用像 client.persist 这样的异步方法来设置大型集合,然后使用 df.compute() 进行快速分析。

>>> # df.compute()  # This is bad and would likely flood local memory
>>> df = client.persist(df)    # This is good and asynchronously pins df
>>> df.x.sum().compute()  # This is good because the result is small
>>> future = client.compute(df.x.sum())  # This is also good but less intuitive

清除数据

我们通过从本地进程中移除集合来从分布式 RAM 中移除数据。一旦所有指向该数据的 Future 对象从所有客户端机器中移除,远程数据也会被移除。

>>> del df  # Deleting local data often deletes remote data

如果这是唯一一份副本,那么很可能会触发集群也删除该数据。

但是,如果我们有多个副本或基于此的其他集合,那么我们就必须将它们全部删除。

>>> df2 = df[df.x < 10]
>>> del df  # would not delete data, because df2 still tracks the futures

强制清除数据

要彻底移除一个计算以及所有依赖于它的计算,您总是可以 cancel Future 对象/集合。

>>> client.cancel(df)  # kills df, df2, and every other dependent computation

或者,如果您想要一个干净的环境,可以重新启动集群。这会清除所有状态并对所有工作进程进行硬重启。这通常在几秒钟内完成。

>>> client.restart()

客户端引用

只要至少有一个 Client 持有对它们的引用,Future 对象就会存在于集群上。当最后一个持有引用的 Client 关闭或崩溃时,则仅由其引用的所有内容都将从集群中移除。这通常是理想的,可以防止不干净的客户端关闭污染长时间运行的集群的内存。

为了提高弹性或仅仅是为了能够在计算通宵运行时关闭笔记本电脑,可以阻止这种行为。请参阅

distributed.Client.publish_dataset(*args, ...)

向调度器发布命名数据集

distributed.fire_and_forget(obj)

至少运行一次任务,即使我们释放了 Future 对象

弹性

除非其他工作节点上的计算需要,否则结果不会被有意复制。弹性通过重新计算来实现,即维护任何结果的来源。如果一个工作节点宕机,调度器能够重新计算其所有结果。任何所需 Future 对象的完整图都会一直维护,直到不再存在对该 Future 对象的引用。

更多信息请参阅 弹性

高级技巧

最初,任务的结果不会被有意复制,而仅保留在最初计算或分散它的节点上。但是,如果另一个任务需要该结果,并且该任务打算由不同的工作节点运行,则结果可能会在正常计算过程中复制到另一个工作节点。这种情况发生在任务需要在不同机器上的两部分数据时(至少需要移动一部分),或者通过工作窃取。在这些情况下,策略是让第二台机器维护其冗余的数据副本。这有助于自然地传播需求量大的数据。

然而,高级用户可能希望更直接地控制整个集群中数据的位置、复制和平衡。他们可能事先知道某些数据应该在整个网络中广播,或者他们的数据已经变得特别不平衡,或者他们希望某些数据保留在网络的特定部分。这些考虑通常不是必需的。

Client.rebalance([futures, workers])

重新平衡网络内的数据

Client.replicate(futures[, n, workers, ...])

设置网络内 Future 对象的复制数量

Client.scatter(data[, workers, broadcast, ...])

将数据分散到分布式内存中

工作节点内存管理

可以通过配置工作节点侧的 工作节点内存管理 来优化内存使用。