API

Client

客户端连接并提交计算到 Dask 集群(例如 distributed.LocalCluster

Client([address, loop, timeout, ...])

连接并提交计算到 Dask 集群

Client.as_current()

线程局部、任务局部上下文管理器,使 Client.current 类方法返回 self。

Client.benchmark_hardware()

在 Worker 上运行内存、磁盘和网络带宽基准测试

Client.call_stack([futures, keys])

所有相关 key 的活动调用栈

Client.cancel(futures[, asynchronous, ...])

取消正在运行的 future 这会阻止尚未运行的 future 任务被调度,如果它们已经运行则删除它们。

Client.close([timeout])

关闭此客户端

Client.compute(collections[, sync, ...])

在集群上计算 dask 集合

Client.current([allow_global])

as_client 的上下文内运行时,返回上下文局部的当前客户端。

Client.dump_cluster_state([filename, ...])

提取整个集群状态的转储并持久化到磁盘或 URL。

Client.forward_logging([logger_name, level])

开始将给定的 logger(默认为 root)及其下的所有 loggers 从 worker 任务转发到客户端进程。

Client.futures_of(futures)

futures_of 的封装方法

Client.gather(futures[, errors, direct, ...])

从分布式内存中收集 future

Client.get(dsk, keys[, workers, ...])

计算 dask 图

Client.get_dataset(name[, default])

如果存在,从调度器获取命名数据集。

Client.get_events([topic])

检索结构化的主题日志

Client.get_executor(**kwargs)

返回一个 concurrent.futures Executor,用于在此 Client 上提交任务

Client.get_metadata(keys[, default])

从调度器获取任意元数据

Client.get_scheduler_logs([n])

从调度器获取日志

Client.get_task_stream([start, stop, count, ...])

从调度器获取任务流数据

Client.get_versions([check, packages])

返回调度器、所有 worker 和自身的版本信息

Client.get_worker_logs([n, workers, nanny])

从 workers 获取日志

Client.has_what([workers])

哪些 key 由哪些 worker 持有

Client.list_datasets(**kwargs)

列出调度器上可用的命名数据集

Client.log_event(topic, msg)

在给定主题下记录一个事件

Client.map(func, *iterables[, key, workers, ...])

将函数映射到一系列参数上

Client.nbytes([keys, summary])

集群上每个 key 占用的字节数

Client.ncores([workers])

每个 worker 节点上可用的线程/核心数

Client.normalize_collection(collection)

如果 collections 的任务已存在 futures,则替换它们

Client.nthreads([workers])

每个 worker 节点上可用的线程/核心数

Client.persist(collections[, ...])

在集群上持久化 dask 集合

Client.processing([workers])

当前在每个 worker 上运行的任务

Client.profile([key, start, stop, workers, ...])

收集有关近期工作的统计分析信息

Client.publish_dataset(*args, **kwargs)

向调度器发布命名数据集

Client.rebalance([futures, workers])

在网络内重新平衡数据

Client.register_plugin(plugin[, name, ...])

注册一个插件。

Client.register_scheduler_plugin(plugin[, ...])

注册一个调度器插件。

Client.register_worker_callbacks([setup])

为所有当前和未来的 worker 注册一个 setup 回调函数。

Client.register_worker_plugin(plugin[, ...])

为所有当前和未来的 worker 注册一个生命周期 worker 插件。

Client.replicate(futures[, n, workers, ...])

设置网络中 future 的复制

Client.restart([timeout, wait_for_workers])

重启所有 worker。

Client.restart_workers(workers[, timeout, ...])

重启指定的 worker 集合

Client.retire_workers([workers, close_workers])

在调度器上淘汰某些 worker

Client.retry(futures[, asynchronous])

重试失败的 future

Client.run(function, *args[, workers, wait, ...])

在任务调度系统之外在所有 worker 上运行函数

Client.run_on_scheduler(function, *args, ...)

在调度器进程上运行函数

Client.scatter(data[, workers, broadcast, ...])

将数据分散到分布式内存中

Client.scheduler_info([n_workers])

关于集群中 worker 的基本信息

Client.set_metadata(key, value)

在调度器中设置任意元数据

Client.shutdown()

关闭连接的调度器和 worker

Client.start(**kwargs)

在单独线程中启动调度器运行

Client.story(*keys_or_stimuli[, on_error])

返回给定 keys 或 stimulus_id 的集群范围故事

Client.submit(func, *args[, key, workers, ...])

向调度器提交函数应用

Client.subscribe_topic(topic, handler)

订阅主题并为每个接收到的事件执行处理程序

Client.unforward_logging([logger_name])

停止将给定 logger(默认为 root)从 worker 任务转发到客户端进程。

Client.unpublish_dataset(name, **kwargs)

从调度器移除命名数据集

Client.unregister_scheduler_plugin(name)

注销一个调度器插件

Client.unregister_worker_plugin(name[, nanny])

注销一个生命周期 worker 插件

Client.unsubscribe_topic(topic)

取消订阅主题并移除事件处理程序

Client.upload_file(filename[, load])

将本地包上传到调度器和 worker

Client.wait_for_workers(n_workers[, timeout])

阻塞调用,等待 n 个 worker 后继续

Client.who_has([futures])

存储每个 future 数据的 worker

Client.write_scheduler_file(scheduler_file)

将调度器信息写入 json 文件。

worker_client([timeout, separate_thread])

获取此线程的客户端

get_worker()

获取当前运行此任务的 worker

get_client([address, timeout, resolve_address])

在任务中获取客户端。

secede()

使此任务脱离 worker 的线程池

rejoin()

使此线程重新加入 ThreadPoolExecutor

print(*args[, sep, end, file, flush])

内置 print 函数的替代品,用于从 worker 远程打印到客户端。

warn(message[, category, stacklevel, source])

内置 warnings.warn() 函数的替代品,用于从 worker 远程发出警告到客户端。

重新调度

重新调度此任务

ReplayTaskClient.recreate_task_locally(future)

对于任何计算,无论成功或失败,都在本地执行任务以进行调试。

ReplayTaskClient.recreate_error_locally(future)

对于失败的计算,在本地执行导致错误的任务以进行调试。

Future

Future(key[, client, state, _id])

远程运行的计算

Future.add_done_callback(fn)

当 future 完成时,在 future 上调用回调

Future.bind_client(client)

Future.cancel([reason, msg])

取消运行此 future 的请求

Future.cancelled()

如果 future 已被取消,则返回 True

Future.done()

返回计算是否完成。

Future.exception([timeout])

返回失败任务的异常

Future.release()

注意事项

Future.result([timeout])

等待计算完成,将结果收集到本地进程。

Future.retry(**kwargs)

如果此 future 失败则重试

Future.traceback([timeout])

返回失败任务的回溯

同步

Event([name, client])

分布式集中式 Event,相当于 asyncio.Event

Lock([name, client, scheduler_rpc, loop])

分布式集中式 Lock

MultiLock([names, client])

分布式集中式 Lock

Semaphore([max_leases, name, scheduler_rpc, ...])

semaphore 将跟踪调度器上的租约,这些租约可由此类实例获取和释放。

Queue([name, client, maxsize])

分布式 Queue

Variable([name, client])

分布式全局 Variable

其他

as_completed([futures, loop, with_results, ...])

按 future 完成的顺序返回它们

distributed.diagnostics.progressbar.progress(...)

跟踪 future 的进度

wait(fs[, timeout, return_when])

等待所有/任何 future 完成

fire_and_forget(obj)

即使我们释放 future,也要至少运行任务一次

futures_of(o[, client])

集合中的 Future 对象

get_task_stream([client, plot, filename])

在上下文块内收集任务流

get_task_metadata()

在上下文块内收集任务元数据

performance_report([filename, stacklevel, ...])

收集性能报告

工具

distributed.utils.Log

用于存储按换行符分隔的日志条目字符串的容器

distributed.utils.Logs

用于存储将名称映射到日志条目字符串的字典的容器

distributed.diagnostics.memray.memray_scheduler([...])

在调度器上生成 Memray 分析报告并下载生成的报告。

distributed.diagnostics.memray.memray_workers([...])

在 worker 上生成 Memray 分析报告并下载生成的报告。

异步方法

大多数方法和函数都可以在阻塞或异步环境中使用 Tornado 协程很好地工作。如果在 Tornado IOLoop 中使用,则应适当地 yield 或 await 阻塞操作。

必须通过传递 asynchronous=True 关键字来告知客户端您打算在异步环境中使用它

# blocking
client = Client()
future = client.submit(func, *args)  # immediate, no blocking/async difference
result = client.gather(future)  # blocking

# asynchronous Python 2/3
client = yield Client(asynchronous=True)
future = client.submit(func, *args)  # immediate, no blocking/async difference
result = yield client.gather(future)  # non-blocking/asynchronous

# asynchronous Python 3
client = await Client(asynchronous=True)
future = client.submit(func, *args)  # immediate, no blocking/async difference
result = await client.gather(future)  # non-blocking/asynchronous

异步变体必须在 Tornado 协程中运行。有关更多信息,请参阅 Asynchronous 文档。

Client

class distributed.Client(address=None, loop=None, timeout=_NoDefault.no_default, set_as_default=True, scheduler_file=None, security=None, asynchronous=False, name=None, heartbeat_interval=None, serializers=None, deserializers=None, extensions={}, direct_to_workers=None, connection_limit=512, **kwargs)[source]

连接并提交计算到 Dask 集群

Client 连接用户到 Dask 集群。它提供了一个围绕函数和 future 的异步用户界面。此类类似于 concurrent.futures 中的 executors,但也允许在 submit/map 调用中使用 Future 对象。当 Client 被实例化时,默认接管所有 dask.computedask.persist 调用。

通常也可以不指定调度器地址来创建 Client,例如 Client()。在这种情况下,Client 会在后台创建一个 LocalCluster 并连接到它。在这种情况下,任何额外的关键字参数都会从 Client 传递到 LocalCluster。有关更多信息,请参阅 LocalCluster 文档。

参数
address: string, 或 Cluster

这可以是 Scheduler 服务器的地址,例如字符串 '127.0.0.1:8786' 或集群对象,例如 LocalCluster()

loop

事件循环

timeout: int (默认为配置 ``distributed.comm.timeouts.connect``)

与调度器建立初始连接的超时时长

set_as_default: bool (True)

使用此 Client 作为全局 dask 调度器

scheduler_file: string (可选)

包含调度器信息文件的路径(如果可用)

security: Security 或 bool (可选)

可选的安全信息。如果创建本地集群,也可以传入 True,在这种情况下将自动创建临时自签名凭证。

asynchronous: bool (默认为 False)

如果在 async/await 函数或 Tornado gen.coroutines 中使用此客户端,则设置为 True。否则,正常使用应保持 False。

name: string (可选)

为客户端指定一个名称,该名称将包含在调度器上生成的与此客户端相关的日志中

heartbeat_interval: int (可选)

向调度器发送心跳的间隔时间(毫秒)

serializers

序列化对象时使用的可迭代方法。有关更多信息,请参阅 Serialization

deserializers

反序列化对象时使用的可迭代方法。有关更多信息,请参阅 Serialization

extensionslist

扩展

direct_to_workers: bool (可选)

是否直接连接到 worker,或者请求调度器充当中介。

connection_limitint

连接池中一次维持的开放通信数量

**kwargs

如果不传递调度器地址,Client 将创建一个 LocalCluster 对象,并传递任何额外的关键字参数。

示例

初始化时提供集群调度器节点的地址

>>> client = Client('127.0.0.1:8786')  

使用 submit 方法将单个计算发送到集群

>>> a = client.submit(add, 1, 2)  
>>> b = client.submit(add, 10, 20)  

继续对结果使用 submit 或 map 以构建更大的计算

>>> c = client.submit(add, a, b)  

使用 gather 方法收集结果。

>>> client.gather(c)  
33

您也可以不带参数调用 Client 以创建您自己的本地集群。

>>> client = Client()  # makes your own local "cluster" 

额外的关键字将直接传递给 LocalCluster

>>> client = Client(n_workers=2, threads_per_worker=4)  
property amm

方便访问 Active Memory Manager

as_current()[source]

线程局部、任务局部上下文管理器,使 Client.current 类方法返回 self。在此上下文管理器内反序列化的任何 Future 对象将自动附加到此 Client。

benchmark_hardware() dict[source]

在 Worker 上运行内存、磁盘和网络带宽基准测试

返回值
result: dict

一个字典,将名称“disk”、“memory”和“network”映射到将大小映射到带宽的字典。这些带宽是跨集群运行计算的许多 worker 的平均值。

call_stack(futures=None, keys=None)[source]

所有相关 key 的活动调用栈

可以通过在 futures= 关键字中提供 futures 或集合,或在 keys= 关键字中提供明确的 key 列表来指定感兴趣的数据。如果两者都未提供,则返回所有调用栈。

参数
futureslist (可选)

futures 列表,默认为所有数据

keyslist (可选)

key 名称列表,默认为所有数据

示例

>>> df = dd.read_parquet(...).persist()  
>>> client.call_stack(df)  # call on collections
>>> client.call_stack()  # Or call with no arguments for all activity  
cancel(futures, asynchronous=None, force=False, reason=None, msg=None)[source]

取消正在运行的 future 这会阻止尚未运行的 future 任务被调度,如果它们已经运行则删除它们。调用后,此结果和所有依赖结果将不再可访问

参数
futuresList[Future]

Future 列表

asynchronous: bool

如果为 True,客户端处于异步模式

forceboolean (False)

即使其他客户端需要此 future,也取消它

reason: str

取消 future 的原因

msgstr

将附加到已取消 future 的消息

close(timeout=_NoDefault.no_default)[source]

关闭此客户端

Python 会话结束时,客户端也会自动关闭

如果像 Client() 这样不带参数启动客户端,那么这也会关闭同时启动的本地集群。

参数
timeoutnumber

超时秒数,之后将引发 dask.distributed.TimeoutError

另请参见

Client.restart
compute(collections, sync=False, optimize_graph=True, workers=None, allow_other_workers=False, resources=None, retries=0, priority=0, fifo_timeout='60s', actors=None, traverse=True, **kwargs)[source]

在集群上计算 dask 集合

参数
collections可迭代的 dask 对象或单个 dask 对象

例如 dask.array 或 dataframe 或 dask.value 对象之类的集合

syncbool (可选)

如果为 False(默认),则返回 Futures;如果为 True,则返回具体值

optimize_graphbool

是否优化底层图

workersstring 或 string 的可迭代对象

可以在其上执行计算的 worker 主机名集合。留空默认为所有 worker(常见情况)

allow_other_workersbool (默认为 False)

workers 一起使用。指示计算是否可以在不在 workers 集合中的 worker 上执行。

retriesint (默认为 0)

如果计算结果失败,允许的自动重试次数

priorityNumber

可选的任务优先级。零是默认值。更高的优先级具有优先权

fifo_timeouttimedelta str (默认为 ’60s’)

允许在调用之间考虑相同优先级的时间量

traversebool (默认为 True)

默认情况下,dask 遍历内置的 python 集合,查找传递给 compute 的 dask 对象。对于大型集合,这可能会很昂贵。如果参数都不包含任何 dask 对象,请设置 traverse=False 以避免此遍历。

resourcesdict (默认为 {})

定义每个映射任务实例在 worker 上所需的 resources;例如 {'GPU': 2}。有关定义资源的详细信息,请参见 worker resources

actorsbool 或 dict (默认为 None)

这些任务是否应作为有状态 actor 存在于 worker 上。指定为全局(True/False)或按任务({'x': True, 'y': False})进行。有关其他详细信息,请参见 Actors

**kwargs

传递给图优化调用的选项

返回值
如果输入是序列,则为 Futures 列表;否则为单个 future

另请参见

Client.get

正常的同步 dask.get 函数

示例

>>> from dask import delayed
>>> from operator import add
>>> x = delayed(add)(1, 2)
>>> y = delayed(add)(x, x)
>>> xx, yy = client.compute([x, y])  
>>> xx  
<Future: status: finished, key: add-8f6e709446674bad78ea8aeecfee188e>
>>> xx.result()  
3
>>> yy.result()  
6

也支持单个参数

>>> xx = client.compute(x)  
classmethod current(allow_global=True)[source]

as_client 上下文内运行时,返回上下文局部的当前客户端。否则,返回最新初始化的 Client。如果不存在 Client 实例,则引发 ValueError。如果 allow_global 设置为 False,则在 as_client 上下文管理器之外运行时引发 ValueError。

参数
allow_globalbool

如果为 True,返回默认客户端

返回值
Client

当前客户端

引发
ValueError

如果未设置客户端,则引发 ValueError

另请参见

default_client

调度器仪表盘链接。

返回值
str

仪表盘 URL。

示例

在您的默认网络浏览器中打开仪表盘

>>> import webbrowser
>>> from distributed import Client
>>> client = Client()
>>> webbrowser.open(client.dashboard_link)
dump_cluster_state(filename: str = 'dask-cluster-dump', write_from_scheduler: bool | None = None, exclude: collections.abc.Collection[str] = (), format: Literal['msgpack', 'yaml'] = 'msgpack', **storage_options)[source]

提取整个集群状态的转储并持久化到磁盘或 URL。这仅用于调试目的。

警告:调度器(以及客户端,如果将转储写入本地)上的内存使用量可能很大。在大型或长时间运行的集群上,这可能需要几分钟。在处理转储时,调度器可能无响应。

结果将存储在 dict 中

{
    "scheduler": {...},  # scheduler state
    "workers": {
        worker_addr: {...},  # worker state
        ...
    }
    "versions": {
        "scheduler": {...},
        "workers": {
            worker_addr: {...},
            ...
        }
    }
}
参数
filename

要写入的路径或 URL。适当的文件后缀(.msgpack.gz.yaml)将自动附加。

必须是 fsspec.open() 支持的路径(例如 s3://my-bucket/cluster-dumpcluster-dumps/dump)。请参阅 write_from_scheduler 来控制转储是直接从调度器写入 filename,还是通过网络发送回客户端然后在本地写入。

write_from_scheduler

如果为 None(默认),则根据 filename 看起来像 URL 还是本地路径进行推断:如果 filename 包含 ://(例如 s3://my-bucket/cluster-dump)则为 True,否则(例如 local_dir/cluster-dump)为 False。

如果为 True,则直接从调度器将集群状态写入 filename。如果 filename 是本地路径,转储将写入调度器文件系统上的该路径,因此如果调度器在临时硬件上运行,请小心。当调度器连接到网络文件系统或持久磁盘时,或用于写入 bucket 时很有用。

如果为 False,则通过网络将集群状态从调度器传输回客户端,然后写入 filename。对于大型转储,这效率较低,但当调度器无法访问任何持久存储时很有用。

exclude

应该从转储中排除的属性名称集合,例如排除代码、回溯、日志等。

默认为排除 run_spec,它是序列化的用户代码。调试通常不需要此项。要允许序列化此项,请传递一个空元组。

format

或者 "msgpack""yaml"。如果使用 msgpack(默认),输出将作为 msgpack 存储在 gzip 文件中。

读取方法

import gzip, msgpack
with gzip.open("filename") as fd:
    state = msgpack.unpack(fd)

import yaml
try:
    from yaml import CLoader as Loader
except ImportError:
    from yaml import Loader
with open("filename") as fd:
    state = yaml.load(fd, Loader=Loader)
**storage_options

写入 URL 时传递给 fsspec.open() 的任何附加参数。

forward_logging(logger_name=None, level=0)[source]

开始将给定的 logger(默认为 root)及其下的所有 loggers 从 worker 任务转发到客户端进程。每当命名 logger 在 worker 端处理 LogRecord 时,该记录将被序列化,发送到客户端,并由客户端具有相同名称的 logger 处理。

请注意,worker 端的 logger 仅在其级别设置得当时处理 LogRecords,而客户端 logger 仅在其自身级别也设置得当时发出转发的 LogRecord。例如,如果您的提交任务向 logger “foo” 记录 DEBUG 消息,那么要使 forward_logging() 导致该消息在您的客户端会话中发出,您必须确保 worker 进程客户端进程中的 logger “foo” 的级别设置为 DEBUG(或更低)。

参数
logger_namestr (可选)

要开始转发的 logger 的名称。logging 模块分层命名系统的常规规则适用。例如,如果 name"foo",那么不仅 "foo",而且 "foo.bar""foo.baz" 等也将被转发。如果 nameNone,则表示 root logger,因此所有 logger 都将被转发。

请注意,logger 仅在其级别足以处理 LogRecord 时才会转发给定的 LogRecord。

levelstr | int (可选)

可选地限制转发到此级别或更高级别的 LogRecords,即使转发的 logger 的自身级别较低。

示例

为了示例起见,假设我们按照用户的方式配置客户端日志记录:将单个 StreamHandler 附加到 root logger,输出级别为 INFO,输出格式简单

import logging
import distributed
import io, yaml

TYPICAL_LOGGING_CONFIG = '''
version: 1
handlers:
  console:
    class : logging.StreamHandler
    formatter: default
    level   : INFO
formatters:
  default:
    format: '%(asctime)s %(levelname)-8s [worker %(worker)s] %(name)-15s %(message)s'
    datefmt: '%Y-%m-%d %H:%M:%S'
root:
  handlers:
    - console
'''
config = yaml.safe_load(io.StringIO(TYPICAL_LOGGING_CONFIG))
logging.config.dictConfig(config)

现在创建一个客户端,并开始将 root logger 从 worker 转发回我们的本地客户端进程。

>>> client = distributed.Client()
>>> client.forward_logging()  # forward the root logger at any handled level

然后提交一个任务,该任务在 worker 上进行一些错误日志记录。我们看到了客户端 StreamHandler 的输出。

>>> def do_error():
...     logging.getLogger("user.module").error("Hello error")
...     return 42
>>> client.submit(do_error).result()
2022-11-09 03:43:25 ERROR    [worker tcp://127.0.0.1:34783] user.module     Hello error
42

注意 dask 如何也向转发的 LogRecord 添加了属性 "worker",我们的自定义 formatter 使用了该属性。这对于准确识别哪个 worker 记录了错误很有用。

一个值得强调的细微之处:尽管我们的客户端 root logger 配置为 INFO 级别,但 worker 端的 root logger 仍然是默认的 ERROR 级别,因为我们没有在 worker 上进行任何显式日志配置。因此,worker 端的 INFO 日志将不会被转发,因为它们根本没有被处理。

>>> def do_info_1():
...     # no output on the client side
...     logging.getLogger("user.module").info("Hello info the first time")
...     return 84
>>> client.submit(do_info_1).result()
84

需要将客户端 logger 的级别设置为 INFO,info 消息才会被处理并转发到客户端。换句话说,客户端转发日志记录的“有效”级别是每个 logger 的客户端级别和 worker 级别中的最大值。

>>> def do_info_2():
...     logger = logging.getLogger("user.module")
...     logger.setLevel(logging.INFO)
...     # now produces output on the client side
...     logger.info("Hello info the second time")
...     return 84
>>> client.submit(do_info_2).result()
2022-11-09 03:57:39 INFO     [worker tcp://127.0.0.1:42815] user.module     Hello info the second time
84
futures_of(futures)[source]

futures_of 的封装方法

参数
futurestuple

Future

gather(futures, errors='raise', direct=None, asynchronous=None)[source]

从分布式内存中收集 future

接受 future、嵌套的 future 容器、迭代器或队列。返回类型将与输入类型匹配。

参数
futuresFuture 集合

这可以是一个可能嵌套的 Future 对象集合。集合可以是列表、集合或字典

errorsstring

如果 future 出错,我们应该引发异常,还是跳过它在输出集合中的包含,此处为“raise”或“skip”

directboolean

是否直接连接到 worker,或请求调度器充当中介。这也可以在创建 Client 时设置。

asynchronous: bool

如果为 True,客户端处于异步模式

返回值
results: 与输入类型相同的集合,但现在包含
收集到的结果而非 future

另请参见

Client.scatter

将数据发送到集群

示例

>>> from operator import add  
>>> c = Client('127.0.0.1:8787')  
>>> x = c.submit(add, 1, 2)  
>>> c.gather(x)  
3
>>> c.gather([x, [x], x])  # support lists and dicts 
[3, [3], 3]
get(dsk, keys, workers=None, allow_other_workers=None, resources=None, sync=True, asynchronous=None, direct=None, retries=None, priority=0, fifo_timeout='60s', actors=None, **kwargs)[source]

计算 dask 图

参数
dskdict
keysobject,或嵌套的对象列表
workersstring 或 string 的可迭代对象

可以在其上执行计算的 worker 地址或主机名集合。留空默认为所有 worker(常见情况)

allow_other_workersbool (默认为 False)

workers 一起使用。指示计算是否可以在不在 workers 集合中的 worker 上执行。

resourcesdict (默认为 {})

定义每个映射任务实例在 worker 上所需的 resources;例如 {'GPU': 2}。有关定义资源的详细信息,请参见 worker resources

syncbool (可选)

如果为 False,则返回 Futures;如果为 True(默认),则返回具体值。

asynchronous: bool

如果为 True,客户端处于异步模式

directbool

是否直接连接到 worker,或请求调度器充当中介。这也可以在创建 Client 时设置。

retriesint (默认为 0)

如果计算结果失败,允许的自动重试次数

priorityNumber

可选的任务优先级。零是默认值。更高的优先级具有优先权

fifo_timeouttimedelta str (默认为 ’60s’)

允许在调用之间考虑相同优先级的时间量

actorsbool 或 dict (默认为 None)

这些任务是否应作为有状态 actor 存在于 worker 上。指定为全局(True/False)或按任务({'x': True, 'y': False})进行。有关其他详细信息,请参见 Actors

返回值
结果

如果 ‘sync’ 为 True,返回结果。否则,返回打包的已知数据。如果 ‘sync’ 为 False,返回已知数据。否则,返回结果。

另请参见

Client.compute

计算异步集合

示例

>>> from operator import add  
>>> c = Client('127.0.0.1:8787')  
>>> c.get({'x': (add, 1, 2)}, 'x')  
3
get_dataset(name, default=_NoDefault.no_default, **kwargs)[source]

如果存在,从调度器获取命名数据集。如果不存在,返回默认值或引发 KeyError。

参数
namestr

要检索的数据集的名称

defaultstr

可选,默认不设置。如果设置,则如果名称不存在,不引发 KeyError,而是返回此默认值

kwargsdict

传递给 _get_dataset 的附加关键字参数

返回值
调度器中的数据集(如果存在)
get_events(topic: str | None = None)[source]

检索结构化的主题日志

参数
topicstr (可选)

要检索事件的主题日志名称。如果未提供 topic,则返回所有主题的日志。

get_executor(**kwargs)[source]

返回一个 concurrent.futures Executor,用于在此 Client 上提交任务

参数
**kwargs

任何与 submit() 或 map() 兼容的参数,例如 workersresources

返回值
ClientExecutor

一个与 concurrent.futures API 完全兼容的 Executor 对象。

get_metadata(keys, default=_NoDefault.no_default)[source]

从调度器获取任意元数据

有关包含示例的完整 docstring,请参阅 set_metadata

参数
keyskey 或 list

要访问的 key。如果为列表,则在嵌套集合内获取

default可选

如果 key 不存在,则返回此值。如果未提供,则如果 key 不存在,将引发 KeyError

另请参见

Client.set_metadata
get_scheduler_logs(n=None)[source]

从调度器获取日志

参数
nint

要检索的日志数量。默认最多 10000 条,可通过 distributed.admin.log-length 配置值进行配置。

返回值
日志按逆序排列(最新在前)
get_task_stream(start=None, stop=None, count=None, plot=False, filename='task-stream.html', bokeh_resources=None)[source]

从调度器获取任务流数据

这会收集仪表盘上诊断“Task Stream”图中存在的数据。它包括特定时长内每个任务的启动、停止、传输和反序列化时间。

请注意,任务流诊断默认不运行。您可能希望在开始工作之前调用此函数一次,以确保开始记录,然后在完成工作后再次调用。

参数
start数字或字符串

您何时想开始记录。如果是一个数字,应该是 time() 调用的结果。如果是一个字符串,则应该是一个与当前时间的差值,例如 '60s' 或 '500 ms'

stop数字或字符串

您何时想停止记录

countint

所需的记录数量,如果同时指定了 start 和 stop 则忽略

plotboolean, str

如果为 true,则也返回一个 Bokeh 图;如果 plot == ‘save’,则将图保存到文件

filenamestr (可选)

如果您设置 plot='save',要保存到的文件名。

bokeh_resourcesbokeh.resources.Resources (可选)

指定资源组件是 INLINE 还是 CDN

返回值
L: List[Dict]

另请参见

get_task_stream

此方法的上下文管理器版本

示例

>>> client.get_task_stream()  # prime plugin if not already connected
>>> x.compute()  # do some work
>>> client.get_task_stream()
[{'task': ...,
  'type': ...,
  'thread': ...,
  ...}]

传递 plot=Trueplot='save' 关键字以获取 Bokeh 图。

>>> data, figure = client.get_task_stream(plot='save', filename='myfile.html')

或者考虑使用上下文管理器

>>> from dask.distributed import get_task_stream
>>> with get_task_stream() as ts:
...     x.compute()
>>> ts.data
[...]
get_versions(check: bool = False, packages: collections.abc.Sequence[str] | None = None) distributed.client.VersionsDict | collections.abc.Coroutine[Any, Any, distributed.client.VersionsDict][source]

返回调度器、所有 worker 和自身的版本信息

参数
check

如果所有必需和可选的包不匹配,则抛出 ValueError

packages

要检查的额外包名称

示例

>>> c.get_versions()  
>>> c.get_versions(packages=['sklearn', 'geopandas'])  
get_worker_logs(n=None, workers=None, nanny=False)[source]

从 workers 获取日志

参数
nint

要检索的日志数量。默认最多 10000 条,可通过 distributed.admin.log-length 配置值进行配置。

workersiterable

要检索的工作节点地址列表。默认获取所有工作节点。

nannybool, 默认 False

是否从工作节点(False)或伴随进程(True)获取日志。如果指定此项,workers 中的地址仍应是工作节点地址,而不是伴随进程地址。

返回值
将工作节点地址映射到日志的字典。
日志按倒序返回(最新的在前)
has_what(workers=None, **kwargs)[source]

哪些 key 由哪些 worker 持有

这会返回每个工作节点内存中持有的数据的键。

参数
workerslist (可选)

工作节点地址列表,默认为所有工作节点

**kwargsdict

远程函数的可选关键字参数

示例

>>> x, y, z = c.map(inc, [1, 2, 3])  
>>> wait([x, y, z])  
>>> c.has_what()  
{'192.168.1.141:46784': ['inc-1c8dd6be1c21646c71f76c16d09304ea',
                         'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b',
                         'inc-1e297fc27658d7b67b3a758f16bcf47a']}
list_datasets(**kwargs)[source]

列出调度器上可用的命名数据集

log_event(topic: str | collections.abc.Collection[str], msg: Any)[source]

在给定主题下记录一个事件

参数
topicstr, list[str]

记录事件的主题名称。要在多个主题下记录同一事件,请传递主题名称列表。

msg

要记录的事件消息。注意,此消息必须是可 msgpack 序列化的。

示例

>>> from time import time
>>> client.log_event("current-time", time())
map(func: Callable, *iterables: collections.abc.Collection, key: str | list | None = None, workers: str | collections.abc.Iterable[str] | None = None, retries: int | None = None, resources: dict[str, Any] | None = None, priority: int = 0, allow_other_workers: bool = False, fifo_timeout: str = '100 ms', actor: bool =False, actors: bool =False, pure: bool =True, batch_size=None, **kwargs)[source]

将函数映射到一系列参数上

参数可以是普通对象或 Futures

参数
funccallable

要调度执行的可调用对象。如果 func 返回一个协程,它将在工作节点的主事件循环上运行。否则,func 将在工作节点的任务执行器池中运行(更多信息请参见 Worker.executors)。

iterablesIterables

要映射的类似列表的对象。它们的长度应相同。

keystr, list

如果为字符串,则作为任务名称的前缀。如果为列表,则为显式名称。

workersstring 或 string 的可迭代对象

可以在其上执行计算的 worker 主机名集合。留空默认为所有 worker(常见情况)

retriesint (默认为 0)

如果任务失败,允许的自动重试次数

resourcesdict (默认为 {})

定义每个映射任务实例在 worker 上所需的 resources;例如 {'GPU': 2}。有关定义资源的详细信息,请参见 worker resources

priorityNumber

可选的任务优先级。零是默认值。更高的优先级具有优先权

allow_other_workersbool (默认为 False)

workers 一起使用。指示计算是否可以在不在 workers 集合中的 worker 上执行。

fifo_timeoutstr timedelta (默认 ‘100ms’)

允许在调用之间考虑相同优先级的时间量

actorbool (默认 False)

这些任务是否应作为有状态的 actor 存在于工作节点上。更多详细信息请参见Actors

actorsbool (默认 False)

actor 的别名

purebool (默认为 True)

函数是否是纯函数。对于 np.random.random 等不纯函数,请设置 pure=False。注意,如果 actorpure kwargs 都设置为 True,则 pure 的值将恢复为 False,因为 actor 是有状态的。更多详细信息请参见Pure Functions by Default

batch_sizeint, 可选 (默认值:只有一个批次,其大小为整个可迭代对象)

将任务以(最多)batch_size 的批量提交给调度器。批量大小的权衡在于,大批量可以避免更多的每批量开销,但过大的批量可能需要很长时间提交,不合理地延迟集群开始处理。

**kwargsdict

发送给函数的额外关键字参数。大值将明确包含在任务图中。

返回值
Future 列表、迭代器或队列,取决于输入的类型。
输入。

另请参见

Client.submit

提交单个函数

注意事项

当前的任务图解析实现会搜索 key 的出现并将其替换为相应的 Future 结果。如果传递给任务的字符串参数与集群上已存在的某个 key 匹配,这可能导致意外替换。为避免这种情况,如果手动设置 key,则需要使用唯一值。请参阅 https://github.com/dask/dask/issues/9969 以跟踪解决此问题的进展。

示例

>>> L = client.map(func, sequence)  
nbytes(keys=None, summary=True, **kwargs)[source]

集群上每个 key 占用的字节数

这是通过 sys.getsizeof 衡量的,可能无法准确反映真实成本。

参数
keyslist (可选)

键列表,默认为所有键

summaryboolean, (可选)

将键汇总为键类型

**kwargsdict

远程函数的可选关键字参数

另请参见

Client.who_has

示例

>>> x, y, z = c.map(inc, [1, 2, 3])  
>>> c.nbytes(summary=False)  
{'inc-1c8dd6be1c21646c71f76c16d09304ea': 28,
 'inc-1e297fc27658d7b67b3a758f16bcf47a': 28,
 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b': 28}
>>> c.nbytes(summary=True)  
{'inc': 84}
ncores(workers=None, **kwargs)

每个 worker 节点上可用的线程/核心数

参数
workerslist (可选)

我们特别关心的工作节点列表。留空以接收所有工作节点的信息。

**kwargsdict

远程函数的可选关键字参数

示例

>>> c.nthreads()  
{'192.168.1.141:46784': 8,
 '192.167.1.142:47548': 8,
 '192.167.1.143:47329': 8,
 '192.167.1.144:37297': 8}
normalize_collection(collection)[source]

如果集合的任务已经存在 Future,则替换它们

这会将集合的任务图中的任务与调度器中已知的 Futures 进行规范化。它返回集合的副本,其任务图包含重叠的 Futures。

参数
collectiondask object

类似 dask.array、dataframe 或 dask.value 对象的集合

返回值
collectiondask object

集合,其任务已替换为任何现有 Futures。

另请参见

Client.persist

触发集合任务的计算

示例

>>> len(x.__dask_graph__())  # x is a dask collection with 100 tasks  
100
>>> set(client.futures).intersection(x.__dask_graph__())  # some overlap exists  
10
>>> x = client.normalize_collection(x)  
>>> len(x.__dask_graph__())  # smaller computational graph  
20
nthreads(workers=None, **kwargs)[source]

每个 worker 节点上可用的线程/核心数

参数
workerslist (可选)

我们特别关心的工作节点列表。留空以接收所有工作节点的信息。

**kwargsdict

远程函数的可选关键字参数

示例

>>> c.nthreads()  
{'192.168.1.141:46784': 8,
 '192.167.1.142:47548': 8,
 '192.167.1.143:47329': 8,
 '192.167.1.144:37297': 8}
persist(collections, optimize_graph=True, workers=None, allow_other_workers=None, resources=None, retries=None, priority=0, fifo_timeout='60s', actors=None, **kwargs)[source]

在集群上持久化 dask 集合

开始在集群后台计算集合。提供一个新的 dask 集合,它在语义上与之前的集合相同,但现在基于当前正在执行的 Futures。

参数
collections序列或单个 dask 对象

例如 dask.array 或 dataframe 或 dask.value 对象之类的集合

optimize_graphbool

是否优化底层图

workersstring 或 string 的可迭代对象

可以在其上执行计算的 worker 主机名集合。留空默认为所有 worker(常见情况)

allow_other_workersbool (默认为 False)

workers 一起使用。指示计算是否可以在不在 workers 集合中的 worker 上执行。

retriesint (默认为 0)

如果计算结果失败,允许的自动重试次数

priorityNumber

可选的任务优先级。零是默认值。更高的优先级具有优先权

fifo_timeouttimedelta str (默认为 ’60s’)

允许在调用之间考虑相同优先级的时间量

resourcesdict (默认为 {})

定义每个映射任务实例在 worker 上所需的 resources;例如 {'GPU': 2}。有关定义资源的详细信息,请参见 worker resources

actorsbool 或 dict (默认为 None)

这些任务是否应作为有状态 actor 存在于 worker 上。指定为全局(True/False)或按任务({'x': True, 'y': False})进行。有关其他详细信息,请参见 Actors

返回值
集合列表,或单个集合,取决于输入的类型。

另请参见

Client.compute

示例

>>> xx = client.persist(x)  
>>> xx, yy = client.persist([x, y])  
processing(workers=None)[source]

当前在每个 worker 上运行的任务

参数
workerslist (可选)

工作节点地址列表,默认为所有工作节点

示例

>>> x, y, z = c.map(inc, [1, 2, 3])  
>>> c.processing()  
{'192.168.1.141:46784': ['inc-1c8dd6be1c21646c71f76c16d09304ea',
                         'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b',
                         'inc-1e297fc27658d7b67b3a758f16bcf47a']}
profile(key=None, start=None, stop=None, workers=None, merge_workers=True, plot=False, filename=None, server=False, scheduler=False)[source]

收集有关近期工作的统计分析信息

参数
keystr

要选择的键前缀,这通常是函数名,如 ‘inc’。留空 None 以收集所有数据

start时间
stop时间
workerslist

限制性能分析信息的工作节点列表

serverbool

如果为 true,则返回工作节点的管理线程的性能分析,而不是工作线程的。这对于分析 Dask 本身而非用户代码非常有用。

schedulerbool

如果为 true,则返回调度器的管理线程的性能分析信息,而不是工作节点的。这对于分析 Dask 的调度本身非常有用。

plotboolean or string

是否返回绘图对象

filenamestr

保存绘图的文件名

示例

>>> client.profile()  # call on collections
>>> client.profile(filename='dask-profile.html')  # save to html file
publish_dataset(*args, **kwargs)[source]

向调度器发布命名数据集

这会在调度器上存储 dask 集合或 Future 列表的命名引用。其他客户端可以使用 get_dataset 下载这些集合或 Future。

数据集不会立即计算。您可能希望在发布数据集之前调用 Client.persist

参数
args要发布为名称的对象列表
kwargsdict

要在调度器上发布的命名集合

返回值
None

示例

发布客户端

>>> df = dd.read_csv('s3://...')  
>>> df = c.persist(df) 
>>> c.publish_dataset(my_dataset=df)  

另一种调用方式 >>> c.publish_dataset(df, name=’my_dataset’)

接收客户端

>>> c.list_datasets()  
['my_dataset']
>>> df2 = c.get_dataset('my_dataset')  
rebalance(futures=None, workers=None, **kwargs)[source]

在网络内重新平衡数据

在工作节点之间移动数据以大致平衡内存负担。这会影响键/工作节点的一个子集或整个网络,具体取决于关键字参数。

有关算法和配置选项的详细信息,请参阅对应的调度器端方法 rebalance()

警告

此操作通常未针对调度器的正常运行进行充分测试。不建议在等待计算时使用它。

参数
futureslist, 可选

要平衡的 Future 列表,默认为所有数据

workerslist, 可选

要在其上平衡的工作节点列表,默认为所有工作节点

**kwargsdict

函数的额外关键字参数

register_plugin(plugin: distributed.diagnostics.plugin.NannyPlugin | distributed.diagnostics.plugin.SchedulerPlugin | distributed.diagnostics.plugin.WorkerPlugin, name: str | None = None, idempotent: bool | None = None)[source]

注册一个插件。

请参阅 https://distributed.dask.org.cn/en/stable/plugins.html

参数
plugin

要注册的伴随进程、调度器或工作节点插件。

name

插件的名称;如果为 None,则从插件实例获取名称,如果不存在则自动生成。

idempotent

如果给定名称的插件已存在,则不再注册。如果为 None,则在定义时使用 plugin.idempotent,否则使用 False。

register_scheduler_plugin(plugin: distributed.diagnostics.plugin.SchedulerPlugin, name: str | None = None, idempotent: bool | None = None)[source]

注册一个调度器插件。

从版本 2023.9.2 开始已弃用: 请改用 Client.register_plugin()

请参阅 https://distributed.dask.org.cn/en/stable/plugins.html#scheduler-plugins

参数
pluginSchedulerPlugin

要传递给调度器的 SchedulerPlugin 实例。

namestr

插件的名称;如果为 None,则从插件实例获取名称,如果不存在则自动生成。

idempotentbool

如果给定名称的插件已存在,则不再注册。

register_worker_callbacks(setup=None)[source]

为所有当前和未来的 worker 注册一个 setup 回调函数。

这会为集群中的工作节点注册新的设置函数。该函数将立即在所有当前连接的工作节点上运行。它也将在未来添加的任何工作节点连接时运行。可以注册多个设置函数 - 这些函数将按添加顺序调用。

如果函数接受名为 dask_worker 的输入参数,则该变量将填充工作节点本身。

参数
setupcallable(dask_worker: Worker) -> None

要在所有工作节点上注册和运行的函数

register_worker_plugin(plugin: distributed.diagnostics.plugin.NannyPlugin | distributed.diagnostics.plugin.WorkerPlugin, name: str | None = None, nanny: bool | None = None)[source]

为所有当前和未来的 worker 注册一个生命周期 worker 插件。

从版本 2023.9.2 开始已弃用: 请改用 Client.register_plugin()

这会注册一个新对象,用于处理此集群中工作节点的设置、任务状态转换和拆卸。插件将在所有当前连接的工作节点上自行实例化。它也将在未来连接的任何工作节点上运行。

插件可以包含 setupteardowntransitionrelease_key 方法。有关接口和文档字符串,请参阅 dask.distributed.WorkerPlugin 类或下面的示例。它必须可以使用 pickle 或 cloudpickle 模块序列化。

如果插件具有 name 属性,或者使用了 name= 关键字,则这将控制幂等性。如果具有该名称的插件已注册,则会将其移除并替换为新插件。

对于插件的替代方案,您可能还希望查看预加载脚本。

参数
pluginWorkerPlugin or NannyPlugin

要注册的 WorkerPlugin 或 NannyPlugin 实例。

namestr, 可选

插件的名称。注册具有相同名称的插件将无效。如果插件没有名称属性,则使用随机名称。

nannybool, 可选

是否向工作节点或伴随进程注册插件。

另请参见

distributed.WorkerPlugin
unregister_worker_plugin

示例

>>> class MyPlugin(WorkerPlugin):
...     def __init__(self, *args, **kwargs):
...         pass  # the constructor is up to you
...     def setup(self, worker: dask.distributed.Worker):
...         pass
...     def teardown(self, worker: dask.distributed.Worker):
...         pass
...     def transition(self, key: str, start: str, finish: str,
...                    **kwargs):
...         pass
...     def release_key(self, key: str, state: str, cause: str | None, reason: None, report: bool):
...         pass
>>> plugin = MyPlugin(1, 2, 3)
>>> client.register_plugin(plugin)

您可以使用 get_worker 函数访问插件

>>> client.register_plugin(other_plugin, name='my-plugin')
>>> def f():
...    worker = get_worker()
...    plugin = worker.plugins['my-plugin']
...    return plugin.my_state
>>> future = client.run(f)
replicate(futures, n=None, workers=None, branching_factor=2, **kwargs)[source]

设置网络中 future 的复制

将数据复制到多个工作节点上。这有助于广播频繁访问的数据,并提高弹性。

这会在整个网络中对每块数据单独执行树状复制。此操作会阻塞直到完成。它不保证将数据复制到将来的工作节点。

注意

此方法与活动内存管理器 (Active Memory Manager) 的 ReduceReplicas 策略不兼容。如果您希望使用它,必须首先禁用该策略或完全禁用 AMM。

参数
futuresFuture 列表

我们要复制的 Futures

nint, 可选

要在集群上复制数据的进程数。默认为所有进程。

workers工作节点地址列表

我们希望限制复制到的工作节点。默认为所有工作节点。

branching_factorint, 可选

每一代可以复制数据的工作节点数

**kwargsdict

远程函数的可选关键字参数

另请参见

Client.rebalance

示例

>>> x = c.submit(func, *args)  
>>> c.replicate([x])  # send to all workers  
>>> c.replicate([x], n=3)  # send to three workers  
>>> c.replicate([x], workers=['alice', 'bob'])  # send to specific  
>>> c.replicate([x], n=1, workers=['alice', 'bob'])  # send to one of specific workers  
>>> c.replicate([x], n=1)  # reduce replications 
restart(timeout: typing.Union[str, int, float, typing.Literal[<no_default>]] = _NoDefault.no_default, wait_for_workers: bool = True)[source]

重启所有工作节点。重置本地状态。可选地等待工作节点返回。

没有伴随进程的工作节点会被关闭,期望外部部署系统会重启它们。因此,如果不使用伴随进程且您的部署系统不会自动重启工作节点,则 restart 只会关闭所有工作节点,然后超时!

restart 后,所有连接的工作节点都是新的,无论是否抛出 TimeoutError。任何未及时关闭的工作节点都会被移除,并且将来可能或可能不会自行关闭。

参数
timeout

如果 wait_for_workers 为 True,则等待工作节点关闭并重新连接的时长,否则仅等待工作节点关闭的时长。如果超出此时间,则抛出 asyncio.TimeoutError

wait_for_workers

是否等待所有工作节点重新连接,或者仅等待它们关闭(默认 True)。结合 Client.wait_for_workers() 使用 restart(wait_for_workers=False) 可以更精细地控制要等待的工作节点数量。

另请参见

Scheduler.restart
Client.restart_workers
restart_workers(workers: list[str], timeout: typing.Union[str, int, float, typing.Literal[<no_default>]] = _NoDefault.no_default, raise_for_error: bool = True)[source]

重启指定的 worker 集合

注意

只有由 distributed.Nanny 监控的工作节点才能重启。更多详细信息请参阅 Nanny.restart

参数
workerslist[str]

要重启的工作节点。这可以是工作节点地址列表、名称列表或两者皆有。

timeoutint | float | None

等待的秒数

raise_for_error: bool (默认 True)

如果在 timeout 内未能完成重启工作节点,或者因重启工作节点导致了其他异常时,是否抛出 TimeoutError

返回值
dict[str, “OK” | “removed” | “timed out”]

工作节点和重启状态的映射,键将与通过 workers 传入的原始值匹配。

另请参见

Client.restart

注意事项

此方法与 Client.restart() 的区别在于,此方法仅重启指定的工作节点集合,而 Client.restart 将重启所有工作节点并重置集群上的本地状态(例如释放所有键)。

此外,此方法不会优雅地处理在工作节点重启时正在执行的任务。这些任务可能会失败或其可疑计数会增加。

示例

您可以使用以下方法获取活动工作节点的信息

>>> workers = client.scheduler_info()['workers']

从该列表中,您可能需要选择一些工作节点进行重启

>>> client.restart_workers(workers=['tcp://address:port', ...])
retire_workers(workers: list[str] | None = None, close_workers: bool = True, **kwargs)[source]

在调度器上淘汰某些 worker

有关完整文档字符串,请参阅 distributed.Scheduler.retire_workers()

参数
workers
close_workers
**kwargsdict

远程函数的可选关键字参数

另请参见

dask.distributed.Scheduler.retire_workers

示例

您可以使用以下方法获取活动工作节点的信息

>>> workers = client.scheduler_info()['workers']

从该列表中,您可能需要选择一些工作节点进行关闭

>>> client.retire_workers(workers=['tcp://address:port', ...])
retry(futures, asynchronous=None)[source]

重试失败的 future

参数
futuresFuture 列表

Future 列表

asynchronous: bool

如果为 True,客户端处于异步模式

run(function, *args, workers: list[str] | None = None, wait: bool = True, nanny: bool = False, on_error: Literal['raise', 'return', 'ignore'] = 'raise', **kwargs)[source]

在任务调度系统之外在所有 worker 上运行函数

这会立即在所有当前已知的工作节点上调用函数,阻塞直到结果返回,并将结果作为以工作节点地址为键的字典异步返回。此方法通常用于副作用,例如收集诊断信息或安装库。

如果您的函数接受名为 dask_worker 的输入参数,则该变量将填充工作节点本身。

参数
functioncallable

要运行的函数

*argstuple

远程函数的可选参数

**kwargsdict

远程函数的可选关键字参数

workerslist

要在其上运行函数的工作节点。默认为所有已知工作节点。

waitboolean (可选)

如果函数是异步的,是否等待该函数完成。

nannybool, 默认 False

是否在伴随进程上运行 function。默认情况下,函数在工作节点进程上运行。如果指定此项,workers 中的地址仍应是工作节点地址,而不是伴随进程地址。

on_error: “raise” | “return” | “ignore”

如果函数在工作节点上抛出错误

raise

(默认) 在客户端上重新抛出异常。其他工作节点的输出将丢失。

return

返回 Exception 对象而不是工作节点的函数输出

ignore

忽略异常并从结果字典中移除工作节点

示例

>>> c.run(os.getpid)  
{'192.168.0.100:9000': 1234,
 '192.168.0.101:9000': 4321,
 '192.168.0.102:9000': 5555}

使用 workers= 关键字参数限制计算到特定的工作节点。

>>> c.run(os.getpid, workers=['192.168.0.100:9000',
...                           '192.168.0.101:9000'])  
{'192.168.0.100:9000': 1234,
 '192.168.0.101:9000': 4321}
>>> def get_status(dask_worker):
...     return dask_worker.status
>>> c.run(get_status)  
{'192.168.0.100:9000': 'running',
 '192.168.0.101:9000': 'running}

在后台运行异步函数

>>> async def print_state(dask_worker):  
...    while True:
...        print(dask_worker.status)
...        await asyncio.sleep(1)
>>> c.run(print_state, wait=False)  
run_on_scheduler(function, *args, **kwargs)[source]

在调度器进程上运行函数

这通常用于实时调试。函数应接受关键字参数 dask_scheduler=,该参数将填充调度器对象本身。

参数
functioncallable

要在调度器进程上运行的函数

*argstuple

函数的可选参数

**kwargsdict

函数的额外关键字参数

另请参见

Client.run

在所有工作节点上运行函数

示例

>>> def get_number_of_tasks(dask_scheduler=None):
...     return len(dask_scheduler.tasks)
>>> client.run_on_scheduler(get_number_of_tasks)  
100

在后台运行异步函数

>>> async def print_state(dask_scheduler):  
...    while True:
...        print(dask_scheduler.status)
...        await asyncio.sleep(1)
>>> c.run(print_state, wait=False)  
scatter(data, workers=None, broadcast=False, direct=None, hash=True, timeout=_NoDefault.no_default, asynchronous=None)[source]

将数据分散到分布式内存中

这将数据从本地客户端进程移动到分布式调度器的工作节点中。请注意,通常最好提交作业到您的工作节点以让它们加载数据,而不是在本地加载数据然后再分发给它们。

参数
datalist, dict, or object

要分发到工作节点的数据。输出类型与输入类型匹配。

workerstuple 列表 (可选)

可选地限制数据的位置。将工作节点指定为主机名/端口对,例如 ('127.0.0.1', 8787)

broadcastbool (默认为 False)

是否将每个数据元素发送到所有工作节点。默认情况下,我们根据核心数进行轮询。

注意

将此标志设置为 True 与活动内存管理器 (Active Memory Manager) 的 ReduceReplicas 策略不兼容。如果您希望使用它,必须首先禁用该策略或完全禁用 AMM。

directbool (默认为自动检查)

是否直接连接到 worker,或请求调度器充当中介。这也可以在创建 Client 时设置。

hashbool (可选)

是否对数据进行哈希以确定键。如果为 False,则使用随机键

timeoutnumber, 可选

超时秒数,之后将引发 dask.distributed.TimeoutError

asynchronous: bool

如果为 True,客户端处于异步模式

返回值
Future 列表、字典、迭代器或队列,类型与输入匹配。

另请参见

Client.gather

将数据收集回本地进程

注意事项

分发字典使用 dict 键创建 Future 键。当前的任务图解析实现会搜索 key 的出现并将其替换为相应的 Future 结果。如果传递给任务的字符串参数与集群上已存在的某个 key 匹配,这可能导致意外替换。为避免这种情况,如果手动设置 key,则需要使用唯一值。请参阅 https://github.com/dask/dask/issues/9969 以跟踪解决此问题的进展。

示例

>>> c = Client('127.0.0.1:8787')  
>>> c.scatter(1) 
<Future: status: finished, key: c0a8a20f903a4915b94db8de3ea63195>
>>> c.scatter([1, 2, 3])  
[<Future: status: finished, key: c0a8a20f903a4915b94db8de3ea63195>,
 <Future: status: finished, key: 58e78e1b34eb49a68c65b54815d1b158>,
 <Future: status: finished, key: d3395e15f605bc35ab1bac6341a285e2>]
>>> c.scatter({'x': 1, 'y': 2, 'z': 3})  
{'x': <Future: status: finished, key: x>,
 'y': <Future: status: finished, key: y>,
 'z': <Future: status: finished, key: z>}

将数据位置限制到工作节点的子集

>>> c.scatter([1, 2, 3], workers=[('hostname', 8788)])   

将数据广播到所有工作节点

>>> [future] = c.scatter([element], broadcast=True)  

使用客户端 Futures 接口将分发的数据发送到并行化函数

>>> data = c.scatter(data, broadcast=True)  
>>> res = [c.submit(func, data, i) for i in range(100)]
scheduler_info(n_workers: int = 5, **kwargs: Any) distributed.objects.SchedulerInfo[source]

关于集群中 worker 的基本信息

参数
n_workers: int

要获取信息的工作节点数量。要获取所有,请使用 -1。

**kwargsdict

远程函数的可选关键字参数

示例

>>> c.scheduler_info()  
{'id': '2de2b6da-69ee-11e6-ab6a-e82aea155996',
 'services': {},
 'type': 'Scheduler',
 'workers': {'127.0.0.1:40575': {'active': 0,
                                 'last-seen': 1472038237.4845693,
                                 'name': '127.0.0.1:40575',
                                 'services': {},
                                 'stored': 0,
                                 'time-delay': 0.0061032772064208984}}}
set_metadata(key, value)[source]

在调度器中设置任意元数据

这允许您在中央调度器进程上存储少量数据用于管理目的。数据应是可 msgpack 序列化的(整数、字符串、列表、字典)。

如果键对应于某个任务,则当调度器忘记该任务时,该键将被清理。

如果键是列表,则假定您希望使用这些键索引到嵌套的字典结构中。例如,如果您调用以下内容

>>> client.set_metadata(['a', 'b', 'c'], 123)

则这与设置以下内容相同

>>> scheduler.task_metadata['a']['b']['c'] = 123

较低级别的字典将按需创建。

另请参见

get_metadata

示例

>>> client.set_metadata('x', 123)  
>>> client.get_metadata('x')  
123
>>> client.set_metadata(['x', 'y'], 123)  
>>> client.get_metadata('x')  
{'y': 123}
>>> client.set_metadata(['x', 'w', 'z'], 456)  
>>> client.get_metadata('x')  
{'y': 123, 'w': {'z': 456}}
>>> client.get_metadata(['x', 'w'])  
{'z': 456}
shutdown()[source]

关闭连接的调度器和 worker

请注意,这可能会中断使用相同调度器和工作节点的其他客户端。

另请参见

Client.close

仅关闭此客户端

start(**kwargs)[source]

在单独线程中启动调度器运行

story(*keys_or_stimuli, on_error=None)[source]

返回给定键或 stimulus_id 的集群范围事件链

submit(func, *args, key=None, workers=None, resources=None, retries=None, priority=0, fifo_timeout='100 ms', allow_other_workers=False, actor=False, actors=False, pure=True, **kwargs)[source]

向调度器提交函数应用

参数
funccallable

要调度为 func(*args **kwargs) 的可调用对象。如果 func 返回协程,它将在工作节点的主事件循环上运行。否则,func 将在工作节点的任务执行器池中运行(更多信息请参见 Worker.executors)。

*argstuple

可选位置参数

keystr

任务的唯一标识符。默认为函数名和哈希值

workersstring 或 string 的可迭代对象

可以在其上执行计算的 worker 地址或主机名集合。留空默认为所有 worker(常见情况)

resourcesdict (默认为 {})

定义每个映射任务实例在 worker 上所需的 resources;例如 {'GPU': 2}。有关定义资源的详细信息,请参见 worker resources

retriesint (默认为 0)

如果任务失败,允许的自动重试次数

priorityNumber

可选的任务优先级。零是默认值。更高的优先级具有优先权

fifo_timeoutstr timedelta (默认 ‘100ms’)

允许在调用之间考虑相同优先级的时间量

allow_other_workersbool (默认为 False)

workers 一起使用。指示计算是否可以在不在 workers 集合中的 worker 上执行。

actorbool (默认 False)

此任务是否应作为有状态的 actor 存在于工作节点上。更多详细信息请参见Actors

actorsbool (默认 False)

actor 的别名

purebool (默认为 True)

函数是否是纯函数。对于 np.random.random 等不纯函数,请设置 pure=False。注意,如果 actorpure kwargs 都设置为 True,则 pure 的值将恢复为 False,因为 actor 是有状态的。更多详细信息请参见Pure Functions by Default

**kwargs
返回值
Future

如果在异步模式下运行,则返回 Future。否则返回具体值

引发
TypeError

如果 ‘func’ 不可调用,则抛出 TypeError

ValueError

如果 ‘allow_other_workers’ 为 True 且 ‘workers’ 为 None,则抛出 ValueError

另请参见

Client.map

一次提交多个参数

注意事项

当前的任务图解析实现会搜索 key 的出现并将其替换为相应的 Future 结果。如果传递给任务的字符串参数与集群上已存在的某个 key 匹配,这可能导致意外替换。为避免这种情况,如果手动设置 key,则需要使用唯一值。请参阅 https://github.com/dask/dask/issues/9969 以跟踪解决此问题的进展。

示例

>>> c = client.submit(add, a, b)  
subscribe_topic(topic, handler)[source]

订阅主题并为每个接收到的事件执行处理程序

参数
topic: str

主题名称

handler: callable or coroutine function

每次收到事件时调用的处理函数。处理函数必须接受一个参数 event,它是一个元组 (timestamp, msg),其中 timestamp 指的是调度器上的时钟。

另请参见

dask.distributed.Client.unsubscribe_topic
dask.distributed.Client.get_events
dask.distributed.Client.log_event

示例

>>> import logging
>>> logger = logging.getLogger("myLogger")  # Log config not shown
>>> client.subscribe_topic("topic-name", lambda: logger.info)
unforward_logging(logger_name=None)[source]

停止将给定 logger(默认为 root)从 worker 任务转发到客户端进程。

unpublish_dataset(name, **kwargs)[source]

从调度器移除命名数据集

参数
namestr

要取消发布的数据集的名称

另请参见

Client.publish_dataset

示例

>>> c.list_datasets()  
['my_dataset']
>>> c.unpublish_dataset('my_dataset')  
>>> c.list_datasets()  
[]
unregister_scheduler_plugin(name)[source]

注销一个调度器插件

请参阅 https://distributed.dask.org.cn/en/stable/plugins.html#scheduler-plugins

参数
namestr

要取消注册的插件名称。更多信息请参见 Client.register_scheduler_plugin() 文档字符串。

示例

>>> class MyPlugin(SchedulerPlugin):
...     def __init__(self, *args, **kwargs):
...         pass  # the constructor is up to you
...     async def start(self, scheduler: Scheduler) -> None:
...         pass
...     async def before_close(self) -> None:
...         pass
...     async def close(self) -> None:
...         pass
...     def restart(self, scheduler: Scheduler) -> None:
...         pass
>>> plugin = MyPlugin(1, 2, 3)
>>> client.register_plugin(plugin, name='foo')
>>> client.unregister_scheduler_plugin(name='foo')
unregister_worker_plugin(name, nanny=None)[source]

注销一个生命周期 worker 插件

这会取消注册现有的工作节点插件。作为取消注册过程的一部分,将调用插件的 teardown 方法。

参数
namestr

要取消注册的插件名称。更多信息请参见 Client.register_plugin() 文档字符串。

另请参见

register_plugin

示例

>>> class MyPlugin(WorkerPlugin):
...     def __init__(self, *args, **kwargs):
...         pass  # the constructor is up to you
...     def setup(self, worker: dask.distributed.Worker):
...         pass
...     def teardown(self, worker: dask.distributed.Worker):
...         pass
...     def transition(self, key: str, start: str, finish: str, **kwargs):
...         pass
...     def release_key(self, key: str, state: str, cause: str | None, reason: None, report: bool):
...         pass
>>> plugin = MyPlugin(1, 2, 3)
>>> client.register_plugin(plugin, name='foo')
>>> client.unregister_worker_plugin(name='foo')
unsubscribe_topic(topic)[source]

取消订阅主题并移除事件处理程序

另请参见

dask.distributed.Client.subscribe_topic
dask.distributed.Client.get_events
dask.distributed.Client.log_event
upload_file(filename, load: bool = True)[source]

将本地包上传到调度器和 worker

这将本地文件发送到调度器和所有工作节点。此文件会放置在每个节点的工作目录中,请参阅配置选项 temporary-directory(默认为 tempfile.gettempdir())。

此目录将添加到 Python 的系统路径中,因此任何 .py.egg.zip 文件都将可导入。

参数
filenamestring

要发送到工作节点的 .py.egg.zip 文件的文件名

loadbool, 可选

是否将模块作为上传过程的一部分导入。默认为 True

示例

>>> client.upload_file('mylibrary.egg')  
>>> from mylibrary import myfunc  
>>> L = client.map(myfunc, seq)  
>>>
>>> # Where did that file go? Use `dask_worker.local_directory`.
>>> def where_is_mylibrary(dask_worker):
>>>     path = pathlib.Path(dask_worker.local_directory) / 'mylibrary.egg'
>>>     assert path.exists()
>>>     return str(path)
>>>
>>> client.run(where_is_mylibrary)  
wait_for_workers(n_workers: int, timeout: float | None = None) None[source]

阻塞调用,等待 n 个 worker 后继续

参数
n_workersint

工作节点的数量。

timeoutnumber, 可选

超时秒数,之后将引发 dask.distributed.TimeoutError

who_has(futures=None, **kwargs)[source]

存储每个 Future 数据的那些工作节点。

参数
futureslist (可选)

一个 Future 列表,默认为所有数据。

**kwargsdict

远程函数的可选关键字参数

示例

>>> x, y, z = c.map(inc, [1, 2, 3])  
>>> wait([x, y, z])  
>>> c.who_has()  
{'inc-1c8dd6be1c21646c71f76c16d09304ea': ['192.168.1.141:46784'],
 'inc-1e297fc27658d7b67b3a758f16bcf47a': ['192.168.1.141:46784'],
 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b': ['192.168.1.141:46784']}
>>> c.who_has([x, y])  
{'inc-1c8dd6be1c21646c71f76c16d09304ea': ['192.168.1.141:46784'],
 'inc-1e297fc27658d7b67b3a758f16bcf47a': ['192.168.1.141:46784']}
write_scheduler_file(scheduler_file)[source]

将调度器信息写入 json 文件。

这有助于使用文件系统轻松共享调度器信息。调度器文件可用于实例化使用同一调度器的第二个客户端。

参数
scheduler_filestr

写入调度器文件的路径。

示例

>>> client = Client()  
>>> client.write_scheduler_file('scheduler.json')  
# connect to previous client's scheduler
>>> client2 = Client(scheduler_file='scheduler.json')  
class distributed.recreate_tasks.ReplayTaskClient(client)[source]

客户端插件,允许在本地重放远程任务。

为给定的客户端添加以下方法。

  • recreate_error_locally:用于重放失败任务的主要用户方法。

  • recreate_task_locally:用于重放任何任务的主要用户方法。

recreate_error_locally(future)[source]

对于失败的计算,在本地执行导致错误的任务以进行调试。

如果堆栈跟踪信息不足以诊断问题,则在 future(gathercompute 等函数的结果)返回“error”状态后应执行此操作。负责错误的特定任务(图中指向 future 的部分)将连同其输入值一起从调度器中获取。然后将执行该函数,以便可以使用 pdb 进行调试。

参数
future失败的 Future 或集合。

与传递给 gather 的内容相同,但返回异常/堆栈跟踪。也可以是包含任何错误 Future 的(持久化)dask 集合。

返回值
无;函数运行并应引发异常,以便
调试器运行。

示例

>>> future = c.submit(div, 1, 0)         
>>> future.status                        
'error'
>>> c.recreate_error_locally(future)     
ZeroDivisionError: division by zero

如果您在 IPython 中,可以趁此机会使用 pdb

>>> %pdb                                 
Automatic pdb calling has been turned ON
>>> c.recreate_error_locally(future)     
ZeroDivisionError: division by zero
      1 def div(x, y):
----> 2     return x / y
ipdb>
recreate_task_locally(future)[source]

对于任何计算,无论成功或失败,都在本地执行任务以进行调试。

在 future(gathercompute 等函数的结果)返回“pending”以外的状态后,应执行此操作。需要调试成功完成的 future 的情况可能包括返回意外结果的计算。常见的调试过程可能包括在调试模式下使用 pdb.runcall 在本地运行任务。

参数
futurefuture

与传递给 gather 的内容相同。

返回值
任何;将返回任务 Future 的结果。

示例

>>> import pdb                                    
>>> future = c.submit(div, 1, 1)                  
>>> future.status                                 
'finished'
>>> pdb.runcall(c.recreate_task_locally, future)  

Future

class distributed.Future(key, client=None, state=None, _id=None)[source]

远程运行的计算

Future 是远程工作节点上运行的结果的本地代理。用户在本地 Python 进程中管理 Future 对象,以决定大型集群中发生的事情。

注意

用户不应手动实例化 Future。这可能导致状态损坏和集群死锁。

参数
key: str 或 tuple

此 Future 引用的远程数据的键。

client: Client

应拥有此 Future 的客户端。默认为 _get_global_client()。

inform: bool

我们是否通知调度器需要更新此 Future 的状态。

state: FutureState

Future 的状态。

另请参见

Client

创建 Future。

示例

Future 通常由客户端计算产生。

>>> my_future = client.submit(add, 1, 2)  

我们可以跟踪 Future 的进度和结果。

>>> my_future  
<Future: status: finished, key: add-8f6e709446674bad78ea8aeecfee188e>

我们可以从 Future 中获取结果或异常和堆栈跟踪。

>>> my_future.result()  
add_done_callback(fn)[source]

当 future 完成时,在 future 上调用回调

回调函数 fn 应将 Future 作为其唯一参数。无论 Future 是成功完成、出错还是被取消,都将调用此函数。

回调函数在单独的线程中执行。

参数
fn可调用对象。

要调用的方法或函数。

cancel(reason=None, msg=None, **kwargs)[source]

取消运行此 future 的请求

另请参见

Client.cancel
cancelled()[source]

如果 future 已被取消,则返回 True

返回值
布尔值

如果 Future 已“取消”,则为 True,否则为 False。

done()[source]

返回计算是否完成。

返回值
布尔值

如果计算已完成,则为 True,否则为 False。

exception(timeout=None, **kwargs)[source]

返回失败任务的异常

参数
timeoutnumber, 可选

超时秒数,之后将引发 dask.distributed.TimeoutError

**kwargsdict

函数的额外关键字参数

返回值
异常

抛出的异常。如果在返回前经过了 *timeout* 秒,则抛出 dask.distributed.TimeoutError

另请参见

Future.traceback
property executor

返回执行器,即客户端。

返回值
Client

执行器。

release()[source]

注意事项

此方法可以从不同的线程调用(例如参见 Client.get() 或 Future.__del__())。

result(timeout=None)[source]

等待计算完成,将结果收集到本地进程。

参数
timeoutnumber, 可选

超时秒数,之后将引发 dask.distributed.TimeoutError

返回值
结果。

计算结果。如果客户端是异步的,则为协程。

引发
dask.distributed.TimeoutError

如果在返回前经过了 *timeout* 秒,则抛出 dask.distributed.TimeoutError

retry(**kwargs)[source]

如果此 future 失败则重试

另请参见

Client.retry
property status

返回状态。

返回值
str

状态。

traceback(timeout=None, **kwargs)[source]

返回失败任务的回溯

这将返回一个堆栈跟踪对象。您可以使用 traceback 模块检查此对象。或者,如果您调用 future.result(),此堆栈跟踪将伴随抛出的异常。

参数
timeoutnumber, 可选

在经过指定秒数后抛出 dask.distributed.TimeoutError。如果在返回前经过了 *timeout* 秒,则抛出 dask.distributed.TimeoutError

返回值
堆栈跟踪。

堆栈跟踪对象。如果客户端是异步的,则为协程。

另请参见

Future.exception

示例

>>> import traceback  
>>> tb = future.traceback()  
>>> traceback.format_tb(tb)  
[...]
property type

返回类型。

同步

class distributed.Event(name=None, client=None)[source]

分布式集中式 Event,相当于 asyncio.Event

事件存储一个标志,该标志在开始时设置为 False。可以使用 set() 调用将其设置为 True,或使用 clear() 调用将其设置回 False。每次调用 wait() 都会阻塞,直到事件标志设置为 True。

参数
name: string (可选)

事件的名称。选择相同的名称允许两个不相关的进程协调一个事件。如果未给出,将生成一个随机名称。

client: Client(可选)

用于与调度器通信的客户端。如果未给出,将使用默认的全局客户端。

示例

>>> event_1 = Event('a')  
>>> event_1.wait(timeout=1)  
>>> # in another process
>>> event_2 = Event('a')  
>>> event_2.set() 
>>> # now event_1 will stop waiting
clear()[source]

清除事件(将其标志设置为 False)。

现在所有等待者都将阻塞。

is_set()[source]

检查事件是否已设置。

set()[source]

设置事件(将其标志设置为 True)。

现在所有等待者都将被释放。

wait(timeout=None)[source]

等待直到事件被设置。

参数
timeout数字、字符串或 timedelta,可选。

在调度器中等待事件的秒数。这不包括本地协程时间、网络传输时间等。除了秒数之外,还可以指定字符串格式的 timedelta,例如“200ms”。

返回值
如果事件已设置则为 True,如果发生超时则为 False。

示例

>>> event = Event('a')  
>>> event.wait(timeout="1s")  
class distributed.Lock(name=None, client=<object object>, scheduler_rpc=None, loop=None)[source]

分布式集中式 Lock

警告

此实现使用 distributed.Semaphore 作为后端,该后端容易受到租约超额预订的影响。对于锁而言,这意味着如果租约超时,则两个或多个实例可能同时获取锁。要禁用租约超时,请将 distributed.scheduler.locks.lease-timeout 设置为 inf,例如。

with dask.config.set({"distributed.scheduler.locks.lease-timeout": "inf"}):
    lock = Lock("x")
    ...

请注意,如果没有租约超时,在集群缩容或工作节点失败的情况下,锁可能会死锁。

参数
name: string (可选)

要获取的锁的名称。选择相同的名称允许两个不相关的进程协调一个锁。如果未给出,将生成一个随机名称。

client: Client(可选)

用于与调度器通信的客户端。如果未给出,将使用默认的全局客户端。

示例

>>> lock = Lock('x')  
>>> lock.acquire(timeout=1)  
>>> # do things with protected resource
>>> lock.release()  
acquire(blocking=True, timeout=None)[source]

获取锁。

参数
blocking布尔值,可选。

如果为 False,则完全不在调度器中等待锁。

timeout字符串、数字或 timedelta,可选。

在调度器中等待锁的秒数。这不包括本地协程时间、网络传输时间等。当 blocking 为 False 时禁止指定超时。除了秒数之外,还可以指定字符串格式的 timedelta,例如“200ms”。

返回值
是否成功获取锁的布尔值(True 或 False)。

示例

>>> lock = Lock('x')  
>>> lock.acquire(timeout="1s")  
class distributed.MultiLock(names: list[str] | None = None, client: distributed.client.Client | None = None)[source]

分布式集中式 Lock

参数
names

要获取的锁的名称。选择相同的名称允许两个不相关的进程协调一个锁。

client

用于与调度器通信的客户端。如果未给出,将使用默认的全局客户端。

示例

>>> lock = MultiLock(['x', 'y'])  
>>> lock.acquire(timeout=1)  
>>> # do things with protected resource 'x' and 'y'
>>> lock.release()  
acquire(blocking=True, timeout=None, num_locks=None)[source]

获取锁。

参数
blocking布尔值,可选。

如果为 False,则完全不在调度器中等待锁。

timeout字符串、数字或 timedelta,可选。

在调度器中等待锁的秒数。这不包括本地协程时间、网络传输时间等。当 blocking 为 False 时禁止指定超时。除了秒数之外,还可以指定字符串格式的 timedelta,例如“200ms”。

num_locks整数,可选。

所需锁的数量。如果为 None,则需要所有锁。

返回值
是否成功获取锁的布尔值(True 或 False)。

示例

>>> lock = MultiLock(['x', 'y'])  
>>> lock.acquire(timeout="1s")  
release()[source]

如果已获取,则释放锁。

class distributed.Semaphore(max_leases=1, name=None, scheduler_rpc=None, loop=None)[source]

信号量将在调度器上跟踪租约,此类实例可以获取和释放这些租约。如果已获取最大数量的租约,则无法获取更多,调用者将等待直到另一个租约被释放。

租约的生命周期通过超时进行控制。此超时由该实例的 Client 定期刷新,并在工作节点失败的情况下提供死锁或资源饥饿保护。超时可以使用配置选项 distributed.scheduler.locks.lease-timeout 进行控制,调度器验证超时的间隔使用选项 distributed.scheduler.locks.lease-validation-interval 进行设置。

与 Python 标准库的信号量的一个显著区别是,此实现不允许释放次数多于获取次数。如果发生这种情况,将发出警告,但内部状态不会被修改。

警告

在发生租约超时的情况下,此实现容易受到租约超额预订的影响。建议监控日志信息,并根据用户应用程序调整上述配置选项为合适的值。

参数
max_leases: int(可选)

可以同时授予的最大租约数量。这有效地设置了对特定资源并行访问数量的上限。默认为 1。

name: string (可选)

要获取的信号量的名称。选择相同的名称允许两个不相关的进程进行协调。如果未给出,将生成一个随机名称。

register: bool

如果为 True,则向调度器注册信号量。在获取任何租约之前必须完成此操作。如果初始化时未完成,也可以通过调用此类的 register 方法来完成。注册时,需要等待。

scheduler_rpc: ConnectionPool

用于连接调度器的 ConnectionPool。如果提供 None,则使用工作节点或客户端连接池。此参数主要用于测试。

loop: IOLoop

此实例正在使用的事件循环。如果提供 None,则重用活动工作节点或客户端的循环。

注意事项

如果客户端尝试释放信号量但未获取租约,将引发异常。

dask 默认假定函数是纯函数来执行它们,当在此类函数内部使用信号量的获取/释放时,必须注意实际上 *存在* 副作用,因此,该函数不能再被认为是纯函数。如果不考虑这一点,可能会导致意外行为。

示例

>>> from distributed import Semaphore
... sem = Semaphore(max_leases=2, name='my_database')
...
... def access_resource(s, sem):
...     # This automatically acquires a lease from the semaphore (if available) which will be
...     # released when leaving the context manager.
...     with sem:
...         pass
...
... futures = client.map(access_resource, range(10), sem=sem)
... client.gather(futures)
... # Once done, close the semaphore to clean up the state on scheduler side.
... sem.close()
acquire(timeout=None)[source]

获取信号量。

如果内部计数器大于零,则将其减一并立即返回 True。如果为零,则等待直到调用 release() 并返回 True。

参数
timeout数字、字符串或 timedelta,可选。

等待获取信号量的秒数。这不包括本地协程时间、网络传输时间等。除了秒数之外,还可以指定字符串格式的 timedelta,例如“200ms”。

get_value()[source]

返回当前注册的租约数量。

release()[source]

释放信号量。

返回值
布尔值

此值指示租约是否立即释放。请注意,用户 *不应* 重试此操作。在某些情况下(例如调度器过载),租约可能不会立即释放,但它将始终在使用“distributed.scheduler.locks.lease-validation-interval”和“distributed.scheduler.locks.lease-timeout”配置的特定间隔后自动释放。

class distributed.Queue(name=None, client=None, maxsize=0)[source]

分布式 Queue

这允许多个客户端使用多生产者/多消费者队列相互共享 Future 或少量数据。所有元数据都通过调度器进行序列化。

队列的元素必须是 Future 或 msgpack 可编码数据(整数、字符串、列表、字典)。所有数据都通过调度器发送,因此发送大型对象是不明智的。要共享大型对象,请分散数据并改为共享 Future。

警告

此对象是实验性的。

参数
name: string (可选)

其他客户端和调度器用于标识队列的名称。如果未给出,将生成一个随机名称。

client: Client(可选)

用于与调度器通信的客户端。如果未给出,将使用默认的全局客户端。

maxsize: int(可选)

队列中允许的最大项目数。如果为 0(默认值),则队列大小不受限制。

另请参见

Variable

客户端之间共享的变量。

示例

>>> from dask.distributed import Client, Queue  
>>> client = Client()  
>>> queue = Queue('x')  
>>> future = client.submit(f, x)  
>>> queue.put(future)  
get(timeout=None, batch=False, **kwargs)[source]

从队列中获取数据。

参数
timeout数字、字符串或 timedelta,可选。

超时前等待的秒数。除了秒数之外,还可以指定字符串格式的 timedelta,例如“200ms”。

batch布尔值,整数(可选)。

如果为 True,则返回当前在队列中等待的所有元素。如果是整数,则返回队列中的该数量元素。如果为 False(默认值),则一次返回一个项目。

put(value, timeout=None, **kwargs)[source]

将数据放入队列。

参数
timeout数字、字符串或 timedelta,可选。

超时前等待的秒数。除了秒数之外,还可以指定字符串格式的 timedelta,例如“200ms”。

qsize(**kwargs)[source]

队列中当前元素的数量。

class distributed.Variable(name=None, client=None)[source]

分布式全局 Variable

这允许多个客户端使用单个可变变量相互共享 Future 和数据。所有元数据都通过调度器进行序列化。可能发生竞态条件。

值必须是 Future 或 msgpack 可编码数据(整数、列表、字符串等)。所有数据都将保存在调度器并通过其发送,因此不宜发送太多数据。如果要共享大量数据,请 scatter 它并改为共享 Future。

参数
name: string (可选)

其他客户端和调度器用于标识变量的名称。如果未给出,将生成一个随机名称。

client: Client(可选)

用于与调度器通信的客户端。如果未给出,将使用默认的全局客户端。

另请参见

Queue

客户端之间共享的多生产者/多消费者队列。

示例

>>> from dask.distributed import Client, Variable 
>>> client = Client()  
>>> x = Variable('x')  
>>> x.set(123)  # docttest: +SKIP
>>> x.get()  # docttest: +SKIP
123
>>> future = client.submit(f, x)  
>>> x.set(future)  
delete()[source]

删除此变量。

注意,这会影响所有当前指向此变量的客户端。

get(timeout=None, **kwargs)[source]

获取此变量的值。

参数
timeout数字、字符串或 timedelta,可选。

超时前等待的秒数。除了秒数之外,还可以指定字符串格式的 timedelta,例如“200ms”。

set(value, timeout='30 s', **kwargs)[source]

设置此变量的值。

参数
valueFuture 或对象。

必须是 Future 或 msgpack 可编码的值。

集群

与集群创建和管理相关的类。其他库(如 dask-jobqueuedask-gatewaydask-kubernetesdask-yarn 等)提供额外的集群对象。

LocalCluster([名称, n_workers, ...])

创建本地调度器和工作节点。

SpecCluster([工作节点, 调度器, 工作节点, ...])

需要工作节点完整规格说明的集群。

class distributed.LocalCluster(name=None, n_workers=None, threads_per_worker=None, processes=None, loop=None, start=None, host=None, ip=None, scheduler_port=0, silence_logs=30, dashboard_address=':8787', worker_dashboard_address=None, diagnostics_port=None, services=None, worker_services=None, service_kwargs=None, asynchronous=False, security=None, protocol=None, blocked_handlers=None, interface=None, worker_class=None, scheduler_kwargs=None, scheduler_sync_interval=1, **worker_kwargs)[source]

创建本地调度器和工作节点。

这将在本地机器上创建一个由调度器和工作节点组成的“集群”。

参数
n_workers: int

要启动的工作节点数量。

memory_limit: str, float, int, 或 None, 默认为 “auto”

设置*每个工作节点*的内存限制。

参数数据类型注意事项

  • 如果为 None 或 0,则不应用限制。

  • 如果为“auto”,则总系统内存将在工作节点之间平均分配。

  • 如果为浮点数,则该比例的系统内存用于*每个工作节点*。

  • 如果是表示字节数的字符串(如 "1GiB"),则该数量用于*每个工作节点*。

  • 如果为整数,则该字节数用于*每个工作节点*。

请注意,仅当 processes=True 时才会强制执行此限制,且此限制仅以尽力而为的方式强制执行——工作节点仍然可能超过此限制。

processes: bool

是使用进程(True)还是线程(False)。默认为 True,除非 worker_class=Worker,此时默认为 False。

threads_per_worker: int

每个工作节点的线程数量。

scheduler_port: int

调度器的端口。使用 0 选择随机端口(默认)。8786 是常用选择。

silence_logs: 日志级别

打印到标准输出的日志级别。默认为 logging.WARN。使用 False 或 None 等假值表示不更改。

host: string

调度器将监听的主机地址,默认为仅 localhost。

ip: string

已弃用。请参见上面的 host

dashboard_address: str

Bokeh 诊断服务器监听的地址,例如“localhost:8787”或“0.0.0.0:8787”。默认为“:8787”。设置为 None 以禁用仪表板。使用“:0”表示随机端口。仅指定端口(如“:8787”)时,仪表板将绑定到 host 参数指定的接口。如果 host 为空,则将在所有接口“0.0.0.0”上绑定。为避免在本地部署时的防火墙问题,请将 host 设置为“localhost”。

worker_dashboard_address: str

Bokeh 工作节点诊断服务器监听的地址,例如“localhost:8787”或“0.0.0.0:8787”。默认为 None,禁用仪表板。使用“:0”表示随机端口。

diagnostics_port: int

已弃用。请参见 dashboard_address。

asynchronous: bool (默认为 False)

asynchronous: 如果在 async/await 函数或 Tornado gen.coroutines 中使用此集群,则设置为 True。正常使用时应保持为 False。

blocked_handlers: List[str]

一个字符串列表,指定调度器上不允许的处理程序黑名单,例如 ['feed', 'run_function']

service_kwargs: Dict[str, Dict]

传递给正在运行服务的额外关键字参数。

securitySecurity 或布尔值,可选。

配置此集群中的通信安全。可以是 security 对象或 True。如果为 True,则会自动创建临时的自签名凭据。

protocol: str(可选)

要使用的协议,如 tcp://tls://inproc://。根据 processessecurity 等其他关键字参数,此参数默认为合理的选择。

interface: str(可选)

要使用的网络接口。默认为 lo/localhost。

worker_class: Worker

用于实例化工作节点的工作节点类。如果 processes=False,默认为 Worker;如果 processes=True 或省略,默认为 Nanny。

**worker_kwargs

额外的工作节点参数。任何额外的关键字参数都将传递给 Worker 类构造函数。

示例

>>> cluster = LocalCluster()  # Create a local cluster  
>>> cluster  
LocalCluster("127.0.0.1:8786", workers=8, threads=8)
>>> c = Client(cluster)  # connect to local cluster  

将集群扩展到三个工作节点。

>>> cluster.scale(3)  

将额外关键字参数传递给 Bokeh。

>>> LocalCluster(service_kwargs={'dashboard': {'prefix': '/foo'}})  
class distributed.SpecCluster(workers=None, scheduler=None, worker=None, asynchronous=False, loop=None, security=None, silence_logs=False, name=None, shutdown_on_close=True, scheduler_sync_interval=1, shutdown_scheduler=True)[source]

需要工作节点完整规格说明的集群。

SpecCluster 类需要使用的调度器和工作节点的完整规格说明。它移除了对用户输入(如线程 vs 进程、核心数量等)的任何处理,以及对集群资源管理器(如 pod、作业等)的任何处理。相反,它期望在调度器和工作节点规格说明中传递这些信息。此类负责处理在适当时间异步地干净地设置和拆除的所有逻辑。希望它可以成为其他更以用户为中心的类的基础。

参数
workers: dict

将名称映射到工作节点类及其规格说明的字典。参见下面的示例。

scheduler: dict,可选。

调度器的类似映射。

worker: dict

单个工作节点的规格说明。这用于创建的任何新工作节点。

asynchronous: bool

asynchronous: 如果在 async/await 函数或 Tornado gen.coroutines 中使用此集群,则设置为 True。正常使用时应保持为 False。

silence_logs: bool

设置集群时是否应静默日志记录。

name: str,可选。

打印集群时使用的名称,默认为类型名称。

shutdown_on_close: bool

程序退出时是否关闭集群。

shutdown_scheduler: bool

关闭集群时是否关停调度器。

示例

要创建 SpecCluster,您需要指定如何设置调度器和工作节点。

>>> from dask.distributed import Scheduler, Worker, Nanny
>>> scheduler = {'cls': Scheduler, 'options': {"dashboard_address": ':8787'}}
>>> workers = {
...     'my-worker': {"cls": Worker, "options": {"nthreads": 1}},
...     'my-nanny': {"cls": Nanny, "options": {"nthreads": 2}},
... }
>>> cluster = SpecCluster(scheduler=scheduler, workers=workers)

工作节点规格说明存储为 .worker_spec 属性。

>>> cluster.worker_spec
{
   'my-worker': {"cls": Worker, "options": {"nthreads": 1}},
   'my-nanny': {"cls": Nanny, "options": {"nthreads": 2}},
}

而此规格说明的实例化存储在 .workers 属性中。

>>> cluster.workers
{
    'my-worker': <Worker ...>
    'my-nanny': <Nanny ...>
}

如果规格说明发生变化,我们可以 await 集群或调用 ._correct_state 方法使实际状态与指定状态对齐。

我们也可以对集群进行 .scale(...) 操作,这将添加给定形式的新工作节点。

>>> worker = {'cls': Worker, 'options': {}}
>>> cluster = SpecCluster(scheduler=scheduler, worker=worker)
>>> cluster.worker_spec
{}
>>> cluster.scale(3)
>>> cluster.worker_spec
{
    0: {'cls': Worker, 'options': {}},
    1: {'cls': Worker, 'options': {}},
    2: {'cls': Worker, 'options': {}},
}

注意,上面我们使用的是标准的 WorkerNanny 类,但在实践中可以使用处理资源管理的其他类,如 KubernetesPodSLURMJob。规格说明不需要符合标准 Dask Worker 类的预期。它只需要使用提供的选项调用,支持 __await__close 方法以及 worker_address 属性。

另请注意,规格说明无需统一。可以在外部(子类中)添加其他 API,将不同规格说明的工作节点添加到同一个字典中。

如果规格说明中的单个条目将生成多个 dask 工作节点,请向规格说明提供一个 “group” 元素,其中包含将添加到每个名称的后缀(这应由您的工作节点类处理)。

>>> cluster.worker_spec
{
    0: {"cls": MultiWorker, "options": {"processes": 3}, "group": ["-0", "-1", -2"]}
    1: {"cls": MultiWorker, "options": {"processes": 2}, "group": ["-0", "-1"]}
}

这些后缀应与工作节点部署时使用的名称相对应。

>>> [ws.name for ws in cluster.scheduler.workers.values()]
["0-0", "0-1", "0-2", "1-0", "1-1"]
adapt(Adaptive: type[distributed.deploy.adaptive.Adaptive] = <class 'distributed.deploy.adaptive.Adaptive'>, minimum: float = 0, maximum: float = inf, minimum_cores: int | None = None, maximum_cores: int | None = None, minimum_memory: str | None = None, maximum_memory: str | None = None, **kwargs: typing.Any) distributed.deploy.adaptive.Adaptive[source]

开启适应性。

这会根据调度器活动自动缩放 Dask 集群。

参数
minimum整数。

最小工作节点数量。

maximum整数。

最大工作节点数量。

minimum_cores整数。

集群中保留的最小核心/线程数量。

maximum_cores整数。

集群中保留的最大核心/线程数量。

minimum_memory字符串。

集群中保留的最小内存量,表示为字符串,例如“100 GiB”。

maximum_memory字符串。

集群中保留的最大内存量,表示为字符串,例如“100 GiB”。

另请参见

dask.distributed.Adaptive

更多关键字参数请参见。

示例

>>> cluster.adapt(minimum=0, maximum_memory="100 GiB", interval='500ms')
classmethod from_name(name: str) distributed.deploy.spec.ProcessInterface[source]

创建此类的实例以按名称表示现有集群。

new_worker_spec()[source]

返回下一个工作节点的名称和规格说明。

返回值
d: 将名称映射到工作节点规格说明的字典。

另请参见

scale
scale(n=0, memory=None, cores=None)[source]

将集群扩展到 n 个工作节点。

参数
nint

目标工作节点数量。

示例

>>> cluster.scale(10)  # scale cluster to ten workers
scale_up(n=0, memory=None, cores=None)

将集群扩展到 n 个工作节点。

参数
nint

目标工作节点数量。

示例

>>> cluster.scale(10)  # scale cluster to ten workers

其他

class distributed.as_completed(futures=None, loop=None, with_results=False, raise_errors=True, *, timeout=None)[source]

按 future 完成的顺序返回它们

这返回一个迭代器,该迭代器按照输入 Future 对象完成的顺序产生这些对象。对迭代器调用 next 将阻塞直到下一个 Future 完成,无论顺序如何。

此外,您还可以在计算过程中使用 .add 方法向此对象添加更多 Future。

参数
futures: Future 集合。

一个 Future 对象列表,将按照它们完成的顺序进行迭代。

with_results: bool (False)

是否等待并包含 Future 的结果;在这种情况下,as_completed 会产生一个 (future, result) 元组。

raise_errors: bool (True)

当 Future 的结果引发异常时是否应抛出异常;仅在 with_results=True 时影响行为。

timeout: int(可选)

如果调用 __next__()__anext__(),并且从最初调用 as_completed() 起经过 timeout 秒后结果仍不可用,则返回的迭代器会抛出 dask.distributed.TimeoutError。如果未指定 timeout 或为 None,则等待时间没有限制。

示例

>>> x, y, z = client.map(inc, [1, 2, 3])  
>>> for future in as_completed([x, y, z]):  
...     print(future.result())  
3
2
4

在计算过程中添加更多 Future。

>>> x, y, z = client.map(inc, [1, 2, 3])  
>>> ac = as_completed([x, y, z])  
>>> for future in ac:  
...     print(future.result())  
...     if random.random() < 0.5:  
...         ac.add(c.submit(double, future))  
4
2
8
3
6
12
24

选择性地等待直到结果也已收集。

>>> ac = as_completed([x, y, z], with_results=True)  
>>> for future, result in ac:  
...     print(result)  
2
4
3
add(future)[source]

将 Future 添加到集合中。

此 Future 完成后将从迭代器中发出。

batches()[source]

一次产生所有已完成的 Future,而不是一个接一个地产生。

这返回一个 Future 列表或 (Future, 结果) 元组列表的迭代器,而不是单个 Future 或单个 (Future, 结果) 元组。它将尽快产生这些,无需等待。

示例

>>> for batch in as_completed(futures).batches():  
...     results = client.gather(batch)
...     print(results)
[4, 2]
[1, 3, 7]
[5]
[6]
clear()[source]

清除所有已提交的 Future。

count()[source]

返回尚未返回的 Future 数量。

这包括仍在计算的 Future 数量,以及已完成但尚未从此迭代器返回的 Future 数量。

has_ready()[source]

如果有可用的已完成 Future,则返回 True。

is_empty()[source]

如果没有已完成或正在计算的 Future,则返回 True。

next_batch(block=True)[source]

获取下一批已完成的 Future。

参数
block布尔值,可选。

如果为 True,则等待直到有结果,否则立即返回,即使列表为空。默认为 True。

返回值
Future 列表或 (Future, 结果) 元组列表。

示例

>>> ac = as_completed(futures)  
>>> client.gather(ac.next_batch())  
[4, 1, 3]
>>> client.gather(ac.next_batch(block=False))  
[]
update(futures)[source]

向集合添加多个 Future。

添加的 Future 完成后将从迭代器中发出。

distributed.diagnostics.progressbar.progress(*futures, notebook=None, multi=True, complete=True, group_by='prefix', **kwargs)[source]

跟踪 future 的进度

这在 Notebook 和控制台中操作方式不同。

  • Notebook: 它立即返回,在屏幕上留下一个 IPython 小部件。

  • 控制台: 它会阻塞,直到计算完成。

参数
futuresFuture 对象

要跟踪的 Future 对象列表或键列表。

notebook布尔值 (可选)

是否在 Notebook 中运行 (默认为自动检测)。

multi布尔值 (可选)

独立跟踪不同的函数 (默认为 True)。

complete布尔值 (可选)

跟踪所有键 (True) 或仅跟踪尚未运行的键 (False) (默认为 True)。

group_by可调用对象 | Literal[“spans”] | Literal[“prefix”]

使用 spans 而不是任务键名称对任务进行分组 (默认为“prefix”)。

注意事项

在 Notebook 中,progress 的输出必须是单元格中的最后一条语句。通常,这意味着在单元格的末尾调用 progress

示例

>>> progress(futures)  
[########################################] | 100% Completed |  1.7s
distributed.wait(fs, timeout=None, return_when='ALL_COMPLETED')[source]

等待所有/任何 future 完成

参数
fsFuture 对象列表
timeout数字,字符串,可选

在此时间后引发 dask.distributed.TimeoutError。可以是像 "10 minutes" 这样的字符串或要等待的秒数。

return_when字符串,可选

ALL_COMPLETEDFIRST_COMPLETED 中的一个。

返回值
包含已完成和未完成项的命名元组。
distributed.fire_and_forget(obj)[source]

即使我们释放 future,也要至少运行任务一次

正常操作下,Dask 不会运行任何没有活跃 Future 对象的任务(这在许多情况下避免了不必要的工作)。然而,有时您可能只想触发一个任务,而不跟踪其 Future 对象,并期望它最终完成。您可以在一个或一组 Future 对象上使用此函数,要求 Dask 完成任务,即使没有活跃的客户端正在跟踪它。

任务完成后,结果不会保留在内存中(除非有活跃的 Future 对象),因此这仅对依赖副作用的任务有用。

参数
objFuture 对象,列表,字典,dask 集合

您希望至少运行一次的 Future 对象。

示例

>>> fire_and_forget(client.submit(func, *args))  
distributed.futures_of(o, client=None)[source]

集合中的 Future 对象

参数
o集合

Dask 对象的可能嵌套集合。

clientClient 对象,可选

客户端对象。

返回值
futuresList[Future]

这些集合持有的 Future 对象列表。

引发
CancelledError

如果其中一个 Future 对象被取消,则会引发 CancelledError。

示例

>>> futures_of(my_dask_dataframe)
[<Future: finished key: ...>,
 <Future: pending  key: ...>]
distributed.worker_client(timeout=None, separate_thread=True)[source]

获取此线程的客户端

此上下文管理器旨在在我们在 Worker 上运行的函数中调用。作为上下文管理器运行时,它提供一个客户端 Client 对象,该对象可以直接从该 Worker 提交其他任务。

参数
timeout数字或字符串

超时时间,在此时间后会出错。默认为 distributed.comm.timeouts.connect 配置值。

separate_thread布尔值,可选

是否在普通线程池之外运行此函数,默认为 True。

示例

>>> def func(x):
...     with worker_client() as c:  # connect from worker back to scheduler
...         a = c.submit(inc, x)     # this task can submit more tasks
...         b = c.submit(dec, x)
...         result = c.gather([a, b])  # and gather results
...     return result
>>> future = client.submit(func, 1)  # submit func(1) on cluster
distributed.get_worker() distributed.worker.Worker[source]

获取当前运行此任务的 worker

另请参见

get_client
worker_client

示例

>>> def f():
...     worker = get_worker()  # The worker on which this task is running
...     return worker.address
>>> future = client.submit(f)  
>>> future.result()  
'tcp://127.0.0.1:47373'
distributed.get_client(address=None, timeout=None, resolve_address=True) Client[source]

在任务中获取客户端。

此客户端连接到与 Worker 连接的同一个调度器。

参数
address字符串,可选

要连接的调度器地址。默认为 Worker 连接的调度器地址。

timeout整数或字符串

获取客户端的超时时间(以秒为单位)。默认为 distributed.comm.timeouts.connect 配置值。

resolve_address布尔值,默认为 True

是否将 address 解析为其规范形式。

返回值
Client

示例

>>> def f():
...     client = get_client(timeout="10s")
...     futures = client.map(lambda x: x + 1, range(10))  # spawn many tasks
...     results = client.gather(futures)
...     return sum(results)
>>> future = client.submit(f)  
>>> future.result()  
55
distributed.secede()[source]

让此任务脱离 Worker 的线程池。

这会为新任务腾出一个新的调度槽和一个新的线程。这使得客户端能够在此节点上调度任务,在等待其他作业完成时(例如,使用 client.gather)尤其有用。

另请参见

get_client
get_worker

示例

>>> def mytask(x):
...     # do some work
...     client = get_client()
...     futures = client.map(...)  # do some remote work
...     secede()  # while that work happens, remove ourself from the pool
...     return client.gather(futures)  # return gathered results
distributed.rejoin()[source]

使此线程重新加入 ThreadPoolExecutor

这将阻塞,直到执行器中出现一个新的槽。下一个完成任务的线程将离开池,以允许此线程加入。

另请参见

secede

离开线程池。

distributed.print(*args, sep: str | None = ' ', end: str | None = '\n', file: TextIO | None = None, flush: bool = False) None[source]

这是内置 print 函数的替代品,用于从 Worker 向客户端进行远程打印。如果在 Dask Worker 外部调用,其参数将直接传递给 builtins.print()。如果由在 Worker 上运行的代码调用,则除了本地打印外,连接到管理此 Worker 的调度器的任何客户端(可能远程)将接收一个事件,指示它们将相同的输出打印到自己的标准输出或标准错误流。例如,用户可以通过在提交的代码中包含对此 print 函数的调用,并在本地 Jupyter Notebook 或解释器会话中检查输出来对远程计算进行简单调试。

所有参数的行为与 builtins.print() 的参数相同,但有一个例外:如果指定了关键字参数 file,则它必须是 sys.stdoutsys.stderr;不允许任意的文件类对象。

所有非关键字参数都使用 str() 转换为字符串,并写入流中,由 sep 分隔,后跟 endsepend 都必须是字符串;它们也可以是 None,表示使用默认值。如果没有给出对象,print() 将只写入 end

参数
sep字符串,可选

插入在值之间的字符串,默认为一个空格。

end字符串,可选

附加在最后一个值之后的字符串,默认为换行符。

filesys.stdoutsys.stderr,可选

默认为当前的 sys.stdout。

flush布尔值,默认为 False

是否强制刷新流。

示例

>>> from dask.distributed import Client, print
>>> client = distributed.Client(...)
>>> def worker_function():
...     print("Hello from worker!")
>>> client.submit(worker_function)
<Future: finished, type: NoneType, key: worker_function-...>
Hello from worker!
distributed.warn(message: str | Warning, category: type[Warning] | None = <class 'UserWarning'>, stacklevel: int = 1, source: typing.Any = None) None[source]

内置 warnings.warn() 函数的替代品,用于从 worker 远程发出警告到客户端。

如果在 Dask Worker 外部调用,其参数将直接传递给 warnings.warn()。如果由在 Worker 上运行的代码调用,则除了本地发出警告外,连接到管理此 Worker 的调度器的任何客户端(可能远程)将接收一个事件,指示它们发出相同的警告(取决于其本地过滤器等)。在实现可能在 Worker 上运行的计算时,用户可以调用此 warn 函数,以确保任何远程客户端会话将看到其警告,例如在 Jupyter 输出单元格中。

虽然本地发出的警告会尊重所有参数(含义与 warnings.warn() 中的相同),但客户端会忽略 stacklevelsource,因为它们在客户端线程中没有意义。

示例

>>> from dask.distributed import Client, warn
>>> client = Client()
>>> def do_warn():
...    warn("A warning from a worker.")
>>> client.submit(do_warn).result()
/path/to/distributed/client.py:678: UserWarning: A warning from a worker.
class distributed.Reschedule[source]

重新调度此任务

引发此异常将停止当前任务的执行,并请求调度器重新调度此任务,可能在不同的机器上。

这不保证任务会迁移到不同的机器。调度器将遵循其正常的启发式算法来确定接受此任务的最佳机器。如果自从首次调度任务以来集群的负载发生显著变化,机器可能会改变。

class distributed.get_task_stream(client=None, plot=False, filename='task-stream.html')[source]

在上下文块内收集任务流

这提供了在此块处于活动状态期间运行的每个任务的诊断信息。

这必须用作上下文管理器。

参数
plot: 布尔值, 字符串

如果为 true,则也返回一个 Bokeh 图;如果 plot == ‘save’,则将图保存到文件

filename: 字符串 (可选)

如果您设置 plot='save',要保存到的文件名。

另请参见

Client.get_task_stream

此上下文管理器对应的函数版本。

示例

>>> with get_task_stream() as ts:
...     x.compute()
>>> ts.data
[...]

获取 Bokeh 图并可选择保存到文件。

>>> with get_task_stream(plot='save', filename='task-stream.html') as ts:
...    x.compute()
>>> ts.figure
<Bokeh Figure>

要与他人共享此文件,您可能希望将其上传并在线提供服务。一种常见的方法是将其作为 gist 上传,然后在 https://raw.githack.com 上提供服务。

$ python -m pip install gist
$ gist task-stream.html
https://gist.github.com/8a5b3c74b10b413f612bb5e250856ceb

然后您可以导航到该网站,点击 task-stream.html 文件右侧的“Raw”按钮,然后将该 URL 提供给 https://raw.githack.com。此过程应提供一个可共享的链接,他人可以使用该链接查看您的任务流图。

class distributed.get_task_metadata[source]

在上下文块内收集任务元数据

这会从调度器收集在此上下文管理器范围内提交和完成的任务的 TaskState 元数据和最终状态。

示例

>>> with get_task_metadata() as tasks:
...     x.compute()
>>> tasks.metadata
{...}
>>> tasks.state
{...}
class distributed.performance_report(filename='dask-report.html', stacklevel=1, mode=None, storage_options=None)[source]

收集性能报告

这会创建一个静态 HTML 文件,其中包含许多与仪表板相同的图,供以后查看。

生成的文件使用 JavaScript,因此必须用网页浏览器查看。在本地,我们建议使用 python -m http.server 或在线托管文件。

参数
filename: 字符串,可选

本地保存性能报告的文件名。

stacklevel: 整数,可选

用于填充报告中“Calling Code”部分的程序执行帧。默认为 1,即调用 performance_report 的帧。

mode: 字符串,可选

传递给 bokeh.io.output.output_file() 的模式参数。默认为 None

storage_options: 字典,可选

写入 URL 时传递给 fsspec.open() 的任何附加参数。

示例

>>> with performance_report(filename="myfile.html", stacklevel=1):
...     x.compute()

工具函数

class distributed.utils.Log[source]

用于存储按换行符分隔的日志条目字符串的容器

class distributed.utils.Logs[source]

用于存储将名称映射到日志条目字符串的字典的容器

distributed.diagnostics.memray.memray_scheduler(directory: str | pathlib.Path = 'memray-profiles', report_args: Union[collections.abc.Sequence[str], Literal[False]] = ('flamegraph', '--temporal', '--leaks'), **memray_kwargs: Any) collections.abc.Iterator[None][source]

在调度器上生成 Memray 分析报告并下载生成的报告。

示例

with memray_scheduler():
    client.submit(my_function).result()

# Or even while the computation is already running

fut = client.submit(my_function)

with memray_scheduler():
    time.sleep(10)

fut.result()
参数
directory字符串

保存报告的目录。

report_argstuple[str]

特别是对于 native_traces=True,报告必须在生成配置文件的同一主机上使用相同的 Python 解释器生成。否则,native traces 将产生不可用的结果。因此,我们在调度器上生成报告,然后下载它们。您可以通过提供额外的参数来修改报告生成过程,我们将按以下方式生成报告:

memray *report_args -f <filename> -o <filename>.html

如果应抓取原始数据而不是报告,请将其设置为 False。

**memray_kwargs

要传递给 memray.Tracker 的关键字参数,例如 {“native_traces”: True}。

distributed.diagnostics.memray.memray_workers(directory: str | pathlib.Path = 'memray-profiles', workers: int | None | list[str] = None, report_args: Union[collections.abc.Sequence[str], Literal[False]] = ('flamegraph', '--temporal', '--leaks'), fetch_reports_parallel: bool | int = True, **memray_kwargs: Any) collections.abc.Iterator[None][source]

在 worker 上生成 Memray 分析报告并下载生成的报告。

示例

with memray_workers():
    client.submit(my_function).result()

# Or even while the computation is already running

fut = client.submit(my_function)

with memray_workers():
    time.sleep(10)

fut.result()
参数
directory字符串

保存报告的目录。

workersint | None | list[str]

要分析性能的 Worker。如果为整数,则使用前 n 个 Worker。如果为 None,则使用所有 Worker。如果为 list[str],则使用具有给定地址的 Worker。

report_argstuple[str]

特别是对于 native_traces=True,报告必须在生成配置文件的同一主机上使用相同的 Python 解释器生成。否则,native traces 将产生不可用的结果。因此,我们在 Worker 上生成报告,然后下载它们。您可以通过提供额外的参数来修改报告生成过程,我们将按以下方式生成报告:

memray *report_args -f <filename> -o <filename>.html

如果应抓取原始数据而不是报告,请将其设置为 False。

fetch_reports_parallelbool | int

抓取结果有时很慢,有时也不希望等待所有 Worker 完成后才接收第一批报告。这控制了同时抓取的 Worker 数量。

int: 同时抓取的 Worker 数量 True: 所有 Worker 同时抓取 False: 一次一个 Worker 抓取

**memray_kwargs

要传递给 memray.Tracker 的关键字参数,例如 {“native_traces”: True}。

自适应

class distributed.deploy.Adaptive(cluster: Cluster, interval: str | float | timedelta | None = None, minimum: int | None = None, maximum: int | float | None = None, wait_count: int | None = None, target_duration: str | float | timedelta | None = None, worker_key: Callable[[distributed.scheduler.WorkerState], Hashable] | None = None, **kwargs: Any)[source]

根据调度器负载自适应地分配 Worker。一个超类。

包含根据当前使用情况动态调整 Dask 集群大小的逻辑。此类需要与一个能够使用集群资源管理器创建和销毁 Dask Worker 的系统配对。通常它被内置到现有解决方案中,而不是由用户直接使用。它最常用于各种 Dask 集群类的 .adapt(...) 方法。

参数
cluster: 对象

必须具有 scale 和 scale_down 方法/协程。

intervaltimedelta 或字符串,默认为 “1000 ms”

两次检查之间的毫秒数。

wait_count: 整数,默认为 3

在移除 Worker 之前,连续建议移除该 Worker 的次数。

target_duration: timedelta 或字符串,默认为 “5s”

我们希望计算花费的时间。这会影响我们扩容的积极程度。

worker_key: 可调用对象[WorkerState]

缩容时将 Worker 分组的函数。有关更多信息,请参阅 Scheduler.workers_to_close。

minimum: 整数

要保留的最小 Worker 数量。

maximum: 整数

要保留的最大 Worker 数量。

**kwargs

传递给 Scheduler.workers_to_close 的额外参数。

注意事项

子类可以覆盖 Adaptive.target()Adaptive.workers_to_close() 来控制何时调整集群大小。默认实现会检查每个 Worker 上的任务是否过多或可用内存是否过少(请参阅 distributed.Scheduler.adaptive_target())。interval、min、max、wait_count 和 target_duration 的值可以在 dask 配置中,在 distributed.adaptive 键下指定。

示例

这通常在现有的 Dask 类中使用,例如 KubeCluster。

>>> from dask_kubernetes import KubeCluster
>>> cluster = KubeCluster()
>>> cluster.adapt(minimum=10, maximum=100)

或者,您可以通过继承 Dask 的 Cluster 超类,从您自己的 Cluster 类中使用它。

>>> from distributed.deploy import Cluster
>>> class MyCluster(Cluster):
...     def scale_up(self, n):
...         """ Bring worker count up to n """
...     def scale_down(self, workers):
...        """ Remove worker addresses from cluster """
>>> cluster = MyCluster()
>>> cluster.adapt(minimum=10, maximum=100)
property loop: tornado.ioloop.IOLoop

覆盖 Adaptive.loop。

async recommendations(target: int) dict[source]

根据当前状态和目标提出扩容/缩容建议。

state: AdaptiveStateState

此自适应策略是否定期进行自适应。

async target()[source]

确定应该存在的 Worker 目标数量。

返回值
目标工作节点数量。

另请参见

Scheduler.adaptive_target

注意事项

Adaptive.target 转发给 Scheduler.adaptive_target(),但可以在子类中被覆盖。

async workers_to_close(target: int) list[str][source]

确定哪些 Worker(如果有)应从集群中移除。

返回值
要关闭的 Worker 名称列表,如果有的话。

另请参见

Scheduler.workers_to_close

注意事项

Adaptive.workers_to_close 转发给 Scheduler.workers_to_close(),但可以在子类中被覆盖。