通信
目录
通信¶
工作节点、调度器和客户端通过相互发送 Python 对象(例如 协议 消息或用户数据)进行通信。通信层负责在分布式端点之间适当地编码和传输这些 Python 对象。通信层能够根据用户的选择或(可能)内部优化来选择不同的传输实现。
通信层位于 distributed.comm
包中。
地址¶
通信地址通常表示为 URI,例如 tcp://127.0.0.1:1234
。为了兼容现有代码,如果省略 URI 方案,则假定默认方案为 tcp
(因此 127.0.0.1:456
实际上与 tcp://127.0.0.1:456
相同)。默认方案将来可能会更改。
以下方案目前在 distributed
源码树中实现
tcp
是主要的传输方式;它使用 TCP 套接字并支持 IPv4 和 IPv6 地址。tls
是一种安全的传输方式,使用基于 TCP 套接字的著名 TLS 协议。使用它需要在 TLS/SSL 中概述的密钥和证书。inproc
是一种进程内传输方式,使用简单的对象队列;它消除了序列化和 I/O 开销,只要端点位于同一进程中,就可以提供几乎零成本的通信。
某些 URI 可能对于监听有效,但对于连接无效。例如,URI tcp://
将监听所有 IPv4 和 IPv6 地址以及任意端口,但您无法连接到该地址。
distributed
中的更高级别 API 为了方便或兼容性,可能会接受其他地址格式,例如 (host, port)
对。然而,抽象通信层始终处理 URI。
函数¶
distributed.comm
中有许多顶级函数用于处理地址
- distributed.comm.parse_address(addr: str, strict: bool = False) tuple[str, str] [source]¶
将地址拆分为其方案和依赖于方案的位置字符串。
>>> parse_address('tcp://127.0.0.1') ('tcp', '127.0.0.1')
如果 strict 设置为 true,地址必须具有方案。
- distributed.comm.unparse_address(scheme: str, loc: str) str [source]¶
撤销 parse_address() 的操作。
>>> unparse_address('tcp', '127.0.0.1') 'tcp://127.0.0.1'
- distributed.comm.normalize_address(addr: str) str [source]¶
规范化地址,必要时添加默认方案。
>>> normalize_address('tls://[::1]') 'tls://[::1]' >>> normalize_address('[::1]') 'tcp://[::1]'
通信 API¶
处理已建立通信的基本单元是 Comm
对象
- class distributed.comm.Comm(deserialize: bool = True)[source]¶
一个面向消息的通信对象,代表一个已建立的通信通道。一次只能有一个读取器和一个写入器:即使与单个对等方通信,要管理当前的通信,您必须创建不同的
Comm
对象。消息是任意 Python 对象。此类的具体实现可以根据底层传输的特性实现不同的序列化机制。
- property extra_info¶
返回关于通信的特定于后端的信息,作为字典。通常,这是在建立通信时初始化且之后不变的信息。
- static handshake_configuration(local: dict[str, Any], remote: dict[str, Any]) dict[str, Any] [source]¶
找到适用于本地和远程的配置
- 参数
- local
此进程中 handshake_info() 的输出
- remote
远程主机上 handshake_info() 的输出
另请参阅
- handshake_info() dict[str, Any] [source]¶
与对等方共享可能不同的环境信息,例如压缩设置。
注意
到此方法运行时,“auto”压缩设置已更新为实际的压缩算法。如果两个对等方都将压缩设置为“auto”但只有一个安装了 lz4,这很重要。请参阅 distributed.protocol.compression._update_and_check_compression_settings()
您不直接创建 Comm
对象:您可以 listen
传入的通信,或 connect
到监听连接的对等方
- async distributed.comm.connect(addr, timeout=None, deserialize=True, handshake_overrides=None, **connection_args)[source]¶
连接到给定地址(URI,例如
tcp://127.0.0.1:1234
)并生成一个Comm
对象。如果连接尝试失败,将重试直到 *timeout* 过期。
- distributed.comm.listen(addr, handle_comm, deserialize=True, **kwargs)[source]¶
使用给定参数创建一个监听器对象。调用其
start()
方法时,监听器将在给定地址(URI,例如tcp://0.0.0.0
)上监听,并为每个传入连接使用Comm
对象调用 *handle_comm*。*handle_comm* 可以是常规函数或协程。
监听器对象暴露以下接口
- class distributed.comm.core.Listener[source]¶
- abstract property contact_address¶
可以联系到此监听器的地址。如果 listen_address 是像 ‘tcp://0.0.0.0:123’ 这样的通配符地址,则此地址可能与 listen_address 不同。
- abstract property listen_address¶
作为 URI 字符串的监听地址。
扩展通信层¶
每种传输方式由一个 URI 方案(例如 tcp
)表示,并由专用的 Backend
实现支持,该实现提供了所有特定于传输的例程的入口点。
外部(Out-of-tree)后端可以在 setuptools 的 entry_points 中的 distributed.comm.backends
组下注册。例如,一个假定的 dask_udp
包会通过在其 setup.py
文件中包含以下内容来注册其 UDP 后端类
setup(name="dask_udp",
entry_points={
"distributed.comm.backends": [
"udp=dask_udp.backend:UDPBackend",
]
},
...
)