Actor
目录
Actor¶
Actor 使得在 Dask 工作流程中进行有状态计算成为可能。它们对于一些需要额外性能并且愿意牺牲弹性的罕见算法很有用。
Actor 是指向驻留在远程工作节点上的用户定义对象的指针。任何拥有该 actor 的人都可以调用该远程对象上的方法。
示例¶
在这里,我们创建一个简单的 Counter
类,在某个工作节点上实例化该类,然后远程调用该类上的方法。
class Counter:
""" A simple class to manage an incrementing counter """
n = 0
def __init__(self):
self.n = 0
def increment(self):
self.n += 1
return self.n
def add(self, x):
self.n += x
return self.n
from dask.distributed import Client # Start a Dask Client
client = Client()
future = client.submit(Counter, actor=True) # Create a Counter on a worker
counter = future.result() # Get back a pointer to that object
counter
# <Actor: Counter, key=Counter-1234abcd>
future = counter.increment() # Call remote method
future.result() # Get back result
# 1
future = counter.add(10) # Call remote method
future.result() # Get back result
# 11
动机¶
引入 Actor 的动机源于使用纯任务图的一些挑战。
正常的 Dask 计算由一个函数图组成。这种方法有一些局限性,这些局限性有利于弹性,但可能会对性能产生负面影响
状态:函数不应就地修改其输入或依赖全局状态。它们应该以纯函数方式运行,消耗输入并产生独立的输出。
中心开销:执行位置和顺序由集中式调度器确定。因为调度器参与每个决策,所以有时会产生中心瓶颈。
有些工作负载可能需要直接更新状态,或者涉及比调度器能处理的更多微小任务(调度器每秒可协调约 4000 个任务)。
Actor 绕开了这两个局限性
状态:Actor 可以保持和修改状态。它们被允许就地更新其状态。
开销:Actor 上的操作不通知中心调度器,因此不增加每秒 4000 个任务的开销。它们还避免了额外的网络跳跃,因此具有较低的延迟。
创建 Actor¶
通过使用正常的 Dask 计算函数(如 submit
、map
、compute
或 persist
)并在调用时使用 actors=
关键字(或在 submit
上使用 actor=
)将类提交到工作节点上运行来创建 actor。
future = client.submit(Counter, actors=True)
你可以使用这些函数的所有其他关键字,如 workers=
、resources=
等,来控制这个 actor 最终在哪里运行。
这会创建一个正常的 Dask future,你可以在其上调用 .result()
以便在 actor 成功在工作节点上运行后获取它。
>>> counter = future.result()
>>> counter
<Actor: Counter, key=...>
一个 Counter
对象已在一个工作节点上实例化,而这个 Actor
对象充当我们访问该远程对象的代理。它具有相同的方法和属性。
>>> dir(counter)
['add', 'increment', 'n']
调用远程方法¶
然而,访问属性或调用方法将触发与远程工作节点的通信,在远程工作节点上的独立线程池中运行该方法,然后将结果通信回调用方。对于属性访问,这些操作会阻塞并完成后返回;对于方法调用,它们会立即返回一个 BaseActorFuture
。
>>> future = counter.increment() # Immediately returns a BaseActorFuture
>>> future.result() # Block until finished and result arrives
1
BaseActorFuture
类似于正常的 Dask Future
对象,但功能不那么齐全。它们目前*只*支持 result
方法,不支持其他。它们目前不适用于任何期望 future 的其他 Dask 函数,例如 as_completed
、wait
或 client.gather
。它们不能被放入额外的 submit 或 map 调用中以形成依赖关系。它们立即通信结果(而不是等待调用 result),并将结果缓存在 future 本身。
访问属性¶
如果在类级别定义属性,那么该属性将对 actor 可访问。
class Counter:
n = 0 # Recall that we defined our class with `n` as a class variable
...
>>> counter.n # Blocks until finished
1
属性访问会自动阻塞。这就像你调用了 .result()
一样。
在工作节点上执行¶
当你在 actor 上调用方法时,你的参数会被序列化并发送到拥有 actor 对象的那个工作节点。如果从工作节点执行此操作,通信是直接的。如果从客户端执行此操作,则如果客户端可以直接访问工作节点(如果可能直接连接,使用 Client(..., direct_to_workers=True)
创建客户端),则通信是直接的;如果客户端无法直接连接工作节点,则通过调度器进行代理。
然后,在独立的线程中调用 Actor 对象的相应方法,捕获结果,然后将其发送回调用方。目前,工作节点只有用于 actor 的单个线程,但这将来可能会改变。
结果立即发送回调用方,并且不与 actor 一起存储在工作节点上。它被缓存在 BaseActorFuture
对象上。
从协程和 async/await 调用¶
如果在协程或 async/await 函数中使用 actor,则 actor 方法和属性访问将返回 Tornado future
async def f():
counter = await client.submit(Counter, actor=True)
await counter.increment()
n = await counter.n
Actor 上的协程和 async/await¶
如果在 actor 类上定义一个 async def
函数,那么该方法将在工作节点的事件循环线程上运行,而不是在独立的线程上运行。
def Waiter:
def __init__(self):
self.event = asyncio.Event()
async def set(self):
self.event.set()
async def wait(self):
await self.event.wait()
waiter = client.submit(Waiter, actor=True).result()
waiter.wait().result() # waits until set, without consuming a worker thread
性能¶
工作节点操作目前约有 1 毫秒的延迟,这还不包括可能存在的任何网络延迟。然而,如果存在足够多的其他活动,工作节点中的其他活动可能会轻易增加这些延迟。
局限性¶
Actor 提供了高级功能,但伴随一些代价
无弹性:未采取措施使 actor 工作负载对工作节点故障具有弹性。如果持有 actor 的工作节点崩溃,则该 actor 将永远丢失。
无诊断:由于调度器不了解 actor 计算,因此无法获得有关这些计算的诊断信息。
无负载均衡:Actor 平均分配到工作节点上,没有认真考虑避免通信。
无动态集群:Actor 无法迁移到其他工作节点。持有 actor 的工作节点无法退役,无论是通过
retire_workers()
还是通过Adaptive
。