开发指南

此仓库是 Dask 项目的一部分。包括寻求帮助的途径、仓库布局、测试实践以及文档和风格标准在内的一般开发指南,可在主文档中的Dask 开发者指南中找到。

安装

  1. 使用 git 克隆此仓库

    git clone git@github.com:dask/distributed.git
    cd distributed
    
  2. 安装 anaconda 或 miniconda(取决于操作系统)

  3. conda env create --file continuous_integration/environment-3.11.yaml
    conda activate dask-distributed
    python -m pip install -e .
    

使分支与上游源保持同步

cd distributed
git remote add upstream git@github.com:dask/distributed.git
git remote -v
git fetch -a upstream
git checkout main
git pull upstream main
git push origin main

测试

使用 py.test 进行测试

py.test distributed --verbose

Tornado

Dask.distributed 是一个 Tornado TCP 应用。Tornado 为我们提供了基于 sockets 的通信层,以及一种类似于 asyncio 的编写异步协程的语法。在不深入了解 Tornado 的情况下,你可以对库中的策略进行适度修改,但进行中等程度的修改可能需要你了解 Tornado IOLoops、协程以及一些非阻塞通信的知识。Tornado API 文档非常出色,我们建议你阅读以下资源:

此外,如果你想在低层次上与 worker 和 scheduler 之间的通信进行交互,那么你应该了解这里提供的 Tornado TCPServerIOStream

Dask.distributed 在 Tornado 周围封装了一些逻辑。更多信息请参阅基础知识

编写测试

测试分布式系统通常相当困难,因为当出现问题时难以检查所有组件的状态。幸运的是,Tornado 中的非阻塞异步模型允许我们在单个线程中运行 scheduler、多个 worker 和多个 client。这为我们带来了可预测的性能、干净的关闭以及在执行期间进入代码任何位置的能力。同时,有时我们希望所有组件都在不同的进程中运行,以模拟更真实的设置。

测试套件包含三种类型的测试:

  1. @gen_cluster: 完全异步测试,所有组件都位于主线程的同一事件循环中。这些测试适用于测试复杂逻辑和直接检查系统状态。它们也更容易调试,并且在关闭时引起的问题最少。

  2. def test_foo(client): 从主进程 fork 出多个进程进行的测试。这些测试适用于测试同步(普通用户)API 以及触发硬故障以进行弹性测试。

  3. popen: 调用命令行启动系统的测试。这些测试很少使用,主要用于测试命令行界面。

如果你熟悉 Tornado 接口,那么你会最乐于使用 @gen_cluster 风格的测试,例如:

# tests/test_submit.py

from distributed.utils_test import gen_cluster, inc
from distributed import Client, Future, Scheduler, Worker

@gen_cluster(client=True)
async def test_submit(c, s, a, b):
    assert isinstance(c, Client)
    assert isinstance(s, Scheduler)
    assert isinstance(a, Worker)
    assert isinstance(b, Worker)

    future = c.submit(inc, 1)
    assert isinstance(future, Future)
    assert future.key in c.futures

    # result = future.result()  # This synchronous API call would block
    result = await future
    assert result == 2

    assert future.key in s.tasks
    assert future.key in a.data or future.key in b.data

@gen_cluster 装饰器为你设置 scheduler、client 和 workers,并在测试后清理它们。它还允许你直接检查集群中每个元素的直接状态。但是,你不能使用普通的同步 API(这样做会导致测试永远等待),而需要使用协程 API,其中所有阻塞函数都以一个下划线 (_) 开头,并通过 await 等待。请注意,在这些测试中使用阻塞接口是一个常见错误。

如果你想测试普通的同步 API,你可以使用 client pytest fixture 风格的测试,它在不同的 fork 进程中为你设置 scheduler 和 workers:

from distributed.utils_test import client

def test_submit(client):
    future = client.submit(inc, 10)
    assert future.result() == 11

此外,如果你想访问 scheduler 和 worker 进程,你也可以添加 s, a, b fixture。

from distributed.utils_test import client

def test_submit(client, s, a, b):
    future = client.submit(inc, 10)
    assert future.result() == 11  # use the synchronous/blocking API here

    a['proc'].terminate()  # kill one of the workers

    result = future.result()  # test that future remains valid
    assert result == 2

在这种测试风格中,你无法访问 scheduler 或 worker。变量 s, a, b 现在是包含 multiprocessing.Process 对象和端口整数的字典。但是,你现在可以使用正常的同步 API(在这种测试风格中切勿使用 await),并且可以通过终止进程轻松关闭它们。

通常对于大多数面向用户的函数,你会发现这两种类型的测试。@gen_cluster 测试用于测试特定逻辑,而 client pytest fixture 测试用于测试基本接口和弹性。

除非绝对必要,例如需要测试命令行界面,否则应避免使用 popen 风格的测试。

代码格式化

Dask.distributed 使用多种代码检查工具(flake8, black, isort, pyupgrade, mypy),这些工具由 CI 强制执行。开发者在提交 PR 之前应在本地运行它们,通过单个命令 pre-commit run --all-files。这确保了所有开发者的检查工具版本和选项保持一致。

或者,你可能希望设置 pre-commit 钩子,以便在进行 git commit 时自动运行。这可以通过运行以下命令完成:

pre-commit install

在 distributed 仓库的根目录下运行。现在,每次提交更改时都会运行代码检查工具。你可以使用 git commit --no-verify 或其简写 git commit -n 跳过这些检查。