效率

并行计算做得好是响应迅速且有益的。然而,可能会遇到一些障碍。本节描述了确保性能的常见方法。

将数据留在集群上

尽可能长时间地等待在本地收集数据。如果你想对集群上的大数据块进行查询,通常将一个函数提交到该数据上比将数据下载到你的本地计算机上要快得多。

例如,如果我们在集群上有一个 numpy 数组,并且想知道它的形状,我们可以选择以下任一选项

  1. 慢: 将 numpy 数组收集到本地进程,访问 .shape 属性

  2. 快: 将 lambda 函数发送到集群以计算形状

>>> x = client.submit(np.random.random, (1000, 1000))
>>> type(x)
Future

>>> x.result().shape  # Slow from lots of data transfer
(1000, 1000)

>>> client.submit(lambda a: a.shape, x).result()  # fast
(1000, 1000)

使用更大的任务

调度器为每个任务或 Future 对象增加大约 一毫秒 的开销。虽然听起来很快,但如果你运行十亿个任务,这将非常慢。如果你的函数运行速度快于 100 毫秒左右,那么使用分布式计算可能不会看到任何加速。

一个常见的解决方案是将输入分批处理成更大的块。

>>> futures = client.map(f, seq)
>>> len(futures)  # avoid large numbers of futures
1000000000

>>> def f_many(chunk):
...     return [f(x) for x in chunk]

>>> from tlz import partition_all
>>> chunks = partition_all(1000000, seq)  # Collect into groups of size 1000

>>> futures = client.map(f_many, chunks)
>>> len(futures)  # Compute on larger pieces of your data at once
1000

调整线程与进程

默认情况下,单个 Worker 使用计算节点拥有的核心数并行运行多个计算。在使用纯 Python 函数时,这可能不是最优的,你可能反而希望在每个节点上运行多个独立的工作进程,每个进程使用一个线程。在配置集群时,你可能需要使用 dask worker 可执行文件中的选项,如下所示

$ dask worker ip:port --nworkers 8 --nthreads 1

请注意,如果你主要使用 NumPy、Pandas、SciPy、Scikit Learn、Numba 或其他 C/Fortran/LLVM/Cython 加速库,那么这不会是你的问题。你的代码可能已经针对多线程使用进行了优化。

不要使用分布式

考虑使用 daskconcurrent.futures 模块,它们与分布式有类似的 API,但在单台机器上运行。可能你的问题在笔记本电脑或大型工作站上已经表现得足够好。

考虑通过并行之外的其他方式加速你的代码。更好的算法、数据结构、存储格式,或者只是一点点 C/Fortran/Numba 代码可能就足以让你获得所需的 10 倍速度提升。并行和分布式计算是加速应用程序的昂贵方式。