开发指南
目录
开发指南¶
此仓库是 Dask 项目的一部分。包括寻求帮助的途径、仓库布局、测试实践以及文档和风格标准在内的一般开发指南,可在主文档中的Dask 开发者指南中找到。
安装¶
使用 git 克隆此仓库
git clone git@github.com:dask/distributed.git cd distributed
安装 anaconda 或 miniconda(取决于操作系统)
conda env create --file continuous_integration/environment-3.11.yaml conda activate dask-distributed python -m pip install -e .
使分支与上游源保持同步
cd distributed
git remote add upstream git@github.com:dask/distributed.git
git remote -v
git fetch -a upstream
git checkout main
git pull upstream main
git push origin main
Tornado¶
Dask.distributed 是一个 Tornado TCP 应用。Tornado 为我们提供了基于 sockets 的通信层,以及一种类似于 asyncio 的编写异步协程的语法。在不深入了解 Tornado 的情况下,你可以对库中的策略进行适度修改,但进行中等程度的修改可能需要你了解 Tornado IOLoops、协程以及一些非阻塞通信的知识。Tornado API 文档非常出色,我们建议你阅读以下资源:
此外,如果你想在低层次上与 worker 和 scheduler 之间的通信进行交互,那么你应该了解这里提供的 Tornado TCPServer
和 IOStream
:
Dask.distributed 在 Tornado 周围封装了一些逻辑。更多信息请参阅基础知识。
编写测试¶
测试分布式系统通常相当困难,因为当出现问题时难以检查所有组件的状态。幸运的是,Tornado 中的非阻塞异步模型允许我们在单个线程中运行 scheduler、多个 worker 和多个 client。这为我们带来了可预测的性能、干净的关闭以及在执行期间进入代码任何位置的能力。同时,有时我们希望所有组件都在不同的进程中运行,以模拟更真实的设置。
测试套件包含三种类型的测试:
@gen_cluster
: 完全异步测试,所有组件都位于主线程的同一事件循环中。这些测试适用于测试复杂逻辑和直接检查系统状态。它们也更容易调试,并且在关闭时引起的问题最少。def test_foo(client)
: 从主进程 fork 出多个进程进行的测试。这些测试适用于测试同步(普通用户)API 以及触发硬故障以进行弹性测试。popen
: 调用命令行启动系统的测试。这些测试很少使用,主要用于测试命令行界面。
如果你熟悉 Tornado 接口,那么你会最乐于使用 @gen_cluster
风格的测试,例如:
# tests/test_submit.py
from distributed.utils_test import gen_cluster, inc
from distributed import Client, Future, Scheduler, Worker
@gen_cluster(client=True)
async def test_submit(c, s, a, b):
assert isinstance(c, Client)
assert isinstance(s, Scheduler)
assert isinstance(a, Worker)
assert isinstance(b, Worker)
future = c.submit(inc, 1)
assert isinstance(future, Future)
assert future.key in c.futures
# result = future.result() # This synchronous API call would block
result = await future
assert result == 2
assert future.key in s.tasks
assert future.key in a.data or future.key in b.data
@gen_cluster
装饰器为你设置 scheduler、client 和 workers,并在测试后清理它们。它还允许你直接检查集群中每个元素的直接状态。但是,你不能使用普通的同步 API(这样做会导致测试永远等待),而需要使用协程 API,其中所有阻塞函数都以一个下划线 (_
) 开头,并通过 await
等待。请注意,在这些测试中使用阻塞接口是一个常见错误。
如果你想测试普通的同步 API,你可以使用 client
pytest fixture 风格的测试,它在不同的 fork 进程中为你设置 scheduler 和 workers:
from distributed.utils_test import client
def test_submit(client):
future = client.submit(inc, 10)
assert future.result() == 11
此外,如果你想访问 scheduler 和 worker 进程,你也可以添加 s, a, b
fixture。
from distributed.utils_test import client
def test_submit(client, s, a, b):
future = client.submit(inc, 10)
assert future.result() == 11 # use the synchronous/blocking API here
a['proc'].terminate() # kill one of the workers
result = future.result() # test that future remains valid
assert result == 2
在这种测试风格中,你无法访问 scheduler 或 worker。变量 s, a, b
现在是包含 multiprocessing.Process
对象和端口整数的字典。但是,你现在可以使用正常的同步 API(在这种测试风格中切勿使用 await
),并且可以通过终止进程轻松关闭它们。
通常对于大多数面向用户的函数,你会发现这两种类型的测试。@gen_cluster
测试用于测试特定逻辑,而 client
pytest fixture 测试用于测试基本接口和弹性。
除非绝对必要,例如需要测试命令行界面,否则应避免使用 popen
风格的测试。
代码格式化¶
Dask.distributed 使用多种代码检查工具(flake8, black, isort, pyupgrade, mypy),这些工具由 CI 强制执行。开发者在提交 PR 之前应在本地运行它们,通过单个命令 pre-commit run --all-files
。这确保了所有开发者的检查工具版本和选项保持一致。
或者,你可能希望设置 pre-commit 钩子,以便在进行 git commit 时自动运行。这可以通过运行以下命令完成:
pre-commit install
在 distributed 仓库的根目录下运行。现在,每次提交更改时都会运行代码检查工具。你可以使用 git commit --no-verify
或其简写 git commit -n
跳过这些检查。