快速入门

安装

$ python -m pip install dask distributed --upgrade

更多信息请参阅安装文档。

简单方式设置 Dask.distributed

如果您创建客户端时未提供地址,它将为您启动一个本地调度器和工作节点。

>>> from dask.distributed import Client
>>> client = Client()  # set up local cluster on your laptop
>>> client
<Client: scheduler="127.0.0.1:8786" processes=8 cores=8>

复杂方式设置 Dask.distributed

这使得 dask.distributed 可以使用多台机器作为工作节点。

在您的本地计算机上设置调度器和工作节点进程

$ dask scheduler
Scheduler started at 127.0.0.1:8786

$ dask worker 127.0.0.1:8786
$ dask worker 127.0.0.1:8786
$ dask worker 127.0.0.1:8786

注意

启动调度器后,必须至少运行一个 dask worker

启动一个客户端并将其指向调度器的 IP/端口。

>>> from dask.distributed import Client
>>> client = Client('127.0.0.1:8786')

高级用法请参阅setup 文档

映射和提交函数

使用 mapsubmit 方法在集群上启动计算。map/submit 函数将函数和参数发送到远程工作节点进行处理。它们返回指向集群上远程数据的 Future 对象。Future 会立即返回,而计算在后台远程运行。

>>> def square(x):
        return x ** 2

>>> def neg(x):
        return -x

>>> A = client.map(square, range(10))
>>> B = client.map(neg, A)
>>> total = client.submit(sum, B)
>>> total.result()
-285

收集

map/submit 函数返回 Future 对象,这些是指向集群上结果的轻量级令牌。默认情况下,计算结果保留在集群上。

>>> total  # Function hasn't yet completed
<Future: status: waiting, key: sum-58999c52e0fa35c7d7346c098f5085c7>

>>> total  # Function completed, result ready on remote worker
<Future: status: finished, key: sum-58999c52e0fa35c7d7346c098f5085c7>

使用单个 future 的 Future.result 方法,或使用同时处理多个 futures 的 Client.gather 方法,将结果收集到您的本地计算机。

>>> total.result()   # result for single future
-285
>>> client.gather(A) # gather for many futures
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

重启

当出现问题或您想重置集群状态时,调用 restart 方法。

>>> client.restart()

高级用法请参阅客户端