序列化
目录
序列化¶
当我们在计算机之间通信数据时,首先将数据转换为可以通过网络传输的字节序列。在序列化中做出的选择会影响性能和安全性。
Python 标准的解决方案是 Pickle,它通常是正确的选择,但并非总是如此。Dask 在不同情况下使用多种不同的序列化方案。这些方案是可扩展的,允许用户在敏感情况下进行控制,也允许库开发者接入性能更高的序列化解决方案。
本文档首先描述 Dask 的默认序列化解决方案,然后讨论控制和扩展该序列化的方法。
默认设置¶
通过 Dask 网络传输的消息有三种类型
小型管理消息,例如“工作节点 A 已完成任务 X”或“我内存不足”。这些消息总是使用 msgpack 进行序列化。
程序数据的传输,例如 Numpy 数组和 Pandas 数据帧。这使用了 pickle 和自定义序列化器的组合,是下一节的主题
计算任务,例如
f(x)
,它们在客户端进程上定义和序列化,然后在工作节点进程上反序列化并运行。这些任务使用由相关库决定的固定方案进行序列化。目前,这结合使用了 pickle 和 cloudpickle。
序列化族¶
使用¶
对于程序数据的传输(即上面第 2 项),我们可以使用几种不同的序列化器族。默认情况下,内置了以下族
Pickle 和 cloudpickle
Msgpack
Dask 自带的自定义按类型序列化器,用于对 Numpy 数组等重要数据类进行特殊序列化
创建 Client 时,您可以选择要用于序列化数据和反序列化数据的族
from dask.distributed import Client
client = Client('tcp://scheduler-address:8786',
serializers=['dask', 'pickle'],
deserializers=['dask', 'msgpack'])
例如,如果出于安全原因,您对接收 Pickle 序列化的数据很敏感,这将很有用。
Dask 默认使用序列化器 ['dask', 'pickle']
,优先尝试使用 Dask 自定义序列化器(如下所述),如果不行则回退到 pickle/cloudpickle。
扩展¶
可以通过创建两个函数 dumps 和 loads 来扩展这些族,它们分别返回和接收一个 msgpack 可编码的头部以及一个字节序列列表。然后必须将它们以适当的名称包含在 distributed.protocol.serialize
字典中。以下是 pickle_dumps
和 pickle_loads
的定义作为示例。
import pickle
def pickle_dumps(x):
header = {'serializer': 'pickle'}
frames = [pickle.dumps(x)]
return header, frames
def pickle_loads(header, frames):
if len(frames) > 1: # this may be cut up for network reasons
frame = ''.join(frames)
else:
frame = frames[0]
return pickle.loads(frame)
from distributed.protocol.serialize import register_serialization_family
register_serialization_family('pickle', pickle_dumps, pickle_loads)
之后,名称 'pickle'
即可用于 Client
和 Dask 其他部分的 serializers=
和 deserializers=
关键字参数中。
通信上下文¶
注意
这是一个实验性功能,可能会在未通知的情况下更改
如果提供了 context=
关键字参数,Dask 通信 (Comms) 可以为序列化族函数提供额外的上下文。这使得序列化可以根据其使用方式表现出不同的行为。
def my_dumps(x, context=None):
if context and 'recipient' in context:
# check if we're sending to the same host or not
上下文取决于通信的类型。例如,通过 TCP 发送时,发送方(我们)和接收方的地址在一个字典中可用。
>>> context
{'sender': 'tcp://127.0.0.1:1234', 'recipient': 'tcp://127.0.0.1:5678'}
其他通信方式可能会提供其他信息。
Dask 序列化族¶
使用¶
Dask 维护着自己的自定义序列化族,它对一些重要类型(如 Numpy 数组)进行了特殊处理。这些序列化器要么比 Pickle 效率更高,要么能够序列化 Pickle 无法处理的类型。
您无需执行任何特殊操作即可使用这个序列化器族。它默认启用(与 pickle 一起)。请注意,Dask 自定义序列化器在某些情况下可能内部使用 pickle。它不应被视为更安全。
扩展¶
dask_serialize 的单分派 |
|
dask_deserialize 的单分派 |
与一般的序列化族一样,Dask 族也同样可扩展。这是支持单一类型对象自定义序列化的好方法。方法类似,您创建 serialize 和 deserialize 函数,它们负责创建和使用头部和帧,然后将它们注册到 Dask。
class Human:
def __init__(self, name):
self.name = name
from distributed.protocol import dask_serialize, dask_deserialize
@dask_serialize.register(Human)
def serialize(human: Human) -> Tuple[Dict, List[bytes]]:
header = {}
frames = [human.name.encode()]
return header, frames
@dask_deserialize.register(Human)
def deserialize(header: Dict, frames: List[bytes]) -> Human:
return Human(frames[0].decode())
遍历属性¶
|
注册(反)序列化以遍历 __dict__ |
一个常见的情况是您的对象只是包装了 Numpy 数组或其他 Dask 已能很好地序列化的对象。例如,Scikit-Learn 估计器主要只是在 Numpy 数组周围附加了一些额外的元数据。在这些情况下,您可以使用 register_generic
函数注册您的类以进行自定义 Dask 序列化。
API¶
|
将对象转换为头部和字节串列表 |
|
将序列化的头部和字节串列表转换回 Python 对象 |
dask_serialize 的单分派 |
|
dask_deserialize 的单分派 |
|
|
注册(反)序列化以遍历 __dict__ |
- distributed.protocol.serialize.serialize(x: object, serializers=None, on_error: Literal['message' | 'raise'] = 'message', context=None, iterate_collection: bool | None = None) tuple[dict[str, Any], list[bytes | memoryview]] [source]¶
将对象转换为头部和字节串列表
这接受任意 Python 对象,并返回一个 msgpack 可序列化的头部以及一个 bytes 或 memoryview 对象列表。
要使用的序列化协议是可配置的:一个名称列表定义了按顺序使用的序列化器集合。这些名称是
serializer_registry
字典(例如 ‘pickle’、‘msgpack’)中的键,它们映射到反序列化/序列化函数。名称 ‘dask’ 是特殊的,将使用按类定义的序列化方法。None
使用默认列表['dask', 'pickle']
。关于
iterate_collection
参数的说明(仅当x
是集合时相关):-iterate_collection=True
:单独序列化集合元素。-iterate_collection=False
:一起序列化集合元素。-iterate_collection=None
(默认):推断最佳设置。- 返回
- header:包含任何 msgpack 可序列化元数据的字典
- frames:bytes 或 memoryviews 列表,通常长度为一
另请参阅
deserialize
将 header 和 frames 转换回对象
to_serialize
标记消息中的数据应被序列化
register_serialization
注册自定义序列化函数
示例
>>> serialize(1) ({}, [b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00K\x01.'])
>>> serialize(b'123') # some special types get custom treatment ({'type': 'builtins.bytes'}, [b'123'])
>>> deserialize(*serialize(1)) 1
- distributed.protocol.serialize.deserialize(header, frames, deserializers=None)[source]¶
将序列化的头部和字节串列表转换回 Python 对象
- 参数
- headerdict
- framesbytes 列表
- deserializersdict[str, tuple[Callable, Callable, bool]] | None
可选字典,将名称映射到(反)序列化器。详见 dask_serialize 和 dask_deserialize。
另请参阅
- distributed.protocol.serialize.dask_serialize(arg, *args, **kwargs)¶
dask_serialize 的单分派
- distributed.protocol.serialize.dask_deserialize(arg, *args, **kwargs)¶
dask_deserialize 的单分派
- distributed.protocol.serialize.register_generic(cls, serializer_name='dask', serialize_func=<dask.utils.Dispatch object>, deserialize_func=<dask.utils.Dispatch object>)[source]¶
注册(反)序列化以遍历 __dict__
通常在为 Dask 的自定义序列化注册新类时,您需要管理 header 和 frames,这可能很繁琐。如果您只想遍历对象并对对象的所有属性应用 serialize,那么此函数可以提供一个更简单的途径。
这会为自定义的 Dask 序列化族注册一个类。它通过遍历其属性的 __dict__ 并递归应用
serialize
和deserialize
来进行序列化。它收集一组 frames 并将小属性保存在 header 中。反序列化则反转此过程。如果满足以下条件,这是一个好方法
您对象的大部分字节由 Dask 自定义序列化已能很好处理的数据类型组成,例如 Numpy 数组。
您的对象不需要任何特殊的构造函数逻辑,除了 object.__new__(cls)
示例
>>> import sklearn.base >>> from distributed.protocol import register_generic >>> register_generic(sklearn.base.BaseEstimator)