基础
目录
基础¶
在阅读本文档之前,您应该先阅读快速入门。
分布式计算之所以困难,原因有二:
分布式系统的一致性协调需要复杂性
并发网络编程复杂且容易出错
dask.distributed
的基础提供了抽象层,以隐藏并发网络编程的一些复杂性(#2)。这些抽象层在一个更安全的环境中简化了复杂并行系统的构建(#1)。然而,与所有分层抽象一样,我们的抽象层也存在缺陷。欢迎提出批判性反馈。
使用 Tornado 协程实现并发¶
工作节点和调度器节点并发运行。它们同时处理多个重叠的请求,并在不阻塞的情况下执行多个重叠的计算。并发编程有多种方法,我们选择使用 Tornado,原因如下:
没有线程开发和调试更舒适
Tornado 的文档非常棒
Stackoverflow 上的覆盖范围很广
性能令人满意
端到端通信¶
各种分布式端点(客户端、调度器、工作节点)通过相互发送任意 Python 对象进行通信。对这些对象进行编码、发送然后解码是通信层的工作。
然而,诸如基于 Bokeh 的 Web 界面等辅助服务有其自身的实现和语义。
协议处理¶
虽然抽象通信层可以传输任意 Python 对象(只要它们可序列化),但 distributed
集群的参与者具体遵守分布式协议,该协议使用定义良好的消息格式指定请求-响应语义。
distributed
中的专用基础设施处理协议的各个方面,例如分派端点支持的各种操作。
服务器¶
工作节点、调度器和 Nanny 对象都继承自 Server
类。
- class distributed.core.Server(handlers, blocked_handlers=None, stream_handlers=None, connection_limit=512, deserialize=True, serializers=None, deserializers=None, connection_args=None, timeout=None, io_loop=None, local_directory=None, needs_workdir=True)[source]¶
Dask 分布式服务器
分布式集群中端点的超类,例如工作节点和调度器对象。
处理函数
服务器使用
handlers
字典定义操作,该字典将操作名称映射到函数。处理函数的第一个参数将是与客户端建立通信的Comm
对象。其他参数将接收来自传入消息(始终是字典)键的值作为输入。>>> def pingpong(comm): ... return b'pong'
>>> def add(comm, x, y): ... return x + y
>>> handlers = {'ping': pingpong, 'add': add} >>> server = Server(handlers) >>> server.listen('tcp://0.0.0.0:8000')
消息格式
服务器期望消息是字典,其中包含一个特殊键 ‘op’,对应于操作的名称,以及函数所需的其他键值对。
因此在上面的示例中,以下是有效的消息。
{'op': 'ping'}
{'op': 'add', 'x': 10, 'y': 20}
RPC¶
为了与远程服务器交互,我们通常使用 rpc
对象,这些对象提供了一种熟悉的方法调用接口来调用远程操作。
- class distributed.core.rpc(arg=None, comm=None, deserialize=True, timeout=None, connection_args=None, serializers=None, deserializers=None)[source]¶
方便地与远程服务器交互
>>> remote = rpc(address) >>> response = await remote.add(x=10, y=20)
一个 rpc 对象可以重复用于多次交互。此外,该对象会根据需要创建和销毁许多 comms,因此在多个重叠通信中安全使用。
完成后,显式关闭 comms。
>>> remote.close_comms()
示例¶
这里有一个使用 distributed.core 创建和与自定义服务器交互的小示例。
服务器端¶
import asyncio
from distributed.core import Server
def add(comm, x=None, y=None): # simple handler, just a function
return x + y
async def stream_data(comm, interval=1): # complex handler, multiple responses
data = 0
while True:
await asyncio.sleep(interval)
data += 1
await comm.write(data)
s = Server({'add': add, 'stream_data': stream_data})
s.listen('tcp://:8888') # listen on TCP port 8888
asyncio.get_event_loop().run_forever()
客户端¶
import asyncio
from distributed.core import connect
async def f():
comm = await connect('tcp://127.0.0.1:8888')
await comm.write({'op': 'add', 'x': 1, 'y': 2})
result = await comm.read()
await comm.close()
print(result)
>>> asyncio.get_event_loop().run_until_complete(f())
3
async def g():
comm = await connect('tcp://127.0.0.1:8888')
await comm.write({'op': 'stream_data', 'interval': 1})
while True:
result = await comm.read()
print(result)
>>> asyncio.get_event_loop().run_until_complete(g())
1
2
3
...
使用 rpc
的客户端¶
RPC 提供了一种更 Pythonic 的接口。它还提供了其他好处,例如在并发情况下使用多个流。大多数 distributed 代码使用 rpc
。例外情况是当我们需要执行多次读写时,就像上面的流数据示例一样。
import asyncio
from distributed.core import rpc
async def f():
# comm = await connect('tcp://127.0.0.1', 8888)
# await comm.write({'op': 'add', 'x': 1, 'y': 2})
# result = await comm.read()
async with rpc('tcp://127.0.0.1:8888') as r:
result = await r.add(x=1, y=2)
print(result)
>>> asyncio.get_event_loop().run_until_complete(f())
3