任务的生命周期

我们将跟随一个任务,贯穿用户界面、调度器、工作节点,再回到用户。希望这能帮助说明系统的内部工作原理。

用户代码

用户计算集群上已有的两个变量之和,然后将结果拉回本地进程。

client = Client('host:port')
x = client.submit(...)
y = client.submit(...)

z = client.submit(add, x, y)  # we follow z

print(z.result())

步骤 1:客户端

Client.submit 函数向 Scheduler 发送以下消息时,z 开始其生命周期

{'op': 'update-graph',
 'tasks': {'z': (add, x, y)},
 'keys': ['z']}

客户端随后创建一个键为 'z'Future 对象,并将该对象返回给用户。这甚至发生在消息被调度器接收之前。Future 的状态显示为 'pending'

步骤 2:抵达调度器

几毫秒后,调度器在开放的套接字上接收到此消息。

调度器用这个小图更新其状态,该图显示了如何计算 z

scheduler.update_graph(tasks=msg['tasks'], keys=msg['keys'])

调度器还更新了大量其他状态。值得注意的是,它必须识别 xy 本身是变量,并连接所有这些依赖关系。这是一个漫长且注重细节的过程,涉及更新大约 10 个集合和字典。感兴趣的读者可以研究 distributed/scheduler.py::update_graph()。虽然描述起来相当复杂和繁琐,但请放心,这一切都发生在常数时间内,大约只需要一毫秒。

步骤 3:选择工作节点

一旦 xy 中的后者完成,调度器注意到 z 的所有依赖项都在内存中,并且 z 本身现在可以运行了。 z 应该选择哪个工作节点?我们考虑一系列标准

  1. 首先,我们快速缩小范围,只选择那些本地内存中拥有 xy 的工作节点。

  2. 然后,我们选择需要收集最少字节才能在本地获得 xy 的工作节点。例如,如果两个不同的工作节点拥有 xy,并且如果 y 占用的字节数多于 x,那么我们选择持有 y 的机器,这样就不必进行太多通信。

  3. 如果多个工作节点需要最少通信字节,那么我们选择最不繁忙的工作节点

z 考虑工作节点并根据上述标准选择一个。在通常情况下,步骤 1 后选择哪个工作节点是相当明显的。z 在与选定的工作节点关联的堆栈上等待。然而,该工作节点可能仍然很忙,因此 z 可能需要等待一段时间。

注意:此策略正在调整中,文档的这部分内容很可能已过时。

步骤 4:传输至工作节点

最终,工作节点完成一个任务,拥有一个空闲核心,并且 z 发现自己位于堆栈顶部(请注意,如果在此期间其他任务将自己放置在工作节点堆栈的顶部,则此过程可能发生在上一节之后的一段时间)。

我们将 z 放入与该工作节点关联的 worker_queue 中,然后一个 worker_core 协程将其取出。z 的函数、与其参数关联的键以及持有这些键的工作节点的位置被打包成一条消息,消息格式如下

{'op': 'compute',
 'function': execute_task,
 'args': ((add, 'x', 'y'),),
 'who_has': {'x': {(worker_host, port)},
             'y': {(worker_host, port), (worker_host, port)}},
 'key': 'z'}

此消息被序列化并通过 TCP 套接字发送到工作节点。

步骤 5:在工作节点上执行

工作节点解包消息,并注意到它需要同时拥有 xy。如果工作节点尚未拥有这两个值,它将从消息中 who_has 字典中列出的工作节点处收集它们。对于每个它没有的键,它会从 who_has 中随机选择一个有效的工作节点并从中收集数据。

经过这次交换后,工作节点同时拥有 x 的值和 y 的值。因此,它在本地的 ThreadPoolExecutor 中启动计算 add(x, y) 并等待结果。

同时,工作节点并行地重复此过程处理其他任务。没有任何东西会被阻塞。

最终计算完成。工作节点将结果存储在其本地内存中

data['z'] = ...

并传回成功消息以及结果的字节数

Worker: Hey Scheduler, 'z' worked great.
        I'm holding onto it.
        It takes up 64 bytes.

工作节点不会传回 z 的实际值。

步骤 6:调度器善后处理

调度器收到此消息后执行以下几项操作

  1. 它注意到工作节点有一个空闲核心,并在可用时发送另一个任务

  2. 如果不再需要 xy,则向相关工作节点发送消息,要求它们从本地内存中删除这些值。

  3. 它向所有客户端发送消息,告知 z 已准备就绪,因此所有当前等待的客户端 Future 对象都应唤醒。特别是,这将唤醒用户最初执行的 z.result() 命令。

步骤 7:收集

当用户调用 z.result() 时,他们既等待计算完成,也等待计算结果通过网络发送回本地进程。通常这并非必要,通常您不会希望将数据移回本地进程,而是希望将其保留在集群上。

但也许用户确实想知道这个具体值,所以他们调用了 z.result()

调度器检查哪个工作节点拥有 z,并向其发送消息请求结果。此消息不会在队列中等待,也不会等待其他作业完成,它会立即开始。该值会被序列化,通过 TCP 发送,然后被反序列化并返回给用户(途中会经过一个或两个队列)。

步骤 8:垃圾回收

用户离开代码的这部分,局部变量 z 超出范围。Python 垃圾回收器会对其进行清理。这会在客户端上触发引用计数减一(我们之前没有提到这一点,但当我们创建 Future 时,我们也开始了一个引用计数)。如果这是唯一指向 z 的 Future 实例,那么我们会向上发送一条消息给调度器,告知可以释放 z 了。用户不再需要它持久存在。

调度器收到此消息后,如果在近期没有可能依赖于 z 的计算,则从本地调度器状态中移除此键的元素,并将该键添加到定期删除的键列表中。每隔 500 毫秒,就会向相关工作节点发送消息,告知它们可以从本地内存中删除哪些键。创建 z 结果的图/配方将永久保存在调度器中。

开销

用户在约 10 毫秒内体验到这个过程,具体取决于网络延迟。