Actor

Actor 使得在 Dask 工作流程中进行有状态计算成为可能。它们对于一些需要额外性能并且愿意牺牲弹性的罕见算法很有用。

Actor 是指向驻留在远程工作节点上的用户定义对象的指针。任何拥有该 actor 的人都可以调用该远程对象上的方法。

示例

在这里,我们创建一个简单的 Counter 类,在某个工作节点上实例化该类,然后远程调用该类上的方法。

class Counter:
    """ A simple class to manage an incrementing counter """
    n = 0

    def __init__(self):
        self.n = 0

    def increment(self):
        self.n += 1
        return self.n

    def add(self, x):
        self.n += x
        return self.n

from dask.distributed import Client          # Start a Dask Client
client = Client()

future = client.submit(Counter, actor=True)  # Create a Counter on a worker
counter = future.result()                    # Get back a pointer to that object

counter
# <Actor: Counter, key=Counter-1234abcd>

future = counter.increment()                 # Call remote method
future.result()                              # Get back result
# 1

future = counter.add(10)                     # Call remote method
future.result()                              # Get back result
# 11

动机

引入 Actor 的动机源于使用纯任务图的一些挑战。

正常的 Dask 计算由一个函数图组成。这种方法有一些局限性,这些局限性有利于弹性,但可能会对性能产生负面影响

  1. 状态:函数不应就地修改其输入或依赖全局状态。它们应该以纯函数方式运行,消耗输入并产生独立的输出。

  2. 中心开销:执行位置和顺序由集中式调度器确定。因为调度器参与每个决策,所以有时会产生中心瓶颈。

有些工作负载可能需要直接更新状态,或者涉及比调度器能处理的更多微小任务(调度器每秒可协调约 4000 个任务)。

Actor 绕开了这两个局限性

  1. 状态:Actor 可以保持和修改状态。它们被允许就地更新其状态。

  2. 开销:Actor 上的操作不通知中心调度器,因此不增加每秒 4000 个任务的开销。它们还避免了额外的网络跳跃,因此具有较低的延迟。

创建 Actor

通过使用正常的 Dask 计算函数(如 submitmapcomputepersist)并在调用时使用 actors= 关键字(或在 submit 上使用 actor=)将类提交到工作节点上运行来创建 actor。

future = client.submit(Counter, actors=True)

你可以使用这些函数的所有其他关键字,如 workers=resources= 等,来控制这个 actor 最终在哪里运行。

这会创建一个正常的 Dask future,你可以在其上调用 .result() 以便在 actor 成功在工作节点上运行后获取它。

>>> counter = future.result()
>>> counter
<Actor: Counter, key=...>

一个 Counter 对象已在一个工作节点上实例化,而这个 Actor 对象充当我们访问该远程对象的代理。它具有相同的方法和属性。

>>> dir(counter)
['add', 'increment', 'n']

调用远程方法

然而,访问属性或调用方法将触发与远程工作节点的通信,在远程工作节点上的独立线程池中运行该方法,然后将结果通信回调用方。对于属性访问,这些操作会阻塞并完成后返回;对于方法调用,它们会立即返回一个 BaseActorFuture

>>> future = counter.increment()  # Immediately returns a BaseActorFuture
>>> future.result()               # Block until finished and result arrives
1

BaseActorFuture 类似于正常的 Dask Future 对象,但功能不那么齐全。它们目前*只*支持 result 方法,不支持其他。它们目前不适用于任何期望 future 的其他 Dask 函数,例如 as_completedwaitclient.gather。它们不能被放入额外的 submit 或 map 调用中以形成依赖关系。它们立即通信结果(而不是等待调用 result),并将结果缓存在 future 本身。

访问属性

如果在类级别定义属性,那么该属性将对 actor 可访问。

class Counter:
    n = 0   # Recall that we defined our class with `n` as a class variable

    ...

>>> counter.n                     # Blocks until finished
1

属性访问会自动阻塞。这就像你调用了 .result() 一样。

在工作节点上执行

当你在 actor 上调用方法时,你的参数会被序列化并发送到拥有 actor 对象的那个工作节点。如果从工作节点执行此操作,通信是直接的。如果从客户端执行此操作,则如果客户端可以直接访问工作节点(如果可能直接连接,使用 Client(..., direct_to_workers=True) 创建客户端),则通信是直接的;如果客户端无法直接连接工作节点,则通过调度器进行代理。

然后,在独立的线程中调用 Actor 对象的相应方法,捕获结果,然后将其发送回调用方。目前,工作节点只有用于 actor 的单个线程,但这将来可能会改变。

结果立即发送回调用方,并且不与 actor 一起存储在工作节点上。它被缓存在 BaseActorFuture 对象上。

从协程和 async/await 调用

如果在协程或 async/await 函数中使用 actor,则 actor 方法和属性访问将返回 Tornado future

async def f():
    counter = await client.submit(Counter, actor=True)

    await counter.increment()
    n = await counter.n

Actor 上的协程和 async/await

如果在 actor 类上定义一个 async def 函数,那么该方法将在工作节点的事件循环线程上运行,而不是在独立的线程上运行。

def Waiter:
    def __init__(self):
        self.event = asyncio.Event()

    async def set(self):
        self.event.set()

    async def wait(self):
        await self.event.wait()

waiter = client.submit(Waiter, actor=True).result()
waiter.wait().result()  # waits until set, without consuming a worker thread

性能

工作节点操作目前约有 1 毫秒的延迟,这还不包括可能存在的任何网络延迟。然而,如果存在足够多的其他活动,工作节点中的其他活动可能会轻易增加这些延迟。

局限性

Actor 提供了高级功能,但伴随一些代价

  1. 无弹性:未采取措施使 actor 工作负载对工作节点故障具有弹性。如果持有 actor 的工作节点崩溃,则该 actor 将永远丢失。

  2. 无诊断:由于调度器不了解 actor 计算,因此无法获得有关这些计算的诊断信息。

  3. 无负载均衡:Actor 平均分配到工作节点上,没有认真考虑避免通信。

  4. 无动态集群:Actor 无法迁移到其他工作节点。持有 actor 的工作节点无法退役,无论是通过 retire_workers() 还是通过 Adaptive