数据本地性
目录
数据本地性¶
数据移动通常不必要地限制了性能。
对于分析计算来说尤其如此。Dask.distributed 会尽可能地最小化数据移动,并在必要时允许用户进行控制。本文档描述了围绕数据本地性的当前调度策略和用户 API。
当前策略¶
任务提交¶
在常见情况下,分布式系统会在已经持有依赖数据的工作节点上运行任务。如果您的任务 f(x)
需要数据 x
,那么该任务很可能将在已持有 x
的工作节点上运行。
如果一个任务需要分散在多个工作节点上的数据,那么调度器会选择将任务运行在需要最少数据传输的工作节点上。每个数据元素的大小由工作节点使用 sys.getsizeof
函数测量,该函数依赖于在大多数相关 Python 对象上普遍可用的 __sizeof__
协议。
数据分散¶
当用户将数据从本地进程分散到分布式网络时,这些数据会以轮询(round-robin)方式按核数分组分发。例如,如果我们有两个工作节点 Alice
和 Bob
,每个都有两个核,并且我们按如下方式分散列表 range(10)
futures = client.scatter(range(10))
那么 Alice 和 Bob 将收到以下数据
Alice:
[0, 1, 4, 5, 8, 9]
Bob:
[2, 3, 6, 7]
用户控制¶
复杂的算法可能需要更多的用户控制。
例如,特殊硬件(如 GPU 或数据库连接)的存在可能会限制某个特定任务的有效工作节点集合。
在这些情况下,使用 workers=
关键字参数给 submit
、map
或 scatter
函数,提供主机名、IP 地址或别名,如下所示
future = client.submit(func, *args, workers=['Alice'])
Alice:
[0, 1, 4, 5, 8, 9, new_result]
Bob:
[2, 3, 6, 7]
所需数据将始终移动到这些工作节点,即使数据量很大。如果此限制仅是偏好而非严格要求,则添加 allow_other_workers
关键字参数,表示在极端情况下,例如没有有效工作节点时,可以使用其他工作节点。
future = client.submit(func, *args, workers=['Alice'],
allow_other_workers=True)
此外,scatter
函数支持 broadcast=
关键字参数,强制将所有数据发送到所有工作节点,而不是轮询分发。如果有新的工作节点加入,它们不会自动接收这些数据。
futures = client.scatter([1, 2, 3], broadcast=True) # send data to all workers
Alice:
[1, 2, 3]
Bob:
[1, 2, 3]
workers=
的有效参数包括以下内容
单个 IP 地址、IP/端口对或主机名,如下所示
192.168.1.100, 192.168.1.100:8989, alice, alice:8989
上述列表或集合
['alice'], ['192.168.1.100', '192.168.1.101:9999']
如果仅提供主机名或 IP,则该机器上的任何工作节点都将被视为有效。此外,您可以在创建工作节点时为其提供别名。
$ dask worker scheduler_address:8786 --name worker_1
然后在指定工作节点时使用此名称。
client.map(func, sequence, workers='worker_1')
使用 Compute/Persist 指定工作节点¶
在 scatter
、submit
和 map
中的 workers=
关键字参数相当直接,接受工作节点主机名、主机:端口对或它们的序列作为有效输入。
client.submit(f, x, workers='127.0.0.1')
client.submit(f, x, workers='127.0.0.1:55852')
client.submit(f, x, workers=['192.168.1.101', '192.168.1.100'])
对于更复杂的计算,例如使用 dask 集合(如 dask.dataframe 或 dask.delayed)时发生的计算,我们有时需要指定计算的某些部分在特定工作节点上运行,而其他部分在其他工作节点上运行。
x = delayed(f)(1)
y = delayed(f)(2)
z = delayed(g)(x, y)
future = client.compute(z, workers={z: '127.0.0.1',
x: '192.168.0.1'})
这里字典的值与之前形式相同:主机、主机:端口对或这些的列表。这里的键可以是 dask 集合或 dask 集合的元组。这些集合的所有最终键都将在指定的机器上运行;依赖项可以在任何地方运行,除非它们也列在 workers=
中。我们将通过一系列示例来探讨这一点。
计算 z = f(x, y)
在主机 127.0.0.1
上运行。计算 x
和 y
的另外两个计算可以在任何地方运行。
future = client.compute(z, workers={z: '127.0.0.1'})
计算 z
和 x
都必须在 127.0.0.1
上运行。
future = client.compute(z, workers={z: '127.0.0.1',
x: '127.0.0.1'})
使用元组对集合进行分组。这是上述方法的简写。
future = client.compute(z, workers={(x, y): '127.0.0.1'})
回想一下,scatter/submit/map
中 workers=
的所有选项在这里也适用。
future = client.compute(z, workers={(x, y): ['192.168.1.100', '192.168.1.101:9999']})
将 allow_other_workers=True
设置为 True 使这些成为宽松限制而非硬性要求。
future = client.compute(z, workers={(x, y): '127.0.0.1'},
allow_other_workers=True)
提供一个集合给 allow_other_workers=[...]
,表示只有某些集合的键是宽松的。在下面的示例中,z
*必须*在 127.0.0.1
上运行,而 x
*应该*在 127.0.0.1
上运行,但在必要时可以在其他地方运行。
future = client.compute(z, workers={(x, y): '127.0.0.1'},
allow_other_workers=[x])
这对于 persist
以及任何 dask 集合(任何具有 .__dask_graph__()
方法的对象)都适用。
df = dd.read_csv('s3://...')
df = client.persist(df, workers={df: ...})
请参阅效率页面了解最佳实践。