异步操作
目录
异步操作¶
Dask 可以完全异步运行,因此可以与其他高并发应用程序互操作。Dask 内部构建于 Tornado 协程之上,但也提供了对 asyncio 的兼容层(参见下文)。
基本操作¶
启动客户端时,提供 asynchronous=True
关键字来告诉 Dask 您打算在异步上下文中使用此客户端,例如使用 async/await
语法定义的函数。
async def f():
client = await Client(asynchronous=True)
以前会阻塞的操作现在提供了 Tornado 协程,您可以在其上 await
。
只提交工作的快速函数仍然很快,不需要等待。这包括所有向集群提交工作的函数,例如 submit
、map
、compute
和 persist
。
future = client.submit(lambda x: x + 1, 10)
您可以直接 await 期物(futures)
result = await future
>>> print(result)
11
或者您可以使用正常的客户端方法。任何等待直到从调度器收到信息的操作现在都应该 await
。
result = await client.gather(future)
如果您想在同步 Client
(未带 asynchronous=True
关键字创建的)中使用异步函数,那么您可以在每个方法调用中应用 asynchronous=True
关键字,并使用 Client.sync
函数来运行异步函数。
from dask.distributed import Client
client = Client() # normal blocking client
async def f():
future = client.submit(lambda x: x + 1, 10)
result = await client.gather(future, asynchronous=True)
return result
client.sync(f)
async with Client(asynchronous=True) as client:
arr = da.random.random((1000, 1000), chunks=(1000, 100))
await client.compute(arr.mean())
示例¶
这个独立的示例启动一个异步客户端,提交一个简单的作业,等待结果,然后关闭客户端。您可以看到 Asyncio 和 Tornado 的实现。
使用 Tornado 或 Asyncio 的 Python 3¶
from dask.distributed import Client
async def f():
client = await Client(asynchronous=True)
future = client.submit(lambda x: x + 1, 10)
result = await future
await client.close()
return result
# Either use Tornado
from tornado.ioloop import IOLoop
IOLoop().run_sync(f)
# Or use asyncio
import asyncio
asyncio.get_event_loop().run_until_complete(f())
用例¶
在历史上,这被用于几种类型的应用程序中
将 Dask 集成到其他异步服务中(例如 Web 后端),提供一个类似于 Celery 的计算引擎,同时仍然保持高度并发且避免不必要的阻塞。
用于状态变化或更新非常迅速的计算,这在某些高级机器学习工作负载中很常见。
用于开发 Dask 分布式基础设施的内部,其完全以这种风格编写。
用于高级应用程序中复杂的控制和数据结构。