API
目录
API¶
Client
客户端连接并提交计算到 Dask 集群(例如 distributed.LocalCluster
)
|
连接并提交计算到 Dask 集群 |
线程局部、任务局部上下文管理器,使 Client.current 类方法返回 self。 |
|
在 Worker 上运行内存、磁盘和网络带宽基准测试 |
|
|
所有相关 key 的活动调用栈 |
|
取消正在运行的 future 这会阻止尚未运行的 future 任务被调度,如果它们已经运行则删除它们。 |
|
关闭此客户端 |
|
在集群上计算 dask 集合 |
|
在 as_client 的上下文内运行时,返回上下文局部的当前客户端。 |
|
提取整个集群状态的转储并持久化到磁盘或 URL。 |
|
开始将给定的 logger(默认为 root)及其下的所有 loggers 从 worker 任务转发到客户端进程。 |
|
futures_of 的封装方法 |
|
从分布式内存中收集 future |
|
计算 dask 图 |
|
如果存在,从调度器获取命名数据集。 |
|
检索结构化的主题日志 |
|
返回一个 concurrent.futures Executor,用于在此 Client 上提交任务 |
|
从调度器获取任意元数据 |
从调度器获取日志 |
|
|
从调度器获取任务流数据 |
|
返回调度器、所有 worker 和自身的版本信息 |
|
从 workers 获取日志 |
|
哪些 key 由哪些 worker 持有 |
|
列出调度器上可用的命名数据集 |
|
在给定主题下记录一个事件 |
|
将函数映射到一系列参数上 |
|
集群上每个 key 占用的字节数 |
|
每个 worker 节点上可用的线程/核心数 |
|
如果 collections 的任务已存在 futures,则替换它们 |
|
每个 worker 节点上可用的线程/核心数 |
|
在集群上持久化 dask 集合 |
|
当前在每个 worker 上运行的任务 |
|
收集有关近期工作的统计分析信息 |
|
向调度器发布命名数据集 |
|
在网络内重新平衡数据 |
|
注册一个插件。 |
|
注册一个调度器插件。 |
|
为所有当前和未来的 worker 注册一个 setup 回调函数。 |
|
为所有当前和未来的 worker 注册一个生命周期 worker 插件。 |
|
设置网络中 future 的复制 |
|
重启所有 worker。 |
|
重启指定的 worker 集合 |
|
在调度器上淘汰某些 worker |
|
重试失败的 future |
|
在任务调度系统之外在所有 worker 上运行函数 |
|
在调度器进程上运行函数 |
|
将数据分散到分布式内存中 |
|
关于集群中 worker 的基本信息 |
|
在调度器中设置任意元数据 |
关闭连接的调度器和 worker |
|
|
在单独线程中启动调度器运行 |
|
返回给定 keys 或 stimulus_id 的集群范围故事 |
|
向调度器提交函数应用 |
|
订阅主题并为每个接收到的事件执行处理程序 |
|
停止将给定 logger(默认为 root)从 worker 任务转发到客户端进程。 |
|
从调度器移除命名数据集 |
注销一个调度器插件 |
|
|
注销一个生命周期 worker 插件 |
|
取消订阅主题并移除事件处理程序 |
|
将本地包上传到调度器和 worker |
|
阻塞调用,等待 n 个 worker 后继续 |
|
存储每个 future 数据的 worker |
|
将调度器信息写入 json 文件。 |
|
获取此线程的客户端 |
获取当前运行此任务的 worker |
|
|
在任务中获取客户端。 |
|
使此任务脱离 worker 的线程池 |
|
使此线程重新加入 ThreadPoolExecutor |
|
内置 |
|
内置 |
重新调度此任务 |
对于任何计算,无论成功或失败,都在本地执行任务以进行调试。 |
|
对于失败的计算,在本地执行导致错误的任务以进行调试。 |
Future
|
远程运行的计算 |
当 future 完成时,在 future 上调用回调 |
|
|
|
|
取消运行此 future 的请求 |
如果 future 已被取消,则返回 True |
|
返回计算是否完成。 |
|
|
返回失败任务的异常 |
注意事项 |
|
|
等待计算完成,将结果收集到本地进程。 |
|
如果此 future 失败则重试 |
|
返回失败任务的回溯 |
同步
|
分布式集中式 Event,相当于 asyncio.Event |
|
分布式集中式 Lock |
|
分布式集中式 Lock |
|
此 semaphore 将跟踪调度器上的租约,这些租约可由此类实例获取和释放。 |
|
分布式 Queue |
|
分布式全局 Variable |
其他
|
按 future 完成的顺序返回它们 |
跟踪 future 的进度 |
|
|
等待所有/任何 future 完成 |
|
即使我们释放 future,也要至少运行任务一次 |
|
集合中的 Future 对象 |
|
在上下文块内收集任务流 |
在上下文块内收集任务元数据 |
|
|
收集性能报告 |
工具
用于存储按换行符分隔的日志条目字符串的容器 |
|
用于存储将名称映射到日志条目字符串的字典的容器 |
|
在调度器上生成 Memray 分析报告并下载生成的报告。 |
|
在 worker 上生成 Memray 分析报告并下载生成的报告。 |
异步方法¶
大多数方法和函数都可以在阻塞或异步环境中使用 Tornado 协程很好地工作。如果在 Tornado IOLoop 中使用,则应适当地 yield 或 await 阻塞操作。
必须通过传递 asynchronous=True
关键字来告知客户端您打算在异步环境中使用它
# blocking
client = Client()
future = client.submit(func, *args) # immediate, no blocking/async difference
result = client.gather(future) # blocking
# asynchronous Python 2/3
client = yield Client(asynchronous=True)
future = client.submit(func, *args) # immediate, no blocking/async difference
result = yield client.gather(future) # non-blocking/asynchronous
# asynchronous Python 3
client = await Client(asynchronous=True)
future = client.submit(func, *args) # immediate, no blocking/async difference
result = await client.gather(future) # non-blocking/asynchronous
异步变体必须在 Tornado 协程中运行。有关更多信息,请参阅 Asynchronous 文档。
Client¶
- class distributed.Client(address=None, loop=None, timeout=_NoDefault.no_default, set_as_default=True, scheduler_file=None, security=None, asynchronous=False, name=None, heartbeat_interval=None, serializers=None, deserializers=None, extensions={}, direct_to_workers=None, connection_limit=512, **kwargs)[source]¶
连接并提交计算到 Dask 集群
Client 连接用户到 Dask 集群。它提供了一个围绕函数和 future 的异步用户界面。此类类似于
concurrent.futures
中的 executors,但也允许在submit/map
调用中使用Future
对象。当 Client 被实例化时,默认接管所有dask.compute
和dask.persist
调用。通常也可以不指定调度器地址来创建 Client,例如
Client()
。在这种情况下,Client 会在后台创建一个LocalCluster
并连接到它。在这种情况下,任何额外的关键字参数都会从 Client 传递到 LocalCluster。有关更多信息,请参阅 LocalCluster 文档。- 参数
- address: string, 或 Cluster
这可以是
Scheduler
服务器的地址,例如字符串'127.0.0.1:8786'
或集群对象,例如LocalCluster()
- loop
事件循环
- timeout: int (默认为配置 ``distributed.comm.timeouts.connect``)
与调度器建立初始连接的超时时长
- set_as_default: bool (True)
使用此 Client 作为全局 dask 调度器
- scheduler_file: string (可选)
包含调度器信息文件的路径(如果可用)
- security: Security 或 bool (可选)
可选的安全信息。如果创建本地集群,也可以传入
True
,在这种情况下将自动创建临时自签名凭证。- asynchronous: bool (默认为 False)
如果在 async/await 函数或 Tornado gen.coroutines 中使用此客户端,则设置为 True。否则,正常使用应保持 False。
- name: string (可选)
为客户端指定一个名称,该名称将包含在调度器上生成的与此客户端相关的日志中
- heartbeat_interval: int (可选)
向调度器发送心跳的间隔时间(毫秒)
- serializers
序列化对象时使用的可迭代方法。有关更多信息,请参阅 Serialization。
- deserializers
反序列化对象时使用的可迭代方法。有关更多信息,请参阅 Serialization。
- extensionslist
扩展
- direct_to_workers: bool (可选)
是否直接连接到 worker,或者请求调度器充当中介。
- connection_limitint
连接池中一次维持的开放通信数量
- **kwargs
如果不传递调度器地址,Client 将创建一个
LocalCluster
对象,并传递任何额外的关键字参数。
示例
初始化时提供集群调度器节点的地址
>>> client = Client('127.0.0.1:8786')
使用
submit
方法将单个计算发送到集群>>> a = client.submit(add, 1, 2) >>> b = client.submit(add, 10, 20)
继续对结果使用 submit 或 map 以构建更大的计算
>>> c = client.submit(add, a, b)
使用
gather
方法收集结果。>>> client.gather(c) 33
您也可以不带参数调用 Client 以创建您自己的本地集群。
>>> client = Client() # makes your own local "cluster"
额外的关键字将直接传递给 LocalCluster
>>> client = Client(n_workers=2, threads_per_worker=4)
- property amm¶
- as_current()[source]¶
线程局部、任务局部上下文管理器,使 Client.current 类方法返回 self。在此上下文管理器内反序列化的任何 Future 对象将自动附加到此 Client。
- benchmark_hardware() dict [source]¶
在 Worker 上运行内存、磁盘和网络带宽基准测试
- 返回值
- result: dict
一个字典,将名称“disk”、“memory”和“network”映射到将大小映射到带宽的字典。这些带宽是跨集群运行计算的许多 worker 的平均值。
- call_stack(futures=None, keys=None)[source]¶
所有相关 key 的活动调用栈
可以通过在
futures=
关键字中提供 futures 或集合,或在keys=
关键字中提供明确的 key 列表来指定感兴趣的数据。如果两者都未提供,则返回所有调用栈。- 参数
- futureslist (可选)
futures 列表,默认为所有数据
- keyslist (可选)
key 名称列表,默认为所有数据
示例
>>> df = dd.read_parquet(...).persist() >>> client.call_stack(df) # call on collections
>>> client.call_stack() # Or call with no arguments for all activity
- cancel(futures, asynchronous=None, force=False, reason=None, msg=None)[source]¶
取消正在运行的 future 这会阻止尚未运行的 future 任务被调度,如果它们已经运行则删除它们。调用后,此结果和所有依赖结果将不再可访问
- 参数
- futuresList[Future]
Future 列表
- asynchronous: bool
如果为 True,客户端处于异步模式
- forceboolean (False)
即使其他客户端需要此 future,也取消它
- reason: str
取消 future 的原因
- msgstr
将附加到已取消 future 的消息
- close(timeout=_NoDefault.no_default)[source]¶
关闭此客户端
Python 会话结束时,客户端也会自动关闭
如果像
Client()
这样不带参数启动客户端,那么这也会关闭同时启动的本地集群。- 参数
- timeoutnumber
超时秒数,之后将引发
dask.distributed.TimeoutError
另请参见
- compute(collections, sync=False, optimize_graph=True, workers=None, allow_other_workers=False, resources=None, retries=0, priority=0, fifo_timeout='60s', actors=None, traverse=True, **kwargs)[source]¶
在集群上计算 dask 集合
- 参数
- collections可迭代的 dask 对象或单个 dask 对象
例如 dask.array 或 dataframe 或 dask.value 对象之类的集合
- syncbool (可选)
如果为 False(默认),则返回 Futures;如果为 True,则返回具体值
- optimize_graphbool
是否优化底层图
- workersstring 或 string 的可迭代对象
可以在其上执行计算的 worker 主机名集合。留空默认为所有 worker(常见情况)
- allow_other_workersbool (默认为 False)
与 workers 一起使用。指示计算是否可以在不在 workers 集合中的 worker 上执行。
- retriesint (默认为 0)
如果计算结果失败,允许的自动重试次数
- priorityNumber
可选的任务优先级。零是默认值。更高的优先级具有优先权
- fifo_timeouttimedelta str (默认为 ’60s’)
允许在调用之间考虑相同优先级的时间量
- traversebool (默认为 True)
默认情况下,dask 遍历内置的 python 集合,查找传递给
compute
的 dask 对象。对于大型集合,这可能会很昂贵。如果参数都不包含任何 dask 对象,请设置traverse=False
以避免此遍历。- resourcesdict (默认为 {})
定义每个映射任务实例在 worker 上所需的 resources;例如
{'GPU': 2}
。有关定义资源的详细信息,请参见 worker resources。- actorsbool 或 dict (默认为 None)
这些任务是否应作为有状态 actor 存在于 worker 上。指定为全局(True/False)或按任务(
{'x': True, 'y': False}
)进行。有关其他详细信息,请参见 Actors。- **kwargs
传递给图优化调用的选项
- 返回值
- 如果输入是序列,则为 Futures 列表;否则为单个 future
另请参见
Client.get
正常的同步 dask.get 函数
示例
>>> from dask import delayed >>> from operator import add >>> x = delayed(add)(1, 2) >>> y = delayed(add)(x, x) >>> xx, yy = client.compute([x, y]) >>> xx <Future: status: finished, key: add-8f6e709446674bad78ea8aeecfee188e> >>> xx.result() 3 >>> yy.result() 6
也支持单个参数
>>> xx = client.compute(x)
- classmethod current(allow_global=True)[source]¶
在 as_client 上下文内运行时,返回上下文局部的当前客户端。否则,返回最新初始化的 Client。如果不存在 Client 实例,则引发 ValueError。如果 allow_global 设置为 False,则在 as_client 上下文管理器之外运行时引发 ValueError。
- 参数
- allow_globalbool
如果为 True,返回默认客户端
- 返回值
- Client
当前客户端
- 引发
- ValueError
如果未设置客户端,则引发 ValueError
另请参见
default_client
- property dashboard_link¶
调度器仪表盘链接。
- 返回值
- str
仪表盘 URL。
示例
在您的默认网络浏览器中打开仪表盘
>>> import webbrowser >>> from distributed import Client >>> client = Client() >>> webbrowser.open(client.dashboard_link)
- dump_cluster_state(filename: str = 'dask-cluster-dump', write_from_scheduler: bool | None = None, exclude: collections.abc.Collection[str] = (), format: Literal['msgpack', 'yaml'] = 'msgpack', **storage_options)[source]¶
提取整个集群状态的转储并持久化到磁盘或 URL。这仅用于调试目的。
警告:调度器(以及客户端,如果将转储写入本地)上的内存使用量可能很大。在大型或长时间运行的集群上,这可能需要几分钟。在处理转储时,调度器可能无响应。
结果将存储在 dict 中
{ "scheduler": {...}, # scheduler state "workers": { worker_addr: {...}, # worker state ... } "versions": { "scheduler": {...}, "workers": { worker_addr: {...}, ... } } }
- 参数
- filename
要写入的路径或 URL。适当的文件后缀(
.msgpack.gz
或.yaml
)将自动附加。必须是
fsspec.open()
支持的路径(例如s3://my-bucket/cluster-dump
或cluster-dumps/dump
)。请参阅write_from_scheduler
来控制转储是直接从调度器写入filename
,还是通过网络发送回客户端然后在本地写入。- write_from_scheduler
如果为 None(默认),则根据
filename
看起来像 URL 还是本地路径进行推断:如果 filename 包含://
(例如s3://my-bucket/cluster-dump
)则为 True,否则(例如local_dir/cluster-dump
)为 False。如果为 True,则直接从调度器将集群状态写入
filename
。如果filename
是本地路径,转储将写入调度器文件系统上的该路径,因此如果调度器在临时硬件上运行,请小心。当调度器连接到网络文件系统或持久磁盘时,或用于写入 bucket 时很有用。如果为 False,则通过网络将集群状态从调度器传输回客户端,然后写入
filename
。对于大型转储,这效率较低,但当调度器无法访问任何持久存储时很有用。- exclude
应该从转储中排除的属性名称集合,例如排除代码、回溯、日志等。
默认为排除
run_spec
,它是序列化的用户代码。调试通常不需要此项。要允许序列化此项,请传递一个空元组。- format
或者
"msgpack"
或"yaml"
。如果使用 msgpack(默认),输出将作为 msgpack 存储在 gzip 文件中。读取方法
import gzip, msgpack with gzip.open("filename") as fd: state = msgpack.unpack(fd)
或
import yaml try: from yaml import CLoader as Loader except ImportError: from yaml import Loader with open("filename") as fd: state = yaml.load(fd, Loader=Loader)
- **storage_options
写入 URL 时传递给
fsspec.open()
的任何附加参数。
- forward_logging(logger_name=None, level=0)[source]¶
开始将给定的 logger(默认为 root)及其下的所有 loggers 从 worker 任务转发到客户端进程。每当命名 logger 在 worker 端处理 LogRecord 时,该记录将被序列化,发送到客户端,并由客户端具有相同名称的 logger 处理。
请注意,worker 端的 logger 仅在其级别设置得当时处理 LogRecords,而客户端 logger 仅在其自身级别也设置得当时发出转发的 LogRecord。例如,如果您的提交任务向 logger “foo” 记录 DEBUG 消息,那么要使
forward_logging()
导致该消息在您的客户端会话中发出,您必须确保 worker 进程和客户端进程中的 logger “foo” 的级别设置为 DEBUG(或更低)。- 参数
- logger_namestr (可选)
要开始转发的 logger 的名称。
logging
模块分层命名系统的常规规则适用。例如,如果name
是"foo"
,那么不仅"foo"
,而且"foo.bar"
、"foo.baz"
等也将被转发。如果name
是None
,则表示 root logger,因此所有 logger 都将被转发。请注意,logger 仅在其级别足以处理 LogRecord 时才会转发给定的 LogRecord。
- levelstr | int (可选)
可选地限制转发到此级别或更高级别的 LogRecords,即使转发的 logger 的自身级别较低。
示例
为了示例起见,假设我们按照用户的方式配置客户端日志记录:将单个 StreamHandler 附加到 root logger,输出级别为 INFO,输出格式简单
import logging import distributed import io, yaml TYPICAL_LOGGING_CONFIG = ''' version: 1 handlers: console: class : logging.StreamHandler formatter: default level : INFO formatters: default: format: '%(asctime)s %(levelname)-8s [worker %(worker)s] %(name)-15s %(message)s' datefmt: '%Y-%m-%d %H:%M:%S' root: handlers: - console ''' config = yaml.safe_load(io.StringIO(TYPICAL_LOGGING_CONFIG)) logging.config.dictConfig(config)
现在创建一个客户端,并开始将 root logger 从 worker 转发回我们的本地客户端进程。
>>> client = distributed.Client() >>> client.forward_logging() # forward the root logger at any handled level
然后提交一个任务,该任务在 worker 上进行一些错误日志记录。我们看到了客户端 StreamHandler 的输出。
>>> def do_error(): ... logging.getLogger("user.module").error("Hello error") ... return 42 >>> client.submit(do_error).result() 2022-11-09 03:43:25 ERROR [worker tcp://127.0.0.1:34783] user.module Hello error 42
注意 dask 如何也向转发的 LogRecord 添加了属性
"worker"
,我们的自定义 formatter 使用了该属性。这对于准确识别哪个 worker 记录了错误很有用。一个值得强调的细微之处:尽管我们的客户端 root logger 配置为 INFO 级别,但 worker 端的 root logger 仍然是默认的 ERROR 级别,因为我们没有在 worker 上进行任何显式日志配置。因此,worker 端的 INFO 日志将不会被转发,因为它们根本没有被处理。
>>> def do_info_1(): ... # no output on the client side ... logging.getLogger("user.module").info("Hello info the first time") ... return 84 >>> client.submit(do_info_1).result() 84
需要将客户端 logger 的级别设置为 INFO,info 消息才会被处理并转发到客户端。换句话说,客户端转发日志记录的“有效”级别是每个 logger 的客户端级别和 worker 级别中的最大值。
>>> def do_info_2(): ... logger = logging.getLogger("user.module") ... logger.setLevel(logging.INFO) ... # now produces output on the client side ... logger.info("Hello info the second time") ... return 84 >>> client.submit(do_info_2).result() 2022-11-09 03:57:39 INFO [worker tcp://127.0.0.1:42815] user.module Hello info the second time 84
- gather(futures, errors='raise', direct=None, asynchronous=None)[source]¶
从分布式内存中收集 future
接受 future、嵌套的 future 容器、迭代器或队列。返回类型将与输入类型匹配。
- 参数
- futuresFuture 集合
这可以是一个可能嵌套的 Future 对象集合。集合可以是列表、集合或字典
- errorsstring
如果 future 出错,我们应该引发异常,还是跳过它在输出集合中的包含,此处为“raise”或“skip”
- directboolean
是否直接连接到 worker,或请求调度器充当中介。这也可以在创建 Client 时设置。
- asynchronous: bool
如果为 True,客户端处于异步模式
- 返回值
- results: 与输入类型相同的集合,但现在包含
- 收集到的结果而非 future
另请参见
Client.scatter
将数据发送到集群
示例
>>> from operator import add >>> c = Client('127.0.0.1:8787') >>> x = c.submit(add, 1, 2) >>> c.gather(x) 3 >>> c.gather([x, [x], x]) # support lists and dicts [3, [3], 3]
- get(dsk, keys, workers=None, allow_other_workers=None, resources=None, sync=True, asynchronous=None, direct=None, retries=None, priority=0, fifo_timeout='60s', actors=None, **kwargs)[source]¶
计算 dask 图
- 参数
- dskdict
- keysobject,或嵌套的对象列表
- workersstring 或 string 的可迭代对象
可以在其上执行计算的 worker 地址或主机名集合。留空默认为所有 worker(常见情况)
- allow_other_workersbool (默认为 False)
与
workers
一起使用。指示计算是否可以在不在 workers 集合中的 worker 上执行。- resourcesdict (默认为 {})
定义每个映射任务实例在 worker 上所需的
resources
;例如{'GPU': 2}
。有关定义资源的详细信息,请参见 worker resources。- syncbool (可选)
如果为 False,则返回 Futures;如果为 True(默认),则返回具体值。
- asynchronous: bool
如果为 True,客户端处于异步模式
- directbool
是否直接连接到 worker,或请求调度器充当中介。这也可以在创建 Client 时设置。
- retriesint (默认为 0)
如果计算结果失败,允许的自动重试次数
- priorityNumber
可选的任务优先级。零是默认值。更高的优先级具有优先权
- fifo_timeouttimedelta str (默认为 ’60s’)
允许在调用之间考虑相同优先级的时间量
- actorsbool 或 dict (默认为 None)
这些任务是否应作为有状态 actor 存在于 worker 上。指定为全局(True/False)或按任务(
{'x': True, 'y': False}
)进行。有关其他详细信息,请参见 Actors。
- 返回值
- 结果
如果 ‘sync’ 为 True,返回结果。否则,返回打包的已知数据。如果 ‘sync’ 为 False,返回已知数据。否则,返回结果。
另请参见
Client.compute
计算异步集合
示例
>>> from operator import add >>> c = Client('127.0.0.1:8787') >>> c.get({'x': (add, 1, 2)}, 'x') 3
- get_dataset(name, default=_NoDefault.no_default, **kwargs)[source]¶
如果存在,从调度器获取命名数据集。如果不存在,返回默认值或引发 KeyError。
- 参数
- namestr
要检索的数据集的名称
- defaultstr
可选,默认不设置。如果设置,则如果名称不存在,不引发 KeyError,而是返回此默认值
- kwargsdict
传递给 _get_dataset 的附加关键字参数
- 返回值
- 调度器中的数据集(如果存在)
- get_events(topic: str | None = None)[source]¶
检索结构化的主题日志
- 参数
- topicstr (可选)
要检索事件的主题日志名称。如果未提供
topic
,则返回所有主题的日志。
- get_executor(**kwargs)[source]¶
返回一个 concurrent.futures Executor,用于在此 Client 上提交任务
- 参数
- **kwargs
任何与 submit() 或 map() 兼容的参数,例如 workers 或 resources。
- 返回值
- ClientExecutor
一个与 concurrent.futures API 完全兼容的 Executor 对象。
- get_metadata(keys, default=_NoDefault.no_default)[source]¶
从调度器获取任意元数据
有关包含示例的完整 docstring,请参阅 set_metadata
- 参数
- keyskey 或 list
要访问的 key。如果为列表,则在嵌套集合内获取
- default可选
如果 key 不存在,则返回此值。如果未提供,则如果 key 不存在,将引发 KeyError
另请参见
- get_scheduler_logs(n=None)[source]¶
从调度器获取日志
- 参数
- nint
要检索的日志数量。默认最多 10000 条,可通过
distributed.admin.log-length
配置值进行配置。
- 返回值
- 日志按逆序排列(最新在前)
- get_task_stream(start=None, stop=None, count=None, plot=False, filename='task-stream.html', bokeh_resources=None)[source]¶
从调度器获取任务流数据
这会收集仪表盘上诊断“Task Stream”图中存在的数据。它包括特定时长内每个任务的启动、停止、传输和反序列化时间。
请注意,任务流诊断默认不运行。您可能希望在开始工作之前调用此函数一次,以确保开始记录,然后在完成工作后再次调用。
- 参数
- start数字或字符串
您何时想开始记录。如果是一个数字,应该是 time() 调用的结果。如果是一个字符串,则应该是一个与当前时间的差值,例如 '60s' 或 '500 ms'
- stop数字或字符串
您何时想停止记录
- countint
所需的记录数量,如果同时指定了 start 和 stop 则忽略
- plotboolean, str
如果为 true,则也返回一个 Bokeh 图;如果 plot == ‘save’,则将图保存到文件
- filenamestr (可选)
如果您设置
plot='save'
,要保存到的文件名。- bokeh_resourcesbokeh.resources.Resources (可选)
指定资源组件是 INLINE 还是 CDN
- 返回值
- L: List[Dict]
另请参见
get_task_stream
此方法的上下文管理器版本
示例
>>> client.get_task_stream() # prime plugin if not already connected >>> x.compute() # do some work >>> client.get_task_stream() [{'task': ..., 'type': ..., 'thread': ..., ...}]
传递
plot=True
或plot='save'
关键字以获取 Bokeh 图。>>> data, figure = client.get_task_stream(plot='save', filename='myfile.html')
或者考虑使用上下文管理器
>>> from dask.distributed import get_task_stream >>> with get_task_stream() as ts: ... x.compute() >>> ts.data [...]
- get_versions(check: bool = False, packages: collections.abc.Sequence[str] | None = None) distributed.client.VersionsDict | collections.abc.Coroutine[Any, Any, distributed.client.VersionsDict] [source]¶
返回调度器、所有 worker 和自身的版本信息
- 参数
- check
如果所有必需和可选的包不匹配,则抛出 ValueError
- packages
要检查的额外包名称
示例
>>> c.get_versions()
>>> c.get_versions(packages=['sklearn', 'geopandas'])
- get_worker_logs(n=None, workers=None, nanny=False)[source]¶
从 workers 获取日志
- 参数
- nint
要检索的日志数量。默认最多 10000 条,可通过
distributed.admin.log-length
配置值进行配置。- workersiterable
要检索的工作节点地址列表。默认获取所有工作节点。
- nannybool, 默认 False
是否从工作节点(False)或伴随进程(True)获取日志。如果指定此项,workers 中的地址仍应是工作节点地址,而不是伴随进程地址。
- 返回值
- 将工作节点地址映射到日志的字典。
- 日志按倒序返回(最新的在前)
- has_what(workers=None, **kwargs)[source]¶
哪些 key 由哪些 worker 持有
这会返回每个工作节点内存中持有的数据的键。
- 参数
- workerslist (可选)
工作节点地址列表,默认为所有工作节点
- **kwargsdict
远程函数的可选关键字参数
示例
>>> x, y, z = c.map(inc, [1, 2, 3]) >>> wait([x, y, z]) >>> c.has_what() {'192.168.1.141:46784': ['inc-1c8dd6be1c21646c71f76c16d09304ea', 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b', 'inc-1e297fc27658d7b67b3a758f16bcf47a']}
- log_event(topic: str | collections.abc.Collection[str], msg: Any)[source]¶
在给定主题下记录一个事件
- 参数
- topicstr, list[str]
记录事件的主题名称。要在多个主题下记录同一事件,请传递主题名称列表。
- msg
要记录的事件消息。注意,此消息必须是可 msgpack 序列化的。
示例
>>> from time import time >>> client.log_event("current-time", time())
- map(func: Callable, *iterables: collections.abc.Collection, key: str | list | None = None, workers: str | collections.abc.Iterable[str] | None = None, retries: int | None = None, resources: dict[str, Any] | None = None, priority: int = 0, allow_other_workers: bool = False, fifo_timeout: str = '100 ms', actor: bool =False, actors: bool =False, pure: bool =True, batch_size=None, **kwargs)[source]¶
将函数映射到一系列参数上
参数可以是普通对象或 Futures
- 参数
- funccallable
要调度执行的可调用对象。如果
func
返回一个协程,它将在工作节点的主事件循环上运行。否则,func
将在工作节点的任务执行器池中运行(更多信息请参见Worker.executors
)。- iterablesIterables
要映射的类似列表的对象。它们的长度应相同。
- keystr, list
如果为字符串,则作为任务名称的前缀。如果为列表,则为显式名称。
- workersstring 或 string 的可迭代对象
可以在其上执行计算的 worker 主机名集合。留空默认为所有 worker(常见情况)
- retriesint (默认为 0)
如果任务失败,允许的自动重试次数
- resourcesdict (默认为 {})
定义每个映射任务实例在 worker 上所需的 resources;例如
{'GPU': 2}
。有关定义资源的详细信息,请参见 worker resources。- priorityNumber
可选的任务优先级。零是默认值。更高的优先级具有优先权
- allow_other_workersbool (默认为 False)
与 workers 一起使用。指示计算是否可以在不在 workers 集合中的 worker 上执行。
- fifo_timeoutstr timedelta (默认 ‘100ms’)
允许在调用之间考虑相同优先级的时间量
- actorbool (默认 False)
这些任务是否应作为有状态的 actor 存在于工作节点上。更多详细信息请参见Actors。
- actorsbool (默认 False)
actor 的别名
- purebool (默认为 True)
函数是否是纯函数。对于
np.random.random
等不纯函数,请设置pure=False
。注意,如果actor
和pure
kwargs 都设置为 True,则pure
的值将恢复为 False,因为 actor 是有状态的。更多详细信息请参见Pure Functions by Default。- batch_sizeint, 可选 (默认值:只有一个批次,其大小为整个可迭代对象)
将任务以(最多)
batch_size
的批量提交给调度器。批量大小的权衡在于,大批量可以避免更多的每批量开销,但过大的批量可能需要很长时间提交,不合理地延迟集群开始处理。- **kwargsdict
发送给函数的额外关键字参数。大值将明确包含在任务图中。
- 返回值
- Future 列表、迭代器或队列,取决于输入的类型。
- 输入。
另请参见
Client.submit
提交单个函数
注意事项
当前的任务图解析实现会搜索
key
的出现并将其替换为相应的Future
结果。如果传递给任务的字符串参数与集群上已存在的某个key
匹配,这可能导致意外替换。为避免这种情况,如果手动设置key
,则需要使用唯一值。请参阅 https://github.com/dask/dask/issues/9969 以跟踪解决此问题的进展。示例
>>> L = client.map(func, sequence)
- nbytes(keys=None, summary=True, **kwargs)[source]¶
集群上每个 key 占用的字节数
这是通过
sys.getsizeof
衡量的,可能无法准确反映真实成本。- 参数
- keyslist (可选)
键列表,默认为所有键
- summaryboolean, (可选)
将键汇总为键类型
- **kwargsdict
远程函数的可选关键字参数
另请参见
示例
>>> x, y, z = c.map(inc, [1, 2, 3]) >>> c.nbytes(summary=False) {'inc-1c8dd6be1c21646c71f76c16d09304ea': 28, 'inc-1e297fc27658d7b67b3a758f16bcf47a': 28, 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b': 28}
>>> c.nbytes(summary=True) {'inc': 84}
- ncores(workers=None, **kwargs)¶
每个 worker 节点上可用的线程/核心数
- 参数
- workerslist (可选)
我们特别关心的工作节点列表。留空以接收所有工作节点的信息。
- **kwargsdict
远程函数的可选关键字参数
示例
>>> c.nthreads() {'192.168.1.141:46784': 8, '192.167.1.142:47548': 8, '192.167.1.143:47329': 8, '192.167.1.144:37297': 8}
- normalize_collection(collection)[source]¶
如果集合的任务已经存在 Future,则替换它们
这会将集合的任务图中的任务与调度器中已知的 Futures 进行规范化。它返回集合的副本,其任务图包含重叠的 Futures。
- 参数
- collectiondask object
类似 dask.array、dataframe 或 dask.value 对象的集合
- 返回值
- collectiondask object
集合,其任务已替换为任何现有 Futures。
另请参见
Client.persist
触发集合任务的计算
示例
>>> len(x.__dask_graph__()) # x is a dask collection with 100 tasks 100 >>> set(client.futures).intersection(x.__dask_graph__()) # some overlap exists 10
>>> x = client.normalize_collection(x) >>> len(x.__dask_graph__()) # smaller computational graph 20
- nthreads(workers=None, **kwargs)[source]¶
每个 worker 节点上可用的线程/核心数
- 参数
- workerslist (可选)
我们特别关心的工作节点列表。留空以接收所有工作节点的信息。
- **kwargsdict
远程函数的可选关键字参数
示例
>>> c.nthreads() {'192.168.1.141:46784': 8, '192.167.1.142:47548': 8, '192.167.1.143:47329': 8, '192.167.1.144:37297': 8}
- persist(collections, optimize_graph=True, workers=None, allow_other_workers=None, resources=None, retries=None, priority=0, fifo_timeout='60s', actors=None, **kwargs)[source]¶
在集群上持久化 dask 集合
开始在集群后台计算集合。提供一个新的 dask 集合,它在语义上与之前的集合相同,但现在基于当前正在执行的 Futures。
- 参数
- collections序列或单个 dask 对象
例如 dask.array 或 dataframe 或 dask.value 对象之类的集合
- optimize_graphbool
是否优化底层图
- workersstring 或 string 的可迭代对象
可以在其上执行计算的 worker 主机名集合。留空默认为所有 worker(常见情况)
- allow_other_workersbool (默认为 False)
与 workers 一起使用。指示计算是否可以在不在 workers 集合中的 worker 上执行。
- retriesint (默认为 0)
如果计算结果失败,允许的自动重试次数
- priorityNumber
可选的任务优先级。零是默认值。更高的优先级具有优先权
- fifo_timeouttimedelta str (默认为 ’60s’)
允许在调用之间考虑相同优先级的时间量
- resourcesdict (默认为 {})
定义每个映射任务实例在 worker 上所需的 resources;例如
{'GPU': 2}
。有关定义资源的详细信息,请参见 worker resources。- actorsbool 或 dict (默认为 None)
这些任务是否应作为有状态 actor 存在于 worker 上。指定为全局(True/False)或按任务(
{'x': True, 'y': False}
)进行。有关其他详细信息,请参见 Actors。
- 返回值
- 集合列表,或单个集合,取决于输入的类型。
另请参见
示例
>>> xx = client.persist(x) >>> xx, yy = client.persist([x, y])
- processing(workers=None)[source]¶
当前在每个 worker 上运行的任务
- 参数
- workerslist (可选)
工作节点地址列表,默认为所有工作节点
示例
>>> x, y, z = c.map(inc, [1, 2, 3]) >>> c.processing() {'192.168.1.141:46784': ['inc-1c8dd6be1c21646c71f76c16d09304ea', 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b', 'inc-1e297fc27658d7b67b3a758f16bcf47a']}
- profile(key=None, start=None, stop=None, workers=None, merge_workers=True, plot=False, filename=None, server=False, scheduler=False)[source]¶
收集有关近期工作的统计分析信息
- 参数
- keystr
要选择的键前缀,这通常是函数名,如 ‘inc’。留空 None 以收集所有数据
- start时间
- stop时间
- workerslist
限制性能分析信息的工作节点列表
- serverbool
如果为 true,则返回工作节点的管理线程的性能分析,而不是工作线程的。这对于分析 Dask 本身而非用户代码非常有用。
- schedulerbool
如果为 true,则返回调度器的管理线程的性能分析信息,而不是工作节点的。这对于分析 Dask 的调度本身非常有用。
- plotboolean or string
是否返回绘图对象
- filenamestr
保存绘图的文件名
示例
>>> client.profile() # call on collections >>> client.profile(filename='dask-profile.html') # save to html file
- publish_dataset(*args, **kwargs)[source]¶
向调度器发布命名数据集
这会在调度器上存储 dask 集合或 Future 列表的命名引用。其他客户端可以使用
get_dataset
下载这些集合或 Future。数据集不会立即计算。您可能希望在发布数据集之前调用
Client.persist
。- 参数
- args要发布为名称的对象列表
- kwargsdict
要在调度器上发布的命名集合
- 返回值
- None
示例
发布客户端
>>> df = dd.read_csv('s3://...') >>> df = c.persist(df) >>> c.publish_dataset(my_dataset=df)
另一种调用方式 >>> c.publish_dataset(df, name=’my_dataset’)
接收客户端
>>> c.list_datasets() ['my_dataset'] >>> df2 = c.get_dataset('my_dataset')
- rebalance(futures=None, workers=None, **kwargs)[source]¶
在网络内重新平衡数据
在工作节点之间移动数据以大致平衡内存负担。这会影响键/工作节点的一个子集或整个网络,具体取决于关键字参数。
有关算法和配置选项的详细信息,请参阅对应的调度器端方法
rebalance()
。警告
此操作通常未针对调度器的正常运行进行充分测试。不建议在等待计算时使用它。
- 参数
- futureslist, 可选
要平衡的 Future 列表,默认为所有数据
- workerslist, 可选
要在其上平衡的工作节点列表,默认为所有工作节点
- **kwargsdict
函数的额外关键字参数
- register_plugin(plugin: distributed.diagnostics.plugin.NannyPlugin | distributed.diagnostics.plugin.SchedulerPlugin | distributed.diagnostics.plugin.WorkerPlugin, name: str | None = None, idempotent: bool | None = None)[source]¶
注册一个插件。
请参阅 https://distributed.dask.org.cn/en/stable/plugins.html
- 参数
- plugin
要注册的伴随进程、调度器或工作节点插件。
- name
插件的名称;如果为 None,则从插件实例获取名称,如果不存在则自动生成。
- idempotent
如果给定名称的插件已存在,则不再注册。如果为 None,则在定义时使用
plugin.idempotent
,否则使用 False。
- register_scheduler_plugin(plugin: distributed.diagnostics.plugin.SchedulerPlugin, name: str | None = None, idempotent: bool | None = None)[source]¶
注册一个调度器插件。
从版本 2023.9.2 开始已弃用: 请改用
Client.register_plugin()
。请参阅 https://distributed.dask.org.cn/en/stable/plugins.html#scheduler-plugins
- 参数
- pluginSchedulerPlugin
要传递给调度器的 SchedulerPlugin 实例。
- namestr
插件的名称;如果为 None,则从插件实例获取名称,如果不存在则自动生成。
- idempotentbool
如果给定名称的插件已存在,则不再注册。
- register_worker_callbacks(setup=None)[source]¶
为所有当前和未来的 worker 注册一个 setup 回调函数。
这会为集群中的工作节点注册新的设置函数。该函数将立即在所有当前连接的工作节点上运行。它也将在未来添加的任何工作节点连接时运行。可以注册多个设置函数 - 这些函数将按添加顺序调用。
如果函数接受名为
dask_worker
的输入参数,则该变量将填充工作节点本身。- 参数
- setupcallable(dask_worker: Worker) -> None
要在所有工作节点上注册和运行的函数
- register_worker_plugin(plugin: distributed.diagnostics.plugin.NannyPlugin | distributed.diagnostics.plugin.WorkerPlugin, name: str | None = None, nanny: bool | None = None)[source]¶
为所有当前和未来的 worker 注册一个生命周期 worker 插件。
从版本 2023.9.2 开始已弃用: 请改用
Client.register_plugin()
。这会注册一个新对象,用于处理此集群中工作节点的设置、任务状态转换和拆卸。插件将在所有当前连接的工作节点上自行实例化。它也将在未来连接的任何工作节点上运行。
插件可以包含
setup
、teardown
、transition
和release_key
方法。有关接口和文档字符串,请参阅dask.distributed.WorkerPlugin
类或下面的示例。它必须可以使用 pickle 或 cloudpickle 模块序列化。如果插件具有
name
属性,或者使用了name=
关键字,则这将控制幂等性。如果具有该名称的插件已注册,则会将其移除并替换为新插件。对于插件的替代方案,您可能还希望查看预加载脚本。
- 参数
- pluginWorkerPlugin or NannyPlugin
要注册的 WorkerPlugin 或 NannyPlugin 实例。
- namestr, 可选
插件的名称。注册具有相同名称的插件将无效。如果插件没有名称属性,则使用随机名称。
- nannybool, 可选
是否向工作节点或伴随进程注册插件。
另请参见
distributed.WorkerPlugin
unregister_worker_plugin
示例
>>> class MyPlugin(WorkerPlugin): ... def __init__(self, *args, **kwargs): ... pass # the constructor is up to you ... def setup(self, worker: dask.distributed.Worker): ... pass ... def teardown(self, worker: dask.distributed.Worker): ... pass ... def transition(self, key: str, start: str, finish: str, ... **kwargs): ... pass ... def release_key(self, key: str, state: str, cause: str | None, reason: None, report: bool): ... pass
>>> plugin = MyPlugin(1, 2, 3) >>> client.register_plugin(plugin)
您可以使用
get_worker
函数访问插件>>> client.register_plugin(other_plugin, name='my-plugin') >>> def f(): ... worker = get_worker() ... plugin = worker.plugins['my-plugin'] ... return plugin.my_state
>>> future = client.run(f)
- replicate(futures, n=None, workers=None, branching_factor=2, **kwargs)[source]¶
设置网络中 future 的复制
将数据复制到多个工作节点上。这有助于广播频繁访问的数据,并提高弹性。
这会在整个网络中对每块数据单独执行树状复制。此操作会阻塞直到完成。它不保证将数据复制到将来的工作节点。
注意
此方法与活动内存管理器 (Active Memory Manager) 的 ReduceReplicas 策略不兼容。如果您希望使用它,必须首先禁用该策略或完全禁用 AMM。
- 参数
- futuresFuture 列表
我们要复制的 Futures
- nint, 可选
要在集群上复制数据的进程数。默认为所有进程。
- workers工作节点地址列表
我们希望限制复制到的工作节点。默认为所有工作节点。
- branching_factorint, 可选
每一代可以复制数据的工作节点数
- **kwargsdict
远程函数的可选关键字参数
另请参见
示例
>>> x = c.submit(func, *args) >>> c.replicate([x]) # send to all workers >>> c.replicate([x], n=3) # send to three workers >>> c.replicate([x], workers=['alice', 'bob']) # send to specific >>> c.replicate([x], n=1, workers=['alice', 'bob']) # send to one of specific workers >>> c.replicate([x], n=1) # reduce replications
- restart(timeout: typing.Union[str, int, float, typing.Literal[<no_default>]] = _NoDefault.no_default, wait_for_workers: bool = True)[source]¶
重启所有工作节点。重置本地状态。可选地等待工作节点返回。
没有伴随进程的工作节点会被关闭,期望外部部署系统会重启它们。因此,如果不使用伴随进程且您的部署系统不会自动重启工作节点,则
restart
只会关闭所有工作节点,然后超时!在
restart
后,所有连接的工作节点都是新的,无论是否抛出TimeoutError
。任何未及时关闭的工作节点都会被移除,并且将来可能或可能不会自行关闭。- 参数
- timeout
如果
wait_for_workers
为 True,则等待工作节点关闭并重新连接的时长,否则仅等待工作节点关闭的时长。如果超出此时间,则抛出asyncio.TimeoutError
。- wait_for_workers
是否等待所有工作节点重新连接,或者仅等待它们关闭(默认 True)。结合
Client.wait_for_workers()
使用restart(wait_for_workers=False)
可以更精细地控制要等待的工作节点数量。
另请参见
Scheduler.restart
Client.restart_workers
- restart_workers(workers: list[str], timeout: typing.Union[str, int, float, typing.Literal[<no_default>]] = _NoDefault.no_default, raise_for_error: bool = True)[source]¶
重启指定的 worker 集合
注意
只有由
distributed.Nanny
监控的工作节点才能重启。更多详细信息请参阅Nanny.restart
。- 参数
- workerslist[str]
要重启的工作节点。这可以是工作节点地址列表、名称列表或两者皆有。
- timeoutint | float | None
等待的秒数
- raise_for_error: bool (默认 True)
如果在
timeout
内未能完成重启工作节点,或者因重启工作节点导致了其他异常时,是否抛出TimeoutError
。
- 返回值
- dict[str, “OK” | “removed” | “timed out”]
工作节点和重启状态的映射,键将与通过
workers
传入的原始值匹配。
另请参见
注意事项
此方法与
Client.restart()
的区别在于,此方法仅重启指定的工作节点集合,而Client.restart
将重启所有工作节点并重置集群上的本地状态(例如释放所有键)。此外,此方法不会优雅地处理在工作节点重启时正在执行的任务。这些任务可能会失败或其可疑计数会增加。
示例
您可以使用以下方法获取活动工作节点的信息
>>> workers = client.scheduler_info()['workers']
从该列表中,您可能需要选择一些工作节点进行重启
>>> client.restart_workers(workers=['tcp://address:port', ...])
- retire_workers(workers: list[str] | None = None, close_workers: bool = True, **kwargs)[source]¶
在调度器上淘汰某些 worker
有关完整文档字符串,请参阅
distributed.Scheduler.retire_workers()
。- 参数
- workers
- close_workers
- **kwargsdict
远程函数的可选关键字参数
另请参见
dask.distributed.Scheduler.retire_workers
示例
您可以使用以下方法获取活动工作节点的信息
>>> workers = client.scheduler_info()['workers']
从该列表中,您可能需要选择一些工作节点进行关闭
>>> client.retire_workers(workers=['tcp://address:port', ...])
- retry(futures, asynchronous=None)[source]¶
重试失败的 future
- 参数
- futuresFuture 列表
Future 列表
- asynchronous: bool
如果为 True,客户端处于异步模式
- run(function, *args, workers: list[str] | None = None, wait: bool = True, nanny: bool = False, on_error: Literal['raise', 'return', 'ignore'] = 'raise', **kwargs)[source]¶
在任务调度系统之外在所有 worker 上运行函数
这会立即在所有当前已知的工作节点上调用函数,阻塞直到结果返回,并将结果作为以工作节点地址为键的字典异步返回。此方法通常用于副作用,例如收集诊断信息或安装库。
如果您的函数接受名为
dask_worker
的输入参数,则该变量将填充工作节点本身。- 参数
- functioncallable
要运行的函数
- *argstuple
远程函数的可选参数
- **kwargsdict
远程函数的可选关键字参数
- workerslist
要在其上运行函数的工作节点。默认为所有已知工作节点。
- waitboolean (可选)
如果函数是异步的,是否等待该函数完成。
- nannybool, 默认 False
是否在伴随进程上运行
function
。默认情况下,函数在工作节点进程上运行。如果指定此项,workers
中的地址仍应是工作节点地址,而不是伴随进程地址。- on_error: “raise” | “return” | “ignore”
如果函数在工作节点上抛出错误
- raise
(默认) 在客户端上重新抛出异常。其他工作节点的输出将丢失。
- return
返回 Exception 对象而不是工作节点的函数输出
- ignore
忽略异常并从结果字典中移除工作节点
示例
>>> c.run(os.getpid) {'192.168.0.100:9000': 1234, '192.168.0.101:9000': 4321, '192.168.0.102:9000': 5555}
使用
workers=
关键字参数限制计算到特定的工作节点。>>> c.run(os.getpid, workers=['192.168.0.100:9000', ... '192.168.0.101:9000']) {'192.168.0.100:9000': 1234, '192.168.0.101:9000': 4321}
>>> def get_status(dask_worker): ... return dask_worker.status
>>> c.run(get_status) {'192.168.0.100:9000': 'running', '192.168.0.101:9000': 'running}
在后台运行异步函数
>>> async def print_state(dask_worker): ... while True: ... print(dask_worker.status) ... await asyncio.sleep(1)
>>> c.run(print_state, wait=False)
- run_on_scheduler(function, *args, **kwargs)[source]¶
在调度器进程上运行函数
这通常用于实时调试。函数应接受关键字参数
dask_scheduler=
,该参数将填充调度器对象本身。- 参数
- functioncallable
要在调度器进程上运行的函数
- *argstuple
函数的可选参数
- **kwargsdict
函数的额外关键字参数
另请参见
Client.run
在所有工作节点上运行函数
示例
>>> def get_number_of_tasks(dask_scheduler=None): ... return len(dask_scheduler.tasks)
>>> client.run_on_scheduler(get_number_of_tasks) 100
在后台运行异步函数
>>> async def print_state(dask_scheduler): ... while True: ... print(dask_scheduler.status) ... await asyncio.sleep(1)
>>> c.run(print_state, wait=False)
- scatter(data, workers=None, broadcast=False, direct=None, hash=True, timeout=_NoDefault.no_default, asynchronous=None)[source]¶
将数据分散到分布式内存中
这将数据从本地客户端进程移动到分布式调度器的工作节点中。请注意,通常最好提交作业到您的工作节点以让它们加载数据,而不是在本地加载数据然后再分发给它们。
- 参数
- datalist, dict, or object
要分发到工作节点的数据。输出类型与输入类型匹配。
- workerstuple 列表 (可选)
可选地限制数据的位置。将工作节点指定为主机名/端口对,例如
('127.0.0.1', 8787)
。- broadcastbool (默认为 False)
是否将每个数据元素发送到所有工作节点。默认情况下,我们根据核心数进行轮询。
注意
将此标志设置为 True 与活动内存管理器 (Active Memory Manager) 的 ReduceReplicas 策略不兼容。如果您希望使用它,必须首先禁用该策略或完全禁用 AMM。
- directbool (默认为自动检查)
是否直接连接到 worker,或请求调度器充当中介。这也可以在创建 Client 时设置。
- hashbool (可选)
是否对数据进行哈希以确定键。如果为 False,则使用随机键
- timeoutnumber, 可选
超时秒数,之后将引发
dask.distributed.TimeoutError
- asynchronous: bool
如果为 True,客户端处于异步模式
- 返回值
- Future 列表、字典、迭代器或队列,类型与输入匹配。
另请参见
Client.gather
将数据收集回本地进程
注意事项
分发字典使用
dict
键创建Future
键。当前的任务图解析实现会搜索key
的出现并将其替换为相应的Future
结果。如果传递给任务的字符串参数与集群上已存在的某个key
匹配,这可能导致意外替换。为避免这种情况,如果手动设置key
,则需要使用唯一值。请参阅 https://github.com/dask/dask/issues/9969 以跟踪解决此问题的进展。示例
>>> c = Client('127.0.0.1:8787') >>> c.scatter(1) <Future: status: finished, key: c0a8a20f903a4915b94db8de3ea63195>
>>> c.scatter([1, 2, 3]) [<Future: status: finished, key: c0a8a20f903a4915b94db8de3ea63195>, <Future: status: finished, key: 58e78e1b34eb49a68c65b54815d1b158>, <Future: status: finished, key: d3395e15f605bc35ab1bac6341a285e2>]
>>> c.scatter({'x': 1, 'y': 2, 'z': 3}) {'x': <Future: status: finished, key: x>, 'y': <Future: status: finished, key: y>, 'z': <Future: status: finished, key: z>}
将数据位置限制到工作节点的子集
>>> c.scatter([1, 2, 3], workers=[('hostname', 8788)])
将数据广播到所有工作节点
>>> [future] = c.scatter([element], broadcast=True)
使用客户端 Futures 接口将分发的数据发送到并行化函数
>>> data = c.scatter(data, broadcast=True) >>> res = [c.submit(func, data, i) for i in range(100)]
- scheduler_info(n_workers: int = 5, **kwargs: Any) distributed.objects.SchedulerInfo [source]¶
关于集群中 worker 的基本信息
- 参数
- n_workers: int
要获取信息的工作节点数量。要获取所有,请使用 -1。
- **kwargsdict
远程函数的可选关键字参数
示例
>>> c.scheduler_info() {'id': '2de2b6da-69ee-11e6-ab6a-e82aea155996', 'services': {}, 'type': 'Scheduler', 'workers': {'127.0.0.1:40575': {'active': 0, 'last-seen': 1472038237.4845693, 'name': '127.0.0.1:40575', 'services': {}, 'stored': 0, 'time-delay': 0.0061032772064208984}}}
- set_metadata(key, value)[source]¶
在调度器中设置任意元数据
这允许您在中央调度器进程上存储少量数据用于管理目的。数据应是可 msgpack 序列化的(整数、字符串、列表、字典)。
如果键对应于某个任务,则当调度器忘记该任务时,该键将被清理。
如果键是列表,则假定您希望使用这些键索引到嵌套的字典结构中。例如,如果您调用以下内容
>>> client.set_metadata(['a', 'b', 'c'], 123)
则这与设置以下内容相同
>>> scheduler.task_metadata['a']['b']['c'] = 123
较低级别的字典将按需创建。
另请参见
示例
>>> client.set_metadata('x', 123) >>> client.get_metadata('x') 123
>>> client.set_metadata(['x', 'y'], 123) >>> client.get_metadata('x') {'y': 123}
>>> client.set_metadata(['x', 'w', 'z'], 456) >>> client.get_metadata('x') {'y': 123, 'w': {'z': 456}}
>>> client.get_metadata(['x', 'w']) {'z': 456}
- shutdown()[source]¶
关闭连接的调度器和 worker
请注意,这可能会中断使用相同调度器和工作节点的其他客户端。
另请参见
Client.close
仅关闭此客户端
- submit(func, *args, key=None, workers=None, resources=None, retries=None, priority=0, fifo_timeout='100 ms', allow_other_workers=False, actor=False, actors=False, pure=True, **kwargs)[source]¶
向调度器提交函数应用
- 参数
- funccallable
要调度为
func(*args **kwargs)
的可调用对象。如果func
返回协程,它将在工作节点的主事件循环上运行。否则,func
将在工作节点的任务执行器池中运行(更多信息请参见Worker.executors
)。- *argstuple
可选位置参数
- keystr
任务的唯一标识符。默认为函数名和哈希值
- workersstring 或 string 的可迭代对象
可以在其上执行计算的 worker 地址或主机名集合。留空默认为所有 worker(常见情况)
- resourcesdict (默认为 {})
定义每个映射任务实例在 worker 上所需的
resources
;例如{'GPU': 2}
。有关定义资源的详细信息,请参见 worker resources。- retriesint (默认为 0)
如果任务失败,允许的自动重试次数
- priorityNumber
可选的任务优先级。零是默认值。更高的优先级具有优先权
- fifo_timeoutstr timedelta (默认 ‘100ms’)
允许在调用之间考虑相同优先级的时间量
- allow_other_workersbool (默认为 False)
与
workers
一起使用。指示计算是否可以在不在 workers 集合中的 worker 上执行。- actorbool (默认 False)
此任务是否应作为有状态的 actor 存在于工作节点上。更多详细信息请参见Actors。
- actorsbool (默认 False)
actor 的别名
- purebool (默认为 True)
函数是否是纯函数。对于
np.random.random
等不纯函数,请设置pure=False
。注意,如果actor
和pure
kwargs 都设置为 True,则pure
的值将恢复为 False,因为 actor 是有状态的。更多详细信息请参见Pure Functions by Default。- **kwargs
- 返回值
- Future
如果在异步模式下运行,则返回 Future。否则返回具体值
- 引发
- TypeError
如果 ‘func’ 不可调用,则抛出 TypeError
- ValueError
如果 ‘allow_other_workers’ 为 True 且 ‘workers’ 为 None,则抛出 ValueError
另请参见
Client.map
一次提交多个参数
注意事项
当前的任务图解析实现会搜索
key
的出现并将其替换为相应的Future
结果。如果传递给任务的字符串参数与集群上已存在的某个key
匹配,这可能导致意外替换。为避免这种情况,如果手动设置key
,则需要使用唯一值。请参阅 https://github.com/dask/dask/issues/9969 以跟踪解决此问题的进展。示例
>>> c = client.submit(add, a, b)
- subscribe_topic(topic, handler)[source]¶
订阅主题并为每个接收到的事件执行处理程序
- 参数
- topic: str
主题名称
- handler: callable or coroutine function
每次收到事件时调用的处理函数。处理函数必须接受一个参数 event,它是一个元组 (timestamp, msg),其中 timestamp 指的是调度器上的时钟。
另请参见
dask.distributed.Client.unsubscribe_topic
dask.distributed.Client.get_events
dask.distributed.Client.log_event
示例
>>> import logging >>> logger = logging.getLogger("myLogger") # Log config not shown >>> client.subscribe_topic("topic-name", lambda: logger.info)
- unpublish_dataset(name, **kwargs)[source]¶
从调度器移除命名数据集
- 参数
- namestr
要取消发布的数据集的名称
示例
>>> c.list_datasets() ['my_dataset'] >>> c.unpublish_dataset('my_dataset') >>> c.list_datasets() []
- unregister_scheduler_plugin(name)[source]¶
注销一个调度器插件
请参阅 https://distributed.dask.org.cn/en/stable/plugins.html#scheduler-plugins
- 参数
- namestr
要取消注册的插件名称。更多信息请参见
Client.register_scheduler_plugin()
文档字符串。
示例
>>> class MyPlugin(SchedulerPlugin): ... def __init__(self, *args, **kwargs): ... pass # the constructor is up to you ... async def start(self, scheduler: Scheduler) -> None: ... pass ... async def before_close(self) -> None: ... pass ... async def close(self) -> None: ... pass ... def restart(self, scheduler: Scheduler) -> None: ... pass
>>> plugin = MyPlugin(1, 2, 3) >>> client.register_plugin(plugin, name='foo') >>> client.unregister_scheduler_plugin(name='foo')
- unregister_worker_plugin(name, nanny=None)[source]¶
注销一个生命周期 worker 插件
这会取消注册现有的工作节点插件。作为取消注册过程的一部分,将调用插件的
teardown
方法。- 参数
- namestr
要取消注册的插件名称。更多信息请参见
Client.register_plugin()
文档字符串。
另请参见
示例
>>> class MyPlugin(WorkerPlugin): ... def __init__(self, *args, **kwargs): ... pass # the constructor is up to you ... def setup(self, worker: dask.distributed.Worker): ... pass ... def teardown(self, worker: dask.distributed.Worker): ... pass ... def transition(self, key: str, start: str, finish: str, **kwargs): ... pass ... def release_key(self, key: str, state: str, cause: str | None, reason: None, report: bool): ... pass
>>> plugin = MyPlugin(1, 2, 3) >>> client.register_plugin(plugin, name='foo') >>> client.unregister_worker_plugin(name='foo')
- unsubscribe_topic(topic)[source]¶
取消订阅主题并移除事件处理程序
另请参见
dask.distributed.Client.subscribe_topic
dask.distributed.Client.get_events
dask.distributed.Client.log_event
- upload_file(filename, load: bool = True)[source]¶
将本地包上传到调度器和 worker
这将本地文件发送到调度器和所有工作节点。此文件会放置在每个节点的工作目录中,请参阅配置选项
temporary-directory
(默认为tempfile.gettempdir()
)。此目录将添加到 Python 的系统路径中,因此任何
.py
、.egg
或.zip
文件都将可导入。- 参数
- filenamestring
要发送到工作节点的
.py
、.egg
或.zip
文件的文件名- loadbool, 可选
是否将模块作为上传过程的一部分导入。默认为
True
。
示例
>>> client.upload_file('mylibrary.egg') >>> from mylibrary import myfunc >>> L = client.map(myfunc, seq) >>> >>> # Where did that file go? Use `dask_worker.local_directory`. >>> def where_is_mylibrary(dask_worker): >>> path = pathlib.Path(dask_worker.local_directory) / 'mylibrary.egg' >>> assert path.exists() >>> return str(path) >>> >>> client.run(where_is_mylibrary)
- wait_for_workers(n_workers: int, timeout: float | None = None) None [source]¶
阻塞调用,等待 n 个 worker 后继续
- 参数
- n_workersint
工作节点的数量。
- timeoutnumber, 可选
超时秒数,之后将引发
dask.distributed.TimeoutError
- who_has(futures=None, **kwargs)[source]¶
存储每个 Future 数据的那些工作节点。
- 参数
- futureslist (可选)
一个 Future 列表,默认为所有数据。
- **kwargsdict
远程函数的可选关键字参数
示例
>>> x, y, z = c.map(inc, [1, 2, 3]) >>> wait([x, y, z]) >>> c.who_has() {'inc-1c8dd6be1c21646c71f76c16d09304ea': ['192.168.1.141:46784'], 'inc-1e297fc27658d7b67b3a758f16bcf47a': ['192.168.1.141:46784'], 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b': ['192.168.1.141:46784']}
>>> c.who_has([x, y]) {'inc-1c8dd6be1c21646c71f76c16d09304ea': ['192.168.1.141:46784'], 'inc-1e297fc27658d7b67b3a758f16bcf47a': ['192.168.1.141:46784']}
- write_scheduler_file(scheduler_file)[source]¶
将调度器信息写入 json 文件。
这有助于使用文件系统轻松共享调度器信息。调度器文件可用于实例化使用同一调度器的第二个客户端。
- 参数
- scheduler_filestr
写入调度器文件的路径。
示例
>>> client = Client() >>> client.write_scheduler_file('scheduler.json') # connect to previous client's scheduler >>> client2 = Client(scheduler_file='scheduler.json')
- class distributed.recreate_tasks.ReplayTaskClient(client)[source]¶
客户端插件,允许在本地重放远程任务。
为给定的客户端添加以下方法。
recreate_error_locally
:用于重放失败任务的主要用户方法。recreate_task_locally
:用于重放任何任务的主要用户方法。
- recreate_error_locally(future)[source]¶
对于失败的计算,在本地执行导致错误的任务以进行调试。
如果堆栈跟踪信息不足以诊断问题,则在 future(
gather
、compute
等函数的结果)返回“error”状态后应执行此操作。负责错误的特定任务(图中指向 future 的部分)将连同其输入值一起从调度器中获取。然后将执行该函数,以便可以使用pdb
进行调试。- 参数
- future失败的 Future 或集合。
与传递给
gather
的内容相同,但返回异常/堆栈跟踪。也可以是包含任何错误 Future 的(持久化)dask 集合。
- 返回值
- 无;函数运行并应引发异常,以便
- 调试器运行。
示例
>>> future = c.submit(div, 1, 0) >>> future.status 'error' >>> c.recreate_error_locally(future) ZeroDivisionError: division by zero
如果您在 IPython 中,可以趁此机会使用 pdb
>>> %pdb Automatic pdb calling has been turned ON
>>> c.recreate_error_locally(future) ZeroDivisionError: division by zero 1 def div(x, y): ----> 2 return x / y ipdb>
- recreate_task_locally(future)[source]¶
对于任何计算,无论成功或失败,都在本地执行任务以进行调试。
在 future(
gather
、compute
等函数的结果)返回“pending”以外的状态后,应执行此操作。需要调试成功完成的 future 的情况可能包括返回意外结果的计算。常见的调试过程可能包括在调试模式下使用 pdb.runcall 在本地运行任务。- 参数
- futurefuture
与传递给
gather
的内容相同。
- 返回值
- 任何;将返回任务 Future 的结果。
示例
>>> import pdb >>> future = c.submit(div, 1, 1) >>> future.status 'finished' >>> pdb.runcall(c.recreate_task_locally, future)
Future¶
- class distributed.Future(key, client=None, state=None, _id=None)[source]¶
远程运行的计算
Future 是远程工作节点上运行的结果的本地代理。用户在本地 Python 进程中管理 Future 对象,以决定大型集群中发生的事情。
注意
用户不应手动实例化 Future。这可能导致状态损坏和集群死锁。
- 参数
- key: str 或 tuple
此 Future 引用的远程数据的键。
- client: Client
应拥有此 Future 的客户端。默认为 _get_global_client()。
- inform: bool
我们是否通知调度器需要更新此 Future 的状态。
- state: FutureState
Future 的状态。
另请参见
Client
创建 Future。
示例
Future 通常由客户端计算产生。
>>> my_future = client.submit(add, 1, 2)
我们可以跟踪 Future 的进度和结果。
>>> my_future <Future: status: finished, key: add-8f6e709446674bad78ea8aeecfee188e>
我们可以从 Future 中获取结果或异常和堆栈跟踪。
>>> my_future.result()
- add_done_callback(fn)[source]¶
当 future 完成时,在 future 上调用回调
回调函数
fn
应将 Future 作为其唯一参数。无论 Future 是成功完成、出错还是被取消,都将调用此函数。回调函数在单独的线程中执行。
- 参数
- fn可调用对象。
要调用的方法或函数。
- exception(timeout=None, **kwargs)[source]¶
返回失败任务的异常
- 参数
- timeoutnumber, 可选
超时秒数,之后将引发
dask.distributed.TimeoutError
- **kwargsdict
函数的额外关键字参数
- 返回值
- 异常
抛出的异常。如果在返回前经过了 *timeout* 秒,则抛出
dask.distributed.TimeoutError
。
另请参见
- property executor¶
返回执行器,即客户端。
- 返回值
- Client
执行器。
- result(timeout=None)[source]¶
等待计算完成,将结果收集到本地进程。
- 参数
- timeoutnumber, 可选
超时秒数,之后将引发
dask.distributed.TimeoutError
- 返回值
- 结果。
计算结果。如果客户端是异步的,则为协程。
- 引发
- dask.distributed.TimeoutError
如果在返回前经过了 *timeout* 秒,则抛出
dask.distributed.TimeoutError
。
- property status¶
返回状态。
- 返回值
- str
状态。
- traceback(timeout=None, **kwargs)[source]¶
返回失败任务的回溯
这将返回一个堆栈跟踪对象。您可以使用
traceback
模块检查此对象。或者,如果您调用future.result()
,此堆栈跟踪将伴随抛出的异常。- 参数
- timeoutnumber, 可选
在经过指定秒数后抛出
dask.distributed.TimeoutError
。如果在返回前经过了 *timeout* 秒,则抛出dask.distributed.TimeoutError
。
- 返回值
- 堆栈跟踪。
堆栈跟踪对象。如果客户端是异步的,则为协程。
另请参见
示例
>>> import traceback >>> tb = future.traceback() >>> traceback.format_tb(tb) [...]
- property type¶
返回类型。
同步¶
- class distributed.Event(name=None, client=None)[source]¶
分布式集中式 Event,相当于 asyncio.Event
事件存储一个标志,该标志在开始时设置为 False。可以使用 set() 调用将其设置为 True,或使用 clear() 调用将其设置回 False。每次调用 wait() 都会阻塞,直到事件标志设置为 True。
- 参数
- name: string (可选)
事件的名称。选择相同的名称允许两个不相关的进程协调一个事件。如果未给出,将生成一个随机名称。
- client: Client(可选)
用于与调度器通信的客户端。如果未给出,将使用默认的全局客户端。
示例
>>> event_1 = Event('a') >>> event_1.wait(timeout=1) >>> # in another process >>> event_2 = Event('a') >>> event_2.set() >>> # now event_1 will stop waiting
- class distributed.Lock(name=None, client=<object object>, scheduler_rpc=None, loop=None)[source]¶
分布式集中式 Lock
警告
此实现使用
distributed.Semaphore
作为后端,该后端容易受到租约超额预订的影响。对于锁而言,这意味着如果租约超时,则两个或多个实例可能同时获取锁。要禁用租约超时,请将distributed.scheduler.locks.lease-timeout
设置为 inf,例如。with dask.config.set({"distributed.scheduler.locks.lease-timeout": "inf"}): lock = Lock("x") ...
请注意,如果没有租约超时,在集群缩容或工作节点失败的情况下,锁可能会死锁。
- 参数
- name: string (可选)
要获取的锁的名称。选择相同的名称允许两个不相关的进程协调一个锁。如果未给出,将生成一个随机名称。
- client: Client(可选)
用于与调度器通信的客户端。如果未给出,将使用默认的全局客户端。
示例
>>> lock = Lock('x') >>> lock.acquire(timeout=1) >>> # do things with protected resource >>> lock.release()
- acquire(blocking=True, timeout=None)[source]¶
获取锁。
- 参数
- blocking布尔值,可选。
如果为 False,则完全不在调度器中等待锁。
- timeout字符串、数字或 timedelta,可选。
在调度器中等待锁的秒数。这不包括本地协程时间、网络传输时间等。当 blocking 为 False 时禁止指定超时。除了秒数之外,还可以指定字符串格式的 timedelta,例如“200ms”。
- 返回值
- 是否成功获取锁的布尔值(True 或 False)。
示例
>>> lock = Lock('x') >>> lock.acquire(timeout="1s")
- class distributed.MultiLock(names: list[str] | None = None, client: distributed.client.Client | None = None)[source]¶
分布式集中式 Lock
- 参数
- names
要获取的锁的名称。选择相同的名称允许两个不相关的进程协调一个锁。
- client
用于与调度器通信的客户端。如果未给出,将使用默认的全局客户端。
示例
>>> lock = MultiLock(['x', 'y']) >>> lock.acquire(timeout=1) >>> # do things with protected resource 'x' and 'y' >>> lock.release()
- acquire(blocking=True, timeout=None, num_locks=None)[source]¶
获取锁。
- 参数
- blocking布尔值,可选。
如果为 False,则完全不在调度器中等待锁。
- timeout字符串、数字或 timedelta,可选。
在调度器中等待锁的秒数。这不包括本地协程时间、网络传输时间等。当 blocking 为 False 时禁止指定超时。除了秒数之外,还可以指定字符串格式的 timedelta,例如“200ms”。
- num_locks整数,可选。
所需锁的数量。如果为 None,则需要所有锁。
- 返回值
- 是否成功获取锁的布尔值(True 或 False)。
示例
>>> lock = MultiLock(['x', 'y']) >>> lock.acquire(timeout="1s")
- class distributed.Semaphore(max_leases=1, name=None, scheduler_rpc=None, loop=None)[source]¶
此信号量将在调度器上跟踪租约,此类实例可以获取和释放这些租约。如果已获取最大数量的租约,则无法获取更多,调用者将等待直到另一个租约被释放。
租约的生命周期通过超时进行控制。此超时由该实例的
Client
定期刷新,并在工作节点失败的情况下提供死锁或资源饥饿保护。超时可以使用配置选项distributed.scheduler.locks.lease-timeout
进行控制,调度器验证超时的间隔使用选项distributed.scheduler.locks.lease-validation-interval
进行设置。与 Python 标准库的信号量的一个显著区别是,此实现不允许释放次数多于获取次数。如果发生这种情况,将发出警告,但内部状态不会被修改。
警告
在发生租约超时的情况下,此实现容易受到租约超额预订的影响。建议监控日志信息,并根据用户应用程序调整上述配置选项为合适的值。
- 参数
- max_leases: int(可选)
可以同时授予的最大租约数量。这有效地设置了对特定资源并行访问数量的上限。默认为 1。
- name: string (可选)
要获取的信号量的名称。选择相同的名称允许两个不相关的进程进行协调。如果未给出,将生成一个随机名称。
- register: bool
如果为 True,则向调度器注册信号量。在获取任何租约之前必须完成此操作。如果初始化时未完成,也可以通过调用此类的 register 方法来完成。注册时,需要等待。
- scheduler_rpc: ConnectionPool
用于连接调度器的 ConnectionPool。如果提供 None,则使用工作节点或客户端连接池。此参数主要用于测试。
- loop: IOLoop
此实例正在使用的事件循环。如果提供 None,则重用活动工作节点或客户端的循环。
注意事项
如果客户端尝试释放信号量但未获取租约,将引发异常。
dask 默认假定函数是纯函数来执行它们,当在此类函数内部使用信号量的获取/释放时,必须注意实际上 *存在* 副作用,因此,该函数不能再被认为是纯函数。如果不考虑这一点,可能会导致意外行为。
示例
>>> from distributed import Semaphore ... sem = Semaphore(max_leases=2, name='my_database') ... ... def access_resource(s, sem): ... # This automatically acquires a lease from the semaphore (if available) which will be ... # released when leaving the context manager. ... with sem: ... pass ... ... futures = client.map(access_resource, range(10), sem=sem) ... client.gather(futures) ... # Once done, close the semaphore to clean up the state on scheduler side. ... sem.close()
- class distributed.Queue(name=None, client=None, maxsize=0)[source]¶
分布式 Queue
这允许多个客户端使用多生产者/多消费者队列相互共享 Future 或少量数据。所有元数据都通过调度器进行序列化。
队列的元素必须是 Future 或 msgpack 可编码数据(整数、字符串、列表、字典)。所有数据都通过调度器发送,因此发送大型对象是不明智的。要共享大型对象,请分散数据并改为共享 Future。
警告
此对象是实验性的。
- 参数
- name: string (可选)
其他客户端和调度器用于标识队列的名称。如果未给出,将生成一个随机名称。
- client: Client(可选)
用于与调度器通信的客户端。如果未给出,将使用默认的全局客户端。
- maxsize: int(可选)
队列中允许的最大项目数。如果为 0(默认值),则队列大小不受限制。
另请参见
Variable
客户端之间共享的变量。
示例
>>> from dask.distributed import Client, Queue >>> client = Client() >>> queue = Queue('x') >>> future = client.submit(f, x) >>> queue.put(future)
- get(timeout=None, batch=False, **kwargs)[source]¶
从队列中获取数据。
- 参数
- timeout数字、字符串或 timedelta,可选。
超时前等待的秒数。除了秒数之外,还可以指定字符串格式的 timedelta,例如“200ms”。
- batch布尔值,整数(可选)。
如果为 True,则返回当前在队列中等待的所有元素。如果是整数,则返回队列中的该数量元素。如果为 False(默认值),则一次返回一个项目。
- class distributed.Variable(name=None, client=None)[source]¶
分布式全局 Variable
这允许多个客户端使用单个可变变量相互共享 Future 和数据。所有元数据都通过调度器进行序列化。可能发生竞态条件。
值必须是 Future 或 msgpack 可编码数据(整数、列表、字符串等)。所有数据都将保存在调度器并通过其发送,因此不宜发送太多数据。如果要共享大量数据,请
scatter
它并改为共享 Future。- 参数
- name: string (可选)
其他客户端和调度器用于标识变量的名称。如果未给出,将生成一个随机名称。
- client: Client(可选)
用于与调度器通信的客户端。如果未给出,将使用默认的全局客户端。
另请参见
Queue
客户端之间共享的多生产者/多消费者队列。
示例
>>> from dask.distributed import Client, Variable >>> client = Client() >>> x = Variable('x') >>> x.set(123) # docttest: +SKIP >>> x.get() # docttest: +SKIP 123 >>> future = client.submit(f, x) >>> x.set(future)
集群¶
与集群创建和管理相关的类。其他库(如 dask-jobqueue、dask-gateway、dask-kubernetes、dask-yarn 等)提供额外的集群对象。
|
创建本地调度器和工作节点。 |
|
需要工作节点完整规格说明的集群。 |
- class distributed.LocalCluster(name=None, n_workers=None, threads_per_worker=None, processes=None, loop=None, start=None, host=None, ip=None, scheduler_port=0, silence_logs=30, dashboard_address=':8787', worker_dashboard_address=None, diagnostics_port=None, services=None, worker_services=None, service_kwargs=None, asynchronous=False, security=None, protocol=None, blocked_handlers=None, interface=None, worker_class=None, scheduler_kwargs=None, scheduler_sync_interval=1, **worker_kwargs)[source]¶
创建本地调度器和工作节点。
这将在本地机器上创建一个由调度器和工作节点组成的“集群”。
- 参数
- n_workers: int
要启动的工作节点数量。
- memory_limit: str, float, int, 或 None, 默认为 “auto”
设置*每个工作节点*的内存限制。
参数数据类型注意事项
如果为 None 或 0,则不应用限制。
如果为“auto”,则总系统内存将在工作节点之间平均分配。
如果为浮点数,则该比例的系统内存用于*每个工作节点*。
如果是表示字节数的字符串(如
"1GiB"
),则该数量用于*每个工作节点*。如果为整数,则该字节数用于*每个工作节点*。
请注意,仅当
processes=True
时才会强制执行此限制,且此限制仅以尽力而为的方式强制执行——工作节点仍然可能超过此限制。- processes: bool
是使用进程(True)还是线程(False)。默认为 True,除非 worker_class=Worker,此时默认为 False。
- threads_per_worker: int
每个工作节点的线程数量。
- scheduler_port: int
调度器的端口。使用 0 选择随机端口(默认)。8786 是常用选择。
- silence_logs: 日志级别
打印到标准输出的日志级别。默认为
logging.WARN
。使用 False 或 None 等假值表示不更改。- host: string
调度器将监听的主机地址,默认为仅 localhost。
- ip: string
已弃用。请参见上面的
host
。- dashboard_address: str
Bokeh 诊断服务器监听的地址,例如“localhost:8787”或“0.0.0.0:8787”。默认为“:8787”。设置为
None
以禁用仪表板。使用“:0”表示随机端口。仅指定端口(如“:8787”)时,仪表板将绑定到host
参数指定的接口。如果host
为空,则将在所有接口“0.0.0.0”上绑定。为避免在本地部署时的防火墙问题,请将host
设置为“localhost”。- worker_dashboard_address: str
Bokeh 工作节点诊断服务器监听的地址,例如“localhost:8787”或“0.0.0.0:8787”。默认为 None,禁用仪表板。使用“:0”表示随机端口。
- diagnostics_port: int
已弃用。请参见 dashboard_address。
- asynchronous: bool (默认为 False)
asynchronous: 如果在 async/await 函数或 Tornado gen.coroutines 中使用此集群,则设置为 True。正常使用时应保持为 False。
- blocked_handlers: List[str]
一个字符串列表,指定调度器上不允许的处理程序黑名单,例如
['feed', 'run_function']
。- service_kwargs: Dict[str, Dict]
传递给正在运行服务的额外关键字参数。
- securitySecurity 或布尔值,可选。
配置此集群中的通信安全。可以是 security 对象或 True。如果为 True,则会自动创建临时的自签名凭据。
- protocol: str(可选)
要使用的协议,如
tcp://
、tls://
、inproc://
。根据processes
和security
等其他关键字参数,此参数默认为合理的选择。- interface: str(可选)
要使用的网络接口。默认为 lo/localhost。
- worker_class: Worker
用于实例化工作节点的工作节点类。如果 processes=False,默认为 Worker;如果 processes=True 或省略,默认为 Nanny。
- **worker_kwargs
额外的工作节点参数。任何额外的关键字参数都将传递给
Worker
类构造函数。
示例
>>> cluster = LocalCluster() # Create a local cluster >>> cluster LocalCluster("127.0.0.1:8786", workers=8, threads=8)
>>> c = Client(cluster) # connect to local cluster
将集群扩展到三个工作节点。
>>> cluster.scale(3)
将额外关键字参数传递给 Bokeh。
>>> LocalCluster(service_kwargs={'dashboard': {'prefix': '/foo'}})
- class distributed.SpecCluster(workers=None, scheduler=None, worker=None, asynchronous=False, loop=None, security=None, silence_logs=False, name=None, shutdown_on_close=True, scheduler_sync_interval=1, shutdown_scheduler=True)[source]¶
需要工作节点完整规格说明的集群。
SpecCluster 类需要使用的调度器和工作节点的完整规格说明。它移除了对用户输入(如线程 vs 进程、核心数量等)的任何处理,以及对集群资源管理器(如 pod、作业等)的任何处理。相反,它期望在调度器和工作节点规格说明中传递这些信息。此类负责处理在适当时间异步地干净地设置和拆除的所有逻辑。希望它可以成为其他更以用户为中心的类的基础。
- 参数
- workers: dict
将名称映射到工作节点类及其规格说明的字典。参见下面的示例。
- scheduler: dict,可选。
调度器的类似映射。
- worker: dict
单个工作节点的规格说明。这用于创建的任何新工作节点。
- asynchronous: bool
asynchronous: 如果在 async/await 函数或 Tornado gen.coroutines 中使用此集群,则设置为 True。正常使用时应保持为 False。
- silence_logs: bool
设置集群时是否应静默日志记录。
- name: str,可选。
打印集群时使用的名称,默认为类型名称。
- shutdown_on_close: bool
程序退出时是否关闭集群。
- shutdown_scheduler: bool
关闭集群时是否关停调度器。
示例
要创建 SpecCluster,您需要指定如何设置调度器和工作节点。
>>> from dask.distributed import Scheduler, Worker, Nanny >>> scheduler = {'cls': Scheduler, 'options': {"dashboard_address": ':8787'}} >>> workers = { ... 'my-worker': {"cls": Worker, "options": {"nthreads": 1}}, ... 'my-nanny': {"cls": Nanny, "options": {"nthreads": 2}}, ... } >>> cluster = SpecCluster(scheduler=scheduler, workers=workers)
工作节点规格说明存储为
.worker_spec
属性。>>> cluster.worker_spec { 'my-worker': {"cls": Worker, "options": {"nthreads": 1}}, 'my-nanny': {"cls": Nanny, "options": {"nthreads": 2}}, }
而此规格说明的实例化存储在
.workers
属性中。>>> cluster.workers { 'my-worker': <Worker ...> 'my-nanny': <Nanny ...> }
如果规格说明发生变化,我们可以 await 集群或调用
._correct_state
方法使实际状态与指定状态对齐。我们也可以对集群进行
.scale(...)
操作,这将添加给定形式的新工作节点。>>> worker = {'cls': Worker, 'options': {}} >>> cluster = SpecCluster(scheduler=scheduler, worker=worker) >>> cluster.worker_spec {}
>>> cluster.scale(3) >>> cluster.worker_spec { 0: {'cls': Worker, 'options': {}}, 1: {'cls': Worker, 'options': {}}, 2: {'cls': Worker, 'options': {}}, }
注意,上面我们使用的是标准的
Worker
和Nanny
类,但在实践中可以使用处理资源管理的其他类,如KubernetesPod
或SLURMJob
。规格说明不需要符合标准 Dask Worker 类的预期。它只需要使用提供的选项调用,支持__await__
和close
方法以及worker_address
属性。另请注意,规格说明无需统一。可以在外部(子类中)添加其他 API,将不同规格说明的工作节点添加到同一个字典中。
如果规格说明中的单个条目将生成多个 dask 工作节点,请向规格说明提供一个 “group” 元素,其中包含将添加到每个名称的后缀(这应由您的工作节点类处理)。
>>> cluster.worker_spec { 0: {"cls": MultiWorker, "options": {"processes": 3}, "group": ["-0", "-1", -2"]} 1: {"cls": MultiWorker, "options": {"processes": 2}, "group": ["-0", "-1"]} }
这些后缀应与工作节点部署时使用的名称相对应。
>>> [ws.name for ws in cluster.scheduler.workers.values()] ["0-0", "0-1", "0-2", "1-0", "1-1"]
- adapt(Adaptive: type[distributed.deploy.adaptive.Adaptive] = <class 'distributed.deploy.adaptive.Adaptive'>, minimum: float = 0, maximum: float = inf, minimum_cores: int | None = None, maximum_cores: int | None = None, minimum_memory: str | None = None, maximum_memory: str | None = None, **kwargs: typing.Any) distributed.deploy.adaptive.Adaptive [source]¶
开启适应性。
这会根据调度器活动自动缩放 Dask 集群。
- 参数
- minimum整数。
最小工作节点数量。
- maximum整数。
最大工作节点数量。
- minimum_cores整数。
集群中保留的最小核心/线程数量。
- maximum_cores整数。
集群中保留的最大核心/线程数量。
- minimum_memory字符串。
集群中保留的最小内存量,表示为字符串,例如“100 GiB”。
- maximum_memory字符串。
集群中保留的最大内存量,表示为字符串,例如“100 GiB”。
另请参见
dask.distributed.Adaptive
更多关键字参数请参见。
示例
>>> cluster.adapt(minimum=0, maximum_memory="100 GiB", interval='500ms')
- classmethod from_name(name: str) distributed.deploy.spec.ProcessInterface [source]¶
创建此类的实例以按名称表示现有集群。
- scale(n=0, memory=None, cores=None)[source]¶
将集群扩展到 n 个工作节点。
- 参数
- nint
目标工作节点数量。
示例
>>> cluster.scale(10) # scale cluster to ten workers
- scale_up(n=0, memory=None, cores=None)¶
将集群扩展到 n 个工作节点。
- 参数
- nint
目标工作节点数量。
示例
>>> cluster.scale(10) # scale cluster to ten workers
其他¶
- class distributed.as_completed(futures=None, loop=None, with_results=False, raise_errors=True, *, timeout=None)[source]¶
按 future 完成的顺序返回它们
这返回一个迭代器,该迭代器按照输入 Future 对象完成的顺序产生这些对象。对迭代器调用
next
将阻塞直到下一个 Future 完成,无论顺序如何。此外,您还可以在计算过程中使用
.add
方法向此对象添加更多 Future。- 参数
- futures: Future 集合。
一个 Future 对象列表,将按照它们完成的顺序进行迭代。
- with_results: bool (False)
是否等待并包含 Future 的结果;在这种情况下,
as_completed
会产生一个 (future, result) 元组。- raise_errors: bool (True)
当 Future 的结果引发异常时是否应抛出异常;仅在
with_results=True
时影响行为。- timeout: int(可选)
如果调用
__next__()
或__anext__()
,并且从最初调用as_completed()
起经过 timeout 秒后结果仍不可用,则返回的迭代器会抛出dask.distributed.TimeoutError
。如果未指定 timeout 或为None
,则等待时间没有限制。
示例
>>> x, y, z = client.map(inc, [1, 2, 3]) >>> for future in as_completed([x, y, z]): ... print(future.result()) 3 2 4
在计算过程中添加更多 Future。
>>> x, y, z = client.map(inc, [1, 2, 3]) >>> ac = as_completed([x, y, z]) >>> for future in ac: ... print(future.result()) ... if random.random() < 0.5: ... ac.add(c.submit(double, future)) 4 2 8 3 6 12 24
选择性地等待直到结果也已收集。
>>> ac = as_completed([x, y, z], with_results=True) >>> for future, result in ac: ... print(result) 2 4 3
- batches()[source]¶
一次产生所有已完成的 Future,而不是一个接一个地产生。
这返回一个 Future 列表或 (Future, 结果) 元组列表的迭代器,而不是单个 Future 或单个 (Future, 结果) 元组。它将尽快产生这些,无需等待。
示例
>>> for batch in as_completed(futures).batches(): ... results = client.gather(batch) ... print(results) [4, 2] [1, 3, 7] [5] [6]
- distributed.diagnostics.progressbar.progress(*futures, notebook=None, multi=True, complete=True, group_by='prefix', **kwargs)[source]¶
跟踪 future 的进度
这在 Notebook 和控制台中操作方式不同。
Notebook: 它立即返回,在屏幕上留下一个 IPython 小部件。
控制台: 它会阻塞,直到计算完成。
- 参数
- futuresFuture 对象
要跟踪的 Future 对象列表或键列表。
- notebook布尔值 (可选)
是否在 Notebook 中运行 (默认为自动检测)。
- multi布尔值 (可选)
独立跟踪不同的函数 (默认为 True)。
- complete布尔值 (可选)
跟踪所有键 (True) 或仅跟踪尚未运行的键 (False) (默认为 True)。
- group_by可调用对象 | Literal[“spans”] | Literal[“prefix”]
使用 spans 而不是任务键名称对任务进行分组 (默认为“prefix”)。
注意事项
在 Notebook 中,progress 的输出必须是单元格中的最后一条语句。通常,这意味着在单元格的末尾调用 progress。
示例
>>> progress(futures) [########################################] | 100% Completed | 1.7s
- distributed.wait(fs, timeout=None, return_when='ALL_COMPLETED')[source]¶
等待所有/任何 future 完成
- 参数
- fsFuture 对象列表
- timeout数字,字符串,可选
在此时间后引发
dask.distributed.TimeoutError
。可以是像"10 minutes"
这样的字符串或要等待的秒数。- return_when字符串,可选
ALL_COMPLETED 或 FIRST_COMPLETED 中的一个。
- 返回值
- 包含已完成和未完成项的命名元组。
- distributed.fire_and_forget(obj)[source]¶
即使我们释放 future,也要至少运行任务一次
正常操作下,Dask 不会运行任何没有活跃 Future 对象的任务(这在许多情况下避免了不必要的工作)。然而,有时您可能只想触发一个任务,而不跟踪其 Future 对象,并期望它最终完成。您可以在一个或一组 Future 对象上使用此函数,要求 Dask 完成任务,即使没有活跃的客户端正在跟踪它。
任务完成后,结果不会保留在内存中(除非有活跃的 Future 对象),因此这仅对依赖副作用的任务有用。
- 参数
- objFuture 对象,列表,字典,dask 集合
您希望至少运行一次的 Future 对象。
示例
>>> fire_and_forget(client.submit(func, *args))
- distributed.futures_of(o, client=None)[source]¶
集合中的 Future 对象
- 参数
- o集合
Dask 对象的可能嵌套集合。
- clientClient 对象,可选
客户端对象。
- 返回值
- futuresList[Future]
这些集合持有的 Future 对象列表。
- 引发
- CancelledError
如果其中一个 Future 对象被取消,则会引发 CancelledError。
示例
>>> futures_of(my_dask_dataframe) [<Future: finished key: ...>, <Future: pending key: ...>]
- distributed.worker_client(timeout=None, separate_thread=True)[source]¶
获取此线程的客户端
此上下文管理器旨在在我们在 Worker 上运行的函数中调用。作为上下文管理器运行时,它提供一个客户端
Client
对象,该对象可以直接从该 Worker 提交其他任务。- 参数
- timeout数字或字符串
超时时间,在此时间后会出错。默认为
distributed.comm.timeouts.connect
配置值。- separate_thread布尔值,可选
是否在普通线程池之外运行此函数,默认为 True。
另请参见
示例
>>> def func(x): ... with worker_client() as c: # connect from worker back to scheduler ... a = c.submit(inc, x) # this task can submit more tasks ... b = c.submit(dec, x) ... result = c.gather([a, b]) # and gather results ... return result
>>> future = client.submit(func, 1) # submit func(1) on cluster
- distributed.get_worker() distributed.worker.Worker [source]¶
获取当前运行此任务的 worker
另请参见
示例
>>> def f(): ... worker = get_worker() # The worker on which this task is running ... return worker.address
>>> future = client.submit(f) >>> future.result() 'tcp://127.0.0.1:47373'
- distributed.get_client(address=None, timeout=None, resolve_address=True) Client [source]¶
在任务中获取客户端。
此客户端连接到与 Worker 连接的同一个调度器。
- 参数
- address字符串,可选
要连接的调度器地址。默认为 Worker 连接的调度器地址。
- timeout整数或字符串
获取客户端的超时时间(以秒为单位)。默认为
distributed.comm.timeouts.connect
配置值。- resolve_address布尔值,默认为 True
是否将 address 解析为其规范形式。
- 返回值
- Client
示例
>>> def f(): ... client = get_client(timeout="10s") ... futures = client.map(lambda x: x + 1, range(10)) # spawn many tasks ... results = client.gather(futures) ... return sum(results)
>>> future = client.submit(f) >>> future.result() 55
- distributed.secede()[source]¶
让此任务脱离 Worker 的线程池。
这会为新任务腾出一个新的调度槽和一个新的线程。这使得客户端能够在此节点上调度任务,在等待其他作业完成时(例如,使用
client.gather
)尤其有用。另请参见
示例
>>> def mytask(x): ... # do some work ... client = get_client() ... futures = client.map(...) # do some remote work ... secede() # while that work happens, remove ourself from the pool ... return client.gather(futures) # return gathered results
- distributed.rejoin()[source]¶
使此线程重新加入 ThreadPoolExecutor
这将阻塞,直到执行器中出现一个新的槽。下一个完成任务的线程将离开池,以允许此线程加入。
另请参见
secede
离开线程池。
- distributed.print(*args, sep: str | None = ' ', end: str | None = '\n', file: TextIO | None = None, flush: bool = False) None [source]¶
这是内置
print
函数的替代品,用于从 Worker 向客户端进行远程打印。如果在 Dask Worker 外部调用,其参数将直接传递给builtins.print()
。如果由在 Worker 上运行的代码调用,则除了本地打印外,连接到管理此 Worker 的调度器的任何客户端(可能远程)将接收一个事件,指示它们将相同的输出打印到自己的标准输出或标准错误流。例如,用户可以通过在提交的代码中包含对此print
函数的调用,并在本地 Jupyter Notebook 或解释器会话中检查输出来对远程计算进行简单调试。所有参数的行为与
builtins.print()
的参数相同,但有一个例外:如果指定了关键字参数file
,则它必须是sys.stdout
或sys.stderr
;不允许任意的文件类对象。所有非关键字参数都使用
str()
转换为字符串,并写入流中,由sep
分隔,后跟end
。sep
和end
都必须是字符串;它们也可以是None
,表示使用默认值。如果没有给出对象,print()
将只写入end
。- 参数
- sep字符串,可选
插入在值之间的字符串,默认为一个空格。
- end字符串,可选
附加在最后一个值之后的字符串,默认为换行符。
- file
sys.stdout
或sys.stderr
,可选 默认为当前的 sys.stdout。
- flush布尔值,默认为 False
是否强制刷新流。
示例
>>> from dask.distributed import Client, print >>> client = distributed.Client(...) >>> def worker_function(): ... print("Hello from worker!") >>> client.submit(worker_function) <Future: finished, type: NoneType, key: worker_function-...> Hello from worker!
- distributed.warn(message: str | Warning, category: type[Warning] | None = <class 'UserWarning'>, stacklevel: int = 1, source: typing.Any = None) None [source]¶
内置
warnings.warn()
函数的替代品,用于从 worker 远程发出警告到客户端。如果在 Dask Worker 外部调用,其参数将直接传递给
warnings.warn()
。如果由在 Worker 上运行的代码调用,则除了本地发出警告外,连接到管理此 Worker 的调度器的任何客户端(可能远程)将接收一个事件,指示它们发出相同的警告(取决于其本地过滤器等)。在实现可能在 Worker 上运行的计算时,用户可以调用此warn
函数,以确保任何远程客户端会话将看到其警告,例如在 Jupyter 输出单元格中。虽然本地发出的警告会尊重所有参数(含义与
warnings.warn()
中的相同),但客户端会忽略stacklevel
和source
,因为它们在客户端线程中没有意义。示例
>>> from dask.distributed import Client, warn >>> client = Client() >>> def do_warn(): ... warn("A warning from a worker.") >>> client.submit(do_warn).result() /path/to/distributed/client.py:678: UserWarning: A warning from a worker.
- class distributed.Reschedule[source]¶
重新调度此任务
引发此异常将停止当前任务的执行,并请求调度器重新调度此任务,可能在不同的机器上。
这不保证任务会迁移到不同的机器。调度器将遵循其正常的启发式算法来确定接受此任务的最佳机器。如果自从首次调度任务以来集群的负载发生显著变化,机器可能会改变。
- class distributed.get_task_stream(client=None, plot=False, filename='task-stream.html')[source]¶
在上下文块内收集任务流
这提供了在此块处于活动状态期间运行的每个任务的诊断信息。
这必须用作上下文管理器。
- 参数
- plot: 布尔值, 字符串
如果为 true,则也返回一个 Bokeh 图;如果 plot == ‘save’,则将图保存到文件
- filename: 字符串 (可选)
如果您设置
plot='save'
,要保存到的文件名。
另请参见
Client.get_task_stream
此上下文管理器对应的函数版本。
示例
>>> with get_task_stream() as ts: ... x.compute() >>> ts.data [...]
获取 Bokeh 图并可选择保存到文件。
>>> with get_task_stream(plot='save', filename='task-stream.html') as ts: ... x.compute() >>> ts.figure <Bokeh Figure>
要与他人共享此文件,您可能希望将其上传并在线提供服务。一种常见的方法是将其作为 gist 上传,然后在 https://raw.githack.com 上提供服务。
$ python -m pip install gist $ gist task-stream.html https://gist.github.com/8a5b3c74b10b413f612bb5e250856ceb
然后您可以导航到该网站,点击
task-stream.html
文件右侧的“Raw”按钮,然后将该 URL 提供给 https://raw.githack.com。此过程应提供一个可共享的链接,他人可以使用该链接查看您的任务流图。
- class distributed.get_task_metadata[source]¶
在上下文块内收集任务元数据
这会从调度器收集在此上下文管理器范围内提交和完成的任务的
TaskState
元数据和最终状态。示例
>>> with get_task_metadata() as tasks: ... x.compute() >>> tasks.metadata {...} >>> tasks.state {...}
- class distributed.performance_report(filename='dask-report.html', stacklevel=1, mode=None, storage_options=None)[source]¶
收集性能报告
这会创建一个静态 HTML 文件,其中包含许多与仪表板相同的图,供以后查看。
生成的文件使用 JavaScript,因此必须用网页浏览器查看。在本地,我们建议使用
python -m http.server
或在线托管文件。- 参数
- filename: 字符串,可选
本地保存性能报告的文件名。
- stacklevel: 整数,可选
用于填充报告中“Calling Code”部分的程序执行帧。默认为 1,即调用
performance_report
的帧。- mode: 字符串,可选
传递给
bokeh.io.output.output_file()
的模式参数。默认为None
。- storage_options: 字典,可选
写入 URL 时传递给
fsspec.open()
的任何附加参数。
示例
>>> with performance_report(filename="myfile.html", stacklevel=1): ... x.compute()
工具函数¶
- distributed.diagnostics.memray.memray_scheduler(directory: str | pathlib.Path = 'memray-profiles', report_args: Union[collections.abc.Sequence[str], Literal[False]] = ('flamegraph', '--temporal', '--leaks'), **memray_kwargs: Any) collections.abc.Iterator[None] [source]¶
在调度器上生成 Memray 分析报告并下载生成的报告。
示例
with memray_scheduler(): client.submit(my_function).result() # Or even while the computation is already running fut = client.submit(my_function) with memray_scheduler(): time.sleep(10) fut.result()
- 参数
- directory字符串
保存报告的目录。
- report_argstuple[str]
特别是对于 native_traces=True,报告必须在生成配置文件的同一主机上使用相同的 Python 解释器生成。否则,native traces 将产生不可用的结果。因此,我们在调度器上生成报告,然后下载它们。您可以通过提供额外的参数来修改报告生成过程,我们将按以下方式生成报告:
memray *report_args -f <filename> -o <filename>.html
如果应抓取原始数据而不是报告,请将其设置为 False。
- **memray_kwargs
要传递给 memray.Tracker 的关键字参数,例如 {“native_traces”: True}。
- distributed.diagnostics.memray.memray_workers(directory: str | pathlib.Path = 'memray-profiles', workers: int | None | list[str] = None, report_args: Union[collections.abc.Sequence[str], Literal[False]] = ('flamegraph', '--temporal', '--leaks'), fetch_reports_parallel: bool | int = True, **memray_kwargs: Any) collections.abc.Iterator[None] [source]¶
在 worker 上生成 Memray 分析报告并下载生成的报告。
示例
with memray_workers(): client.submit(my_function).result() # Or even while the computation is already running fut = client.submit(my_function) with memray_workers(): time.sleep(10) fut.result()
- 参数
- directory字符串
保存报告的目录。
- workersint | None | list[str]
要分析性能的 Worker。如果为整数,则使用前 n 个 Worker。如果为 None,则使用所有 Worker。如果为 list[str],则使用具有给定地址的 Worker。
- report_argstuple[str]
特别是对于 native_traces=True,报告必须在生成配置文件的同一主机上使用相同的 Python 解释器生成。否则,native traces 将产生不可用的结果。因此,我们在 Worker 上生成报告,然后下载它们。您可以通过提供额外的参数来修改报告生成过程,我们将按以下方式生成报告:
memray *report_args -f <filename> -o <filename>.html
如果应抓取原始数据而不是报告,请将其设置为 False。
- fetch_reports_parallelbool | int
抓取结果有时很慢,有时也不希望等待所有 Worker 完成后才接收第一批报告。这控制了同时抓取的 Worker 数量。
int: 同时抓取的 Worker 数量 True: 所有 Worker 同时抓取 False: 一次一个 Worker 抓取
- **memray_kwargs
要传递给 memray.Tracker 的关键字参数,例如 {“native_traces”: True}。
自适应¶
- class distributed.deploy.Adaptive(cluster: Cluster, interval: str | float | timedelta | None = None, minimum: int | None = None, maximum: int | float | None = None, wait_count: int | None = None, target_duration: str | float | timedelta | None = None, worker_key: Callable[[distributed.scheduler.WorkerState], Hashable] | None = None, **kwargs: Any)[source]¶
根据调度器负载自适应地分配 Worker。一个超类。
包含根据当前使用情况动态调整 Dask 集群大小的逻辑。此类需要与一个能够使用集群资源管理器创建和销毁 Dask Worker 的系统配对。通常它被内置到现有解决方案中,而不是由用户直接使用。它最常用于各种 Dask 集群类的
.adapt(...)
方法。- 参数
- cluster: 对象
必须具有 scale 和 scale_down 方法/协程。
- intervaltimedelta 或字符串,默认为 “1000 ms”
两次检查之间的毫秒数。
- wait_count: 整数,默认为 3
在移除 Worker 之前,连续建议移除该 Worker 的次数。
- target_duration: timedelta 或字符串,默认为 “5s”
我们希望计算花费的时间。这会影响我们扩容的积极程度。
- worker_key: 可调用对象[WorkerState]
缩容时将 Worker 分组的函数。有关更多信息,请参阅 Scheduler.workers_to_close。
- minimum: 整数
要保留的最小 Worker 数量。
- maximum: 整数
要保留的最大 Worker 数量。
- **kwargs
传递给 Scheduler.workers_to_close 的额外参数。
注意事项
子类可以覆盖
Adaptive.target()
和Adaptive.workers_to_close()
来控制何时调整集群大小。默认实现会检查每个 Worker 上的任务是否过多或可用内存是否过少(请参阅distributed.Scheduler.adaptive_target()
)。interval、min、max、wait_count 和 target_duration 的值可以在 dask 配置中,在 distributed.adaptive 键下指定。示例
这通常在现有的 Dask 类中使用,例如 KubeCluster。
>>> from dask_kubernetes import KubeCluster >>> cluster = KubeCluster() >>> cluster.adapt(minimum=10, maximum=100)
或者,您可以通过继承 Dask 的 Cluster 超类,从您自己的 Cluster 类中使用它。
>>> from distributed.deploy import Cluster >>> class MyCluster(Cluster): ... def scale_up(self, n): ... """ Bring worker count up to n """ ... def scale_down(self, workers): ... """ Remove worker addresses from cluster """
>>> cluster = MyCluster() >>> cluster.adapt(minimum=10, maximum=100)
- property loop: tornado.ioloop.IOLoop¶
覆盖 Adaptive.loop。
- state: AdaptiveStateState¶
此自适应策略是否定期进行自适应。