工作器资源
目录
工作器资源¶
对稀缺资源(如内存、GPU 或特殊硬件)的访问可能会限制在特定机器上运行某些任务的数量。
例如,我们可能有一个拥有十台计算机的集群,其中四台各有两个 GPU。我们可能有上千个任务,其中一百个需要一个 GPU,十个需要同时使用两个 GPU。在这种情况下,我们希望在考虑这些资源约束的同时在集群中平衡任务,将受 GPU 约束的任务分配给启用 GPU 的工作器。此外,我们需要确保限制在任何给定工作器上并行运行的 GPU 任务数量,以确保我们遵守提供的限制。
这种情况不仅发生在 GPU 上,也适用于许多资源,例如在运行时需要大量内存、特殊磁盘访问或访问特殊硬件的任务。Dask 允许您指定抽象的任意资源,以限制您的任务如何在您的工作器上运行。Dask 不会以任何特定方式建模这些资源(Dask 不知道什么是 GPU),并且由用户指定工作器上的资源可用性和任务的资源需求。
示例¶
我们考虑一个计算,它从许多文件加载数据,使用需要 GPU 的函数处理每个文件,然后使用一个需要 70GB 内存的任务聚合所有中间结果。
我们在一个三节点集群上操作,其中两台机器各有两个 GPU,一台机器有 100GB 内存。
当我们设置集群时,我们定义每个工作器的资源
dask worker scheduler:8786 --resources "GPU=2"
dask worker scheduler:8786 --resources "GPU=2"
dask worker scheduler:8786 --resources "MEMORY=100e9"
当我们向集群提交任务时,我们为每个任务指定约束
from distributed import Client
client = Client('scheduler:8786')
data = [client.submit(load, fn) for fn in filenames]
processed = [client.submit(process, d, resources={'GPU': 1}) for d in data]
final = client.submit(aggregate, processed, resources={'MEMORY': 70e9})
同样,我们可以使用 dask 注解机制指定资源约束
with dask.annotate(resources={'GPU': 1}):
processed = [client.submit(process, d) for d in data]
with dask.annotate(resources={'MEMORY': 70e9}):
final = client.submit(aggregate, processed)
指定资源¶
可以通过多种方式指定资源。最简单的方式取决于您集群的创建方式。
从命令行
如上所示,可以在启动工作进程时提供资源
dask worker scheduler:8786 --resources "GPU=2"
键用作资源名称,值解析为数值。
从 Dask 的配置系统
或者,可以使用 Dask 的配置系统指定资源。
from distributed import LocalCluster
with dask.config.set({"distributed.worker.resources.GPU": 2}):
cluster = LocalCluster()
配置需要设置在实际生成工作器的进程中。通过将资源指定为环境变量(下一节所示)来实现此目的可能最容易。
从环境变量
与其他任何 Dask 配置值一样,可以在启动进程之前将资源指定为环境变量。使用 Bash 语法如下
$ DASK_DISTRIBUTED__WORKER__RESOURCES__GPU=2 dask worker
...
如果您无法向 distributed.Worker
类传递选项,这可能是最简单的解决方案。
资源独立应用于每个工作进程¶
如果您使用 dask worker --nworkers <nworkers>
,资源将独立应用于每个 nworkers
工作进程。假设您的机器上有 2 个 GPU,如果您想使用两个工作进程,那么每个工作进程有 1 个 GPU,因此您需要这样做
dask worker scheduler:8786 --nworkers 2 --resources "GPU=1"
这是一个示例,说明如何使用资源来确保每个任务都在一个单独的进程内运行,这对于执行非线程安全的任务或内部使用多线程的任务非常有用
dask worker scheduler:8786 --nworkers 3 --nthreads 2 --resources "process=1"
使用下面的代码,最多会有 3 个任务并行运行,并且每个任务都将在一个单独的进程中运行
from distributed import Client
client = Client('scheduler:8786')
futures = [client.submit(non_thread_safe_function, arg,
resources={'process': 1}) for arg in args]
资源是抽象的¶
以此方式列出的资源只是抽象的数量。我们在上面同样可以使用术语“mem”、“memory”、“bytes”等,因为从 Dask 的角度来看,这只是一个抽象的术语。您可以选择任何术语,只要在工作器和客户端之间保持一致即可。
值得注意的是,Dask 单独跟踪核心数量和可用内存作为实际资源,并在正常调度操作中使用它们。
集合(collections)与资源¶
您也可以将资源与 Dask 集合(如数组和延迟对象)一起使用。您可以使用 dask 注解机制为集合上的操作添加注解,指定执行计算所需的特定资源。
# Read note below!
dask.config.set({"optimization.fuse.active": False})
x = da.read_zarr(...)
with dask.annotate(resources={'GPU': 1}):
y = x.map_blocks(func1)
z = y.map_blocks(func2)
z.compute()
注意
目前,此功能仅在 with dask.annotate(...):
包装 compute() 或 persist() 调用时支持数据帧;在这种情况下,注解应用于整个计算图,从(但不包括)任何先前持久化的集合开始。
对于其他集合,如数组和延迟对象,注解可能在优化阶段丢失。为避免此问题,您必须设置
>>> dask.config.set({"optimization.fuse.active": False})
或者在 dask.yaml 中
optimization:
fuse:
active: false
一个可能的解决方法(也适用于数据帧)是执行对 persist() 的中间调用。但请注意,这会显著影响优化并降低整体性能。
x = dd.read_parquet(...)
with dask.annotate(resources={'GPU': 1}):
y = x.map_partitions(func1).persist()
z = y.map_partitions(func2)
del y # Release distributed memory for y as soon as possible
z.compute()