协议

调度器、工作节点和客户端之间相互传递消息。从语义上讲,这些消息编码了命令、状态更新和数据,如下所示

  • 请在数据 x 上计算函数 sum 并存储到 y

  • 计算 y 已完成

  • 请注意,名为 alice 的新工作节点已可用

  • 这是键 'x''y' 的数据

在实践中,我们使用字典/映射来表示这些消息

{'op': 'compute',
 'function': ...
 'args': ['x']}

{'op': 'task-complete',
 'key': 'y',
 'nbytes': 26}

{'op': 'register-worker',
 'address': '192.168.1.42',
 'name': 'alice',
 'nthreads': 4}

{'x': b'...',
 'y': b'...'}

当我们在节点之间通信这些消息时,需要将这些消息序列化为字节字符串,然后在另一端反序列化为内存中的字典形式。对于简单情况,存在多种选项,如 JSON、MsgPack、Protobuffers 和 Thrift。序列化 Python 函数和 Python 对象、可选压缩、跨语言支持、大消息和效率等问题使得情况更加复杂。

本文档描述了 dask.distributed 当前使用的协议。请注意,随着我们持续优化性能,此协议可能会快速更改。

概述

我们可能将单个消息分割成多个消息部分以适应不同的协议。通常,小块数据使用 MsgPack 编码,而大型字节字符串和复杂数据类型由自定义格式处理。每个消息部分都有自己的头部,总是以 msgpack 编码。在序列化所有消息部分后,我们得到一个字节字符串序列或 ,我们将这些帧通过网络发送,并在前面附加长度信息。

应用程序对此一无所知,它只向我们发送包含各种数据类型的 Python 字典,而我们生成一个字节字符串列表,并将其写入套接字。这种格式对于许多频繁的消息和大型消息都很快。

用于消息的 MsgPack

大部分消息使用 MsgPack 编码,MsgPack 是一种自描述的半结构化序列化格式,与 JSON 非常相似,但更小、更快、不可读,并且支持字节字符串和(即将支持)时间戳。我们选择 MsgPack 作为基础序列化格式的原因如下

  • 它不需要单独的头部,因此易于使用且灵活,这对于像 dask.distributed 这样的早期项目尤为重要

  • 它非常快,比 JSON 快得多,并且有经过良好优化的实现。除了少数例外(稍后描述),即使在大量使用下,MsgPack 也绝不会成为瓶颈。

  • 与 JSON 不同,它支持字节字符串

  • 它涵盖了编码大多数信息所需的标准类型集

  • 它被广泛应用于多种语言(参见下面的跨语言部分)

然而,MsgPack 在以下方面存在(正确地)不足

  • 它不提供任何方法来编码 Python 函数或用户自定义数据类型

  • 它不支持大于 4GB 的字节字符串,并且对于非常大的消息通常效率不高。

由于这些不足,我们通过特定语言协议和针对大型字节字符串的特殊情况来补充它。

用于函数和某些数据的 CloudPickle

Pickle 和 CloudPickle 是用于序列化几乎任何 Python 对象(包括函数)的 Python 库。在将用户的函数和数据包含在我们传递给 msgpack 的字典/映射中之前,我们使用这些库将其转换为字节。在入门示例中,您可能注意到我们跳过了为函数参数提供示例

{'op': 'compute',
 'function': ...
 'args': ['x']}

这是因为值 ... 实际上将是调用 cloudpickle.dumps(myfunction) 的结果。然后,这些字节将包含在我们发送给 msgpack 的字典中,msgpack 只需处理字节,而无需处理晦涩的 Python 函数。

注意:根据具体情况,我们实际上调用了 pickle 和 cloudpickle 的某种组合。这是出于性能原因。

CloudPickle 可以通过引用(通过模块名和对象名引用)或通过值(序列化对象的实际代码)来序列化对象。默认情况下,如果可能,它会通过引用进行序列化,但从 CloudPickle 2.0 开始,您可以注册一个模块以通过值进行序列化。如果您想发送一个在接收端不存在的模块中的对象,这可能很有用

import mymodule
cloudpickle.register_pickle_by_value(mymodule)

跨语言专门化

客户端和工作节点必须就特定的语言序列化格式达成一致。在标准的 dask.distributed 客户端和工作节点对象中,最终使用的是以下格式

bytes = cloudpickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
obj = cloudpickle.loads(bytes)

这在 Python 2 和 3 之间有所不同,因此您的客户端和工作节点必须匹配其 Python 版本和软件环境。

然而,调度器从不使用特定语言的序列化,而只处理 MsgPack。如果客户端将一个 pickled 的函数发送给调度器,调度器不会解包该函数,而是将其保留为字节。最终,这些字节将被发送到一个工作节点,工作节点随后将字节解包为正常的 Python 函数。由于调度器从不解包特定语言的序列化字节,因此它可以是不同的语言。

客户端和工作节点必须共享相同的语言和软件环境,调度器可以不同。

这有几个优点

  1. 调度器受到保护,不会解包不安全的代码

  2. 我们可以设想为其他语言(如 R 或 Julia)实现工作节点和客户端,并重用 Python 调度器。工作节点和客户端代码相当简单,比复杂的调度器更容易重新实现。

  3. 调度器有一天可能会用经过更优化的 C 或 Go 重写

压缩

LZ4 或 Snappy 等快速压缩库可以通过在发送前压缩数据并在接收后解压缩来提高有效带宽。这在低带宽网络上尤其有价值。

如果这些库中的任何一个可用(我们更偏爱 LZ4 而不是 Snappy),那么对于每个大于 1kB 的消息,我们尝试压缩消息,如果压缩至少提高 10%,我们就发送压缩后的字节而不是原始负载。我们在头部中记录使用的压缩类型,例如字符串 'lz4''snappy'

为了避免压缩大量不可压缩的数据,我们首先尝试对样本进行压缩。我们从数据集的五个位置取 10kB 的块,将它们组合在一起,然后尝试压缩结果。如果这没有产生显著的压缩效果,那么我们就不尝试压缩整个结果。

序列化数据

对于更新状态等管理消息,msgpack 就足够了。但是对于大型结果或 Python 特定数据,如 NumPy 数组或 Pandas Dataframes,或者更大的结果,我们需要使用其他方法将 Python 对象转换为字节字符串。具体如何操作在序列化文档中进行了更详细的描述。

应用程序代码使用 to_serialize 函数标记 Python 特定结果

>>> import numpy as np
>>> x = np.ones(5)

>>> from distributed.protocol import to_serialize
>>> msg = {'status': 'OK', 'data': to_serialize(x)}
>>> msg
{'data': <Serialize: [ 1.  1.  1.  1.  1.]>, 'status': 'OK'}

我们将消息分成两部分,一部分编码所有需要序列化的数据,另一部分编码其余内容

{'key': 'x', 'address': 'alice'}
{'data': <Serialize: [ 1.  1.  1.  1.  1.]>}

第一部分消息我们通常使用 msgpack 传递。第二部分我们分成多个部分传递,一部分用于每个序列化的数据(参见序列化),一部分是包含用于每个值的类型、压缩等信息的头部

{'keys': ['data'],
 'compression': ['lz4']}
b'...'
b'...'

在管道的末端,我们得到一个字节字符串序列或帧。我们需要告诉接收端有多少帧以及每个帧有多长。我们按照以下顺序排列帧和帧的长度

  1. 帧的数量,存储为 8 字节无符号整数

  2. 每个帧的长度,每个存储为 8 字节无符号整数

  3. 每个帧本身

在以下部分中,我们将描述如何创建这些帧。

技术版本

消息被分解为以下组成部分

  1. 8 字节编码消息中帧的数量 (N),作为 uint64

  2. 8 * N 帧编码每个帧的长度,作为 uint64

  3. 管理消息的头部

  4. 管理消息,msgpack 编码,可能已压缩

  5. 所有负载消息的头部

  6. 负载消息

管理消息的头部

管理消息是任意的 msgpack 编码数据。通常是一个字典。它可以选择进行压缩。如果进行了压缩,压缩类型将包含在头部中。

负载帧和头部

这些帧是可选的。

负载帧用于发送大型或特定语言的数据。这些值在解码后将被插入到管理消息中。头部是 msgpack 编码的,包含所有后续负载消息的编码和压缩信息。

一个负载可以分散在许多帧中。每个帧可以单独压缩。

简单示例

这个简单示例展示了一个最小的消息。只有一个空的头部和一个小的 msgpack 消息。没有额外的负载帧

消息: {'status': 'OK'}

  • 头部: {}

  • 管理消息: {'status': 'OK'}

自定义数据的示例

此示例包含一个由单个帧组成的负载消息。它对 NumPy 数组使用了特殊的序列化。

消息: {'op': 'get-data', 'data': np.ones(5)}

  • 头部: {}

  • 管理消息: {'op': 'get-data'}

  • 负载头部

    {'headers': [{'type': 'numpy.ndarray',
                  'compression': 'lz4',
                  'count': 1,
                  'lengths': [40],
                  'dtype': '<f8',
                  'strides': (8,),
                  'shape': (5,)}],
                 'keys': [('data',)]}
    
  • 负载帧: b'(\x00\x00\x00\x11\x00\x01\x00!\xf0?\x07\x00\x0f\x08\x00\x03P\x00\x00\x00\xf0?'