序列化

当我们在计算机之间通信数据时,首先将数据转换为可以通过网络传输的字节序列。在序列化中做出的选择会影响性能和安全性。

Python 标准的解决方案是 Pickle,它通常是正确的选择,但并非总是如此。Dask 在不同情况下使用多种不同的序列化方案。这些方案是可扩展的,允许用户在敏感情况下进行控制,也允许库开发者接入性能更高的序列化解决方案。

本文档首先描述 Dask 的默认序列化解决方案,然后讨论控制和扩展该序列化的方法。

默认设置

通过 Dask 网络传输的消息有三种类型

  1. 小型管理消息,例如“工作节点 A 已完成任务 X”或“我内存不足”。这些消息总是使用 msgpack 进行序列化。

  2. 程序数据的传输,例如 Numpy 数组和 Pandas 数据帧。这使用了 pickle 和自定义序列化器的组合,是下一节的主题

  3. 计算任务,例如 f(x),它们在客户端进程上定义和序列化,然后在工作节点进程上反序列化并运行。这些任务使用由相关库决定的固定方案进行序列化。目前,这结合使用了 pickle 和 cloudpickle。

序列化族

使用

对于程序数据的传输(即上面第 2 项),我们可以使用几种不同的序列化器族。默认情况下,内置了以下族

  1. Pickle 和 cloudpickle

  2. Msgpack

  3. Dask 自带的自定义按类型序列化器,用于对 Numpy 数组等重要数据类进行特殊序列化

创建 Client 时,您可以选择要用于序列化数据和反序列化数据的族

from dask.distributed import Client
client = Client('tcp://scheduler-address:8786',
                serializers=['dask', 'pickle'],
                deserializers=['dask', 'msgpack'])

例如,如果出于安全原因,您对接收 Pickle 序列化的数据很敏感,这将很有用。

Dask 默认使用序列化器 ['dask', 'pickle'],优先尝试使用 Dask 自定义序列化器(如下所述),如果不行则回退到 pickle/cloudpickle。

扩展

可以通过创建两个函数 dumps 和 loads 来扩展这些族,它们分别返回和接收一个 msgpack 可编码的头部以及一个字节序列列表。然后必须将它们以适当的名称包含在 distributed.protocol.serialize 字典中。以下是 pickle_dumpspickle_loads 的定义作为示例。

import pickle

def pickle_dumps(x):
    header = {'serializer': 'pickle'}
    frames = [pickle.dumps(x)]
    return header, frames

def pickle_loads(header, frames):
    if len(frames) > 1:  # this may be cut up for network reasons
        frame = ''.join(frames)
    else:
        frame = frames[0]
    return pickle.loads(frame)

from distributed.protocol.serialize import register_serialization_family
register_serialization_family('pickle', pickle_dumps, pickle_loads)

之后,名称 'pickle' 即可用于 Client 和 Dask 其他部分的 serializers=deserializers= 关键字参数中。

通信上下文

注意

这是一个实验性功能,可能会在未通知的情况下更改

如果提供了 context= 关键字参数,Dask 通信 (Comms) 可以为序列化族函数提供额外的上下文。这使得序列化可以根据其使用方式表现出不同的行为。

def my_dumps(x, context=None):
    if context and 'recipient' in context:
        # check if we're sending to the same host or not

上下文取决于通信的类型。例如,通过 TCP 发送时,发送方(我们)和接收方的地址在一个字典中可用。

>>> context
{'sender': 'tcp://127.0.0.1:1234', 'recipient': 'tcp://127.0.0.1:5678'}

其他通信方式可能会提供其他信息。

Dask 序列化族

使用

Dask 维护着自己的自定义序列化族,它对一些重要类型(如 Numpy 数组)进行了特殊处理。这些序列化器要么比 Pickle 效率更高,要么能够序列化 Pickle 无法处理的类型。

您无需执行任何特殊操作即可使用这个序列化器族。它默认启用(与 pickle 一起)。请注意,Dask 自定义序列化器在某些情况下可能内部使用 pickle。它不应被视为更安全。

扩展

dask_serialize

dask_serialize 的单分派

dask_deserialize

dask_deserialize 的单分派

与一般的序列化族一样,Dask 族也同样可扩展。这是支持单一类型对象自定义序列化的好方法。方法类似,您创建 serialize 和 deserialize 函数,它们负责创建和使用头部和帧,然后将它们注册到 Dask。

class Human:
    def __init__(self, name):
        self.name = name

from distributed.protocol import dask_serialize, dask_deserialize

@dask_serialize.register(Human)
def serialize(human: Human) -> Tuple[Dict, List[bytes]]:
    header = {}
    frames = [human.name.encode()]
    return header, frames

@dask_deserialize.register(Human)
def deserialize(header: Dict, frames: List[bytes]) -> Human:
    return Human(frames[0].decode())

遍历属性

register_generic(cls[, serializer_name, ...])

注册(反)序列化以遍历 __dict__

一个常见的情况是您的对象只是包装了 Numpy 数组或其他 Dask 已能很好地序列化的对象。例如,Scikit-Learn 估计器主要只是在 Numpy 数组周围附加了一些额外的元数据。在这些情况下,您可以使用 register_generic 函数注册您的类以进行自定义 Dask 序列化。

API

serialize(x[, serializers, on_error, ...])

将对象转换为头部和字节串列表

deserialize(header, frames[, deserializers])

将序列化的头部和字节串列表转换回 Python 对象

dask_serialize

dask_serialize 的单分派

dask_deserialize

dask_deserialize 的单分派

register_generic(cls[, serializer_name, ...])

注册(反)序列化以遍历 __dict__

distributed.protocol.serialize.serialize(x: object, serializers=None, on_error: Literal['message' | 'raise'] = 'message', context=None, iterate_collection: bool | None = None) tuple[dict[str, Any], list[bytes | memoryview]][source]

将对象转换为头部和字节串列表

这接受任意 Python 对象,并返回一个 msgpack 可序列化的头部以及一个 bytes 或 memoryview 对象列表。

要使用的序列化协议是可配置的:一个名称列表定义了按顺序使用的序列化器集合。这些名称是 serializer_registry 字典(例如 ‘pickle’、‘msgpack’)中的键,它们映射到反序列化/序列化函数。名称 ‘dask’ 是特殊的,将使用按类定义的序列化方法。None 使用默认列表 ['dask', 'pickle']

关于 iterate_collection 参数的说明(仅当 x 是集合时相关):- iterate_collection=True:单独序列化集合元素。- iterate_collection=False:一起序列化集合元素。- iterate_collection=None(默认):推断最佳设置。

返回
header:包含任何 msgpack 可序列化元数据的字典
frames:bytes 或 memoryviews 列表,通常长度为一

另请参阅

deserialize

将 header 和 frames 转换回对象

to_serialize

标记消息中的数据应被序列化

register_serialization

注册自定义序列化函数

示例

>>> serialize(1)
({}, [b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00K\x01.'])
>>> serialize(b'123')  # some special types get custom treatment
({'type': 'builtins.bytes'}, [b'123'])
>>> deserialize(*serialize(1))
1
distributed.protocol.serialize.deserialize(header, frames, deserializers=None)[source]

将序列化的头部和字节串列表转换回 Python 对象

参数
headerdict
framesbytes 列表
deserializersdict[str, tuple[Callable, Callable, bool]] | None

可选字典,将名称映射到(反)序列化器。详见 dask_serializedask_deserialize

另请参阅

serialize
distributed.protocol.serialize.dask_serialize(arg, *args, **kwargs)

dask_serialize 的单分派

distributed.protocol.serialize.dask_deserialize(arg, *args, **kwargs)

dask_deserialize 的单分派

distributed.protocol.serialize.register_generic(cls, serializer_name='dask', serialize_func=<dask.utils.Dispatch object>, deserialize_func=<dask.utils.Dispatch object>)[source]

注册(反)序列化以遍历 __dict__

通常在为 Dask 的自定义序列化注册新类时,您需要管理 header 和 frames,这可能很繁琐。如果您只想遍历对象并对对象的所有属性应用 serialize,那么此函数可以提供一个更简单的途径。

这会为自定义的 Dask 序列化族注册一个类。它通过遍历其属性的 __dict__ 并递归应用 serializedeserialize 来进行序列化。它收集一组 frames 并将小属性保存在 header 中。反序列化则反转此过程。

如果满足以下条件,这是一个好方法

  1. 您对象的大部分字节由 Dask 自定义序列化已能很好处理的数据类型组成,例如 Numpy 数组。

  2. 您的对象不需要任何特殊的构造函数逻辑,除了 object.__new__(cls)

示例

>>> import sklearn.base
>>> from distributed.protocol import register_generic
>>> register_generic(sklearn.base.BaseEstimator)