客户端
目录
客户端¶
客户端 (Client) 是 dask.distributed
用户的主要入口点。
在 设置集群 后,我们通过指向调度器 (Scheduler) 的地址来初始化一个 Client
。
>>> from distributed import Client
>>> client = Client('127.0.0.1:8786')
有几种不同的方式可以通过客户端与集群交互
客户端满足大多数标准 concurrent.futures - PEP-3148 接口,包括
.submit
、.map
函数和Future
对象,可以直接立即提交任务。客户端将自己注册为默认的 Dask 调度器,因此会运行所有 Dask 集合,如 dask.array、dask.bag、dask.dataframe 和 dask.delayed。
客户端还提供了用于远程操作数据的附加方法。有关完整列表,请参阅完整的 API。
Concurrent.futures¶
我们可以使用 client.submit
方法提交单个函数调用,或使用 client.map
方法提交多个函数调用。
>>> def inc(x):
return x + 1
>>> x = client.submit(inc, 10)
>>> x
<Future - key: inc-e4853cffcc2f51909cdb69d16dacd1a5>
>>> L = client.map(inc, range(1000))
>>> L
[<Future - key: inc-e4853cffcc2f51909cdb69d16dacd1a5>,
<Future - key: inc-...>,
<Future - key: inc-...>,
<Future - key: inc-...>, ...]
这些结果驻留在分布式工作节点上。
我们使用 Future.result
方法(用于单个 future)或 client.gather
方法(用于多个 future)将结果收集回本地进程。
>>> x.result()
11
>>> client.gather(L)
[1, 2, 3, 4, 5, ...]
但是,一如既往,我们希望最大程度地减少将结果通信回本地进程。通常最好将数据留在集群上,并通过 submit
、map
、get
和 compute
等函数进行远程操作。有关分布式高效使用的更多信息,请参阅效率页面。
我们可以提交 future 上的任务,并使用 future 作为输入。函数将发送到存储这些 future 的机器,并在结果完成后在那里运行。
>>> y = client.submit(inc, x) # Submit on x, a Future
>>> total = client.submit(sum, L) # Submit on L, a list of Futures
>>> y.result()
12
Dask¶
父库 Dask 包含 dask.array、dask.dataframe、dask.bag 和 dask.delayed 等对象,它们会自动在大型数据集上生成并行算法。所有 Dask 集合都能与分布式调度器无缝协作。
当我们创建一个 Client
对象时,它会将自己注册为默认的 Dask 调度器。所有 .compute()
方法都会自动开始使用分布式系统。
client = Client('scheduler:8786')
my_dataframe.sum().compute() # Now uses the distributed system by default
在启动客户端时,我们可以通过使用 set_as_default=False
关键字参数来停止此行为。
Dask 的普通 .compute()
方法是*同步*的,这意味着它们会阻塞解释器直到完成。Dask.distributed 允许*异步*计算的新能力,我们可以触发计算在后台发生并保留在内存中,同时我们继续进行其他工作。这通常通过 Client.persist
和 Client.compute
方法处理,它们分别用于更大和更小的结果集。
>>> df = client.persist(df) # trigger all computations, keep df in memory
>>> type(df)
dask.DataFrame
有关更多信息,请参阅 管理计算 页面。
默认纯函数¶
默认情况下,distributed
假定所有函数都是纯函数。纯函数具备以下特性:
对于给定的输入集总是返回相同的输出。
没有副作用,例如修改全局状态或创建文件。
如果情况并非如此,您应该在 Client.map()
和 Client.submit()
等方法中使用 pure=False
关键字参数。
客户端将一个键关联到所有计算。这个键可以在 Future 对象上访问。
>>> from operator import add
>>> x = client.submit(add, 1, 2)
>>> x.key
'add-ebf39f96ad7174656f97097d658f3fa2'
这个键对于具有相同输入的计算和所有机器上的计算都应该相同。如果我们在任何具有相同环境的计算机上运行上述计算,那么我们应该获得完全相同的键。
调度器避免冗余计算。如果结果已经在内存中(来自之前的调用),则将使用该旧结果,而不是重新计算。在常见情况下,提交或 map 的调用是幂等的。
虽然方便,但对于非纯函数(如 random
),此功能可能不受欢迎。在这种情况下,两次调用同一函数并使用相同输入应该产生不同的结果。我们通过 pure=False
关键字参数来实现这一点。在这种情况下,键是随机生成的(通过 uuid4
)。
>>> import numpy as np
>>> client.submit(np.random.random, 1000, pure=False).key
'random_sample-fc814a39-ee00-42f3-8b6f-cac65bcb5556'
>>> client.submit(np.random.random, 1000, pure=False).key
'random_sample-a24e7220-a113-47f2-a030-72209439f093'
Async/await 操作¶
如果我们在异步环境中操作,则上面列出的阻塞函数将变为相应的异步函数。您必须使用 asynchronous=True
关键字启动客户端,并 yield
或 await
阻塞函数。
async def f():
client = await Client(asynchronous=True)
future = client.submit(func, *args)
result = await future
return result
如果您想在异步和同步环境中重用同一个客户端,可以在每次方法调用时应用 asynchronous=True
关键字。
client = Client() # normal blocking client
async def f():
futures = client.map(func, L)
results = await client.gather(futures, asynchronous=True)
return results
有关更多信息,请参阅 异步 文档。