Worker
目录
Worker¶
概述¶
Worker 提供两个功能
根据调度器指示计算任务
存储计算结果并提供给其他 Worker 或客户端
每个 Worker 都包含一个线程池,用于根据调度器的请求评估任务。它将这些任务的结果存储在本地,并按需将其提供给其他 Worker 或客户端。如果 Worker 被要求评估一个任务,而它没有所有必需的数据,那么它会联系它的对等 Worker 来收集必需的依赖项。
调度器与两个 Worker(Alice 和 Bob)之间的典型对话可能如下所示
Scheduler -> Alice: Compute ``x <- add(1, 2)``!
Alice -> Scheduler: I've computed x and am holding on to it!
Scheduler -> Bob: Compute ``y <- add(x, 10)``!
You will need x. Alice has x.
Bob -> Alice: Please send me x.
Alice -> Bob: Sure. x is 3!
Bob -> Scheduler: I've computed y and am holding on to it!
数据存储¶
数据本地存储在 .data
属性中的字典中,该字典将键映射到函数调用的结果。
>>> worker.data
{'x': 3,
'y': 13,
...
'(df, 0)': pd.DataFrame(...),
...
}
这个 .data
属性是一个 MutableMapping
,通常是内存和磁盘存储的组合,并采用 LRU 策略在这两者之间移动数据。
了解更多:Worker 内存管理
线程池¶
每个 Worker 将计算发送到 concurrent.futures.ThreadPoolExecutor
中的一个线程进行计算。这些计算与 Worker 通信服务器在同一个进程中进行,以便它们之间可以高效地访问和共享数据。出于数据本地性的目的,Worker 中的所有线程都被视为同一个 Worker。
如果您的计算主要是数值性质的(例如 NumPy 和 Pandas 计算),并且完全释放了 GIL,那么建议运行具有许多线程和一个进程的 dask worker
进程。这可以降低通信成本并通常简化部署。
如果您的计算主要是 Python 代码且不释放 GIL,那么建议运行具有许多进程和每个进程一个线程的 dask worker
进程
$ dask worker scheduler:8786 --nworkers 8 --nthreads 1
这将启动 8 个 Worker 进程,每个进程都有一个大小为 1 的 ThreadPoolExecutor
。
如果您的计算是 Python 外部的,长时间运行且不释放 GIL,那么请注意,在计算运行时,Worker 进程将无法与其他 Worker 或调度器通信。应避免这种情况。如果您不链接自己的自定义 C/Fortran 代码,则此主题可能不适用。
内部调度¶
参见专用页面:Worker 状态机
API 文档¶
- class distributed.worker.Worker(scheduler_ip: str | None = None, scheduler_port: int | None = None, *, scheduler_file: str | None = None, nthreads: int | None = None, loop: IOLoop | None = None, local_directory: str | None = None, services: dict | None = None, name: Any | None = None, reconnect: bool | None = None, executor: Executor | dict[str, Executor] | Literal['offload'] | None = None, resources: dict[str, float] | None = None, silence_logs: int | None = None, death_timeout: Any | None = None, preload: list[str] | None = None, preload_argv: list[str] | list[list[str]] | None = None, security: Security | dict[str, Any] | None = None, contact_address: str | None = None, heartbeat_interval: Any = '1s', extensions: dict[str, type] | None = None, metrics: Mapping[str, Callable[[Worker], Any]] = {}, startup_information: Mapping[str, Callable[[Worker], Any]] = {}, interface: str | None = None, host: str | None = None, port: int | str | Collection[int] | None = None, protocol: str | None = None, dashboard_address: str | None = None, dashboard: bool = False, http_prefix: str = '/', nanny: Nanny | None = None, plugins: tuple[WorkerPlugin, ...] = (), low_level_profiler: bool | None = None, validate: bool | None = None, profile_cycle_interval=None, lifetime: Any | None = None, lifetime_stagger: Any | None = None, lifetime_restart: bool | None = None, transition_counter_max: int | Literal[False] = False, memory_limit: str | float = 'auto', data: WorkerDataParameter = None, memory_target_fraction: float | Literal[False] | None = None, memory_spill_fraction: float | Literal[False] | None = None, memory_pause_fraction: float | Literal[False] | None = None, scheduler_sni: str | None = None, WorkerStateClass: type = <class 'distributed.worker_state_machine.WorkerState'>, **kwargs)[source]¶
Dask 分布式集群中的 Worker 节点
Worker 执行两个功能
从本地字典提供数据服务
对该数据和来自对等 Worker 的数据执行计算
Worker 向调度器报告其数据信息,并在执行计算需要时,通过调度器从其他 Worker 收集数据。
您可以使用
dask worker
命令行应用程序启动 Worker$ dask worker scheduler-ip:port
使用
--help
标志查看更多选项$ dask worker --help
此文档字符串的其余部分介绍了 Worker 用于管理和跟踪内部计算的内部状态。
状态
信息状态
这些属性在执行期间不会发生显著变化。
nthreads
:int
此 Worker 进程使用的线程数 (nthreads)
executors
:dict[str, concurrent.futures.Executor]
用于执行计算的执行器。始终包含默认执行器。
local_directory
:path
本地机器上用于存储临时文件的路径
scheduler
:PooledRPCCall
调度器的位置。参见
.ip/.port
属性。
name
:string
别名
services
:{str: Server}
在此 Worker 上运行的辅助 Web 服务器
service_ports
:{str: port}
transfer_outgoing_count_limit
:int
并发传出数据传输的最大数量。另请参见
distributed.worker_state_machine.WorkerState.transfer_incoming_count_limit
。
batched_stream
:BatchedSend
用于与调度器通信的批处理流
log
:[(message)]
结构化且可查询的日志。参见
Worker.story
易变状态
这些属性跟踪此 Worker 正在尝试完成的任务的进度。在下面的描述中,
key
是我们要计算的任务的名称,dep
是我们要从其他地方收集的依赖数据的名称。threads
:{key: int}
任务在其上运行的线程 ID
active_threads
:{int: key}
当前在活跃线程上运行的键
state
:WorkerState
封装的状态机。参见
BaseWorker
和WorkerState
- 参数
scheduler_ip
: str, 可选scheduler_port
: int, 可选scheduler_file
: str, 可选host
: str, 可选data
: MutableMapping, type, None用于存储的对象,默认构建一个基于磁盘的 LRU 字典。
如果提供了用于构造存储对象的可调用对象,则如果调用签名中有一个名为
worker_local_directory
的参数,它将接收 Worker 的属性local_directory
作为参数。nthreads
: int, 可选local_directory
: str, 可选放置本地资源的目录
name
: str, 可选memory_limit
: int, float, string此 Worker 应使用的内存字节数。设置为零表示无限制。设置为 'auto' 以计算为 system.MEMORY_LIMIT * min(1, nthreads / total_cores) 使用字符串或数字,例如 5GB 或 5e9
memory_target_fraction
: float 或 False尝试保持在内存阈值下方的百分比(默认值:从配置键
distributed.worker.memory.target
读取)memory_spill_fraction
: float 或 False开始溢出到磁盘时的内存百分比(默认值:从配置键
distributed.worker.memory.spill
读取)memory_pause_fraction
: float 或 False停止运行新任务时的内存百分比(默认值:从配置键
distributed.worker.memory.pause
读取)max_spill
: int, string 或 False溢出到磁盘的字节数限制(默认值:从配置键
distributed.worker.memory.max-spill
读取)executor
: concurrent.futures.Executor, dict[str, concurrent.futures.Executor], “offload”- 要使用的执行器。根据类型,它具有以下含义
Executor
实例:默认执行器。Dict[str, Executor]
:将名称映射到Executor
实例。如果字典中没有 “default” 键,将使用ThreadPoolExecutor(nthreads)
创建一个 “default” 执行器。字符串:字符串 “offload”,它指的是用于卸载通信的同一个线程池。这导致同一个线程用于反序列化和计算。
resources
: dict此 Worker 拥有的资源,例如
{'GPU': 2}
nanny
: str如果 Nanny 存在,用于联系 Nanny 的地址
lifetime
: str在此时间(例如 “1 小时”)后,我们将优雅地关闭 Worker。默认为 None,表示没有明确的关闭时间。
lifetime_stagger
: str错开生命周期值的时间(例如 “5 分钟”)。实际生命周期将在 lifetime +/- lifetime_stagger 之间随机均匀选择。
lifetime_restart
: boolWorker 达到其生命周期后是否重新启动。默认为 False
kwargs
: 可选传递给 ServerNode 构造器的附加参数
示例
使用命令行启动 Worker
$ dask scheduler Start scheduler at 127.0.0.1:8786 $ dask worker 127.0.0.1:8786 Start worker at: 127.0.0.1:1234 Registered with scheduler at: 127.0.0.1:8786
- batched_send(msg: dict[str, Any]) None [source]¶
实现了 BaseWorker 抽象方法。
通过批量通信向调度器发送一个即发即弃的消息。
如果我们当前未连接到调度器,消息将被静默丢弃!
- async close(timeout: float = 30, executor_wait: bool = True, nanny: bool = True, reason: str = 'worker-close') str | None [source]¶
关闭 Worker
关闭 Worker 上运行的异步操作,停止所有执行器和通信。如果请求,这也关闭 Nanny。
- 参数
- timeout
关闭单个指令的超时时间(秒)
- executor_wait
如果为 True,同步关闭执行器,否则异步关闭
- nanny
如果为 True,关闭 Nanny
- reason
关闭 Worker 的原因
- 返回值
- str | None
如果 Worker 已处于关闭状态或失败,则为 None,否则为 “OK”
- async close_gracefully(restart=None, reason: str = 'worker-close-gracefully')[source]¶
优雅地关闭 Worker
这首先通知调度器我们要关闭,并要求它将我们的数据移到其他地方。之后,我们像往常一样关闭。
- property data: collections.abc.MutableMapping[Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], object]¶
所有已完成任务的 {任务键: 任务负载},无论它们是在此 Worker 上计算的还是在其他地方计算然后通过网络传输到此处的。
使用默认配置时,这是一个 zict 缓冲区,当超过目标阈值时会自动溢出到磁盘。如果禁用溢出,则是一个普通的字典。它也可以是在初始化 Worker 或 Nanny 时传递的用户定义的任意类似字典的对象。Worker 逻辑应将其视为不透明对象,并遵循 MutableMapping API。
注意
此集合也可通过
self.state.data
和self.memory_manager.data
访问。
- digest_metric(name: collections.abc.Hashable, value: float) None [source]¶
通过调用 Server.digest_metric 实现 BaseWorker.digest_metric
- async execute(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent [source]¶
执行任务。实现了 BaseWorker 抽象方法。
- async gather(who_has: dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], list[str]]) dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], object] [source]¶
Scheduler.rebalance() 和 Scheduler.replicate() 使用的端点
- async gather_dep(worker: str, to_gather: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], total_nbytes: int, *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent [source]¶
实现了 BaseWorker 抽象方法
- get_current_task() Union[str, int, float, tuple[ForwardRef('Key'), ...]] [source]¶
获取当前正在运行的任务的键
这仅在任务内部运行时才有意义
另请参见
get_worker
示例
>>> from dask.distributed import get_worker >>> def f(): ... return get_worker().get_current_task()
>>> future = client.submit(f) >>> future.result() 'f-1234'
- handle_stimulus(*stims: distributed.worker_state_machine.StateMachineEvent) None [source]¶
覆盖 BaseWorker 方法以增加验证
- log_event(topic: str | collections.abc.Collection[str], msg: Any) None [source]¶
在给定主题下记录事件
- 参数
topic
str, list[str]记录事件的主题名称。要在多个主题下记录同一事件,请传递主题名称列表。
- msg
要记录的事件消息。注意,这必须是 msgpack 可序列化的。
另请参见
Client.log_event
- async retry_busy_worker_later(worker: str) distributed.worker_state_machine.StateMachineEvent [source]¶
等待一段时间,然后将对等 Worker 从忙碌状态中移除。实现了 BaseWorker 抽象方法。
- async start_unsafe()[source]¶
尝试启动服务器。这是非幂等的,并且不防止并发启动尝试。
这旨在由子类覆盖或调用。对于安全的启动,请使用
Server.start
。如果配置了
death_timeout
,我们将要求此协程在此超时时间到达之前完成。如果达到超时时间,我们将关闭实例并引发asyncio.TimeoutError
。
- property worker_address¶
为了与 Nanny 的 API 兼容性
Nanny¶
Dask Worker 默认由一个小的 Nanny 进程启动、监控和管理。
- class distributed.nanny.Nanny(scheduler_ip=None, scheduler_port=None, scheduler_file=None, worker_port: int | str | collections.abc.Collection[int] | None = 0, nthreads=None, loop=None, local_directory=None, services=None, name=None, memory_limit='auto', reconnect=True, validate=False, quiet=False, resources=None, silence_logs=None, death_timeout=None, preload=None, preload_argv=None, preload_nanny=None, preload_nanny_argv=None, security=None, contact_address=None, listen_address=None, worker_class=None, env=None, interface=None, host=None, port: int | str | collections.abc.Collection[int] | None = None, protocol=None, config=None, **worker_kwargs)[source]¶
管理 Worker 进程的进程
Nanny 启动 Worker 进程,监视它们,并在必要时杀死或重新启动它们。如果您想使用
Client.restart
方法,或者如果 Worker 达到其内存限制的终止比例时自动重新启动 Worker,则 Nanny 是必需的。Nanny 的参数与 Worker 的参数基本相同,但以下列出的例外情况除外。
- 参数
env
: dict, 可选在 Nanny 初始化时设置的环境变量也将确保在 Worker 进程中设置。此参数允许覆盖或以其他方式为 Worker 设置环境变量。也可以使用选项
distributed.nanny.environ
设置环境变量。优先级如下:Nanny 参数
现有环境变量
Dask 配置
注意
一些环境变量,如
OMP_NUM_THREADS
,必须在导入 numpy 之前设置才能生效。其他变量,如MALLOC_TRIM_THRESHOLD_
(参见内存未释放回操作系统),必须在启动 Linux 进程之前设置。如果在这里或distributed.nanny.environ
中设置这些变量将无效;它们必须在distributed.nanny.pre-spawn-environ
中设置,以便在派生子进程之前设置它们,即使这意味着污染运行 Nanny 的进程。出于同样的原因,请注意将
distributed.worker.multiprocessing-method
从spawn
更改为fork
或forkserver
可能会抑制某些环境变量;如果这样做,您应该在启动dask-worker
之前在 shell 中自己设置这些变量。
另请参见
Worker
- async close(timeout: float = 5, reason: str = 'nanny-close') Literal['OK'] [source]¶
关闭 Worker 进程,停止所有通信。
- close_gracefully(reason: str = 'nanny-close-gracefully') None [source]¶
一个信号,表示当 Worker 消失时,我们不应尝试重新启动它们
这在集群关闭过程中使用。
- async kill(timeout: float = 5, reason: str = 'nanny-kill') None [source]¶
杀死本地 Worker 进程
阻塞直到进程关闭并且调度器被正确告知