管理计算

Dask.distributed 中的数据和计算总是处于以下三种状态之一

  1. 本地内存中的具体值。例如,本地进程中的整数 1 或 numpy 数组。

  2. Dask 图中的惰性计算,可能存储在 dask.delayeddask.dataframe 对象中。

  3. 正在运行的计算或远程数据,由指向当前进行中计算的 Future 对象表示。

这三种形式都很重要,并且存在可以在这三种状态之间进行转换的函数。

Dask 集合到具体值

您可以通过调用 .compute() 方法或 dask.compute(...) 函数将任何 Dask 集合转换为具体值。此函数将阻塞直到计算完成,直接从惰性 Dask 集合转换为本地内存中的具体值。

这种方法最为熟悉和直接,特别是对于有标准单机 Dask 经验或仅有普通编程经验的人来说。当您内存中已经有数据并希望将小型快速结果直接获取到本地进程时,这种方法非常棒。

>>> df = dd.read_csv('s3://...')
>>> df.value.sum().compute()
100000000

然而,如果您尝试将整个数据集带回本地 RAM,这种方法通常会失效

>>> df.compute()
MemoryError(...)

它还会强制您等待计算完成后才将解释器的控制权交还给您。

Dask 集合到 Future

您可以使用 client.computeclient.persist 方法将惰性 Dask 图异步提交到集群上运行。这些函数会立即返回 Future 对象。然后可以查询这些 Future 以确定计算的状态。

client.compute

.compute 方法接受一个集合并返回一个单独的 Future。

>>> df = dd.read_csv('s3://...')
>>> total = client.compute(df.sum())  # Return a single future
>>> total
Future(..., status='pending')

>>> total.result()               # Block until finished
100000000

因为这是一个单独的 Future,结果必须适合单个工作节点机器。与上面的 dask.compute 一样,client.compute 方法仅适用于结果较小且应能放入内存的情况。以下操作可能会失败

>>> future = client.compute(df)       # Blows up memory

相反,您应该使用 client.persist

client.persist

.persist 方法将 Dask 集合背后的任务图提交给调度器,为所有顶层任务获取 Future(例如,Dask DataFrame 中每个 Pandas DataFrame 的一个 Future)。然后它返回一个指向这些 Future 而不是先前图的集合的副本。这个新集合在语义上是等效的,但现在指向活动运行的数据而不是惰性图。如果您查看集合中的 Dask 图,您将直接看到 Future 对象

>>> df = dd.read_csv('s3://...')
>>> df.dask                          # Recipe to compute df in chunks
{('read', 0): (load_s3_bytes, ...),
 ('parse', 0): (pd.read_csv, ('read', 0)),
 ('read', 1): (load_s3_bytes, ...),
 ('parse', 1): (pd.read_csv, ('read', 1)),
 ...
}

>>> df = client.persist(df)               # Start computation
>>> df.dask                          # Now points to running futures
{('parse', 0): Future(..., status='finished'),
 ('parse', 1): Future(..., status='pending'),
 ...
}

该集合会立即返回,计算在集群的后台进行。最终,此集合的所有 Future 都将完成,此时对此集合的进一步查询可能会非常快。

通常的工作流程是使用 dask.dataframedask.delayed 等工具定义计算,直到您获得一个很好的数据集,然后将该集合持久化到集群,然后对结果集合执行许多快速查询。

具体值到 Future

我们通过几种不同的方式获取 Future。一种是上述机制,通过将 Future 包装在 Dask 集合中。另一种是通过使用 client.scatterclient.submitclient.map 将数据或任务直接提交到集群。

futures = client.scatter(args)                        # Send data
future = client.submit(function, *args, **kwargs)     # Send single task
futures = client.map(function, sequence, **kwargs)    # Send many tasks

在这种情况下,*args**kwargs 可以是普通的 Python 对象,如 1'hello',或者如果您想通过依赖关系将任务链接在一起,它们也可以是其他 Future 对象。

与像 dask.delayed 这样的 Dask 集合不同,这些任务提交会立即发生。concurrent.futures 接口与 dask.delayed 非常相似,只是执行是立即的而不是惰性的。

Future 到具体值

您可以通过调用 Future.result() 方法将单个 Future 转换为本地进程中的具体值。您可以通过调用 client.gather 方法将 Future 集合转换为具体值。

>>> future.result()
1

>>> client.gather(futures)
[1, 2, 3, 4, ...]

Future 到 Dask 集合

如“集合到 Future”部分所示,Dask 图中通常包含当前正在计算的 Future 对象。这使我们能够在当前正在运行的计算之上构建进一步的计算。这通常在使用 dask.delayed 工作流进行自定义计算时完成。

>>> x = delayed(sum)(futures)
>>> y = delayed(product)(futures)
>>> future = client.compute(x + y)

混合这两种形式允许您分阶段构建和提交计算,例如 sum(...) + product(...)。如果您想在确定如何继续之前等待查看计算某些部分的值,这通常很有价值。一次提交许多计算可以让调度器在确定运行什么时稍微更智能。

如果您对本页面感兴趣,那么您可能还会想查看关于 管理内存 的文档页面