通信

工作节点、调度器和客户端通过相互发送 Python 对象(例如 协议 消息或用户数据)进行通信。通信层负责在分布式端点之间适当地编码和传输这些 Python 对象。通信层能够根据用户的选择或(可能)内部优化来选择不同的传输实现。

通信层位于 distributed.comm 包中。

地址

通信地址通常表示为 URI,例如 tcp://127.0.0.1:1234。为了兼容现有代码,如果省略 URI 方案,则假定默认方案为 tcp(因此 127.0.0.1:456 实际上与 tcp://127.0.0.1:456 相同)。默认方案将来可能会更改。

以下方案目前在 distributed 源码树中实现

  • tcp 是主要的传输方式;它使用 TCP 套接字并支持 IPv4 和 IPv6 地址。

  • tls 是一种安全的传输方式,使用基于 TCP 套接字的著名 TLS 协议。使用它需要在 TLS/SSL 中概述的密钥和证书。

  • inproc 是一种进程内传输方式,使用简单的对象队列;它消除了序列化和 I/O 开销,只要端点位于同一进程中,就可以提供几乎零成本的通信。

某些 URI 可能对于监听有效,但对于连接无效。例如,URI tcp:// 将监听所有 IPv4 和 IPv6 地址以及任意端口,但您无法连接到该地址。

distributed 中的更高级别 API 为了方便或兼容性,可能会接受其他地址格式,例如 (host, port) 对。然而,抽象通信层始终处理 URI。

函数

distributed.comm 中有许多顶级函数用于处理地址

distributed.comm.parse_address(addr: str, strict: bool = False) tuple[str, str][source]

将地址拆分为其方案和依赖于方案的位置字符串。

>>> parse_address('tcp://127.0.0.1')
('tcp', '127.0.0.1')

如果 strict 设置为 true,地址必须具有方案。

distributed.comm.unparse_address(scheme: str, loc: str) str[source]

撤销 parse_address() 的操作。

>>> unparse_address('tcp', '127.0.0.1')
'tcp://127.0.0.1'
distributed.comm.normalize_address(addr: str) str[source]

规范化地址,必要时添加默认方案。

>>> normalize_address('tls://[::1]')
'tls://[::1]'
>>> normalize_address('[::1]')
'tcp://[::1]'
distributed.comm.resolve_address(addr: str) str[source]

addr 应用特定于方案的地址解析,用具体位置指定符替换所有符号引用。

实际上,这意味着主机名被解析为 IP 地址。

>>> resolve_address('tcp://localhost:8786')
'tcp://127.0.0.1:8786'
distributed.comm.get_address_host(addr: str) str[source]

返回标识此地址所在机器的主机名 / IP 地址。

与 get_address_host_port() 不同,此函数对于格式正确的地址应始终成功。

>>> get_address_host('tcp://1.2.3.4:80')
'1.2.3.4'

通信 API

处理已建立通信的基本单元是 Comm 对象

class distributed.comm.Comm(deserialize: bool = True)[source]

一个面向消息的通信对象,代表一个已建立的通信通道。一次只能有一个读取器和一个写入器:即使与单个对等方通信,要管理当前的通信,您必须创建不同的 Comm 对象。

消息是任意 Python 对象。此类的具体实现可以根据底层传输的特性实现不同的序列化机制。

abstract abort()[source]

立即突然关闭通信。在析构函数或生成器的 finally 块中很有用。

abstract async close()[source]

干净地关闭通信。这将尝试在实际关闭底层传输之前刷新传出缓冲区。

此方法返回一个协程。

abstract closed()[source]

返回流是否已关闭。

property extra_info

返回关于通信的特定于后端的信息,作为字典。通常,这是在建立通信时初始化且之后不变的信息。

static handshake_configuration(local: dict[str, Any], remote: dict[str, Any]) dict[str, Any][source]

找到适用于本地和远程的配置

参数
local

此进程中 handshake_info() 的输出

remote

远程主机上 handshake_info() 的输出

另请参阅

handshake_info
handshake_info() dict[str, Any][source]

与对等方共享可能不同的环境信息,例如压缩设置。

注意

到此方法运行时,“auto”压缩设置已更新为实际的压缩算法。如果两个对等方都将压缩设置为“auto”但只有一个安装了 lz4,这很重要。请参阅 distributed.protocol.compression._update_and_check_compression_settings()

abstract property local_address: str

本地地址

abstract property peer_address: str

对等方地址

abstract async read(deserializers=None)[source]

读取并返回一条消息(一个 Python 对象)。

此方法返回一个协程。

参数
deserializersdict[str, tuple[Callable, Callable, bool]] | None

一个适用于 distributed.protocol.deserialize 的可选字典。有关更多信息,请参阅 序列化

property same_host: bool

如果对等方在 localhost 上,则返回 True;否则返回 False

abstract async write(msg, serializers=None, on_error=None)[source]

写入一条消息(一个 Python 对象)。

此方法返回一个协程。

参数
msg
on_errorstr | None

序列化失败时的行为。有关有效值,请参阅 distributed.protocol.core.dumps

您不直接创建 Comm 对象:您可以 listen 传入的通信,或 connect 到监听连接的对等方

async distributed.comm.connect(addr, timeout=None, deserialize=True, handshake_overrides=None, **connection_args)[source]

连接到给定地址(URI,例如 tcp://127.0.0.1:1234)并生成一个 Comm 对象。如果连接尝试失败,将重试直到 *timeout* 过期。

distributed.comm.listen(addr, handle_comm, deserialize=True, **kwargs)[source]

使用给定参数创建一个监听器对象。调用其 start() 方法时,监听器将在给定地址(URI,例如 tcp://0.0.0.0)上监听,并为每个传入连接使用 Comm 对象调用 *handle_comm*。

*handle_comm* 可以是常规函数或协程。

监听器对象暴露以下接口

class distributed.comm.core.Listener[source]
abstract property contact_address

可以联系到此监听器的地址。如果 listen_address 是像 ‘tcp://0.0.0.0:123’ 这样的通配符地址,则此地址可能与 listen_address 不同。

abstract property listen_address

作为 URI 字符串的监听地址。

abstract async start()[source]

开始监听传入连接。

abstract stop()[source]

停止监听。这不会关闭已建立的通信,但会阻止接受新的连接。

扩展通信层

每种传输方式由一个 URI 方案(例如 tcp)表示,并由专用的 Backend 实现支持,该实现提供了所有特定于传输的例程的入口点。

外部(Out-of-tree)后端可以在 setuptools 的 entry_points 中的 distributed.comm.backends 组下注册。例如,一个假定的 dask_udp 包会通过在其 setup.py 文件中包含以下内容来注册其 UDP 后端类

setup(name="dask_udp",
      entry_points={
        "distributed.comm.backends": [
            "udp=dask_udp.backend:UDPBackend",
        ]
      },
      ...
)
class distributed.comm.registry.Backend[source]

一个通信后端,由给定的 URI 方案(例如 ‘tcp’)选择。

abstract get_address_host(loc)[source]

获取标识地址所在主机的名称(通常是 IP 地址)。loc 是一个没有方案的地址。

get_address_host_port(loc)[source]

获取没有方案的地址 loc 的 (host, port) 元组。这仅应由基于 IP 的传输实现。

abstract get_connector()[source]

获取一个可用于连接到地址的连接器对象。

abstract get_listener(loc, handle_comm, deserialize, **connection_args)[source]

获取没有方案的地址 loc 的监听器对象。

abstract get_local_address_for(loc)[source]

获取适合连接到 loc 的本地监听地址。

abstract resolve_address(loc)[source]

将地址解析为规范形式。loc 是一个没有方案的地址。

简单的实现可能会保持 loc 不变。