异步操作

Dask 可以完全异步运行,因此可以与其他高并发应用程序互操作。Dask 内部构建于 Tornado 协程之上,但也提供了对 asyncio 的兼容层(参见下文)。

基本操作

启动客户端时,提供 asynchronous=True 关键字来告诉 Dask 您打算在异步上下文中使用此客户端,例如使用 async/await 语法定义的函数。

async def f():
    client = await Client(asynchronous=True)

以前会阻塞的操作现在提供了 Tornado 协程,您可以在其上 await

只提交工作的快速函数仍然很快,不需要等待。这包括所有向集群提交工作的函数,例如 submitmapcomputepersist

future = client.submit(lambda x: x + 1, 10)

您可以直接 await 期物(futures)

result = await future

>>> print(result)
11

或者您可以使用正常的客户端方法。任何等待直到从调度器收到信息的操作现在都应该 await

result = await client.gather(future)

如果您想在同步 Client(未带 asynchronous=True 关键字创建的)中使用异步函数,那么您可以在每个方法调用中应用 asynchronous=True 关键字,并使用 Client.sync 函数来运行异步函数。

from dask.distributed import Client

client = Client()  # normal blocking client

async def f():
    future = client.submit(lambda x: x + 1, 10)
    result = await client.gather(future, asynchronous=True)
    return result

client.sync(f)
async with Client(asynchronous=True) as client:
    arr = da.random.random((1000, 1000), chunks=(1000, 100))
    await client.compute(arr.mean())

示例

这个独立的示例启动一个异步客户端,提交一个简单的作业,等待结果,然后关闭客户端。您可以看到 Asyncio 和 Tornado 的实现。

使用 Tornado 或 Asyncio 的 Python 3

from dask.distributed import Client

async def f():
    client = await Client(asynchronous=True)
    future = client.submit(lambda x: x + 1, 10)
    result = await future
    await client.close()
    return result

# Either use Tornado
from tornado.ioloop import IOLoop
IOLoop().run_sync(f)

# Or use asyncio
import asyncio
asyncio.get_event_loop().run_until_complete(f())

用例

在历史上,这被用于几种类型的应用程序中

  1. 将 Dask 集成到其他异步服务中(例如 Web 后端),提供一个类似于 Celery 的计算引擎,同时仍然保持高度并发且避免不必要的阻塞。

  2. 用于状态变化或更新非常迅速的计算,这在某些高级机器学习工作负载中很常见。

  3. 用于开发 Dask 分布式基础设施的内部,其完全以这种风格编写。

  4. 用于高级应用程序中复杂的控制和数据结构。