Worker

概述

Worker 提供两个功能

  1. 根据调度器指示计算任务

  2. 存储计算结果并提供给其他 Worker 或客户端

每个 Worker 都包含一个线程池,用于根据调度器的请求评估任务。它将这些任务的结果存储在本地,并按需将其提供给其他 Worker 或客户端。如果 Worker 被要求评估一个任务,而它没有所有必需的数据,那么它会联系它的对等 Worker 来收集必需的依赖项。

调度器与两个 Worker(Alice 和 Bob)之间的典型对话可能如下所示

Scheduler -> Alice:  Compute ``x <- add(1, 2)``!
Alice -> Scheduler:  I've computed x and am holding on to it!

Scheduler -> Bob:    Compute ``y <- add(x, 10)``!
                     You will need x.  Alice has x.
Bob -> Alice:        Please send me x.
Alice -> Bob:        Sure.  x is 3!
Bob -> Scheduler:    I've computed y and am holding on to it!

数据存储

数据本地存储在 .data 属性中的字典中,该字典将键映射到函数调用的结果。

>>> worker.data
{'x': 3,
 'y': 13,
 ...
 '(df, 0)': pd.DataFrame(...),
 ...
 }

这个 .data 属性是一个 MutableMapping,通常是内存和磁盘存储的组合,并采用 LRU 策略在这两者之间移动数据。

了解更多:Worker 内存管理

线程池

每个 Worker 将计算发送到 concurrent.futures.ThreadPoolExecutor 中的一个线程进行计算。这些计算与 Worker 通信服务器在同一个进程中进行,以便它们之间可以高效地访问和共享数据。出于数据本地性的目的,Worker 中的所有线程都被视为同一个 Worker。

如果您的计算主要是数值性质的(例如 NumPy 和 Pandas 计算),并且完全释放了 GIL,那么建议运行具有许多线程和一个进程的 dask worker 进程。这可以降低通信成本并通常简化部署。

如果您的计算主要是 Python 代码且不释放 GIL,那么建议运行具有许多进程和每个进程一个线程的 dask worker 进程

$ dask worker scheduler:8786 --nworkers 8 --nthreads 1

这将启动 8 个 Worker 进程,每个进程都有一个大小为 1 的 ThreadPoolExecutor

如果您的计算是 Python 外部的,长时间运行且不释放 GIL,那么请注意,在计算运行时,Worker 进程将无法与其他 Worker 或调度器通信。应避免这种情况。如果您不链接自己的自定义 C/Fortran 代码,则此主题可能不适用。

命令行工具

使用 dask worker 命令行工具启动单个 Worker。有关命令行选项的更多详细信息,请参阅命令行工具文档

内部调度

参见专用页面:Worker 状态机

API 文档

class distributed.worker.Worker(scheduler_ip: str | None = None, scheduler_port: int | None = None, *, scheduler_file: str | None = None, nthreads: int | None = None, loop: IOLoop | None = None, local_directory: str | None = None, services: dict | None = None, name: Any | None = None, reconnect: bool | None = None, executor: Executor | dict[str, Executor] | Literal['offload'] | None = None, resources: dict[str, float] | None = None, silence_logs: int | None = None, death_timeout: Any | None = None, preload: list[str] | None = None, preload_argv: list[str] | list[list[str]] | None = None, security: Security | dict[str, Any] | None = None, contact_address: str | None = None, heartbeat_interval: Any = '1s', extensions: dict[str, type] | None = None, metrics: Mapping[str, Callable[[Worker], Any]] = {}, startup_information: Mapping[str, Callable[[Worker], Any]] = {}, interface: str | None = None, host: str | None = None, port: int | str | Collection[int] | None = None, protocol: str | None = None, dashboard_address: str | None = None, dashboard: bool = False, http_prefix: str = '/', nanny: Nanny | None = None, plugins: tuple[WorkerPlugin, ...] = (), low_level_profiler: bool | None = None, validate: bool | None = None, profile_cycle_interval=None, lifetime: Any | None = None, lifetime_stagger: Any | None = None, lifetime_restart: bool | None = None, transition_counter_max: int | Literal[False] = False, memory_limit: str | float = 'auto', data: WorkerDataParameter = None, memory_target_fraction: float | Literal[False] | None = None, memory_spill_fraction: float | Literal[False] | None = None, memory_pause_fraction: float | Literal[False] | None = None, scheduler_sni: str | None = None, WorkerStateClass: type = <class 'distributed.worker_state_machine.WorkerState'>, **kwargs)[source]

Dask 分布式集群中的 Worker 节点

Worker 执行两个功能

  1. 从本地字典提供数据服务

  2. 对该数据和来自对等 Worker 的数据执行计算

Worker 向调度器报告其数据信息,并在执行计算需要时,通过调度器从其他 Worker 收集数据。

您可以使用 dask worker 命令行应用程序启动 Worker

$ dask worker scheduler-ip:port

使用 --help 标志查看更多选项

$ dask worker --help

此文档字符串的其余部分介绍了 Worker 用于管理和跟踪内部计算的内部状态。

状态

信息状态

这些属性在执行期间不会发生显著变化。

  • nthreads: int

    此 Worker 进程使用的线程数 (nthreads)

  • executors: dict[str, concurrent.futures.Executor]

    用于执行计算的执行器。始终包含默认执行器。

  • local_directory: path

    本地机器上用于存储临时文件的路径

  • scheduler: PooledRPCCall

    调度器的位置。参见 .ip/.port 属性。

  • name: string

    别名

  • services: {str: Server}

    在此 Worker 上运行的辅助 Web 服务器

  • service_ports: {str: port}

  • transfer_outgoing_count_limit: int

    并发传出数据传输的最大数量。另请参见 distributed.worker_state_machine.WorkerState.transfer_incoming_count_limit

  • batched_stream: BatchedSend

    用于与调度器通信的批处理流

  • log: [(message)]

    结构化且可查询的日志。参见 Worker.story

易变状态

这些属性跟踪此 Worker 正在尝试完成的任务的进度。在下面的描述中,key 是我们要计算的任务的名称,dep 是我们要从其他地方收集的依赖数据的名称。

  • threads: {key: int}

    任务在其上运行的线程 ID

  • active_threads: {int: key}

    当前在活跃线程上运行的键

  • state: WorkerState

    封装的状态机。参见 BaseWorkerWorkerState

参数
scheduler_ip: str, 可选
scheduler_port: int, 可选
scheduler_file: str, 可选
host: str, 可选
data: MutableMapping, type, None

用于存储的对象,默认构建一个基于磁盘的 LRU 字典。

如果提供了用于构造存储对象的可调用对象,则如果调用签名中有一个名为 worker_local_directory 的参数,它将接收 Worker 的属性 local_directory 作为参数。

nthreads: int, 可选
local_directory: str, 可选

放置本地资源的目录

name: str, 可选
memory_limit: int, float, string

此 Worker 应使用的内存字节数。设置为零表示无限制。设置为 'auto' 以计算为 system.MEMORY_LIMIT * min(1, nthreads / total_cores) 使用字符串或数字,例如 5GB 或 5e9

memory_target_fraction: float 或 False

尝试保持在内存阈值下方的百分比(默认值:从配置键 distributed.worker.memory.target 读取)

memory_spill_fraction: float 或 False

开始溢出到磁盘时的内存百分比(默认值:从配置键 distributed.worker.memory.spill 读取)

memory_pause_fraction: float 或 False

停止运行新任务时的内存百分比(默认值:从配置键 distributed.worker.memory.pause 读取)

max_spill: int, string 或 False

溢出到磁盘的字节数限制(默认值:从配置键 distributed.worker.memory.max-spill 读取)

executor: concurrent.futures.Executor, dict[str, concurrent.futures.Executor], “offload”
要使用的执行器。根据类型,它具有以下含义
  • Executor 实例:默认执行器。

  • Dict[str, Executor]:将名称映射到 Executor 实例。如果字典中没有 “default” 键,将使用 ThreadPoolExecutor(nthreads) 创建一个 “default” 执行器。

  • 字符串:字符串 “offload”,它指的是用于卸载通信的同一个线程池。这导致同一个线程用于反序列化和计算。

resources: dict

此 Worker 拥有的资源,例如 {'GPU': 2}

nanny: str

如果 Nanny 存在,用于联系 Nanny 的地址

lifetime: str

在此时间(例如 “1 小时”)后,我们将优雅地关闭 Worker。默认为 None,表示没有明确的关闭时间。

lifetime_stagger: str

错开生命周期值的时间(例如 “5 分钟”)。实际生命周期将在 lifetime +/- lifetime_stagger 之间随机均匀选择。

lifetime_restart: bool

Worker 达到其生命周期后是否重新启动。默认为 False

kwargs: 可选

传递给 ServerNode 构造器的附加参数

示例

使用命令行启动 Worker

$ dask scheduler
Start scheduler at 127.0.0.1:8786

$ dask worker 127.0.0.1:8786
Start worker at:               127.0.0.1:1234
Registered with scheduler at:  127.0.0.1:8786
batched_send(msg: dict[str, Any]) None[source]

实现了 BaseWorker 抽象方法。

通过批量通信向调度器发送一个即发即弃的消息。

如果我们当前未连接到调度器,消息将被静默丢弃!

async close(timeout: float = 30, executor_wait: bool = True, nanny: bool = True, reason: str = 'worker-close') str | None[source]

关闭 Worker

关闭 Worker 上运行的异步操作,停止所有执行器和通信。如果请求,这也关闭 Nanny。

参数
timeout

关闭单个指令的超时时间(秒)

executor_wait

如果为 True,同步关闭执行器,否则异步关闭

nanny

如果为 True,关闭 Nanny

reason

关闭 Worker 的原因

返回值
str | None

如果 Worker 已处于关闭状态或失败,则为 None,否则为 “OK”

async close_gracefully(restart=None, reason: str = 'worker-close-gracefully')[source]

优雅地关闭 Worker

这首先通知调度器我们要关闭,并要求它将我们的数据移到其他地方。之后,我们像往常一样关闭。

property data: collections.abc.MutableMapping[Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], object]

所有已完成任务的 {任务键: 任务负载},无论它们是在此 Worker 上计算的还是在其他地方计算然后通过网络传输到此处的。

使用默认配置时,这是一个 zict 缓冲区,当超过目标阈值时会自动溢出到磁盘。如果禁用溢出,则是一个普通的字典。它也可以是在初始化 Worker 或 Nanny 时传递的用户定义的任意类似字典的对象。Worker 逻辑应将其视为不透明对象,并遵循 MutableMapping API。

注意

此集合也可通过 self.state.dataself.memory_manager.data 访问。

digest_metric(name: collections.abc.Hashable, value: float) None[source]

通过调用 Server.digest_metric 实现 BaseWorker.digest_metric

async execute(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent[source]

执行任务。实现了 BaseWorker 抽象方法。

async gather(who_has: dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], list[str]]) dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], object][source]

Scheduler.rebalance() 和 Scheduler.replicate() 使用的端点

async gather_dep(worker: str, to_gather: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], total_nbytes: int, *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent[source]

实现了 BaseWorker 抽象方法

get_current_task() Union[str, int, float, tuple[ForwardRef('Key'), ...]][source]

获取当前正在运行的任务的键

这仅在任务内部运行时才有意义

另请参见

get_worker

示例

>>> from dask.distributed import get_worker
>>> def f():
...     return get_worker().get_current_task()
>>> future = client.submit(f)  
>>> future.result()  
'f-1234'
handle_stimulus(*stims: distributed.worker_state_machine.StateMachineEvent) None[source]

覆盖 BaseWorker 方法以增加验证

log_event(topic: str | collections.abc.Collection[str], msg: Any) None[source]

在给定主题下记录事件

参数
topicstr, list[str]

记录事件的主题名称。要在多个主题下记录同一事件,请传递主题名称列表。

msg

要记录的事件消息。注意,这必须是 msgpack 可序列化的。

另请参见

Client.log_event
async retry_busy_worker_later(worker: str) distributed.worker_state_machine.StateMachineEvent[source]

等待一段时间,然后将对等 Worker 从忙碌状态中移除。实现了 BaseWorker 抽象方法。

async start_unsafe()[source]

尝试启动服务器。这是非幂等的,并且不防止并发启动尝试。

这旨在由子类覆盖或调用。对于安全的启动,请使用 Server.start

如果配置了 death_timeout,我们将要求此协程在此超时时间到达之前完成。如果达到超时时间,我们将关闭实例并引发 asyncio.TimeoutError

transfer_outgoing_bytes: int

当前向其他 Worker 进行的开放数据传输的总大小

transfer_outgoing_bytes_total: int

向其他 Worker 进行的数据传输总大小(包括进行中和失败的传输)

transfer_outgoing_count: int

当前向其他 Worker 进行的开放数据传输数量

transfer_outgoing_count_total: int

自 Worker 启动以来向其他 Worker 进行的数据传输总数

trigger_profile() None[source]

从所有活跃计算线程获取帧

将这些帧合并到现有的性能分析计数中

property worker_address

为了与 Nanny 的 API 兼容性

Nanny

Dask Worker 默认由一个小的 Nanny 进程启动、监控和管理。

class distributed.nanny.Nanny(scheduler_ip=None, scheduler_port=None, scheduler_file=None, worker_port: int | str | collections.abc.Collection[int] | None = 0, nthreads=None, loop=None, local_directory=None, services=None, name=None, memory_limit='auto', reconnect=True, validate=False, quiet=False, resources=None, silence_logs=None, death_timeout=None, preload=None, preload_argv=None, preload_nanny=None, preload_nanny_argv=None, security=None, contact_address=None, listen_address=None, worker_class=None, env=None, interface=None, host=None, port: int | str | collections.abc.Collection[int] | None = None, protocol=None, config=None, **worker_kwargs)[source]

管理 Worker 进程的进程

Nanny 启动 Worker 进程,监视它们,并在必要时杀死或重新启动它们。如果您想使用 Client.restart 方法,或者如果 Worker 达到其内存限制的终止比例时自动重新启动 Worker,则 Nanny 是必需的。

Nanny 的参数与 Worker 的参数基本相同,但以下列出的例外情况除外。

参数
env: dict, 可选

在 Nanny 初始化时设置的环境变量也将确保在 Worker 进程中设置。此参数允许覆盖或以其他方式为 Worker 设置环境变量。也可以使用选项 distributed.nanny.environ 设置环境变量。优先级如下:

  1. Nanny 参数

  2. 现有环境变量

  3. Dask 配置

注意

一些环境变量,如 OMP_NUM_THREADS,必须在导入 numpy 之前设置才能生效。其他变量,如 MALLOC_TRIM_THRESHOLD_(参见内存未释放回操作系统),必须在启动 Linux 进程之前设置。如果在这里或 distributed.nanny.environ 中设置这些变量将无效;它们必须在 distributed.nanny.pre-spawn-environ 中设置,以便在派生子进程之前设置它们,即使这意味着污染运行 Nanny 的进程。

出于同样的原因,请注意将 distributed.worker.multiprocessing-methodspawn 更改为 forkforkserver 可能会抑制某些环境变量;如果这样做,您应该在启动 dask-worker 之前在 shell 中自己设置这些变量。

另请参见

Worker
async close(timeout: float = 5, reason: str = 'nanny-close') Literal['OK'][source]

关闭 Worker 进程,停止所有通信。

close_gracefully(reason: str = 'nanny-close-gracefully') None[source]

一个信号,表示当 Worker 消失时,我们不应尝试重新启动它们

这在集群关闭过程中使用。

async instantiate() distributed.core.Status[source]

启动本地 Worker 进程

阻塞直到进程启动并且调度器被正确告知

async kill(timeout: float = 5, reason: str = 'nanny-kill') None[source]

杀死本地 Worker 进程

阻塞直到进程关闭并且调度器被正确告知

log_event(topic, msg)[source]

在给定主题下记录事件

参数
topicstr, list[str]

记录事件的主题名称。要在多个主题下记录同一事件,请传递主题名称列表。

msg

要记录的事件消息。注意,这必须是 msgpack 可序列化的。

另请参见

Client.log_event
async start_unsafe()[source]

启动 Nanny,启动本地进程,开始监视