数据本地性

数据移动通常不必要地限制了性能。

对于分析计算来说尤其如此。Dask.distributed 会尽可能地最小化数据移动,并在必要时允许用户进行控制。本文档描述了围绕数据本地性的当前调度策略和用户 API。

当前策略

任务提交

在常见情况下,分布式系统会在已经持有依赖数据的工作节点上运行任务。如果您的任务 f(x) 需要数据 x,那么该任务很可能将在已持有 x 的工作节点上运行。

如果一个任务需要分散在多个工作节点上的数据,那么调度器会选择将任务运行在需要最少数据传输的工作节点上。每个数据元素的大小由工作节点使用 sys.getsizeof 函数测量,该函数依赖于在大多数相关 Python 对象上普遍可用的 __sizeof__ 协议。

数据分散

当用户将数据从本地进程分散到分布式网络时,这些数据会以轮询(round-robin)方式按核数分组分发。例如,如果我们有两个工作节点 AliceBob,每个都有两个核,并且我们按如下方式分散列表 range(10)

futures = client.scatter(range(10))

那么 Alice 和 Bob 将收到以下数据

  • Alice: [0, 1, 4, 5, 8, 9]

  • Bob: [2, 3, 6, 7]

用户控制

复杂的算法可能需要更多的用户控制。

例如,特殊硬件(如 GPU 或数据库连接)的存在可能会限制某个特定任务的有效工作节点集合。

在这些情况下,使用 workers= 关键字参数给 submitmapscatter 函数,提供主机名、IP 地址或别名,如下所示

future = client.submit(func, *args, workers=['Alice'])
  • Alice: [0, 1, 4, 5, 8, 9, new_result]

  • Bob: [2, 3, 6, 7]

所需数据将始终移动到这些工作节点,即使数据量很大。如果此限制仅是偏好而非严格要求,则添加 allow_other_workers 关键字参数,表示在极端情况下,例如没有有效工作节点时,可以使用其他工作节点。

future = client.submit(func, *args, workers=['Alice'],
                       allow_other_workers=True)

此外,scatter 函数支持 broadcast= 关键字参数,强制将所有数据发送到所有工作节点,而不是轮询分发。如果有新的工作节点加入,它们不会自动接收这些数据。

futures = client.scatter([1, 2, 3], broadcast=True)  # send data to all workers
  • Alice: [1, 2, 3]

  • Bob: [1, 2, 3]

workers= 的有效参数包括以下内容

  • 单个 IP 地址、IP/端口对或主机名,如下所示

    192.168.1.100, 192.168.1.100:8989, alice, alice:8989
    
  • 上述列表或集合

    ['alice'], ['192.168.1.100', '192.168.1.101:9999']
    

如果仅提供主机名或 IP,则该机器上的任何工作节点都将被视为有效。此外,您可以在创建工作节点时为其提供别名。

$ dask worker scheduler_address:8786 --name worker_1

然后在指定工作节点时使用此名称。

client.map(func, sequence, workers='worker_1')

使用 Compute/Persist 指定工作节点

scattersubmitmap 中的 workers= 关键字参数相当直接,接受工作节点主机名、主机:端口对或它们的序列作为有效输入。

client.submit(f, x, workers='127.0.0.1')
client.submit(f, x, workers='127.0.0.1:55852')
client.submit(f, x, workers=['192.168.1.101', '192.168.1.100'])

对于更复杂的计算,例如使用 dask 集合(如 dask.dataframe 或 dask.delayed)时发生的计算,我们有时需要指定计算的某些部分在特定工作节点上运行,而其他部分在其他工作节点上运行。

x = delayed(f)(1)
y = delayed(f)(2)
z = delayed(g)(x, y)

future = client.compute(z, workers={z: '127.0.0.1',
                                    x: '192.168.0.1'})

这里字典的值与之前形式相同:主机、主机:端口对或这些的列表。这里的键可以是 dask 集合或 dask 集合的元组。这些集合的所有最终键都将在指定的机器上运行;依赖项可以在任何地方运行,除非它们也列在 workers= 中。我们将通过一系列示例来探讨这一点。

计算 z = f(x, y) 在主机 127.0.0.1 上运行。计算 xy 的另外两个计算可以在任何地方运行。

future = client.compute(z, workers={z: '127.0.0.1'})

计算 zx 都必须在 127.0.0.1 上运行。

future = client.compute(z, workers={z: '127.0.0.1',
                                    x: '127.0.0.1'})

使用元组对集合进行分组。这是上述方法的简写。

future = client.compute(z, workers={(x, y): '127.0.0.1'})

回想一下,scatter/submit/mapworkers= 的所有选项在这里也适用。

future = client.compute(z, workers={(x, y): ['192.168.1.100', '192.168.1.101:9999']})

allow_other_workers=True 设置为 True 使这些成为宽松限制而非硬性要求。

future = client.compute(z, workers={(x, y): '127.0.0.1'},
                        allow_other_workers=True)

提供一个集合给 allow_other_workers=[...],表示只有某些集合的键是宽松的。在下面的示例中,z *必须*在 127.0.0.1 上运行,而 x *应该*在 127.0.0.1 上运行,但在必要时可以在其他地方运行。

future = client.compute(z, workers={(x, y): '127.0.0.1'},
                        allow_other_workers=[x])

这对于 persist 以及任何 dask 集合(任何具有 .__dask_graph__() 方法的对象)都适用。

df = dd.read_csv('s3://...')
df = client.persist(df, workers={df: ...})

请参阅效率页面了解最佳实践。