Spans
目录
Spans¶
注意
这是一个实验性功能,可能会快速更改而无需弃用周期。
Dask 提供各种诊断信息和关于任务的精细性能指标,通过任务前缀(通常是正在调用的函数名称)对其进行分组。这可能并非最优:
您的客户端代码可能非常复杂,难以将客户端上的代码行与集群上运行的任务关联起来;
同一函数可能应用于工作流的不同部分,具有不同的性能特征;
您可能处于多租户设置中,集群上的部分负载并非由您的客户端代码引起。
在这些情况下,为工作流或其片段附加有意义的标签可能会很有用。为此,您应在客户端代码中使用span()
上下文管理器。
例如:
import dask.config
import dask.array as da
from distributed import Client, span
# Read important note below
dask.config.set({"optimization.fuse.active": False})
client = Client()
with span("Alice's workflow"):
with span("data load"):
a = da.read_zarr(...)
with span("ML preprocessing"):
a = preprocess(a)
with span("Model training"):
model = train(a)
model = model.compute()
注意span()
上下文管理器可以嵌套。该示例将在调度器上创建以下 spans:
("Alice's workflow", )
("Alice's workflow", "data load")
("Alice's workflow", "ML preprocessing")
("Alice's workflow", "Model training")
每个 span 都将映射到与其上下文管理器内定义的图片段相匹配的任务。父 span 将映射到其所有子任务。
标签是任意的,没有任何东西能阻止您对其进行参数化;例如:
>>> with span(f"{user}'s workflow"):
... ...
这可能会为您提供:
("Alice's workflow", "data load")
("Bob's workflow", "data load")
等等。
这非常有用,例如当您想要观察 Alice 提交的所有工作负载,同时隐藏 Bob 的活动,或者反之,观察所有数据加载活动,无论谁提交的。
可能性几乎是无限的——除了顶部的用户名之外,您还可以存储有关您正在处理的数据集等信息。
查看 Spans¶
您可以在精细性能指标仪表板小部件中按 span 标签过滤您的工作负载。

此外,可以使用调度器扩展或run_on_scheduler()
查询 spans;请参阅Dask 开发者 API。
用户 API¶
重要
Dataframes 的最小粒度是单个 compute() 或 persist() 调用,无法进一步细分为操作组 - 如果上面的示例使用了 Dataframes,则所有内容都将被统一标记为“Alice's Workflow”,因为这是 compute() 期间活动的 span。
在其他集合(如数组和延迟对象)中,未封装 compute() 或 persist() 调用的 spans 可能会在优化阶段丢失。为防止此问题,您必须设置:
>>> dask.config.set({"optimization.fuse.active": False})
或在 dask.yaml 中:
optimization:
fuse:
active: false
一个可能的解决方法(也适用于 Dataframes)是执行中间的 persist() 调用。但是请注意,这可能会显著影响优化并降低整体性能。
with span("Alice's workflow"):
with span("data load"):
a = dd.read_parquet(...).persist()
with span("ML preprocessing"):
a = preprocess(a).persist()
del a # Release distributed memory for a as soon as possible
with span("Model training"):
model = train(b).persist()
del b # Release distributed memory for b as soon as possible
model = model.compute()
- distributed.span(*tags: str) collections.abc.Iterator[str] [源代码]¶
将任务组标记为特定组的一部分,称为 span。
此上下文管理器可以嵌套,从而创建子 spans。如果您关闭并使用相同标签重新打开 span 上下文管理器,您将得到两个独立的 spans。
每个集群在客户端未定义任何 span 时都会定义一个全局“default” span;当与 default span关联的所有任务都完成后,default span会自动关闭并重新打开;换句话说,集群除了明确由 span 注释的任务外,处于空闲状态。请注意,在某些边缘情况下,您可能会遇到重叠的 default spans,例如当工作节点崩溃并且其内存中的所有唯一任务需要重新计算时。
您可以在客户端捕获 span 的 ID,以便将其与调度器上的
Span
对象匹配。>>> client = Client() >>> with span("my workflow") as span_id: ... client.submit(lambda: "Hello world!").result() >>> client.cluster.scheduler.extensions["spans"].spans[span_id] Span<name=('my workflow',), id=5dc9b908-116b-49a5-b0d7-5a681f49a111>
注意
您可以使用
dask.get_annotations().get("span")
检索当前 span。您可以在客户端代码中以及任务内部执行此操作。
Dask 开发者 API¶
目标受众
本节仅对维护 Dask 或编写调度器扩展的开发者感兴趣,例如创建替代仪表板或长期存储指标。
Spans 可以在调度器上通过Scheduler.extensions["spans"]
访问,它包含SpansSchedulerExtension
的单例实例。反过来,扩展包含所有Span
对象的映射,以及各种便捷方法来访问和聚合它们。
请注意,Span
对象提供了仪表板当前未使用的各种方法 - 例如开始/停止时间、任务计数和输出大小。
- class distributed.spans.Span(name: tuple[str, ...], id_: str, parent: distributed.spans.Span | None, total_nthreads_history: list[tuple[float, int]])[源代码]¶
- property active_cpu_seconds: float¶
返回此 Span 运行时在集群上可用的 CPU 秒数;换句话说,
(Span.stop - Span.enqueued) * Scheduler.total_nthreads
。这考虑了在此 Span 活动期间加入和离开集群的工作节点。如果此 Span 是
merge()
的输出,则不计算输入 spans 之间的间隔。另请参阅
enqueued
stop
nthreads_intervals
distributed.scheduler.SchedulerState.total_nthreads
- property all_durations: dict[str, float]¶
此 span 树中所有已完成操作的累计持续时间(按操作分组)
另请参阅
duration
distributed.scheduler.TaskGroup.all_durations
- property annotation: dict[str, tuple[str, ...]] | None¶
重建包含完整 id 历史记录的 dask 图注释
请注意,如果 TaskGroup 冲突,这可能与原始注释不匹配。
- property code: list[tuple[SourceCode, ...]]¶
客户端在 compute()、persist() 和 submit() 时发送的代码片段。
仅在
distributed.diagnostics.computations.nframes
非零时填充。
- property cumulative_worker_metrics: dict[tuple[collections.abc.Hashable, ...], float]¶
Worker.digests_total
和Scheduler.cumulative_worker_metrics
的副本,但仅针对可归因于当前 span 树的指标。span id 已从键中移除。目前,所有键都是
("execute", <task prefix>, <activity>, <unit>)
或("p2p", <where>, <activity>, <unit>)
,但将来可能会添加更多不同格式的键;请测试例如k[0] == "execute"
。
- property done: bool¶
如果此 span 树中的所有任务都已完成,则返回 True;否则返回 False。
另请参阅
distributed.scheduler.TaskGroup.done
注意
此属性可能从 True 转换为 False,例如添加新的子 span 时,或者当包含任务唯一副本的工作节点崩溃并且任务需要重新计算时。
- property duration: float¶
在此 span 树中所有任务花费的总时间
另请参阅
all_durations
distributed.scheduler.TaskGroup.duration
- groups: set[TaskGroup]¶
注意
当最后一个任务被遗忘时,TaskGroups 会被调度器遗忘,但会在这里被无限期引用。如果用户对同一集合调用两次 compute(),您将在此集合中有多个具有相同 tg.name 的组!出于同样的原因,虽然同一个 TaskGroup 对象保证只附加到一个 Span,但您可能有具有相同键的不同 TaskGroups 附加到不同的 Spans。
- id: str¶
由
span()
生成的唯一 ID,取自TaskState.annotations["span"]["id"][-1]
。匹配distributed.scheduler.TaskState.group.span_id
和distributed.worker_state_machine.TaskState.span_id
。
- static merge(*items: distributed.spans.Span) distributed.spans.Span [源代码]¶
将多个 spans 合并为一个合成 span。输入的 spans 之间不能有关系。
- property nthreads_intervals: list[tuple[float, float, int]]¶
- 返回
- 元组列表
- 开始时间戳
- 结束时间戳
- 在此间隔内的 Scheduler.total_nthreads
- 当 Span 是
merge()
的输出时,间隔可能不是 - 连续的。
另请参阅
enqueued
stop
active_cpu_seconds
distributed.scheduler.SchedulerState.total_nthreads
- property start: float¶
属于此 span 树的任务最早开始计算的时间;如果尚未有任务完成计算,则为 0。
注意
此值在至少有一个任务完成计算之前不会更新。它可能随着任务完成而回退。
- property states: dict[TaskStateState, int]¶
此 span 树中当前处于每种状态的任务数量;例如
{"memory": 10, "processing": 3, "released": 4, ...}
。另请参阅
distributed.scheduler.TaskGroup.states
- traverse_spans() collections.abc.Iterator[distributed.spans.Span] [源代码]¶
此 span 树分支中所有 spans 的自顶向下递归,包括自身
- class distributed.spans.SpansSchedulerExtension(scheduler: Scheduler)[源代码]¶
用于支持 spans 的调度器扩展
- find_by_tags(*tags: str) collections.abc.Iterator[distributed.spans.Span] [源代码]¶
返回包含给定任意标签的所有 spans。当一个标签被某个 span 及其(孙)子 spans 共享时,只返回父级。
- heartbeat(ws: scheduler_module.WorkerState, data: dict[tuple[Hashable, ...], float]) None [源代码]¶
由
SpansWorkerExtension.heartbeat()
触发。使用来自工作节点的数据填充
Span.cumulative_worker_metrics()
。
- merge_all() distributed.spans.Span [源代码]¶
返回一个表示所有 spans 总和的合成 Span
- merge_by_tags(*tags: str) distributed.spans.Span [源代码]¶
返回一个表示包含给定标签的所有 spans 总和的合成 Span
- observe_tasks(tss: Iterable[scheduler_module.TaskState], code: tuple[SourceCode, ...], span_metadata: SpanMetadata) dict[Key, dict] [源代码]¶
确认调度器上存在可运行任务。这些任务可能是新任务、之前无法运行的任务,或者已经输入过此方法的任务。
将新观察到的任务附加到所需的 span 或 ("default", )。更新 TaskGroup.span_id 并清除 TaskState.annotations["span"]。
- 返回
- 更新后的 'span' 注释: {key: {"name": (…, …), "ids": (…, …)}}
- class distributed.spans.SpansWorkerExtension(worker: Worker)[源代码]¶
用于支持 spans 的工作节点扩展
- heartbeat() dict[tuple[collections.abc.Hashable, ...], float] [源代码]¶
将具有 span 的指标分配给调度器上的 Spans
- 返回
{(context, span_id, prefix, activity, unit): value}}
另请参阅
SpansSchedulerExtension.heartbeat
Span.cumulative_worker_metrics
distributed.worker.Worker.get_metrics