基础

在阅读本文档之前,您应该先阅读快速入门

分布式计算之所以困难,原因有二:

  1. 分布式系统的一致性协调需要复杂性

  2. 并发网络编程复杂且容易出错

dask.distributed 的基础提供了抽象层,以隐藏并发网络编程的一些复杂性(#2)。这些抽象层在一个更安全的环境中简化了复杂并行系统的构建(#1)。然而,与所有分层抽象一样,我们的抽象层也存在缺陷。欢迎提出批判性反馈。

使用 Tornado 协程实现并发

工作节点和调度器节点并发运行。它们同时处理多个重叠的请求,并在不阻塞的情况下执行多个重叠的计算。并发编程有多种方法,我们选择使用 Tornado,原因如下:

  1. 没有线程开发和调试更舒适

  2. Tornado 的文档非常棒

  3. Stackoverflow 上的覆盖范围很广

  4. 性能令人满意

端到端通信

各种分布式端点(客户端、调度器、工作节点)通过相互发送任意 Python 对象进行通信。对这些对象进行编码、发送然后解码是通信层的工作。

然而,诸如基于 Bokeh 的 Web 界面等辅助服务有其自身的实现和语义。

协议处理

虽然抽象通信层可以传输任意 Python 对象(只要它们可序列化),但 distributed 集群的参与者具体遵守分布式协议,该协议使用定义良好的消息格式指定请求-响应语义。

distributed 中的专用基础设施处理协议的各个方面,例如分派端点支持的各种操作。

服务器

工作节点、调度器和 Nanny 对象都继承自 Server 类。

class distributed.core.Server(handlers, blocked_handlers=None, stream_handlers=None, connection_limit=512, deserialize=True, serializers=None, deserializers=None, connection_args=None, timeout=None, io_loop=None, local_directory=None, needs_workdir=True)[source]

Dask 分布式服务器

分布式集群中端点的超类,例如工作节点和调度器对象。

处理函数

服务器使用 handlers 字典定义操作,该字典将操作名称映射到函数。处理函数的第一个参数将是与客户端建立通信的 Comm 对象。其他参数将接收来自传入消息(始终是字典)键的值作为输入。

>>> def pingpong(comm):
...     return b'pong'
>>> def add(comm, x, y):
...     return x + y
>>> handlers = {'ping': pingpong, 'add': add}
>>> server = Server(handlers)  
>>> server.listen('tcp://0.0.0.0:8000')  

消息格式

服务器期望消息是字典,其中包含一个特殊键 ‘op’,对应于操作的名称,以及函数所需的其他键值对。

因此在上面的示例中,以下是有效的消息。

  • {'op': 'ping'}

  • {'op': 'add', 'x': 10, 'y': 20}

RPC

为了与远程服务器交互,我们通常使用 rpc 对象,这些对象提供了一种熟悉的方法调用接口来调用远程操作。

class distributed.core.rpc(arg=None, comm=None, deserialize=True, timeout=None, connection_args=None, serializers=None, deserializers=None)[source]

方便地与远程服务器交互

>>> remote = rpc(address)  
>>> response = await remote.add(x=10, y=20)  

一个 rpc 对象可以重复用于多次交互。此外,该对象会根据需要创建和销毁许多 comms,因此在多个重叠通信中安全使用。

完成后,显式关闭 comms。

>>> remote.close_comms()  

示例

这里有一个使用 distributed.core 创建和与自定义服务器交互的小示例。

服务器端

import asyncio
from distributed.core import Server

def add(comm, x=None, y=None):  # simple handler, just a function
    return x + y

async def stream_data(comm, interval=1):  # complex handler, multiple responses
    data = 0
    while True:
        await asyncio.sleep(interval)
        data += 1
        await comm.write(data)

s = Server({'add': add, 'stream_data': stream_data})
s.listen('tcp://:8888')   # listen on TCP port 8888

asyncio.get_event_loop().run_forever()

客户端

import asyncio
from distributed.core import connect

async def f():
    comm = await connect('tcp://127.0.0.1:8888')
    await comm.write({'op': 'add', 'x': 1, 'y': 2})
    result = await comm.read()
    await comm.close()
    print(result)

>>> asyncio.get_event_loop().run_until_complete(f())
3

async def g():
    comm = await connect('tcp://127.0.0.1:8888')
    await comm.write({'op': 'stream_data', 'interval': 1})
    while True:
        result = await comm.read()
        print(result)

>>> asyncio.get_event_loop().run_until_complete(g())
1
2
3
...

使用 rpc 的客户端

RPC 提供了一种更 Pythonic 的接口。它还提供了其他好处,例如在并发情况下使用多个流。大多数 distributed 代码使用 rpc。例外情况是当我们需要执行多次读写时,就像上面的流数据示例一样。

import asyncio
from distributed.core import rpc

async def f():
    # comm = await connect('tcp://127.0.0.1', 8888)
    # await comm.write({'op': 'add', 'x': 1, 'y': 2})
    # result = await comm.read()
    async with rpc('tcp://127.0.0.1:8888') as r:
        result = await r.add(x=1, y=2)

    print(result)

>>> asyncio.get_event_loop().run_until_complete(f())
3