从任务中启动任务

有时从其他任务中启动任务会很方便。例如,您可能直到获得某些初始计算的结果后才知道要运行哪些计算。

动机示例

我们想要下载一块数据并将其转换为一个列表。然后我们想为该列表的每个元素提交一个任务。在获得数据之前,我们不知道列表会有多长。

因此,我们发送原始的 download_and_convert_to_list 函数,该函数将在我们的某个工作节点机器上下载数据并将其转换为列表

future = client.submit(download_and_convert_to_list, uri)

但是现在我们需要为这块数据的各个部分提交新的任务。我们有三种选择。

  1. 将数据收集回本地进程,然后从本地进程提交新的作业

  2. 只将数据的足够信息收集回本地进程,然后从本地进程提交作业

  3. 向集群提交一个任务,该任务将直接从该工作节点提交其他任务

在本地收集数据

如果数据不大,我们可以将其取回客户端,在本地机器上执行必要的逻辑

>>> data = future.result()                  # gather data to local process
>>> data                                    # data is a list
[...]

>>> futures = e.map(process_element, data)  # submit new tasks on data
>>> analysis = e.submit(aggregate, futures) # submit final aggregation task

这很简单,而且如果 data 很小,那么这可能是最简单且因此是正确的选择。然而,如果 data 很大,那么我们就必须选择另一种方案。

从客户端提交任务

我们可以在远程数据上运行小型函数,以确定足够的信息来提交正确类型的任务。在下面的示例中,我们远程计算 len 函数作用于 data,然后将数据分解为各个元素。

>>> n = client.submit(len, data)            # compute number of elements
>>> n = n.result()                          # gather n (small) locally

>>> from operator import getitem
>>> elements = [client.submit(getitem, data, i) for i in range(n)]  # split data

>>> futures = client.map(process_element, elements)
>>> analysis = client.submit(aggregate, futures)

我们远程计算长度,取回这个非常小的结果,然后用它来提交更多任务,以便在集群上分解和处理数据。这更复杂,因为我们需要在集群和本地进程之间往返几次,但移动的数据量非常小,因此这只增加了我们总处理时间的几毫秒。

扩展示例

计算斐波那契数列涉及递归函数。当函数运行时,它会使用其计算出来的值调用自身。我们将以此为例贯穿本文档,以说明从任务中提交任务的不同技术。

def fib(n):
    if n < 2:
        return n
    a = fib(n - 1)
    b = fib(n - 2)
    return a + b

print(fib(10))  # prints "55"

我们将使用此示例展示不同的接口。

从工作节点提交任务

注意:此接口是新增的实验性功能。未来版本中可能会在不发出警告的情况下进行更改。

我们可以从其他任务中提交任务。这使得我们可以在工作节点上进行决策。

要从工作节点提交新任务,该工作节点必须首先创建一个连接到调度器的新客户端对象。有三种方法可以做到这一点

  1. dask.delayeddask.compute

  2. get_client 结合 secederejoin

  3. worker_client

dask.delayed

Dask delayed 的行为正常:它将函数提交到图,优化以减少带宽/计算,并收集结果。更多详细信息,请参阅dask.delayed

from distributed import Client
from dask import delayed, compute


@delayed
def fib(n):
    if n < 2:
        return n
    # We can use dask.delayed and dask.compute to launch
    # computation from within tasks
    a = fib(n - 1)  # these calls are delayed
    b = fib(n - 2)
    a, b = compute(a, b)  # execute both in parallel
    return a + b

if __name__ == "__main__":
    # these features require the dask.distributed scheduler
    client = Client()

    result = fib(10).compute()
    print(result)  # prints "55"

在工作节点上获取客户端

get_client 函数提供一个普通的客户端对象,该对象提供对 dask 集群的完全访问权限,包括提交、scatter 和 gather 结果的能力。

from distributed import Client, get_client, secede, rejoin

def fib(n):
    if n < 2:
        return n
    client = get_client()
    a_future = client.submit(fib, n - 1)
    b_future = client.submit(fib, n - 2)
    a, b = client.gather([a_future, b_future])
    return a + b

if __name__ == "__main__":
    client = Client()
    future = client.submit(fib, 10)
    result = future.result()
    print(result)  # prints "55"

然而,如果太多任务同时请求作业,这可能会导致调度器死锁。每个任务都不会向调度器表明它们正在等待结果并可以计算其他任务。如果每个调度槽位都在运行一个任务,并且它们都请求更多任务,这可能会导致集群死锁。

为了避免这种死锁问题,我们可以使用secederejoin。这些函数将分别从集群中移除并重新加入当前任务。

def fib(n):
    if n < 2:
        return n
    client = get_client()
    a_future = client.submit(fib, n - 1)
    b_future = client.submit(fib, n - 2)
    secede()
    a, b = client.gather([a_future, b_future])
    rejoin()
    return a + b

使用上下文管理器连接

worker_client 函数执行与 get_client 相同的任务,但被实现为一个上下文管理器。使用 worker_client 作为上下文管理器可确保在工作节点上进行适当的清理。

from dask.distributed import Client, worker_client


def fib(n):
    if n < 2:
        return n
    with worker_client() as client:
        a_future = client.submit(fib, n - 1)
        b_future = client.submit(fib, n - 2)
        a, b = client.gather([a_future, b_future])
    return a + b

if __name__ == "__main__":
    client = Client()
    future = client.submit(fib, 10)
    result = future.result()
    print(result)  # prints "55"

调用 worker_client 的任务被保守地假定为是 长时间运行 的。它们可能需要很长时间,等待其他任务完成,收集结果等。为了避免它们占用处理槽,每当任务调用 worker_client 时,会发生以下操作。

  1. 在工作节点上运行此函数的线程从线程池中 脱离(secede),独立运行。这允许线程池用一个新线程填充该槽位,并继续处理其他任务,而不会将这个长时间运行的任务计入其正常配额。

  2. 工作节点向调度器发送一条消息,暂时将其允许的任务数量增加一。同样,这使得调度器可以为该工作节点分配更多任务,而不会将这个长时间运行的任务计算在内。

与调度器建立连接需要几毫秒的时间,因此对于使用此功能的计算,其持续时间最好比这长几倍。