Spans

注意

这是一个实验性功能,可能会快速更改而无需弃用周期。

Dask 提供各种诊断信息和关于任务的精细性能指标,通过任务前缀(通常是正在调用的函数名称)对其进行分组。这可能并非最优:

  • 您的客户端代码可能非常复杂,难以将客户端上的代码行与集群上运行的任务关联起来;

  • 同一函数可能应用于工作流的不同部分,具有不同的性能特征;

  • 您可能处于多租户设置中,集群上的部分负载并非由您的客户端代码引起。

在这些情况下,为工作流或其片段附加有意义的标签可能会很有用。为此,您应在客户端代码中使用span()上下文管理器。

例如:

import dask.config
import dask.array as da
from distributed import Client, span

# Read important note below
dask.config.set({"optimization.fuse.active": False})
client = Client()

with span("Alice's workflow"):
    with span("data load"):
        a = da.read_zarr(...)
    with span("ML preprocessing"):
        a = preprocess(a)
    with span("Model training"):
        model = train(a)
    model = model.compute()

注意span()上下文管理器可以嵌套。该示例将在调度器上创建以下 spans:

  • ("Alice's workflow", )

  • ("Alice's workflow", "data load")

  • ("Alice's workflow", "ML preprocessing")

  • ("Alice's workflow", "Model training")

每个 span 都将映射到与其上下文管理器内定义的图片段相匹配的任务。父 span 将映射到其所有子任务。

标签是任意的,没有任何东西能阻止您对其进行参数化;例如:

>>> with span(f"{user}'s workflow"):
...     ...

这可能会为您提供:

  • ("Alice's workflow", "data load")

  • ("Bob's workflow", "data load")

  • 等等。

这非常有用,例如当您想要观察 Alice 提交的所有工作负载,同时隐藏 Bob 的活动,或者反之,观察所有数据加载活动,无论谁提交的。

可能性几乎是无限的——除了顶部的用户名之外,您还可以存储有关您正在处理的数据集等信息。

默认 span

如果您不使用span()上下文管理器,您的任务将自动归属于("default", ) span。

查看 Spans

您可以在精细性能指标仪表板小部件中按 span 标签过滤您的工作负载。

Span selection in the Fine Performance Metrics dashboard

此外,可以使用调度器扩展或run_on_scheduler()查询 spans;请参阅Dask 开发者 API

用户 API

重要

Dataframes 的最小粒度是单个 compute()persist() 调用,无法进一步细分为操作组 - 如果上面的示例使用了 Dataframes,则所有内容都将被统一标记为“Alice's Workflow”,因为这是 compute() 期间活动的 span。

在其他集合(如数组和延迟对象)中,未封装 compute()persist() 调用的 spans 可能会在优化阶段丢失。为防止此问题,您必须设置:

>>> dask.config.set({"optimization.fuse.active": False})

或在 dask.yaml 中:

optimization:
  fuse:
    active: false

一个可能的解决方法(也适用于 Dataframes)是执行中间的 persist() 调用。但是请注意,这可能会显著影响优化并降低整体性能。

with span("Alice's workflow"):
    with span("data load"):
        a = dd.read_parquet(...).persist()
    with span("ML preprocessing"):
        a = preprocess(a).persist()
        del a  # Release distributed memory for a as soon as possible
    with span("Model training"):
        model = train(b).persist()
        del b  # Release distributed memory for b as soon as possible
        model = model.compute()
distributed.span(*tags: str) collections.abc.Iterator[str][源代码]

将任务组标记为特定组的一部分,称为 span。

此上下文管理器可以嵌套,从而创建子 spans。如果您关闭并使用相同标签重新打开 span 上下文管理器,您将得到两个独立的 spans。

每个集群在客户端未定义任何 span 时都会定义一个全局“default” span;当与 default span关联的所有任务都完成后,default span会自动关闭并重新打开;换句话说,集群除了明确由 span 注释的任务外,处于空闲状态。请注意,在某些边缘情况下,您可能会遇到重叠的 default spans,例如当工作节点崩溃并且其内存中的所有唯一任务需要重新计算时。

您可以在客户端捕获 span 的 ID,以便将其与调度器上的Span对象匹配。

>>> client = Client()
>>> with span("my workflow") as span_id:
...     client.submit(lambda: "Hello world!").result()
>>> client.cluster.scheduler.extensions["spans"].spans[span_id]
Span<name=('my workflow',), id=5dc9b908-116b-49a5-b0d7-5a681f49a111>

注意

您可以使用dask.get_annotations().get("span")检索当前 span。您可以在客户端代码中以及任务内部执行此操作。

Dask 开发者 API

目标受众

本节仅对维护 Dask 或编写调度器扩展的开发者感兴趣,例如创建替代仪表板或长期存储指标。

Spans 可以在调度器上通过Scheduler.extensions["spans"]访问,它包含SpansSchedulerExtension的单例实例。反过来,扩展包含所有Span对象的映射,以及各种便捷方法来访问和聚合它们。

请注意,Span对象提供了仪表板当前未使用的各种方法 - 例如开始/停止时间、任务计数和输出大小。

class distributed.spans.Span(name: tuple[str, ...], id_: str, parent: distributed.spans.Span | None, total_nthreads_history: list[tuple[float, int]])[源代码]
property active_cpu_seconds: float

返回此 Span 运行时在集群上可用的 CPU 秒数;换句话说,(Span.stop - Span.enqueued) * Scheduler.total_nthreads

这考虑了在此 Span 活动期间加入和离开集群的工作节点。如果此 Span 是merge()的输出,则不计算输入 spans 之间的间隔。

另请参阅

enqueued
stop
nthreads_intervals
distributed.scheduler.SchedulerState.total_nthreads
add_metadata(metadata: distributed.spans.SpanMetadata) None[源代码]

向 span 添加元数据,例如代码片段

property all_durations: dict[str, float]

此 span 树中所有已完成操作的累计持续时间(按操作分组)

另请参阅

duration
distributed.scheduler.TaskGroup.all_durations
property annotation: dict[str, tuple[str, ...]] | None

重建包含完整 id 历史记录的 dask 图注释

请注意,如果 TaskGroup 冲突,这可能与原始注释不匹配。

children: list[Span]

此 span 的直接子级,按创建时间排序

property code: list[tuple[SourceCode, ...]]

客户端在 compute()、persist() 和 submit() 时发送的代码片段。

仅在 distributed.diagnostics.computations.nframes 非零时填充。

property cumulative_worker_metrics: dict[tuple[collections.abc.Hashable, ...], float]

Worker.digests_totalScheduler.cumulative_worker_metrics 的副本,但仅针对可归因于当前 span 树的指标。span id 已从键中移除。

目前,所有键都是 ("execute", <task prefix>, <activity>, <unit>)("p2p", <where>, <activity>, <unit>),但将来可能会添加更多不同格式的键;请测试例如 k[0] == "execute"

property done: bool

如果此 span 树中的所有任务都已完成,则返回 True;否则返回 False。

另请参阅

distributed.scheduler.TaskGroup.done

注意

此属性可能从 True 转换为 False,例如添加新的子 span 时,或者当包含任务唯一副本的工作节点崩溃并且任务需要重新计算时。

property duration: float

在此 span 树中所有任务花费的总时间

另请参阅

all_durations
distributed.scheduler.TaskGroup.duration
enqueued: float

Span 第一次出现在调度器上的时间。父 spans 上的相同属性总是小于或等于此值。

另请参阅

start
stop
groups: set[TaskGroup]

注意

当最后一个任务被遗忘时,TaskGroups 会被调度器遗忘,但会在这里被无限期引用。如果用户对同一集合调用两次 compute(),您将在此集合中有多个具有相同 tg.name 的组!出于同样的原因,虽然同一个 TaskGroup 对象保证只附加到一个 Span,但您可能有具有相同键的不同 TaskGroups 附加到不同的 Spans。

id: str

span()生成的唯一 ID,取自TaskState.annotations["span"]["id"][-1]。匹配distributed.scheduler.TaskState.group.span_iddistributed.worker_state_machine.TaskState.span_id

static merge(*items: distributed.spans.Span) distributed.spans.Span[源代码]

将多个 spans 合并为一个合成 span。输入的 spans 之间不能有关系。

name: tuple[str, ...]

(<tag>, <tag>, …) 匹配TaskState.annotations["span"]["name"],在调度器和工作节点上都是如此。

property nbytes_total: int

此 span 树产生的总字节数

另请参阅

distributed.scheduler.TaskGroup.nbytes_total
property nthreads_intervals: list[tuple[float, float, int]]
返回
元组列表
  • 开始时间戳
  • 结束时间戳
  • 在此间隔内的 Scheduler.total_nthreads
当 Span 是merge()的输出时,间隔可能不是
连续的。

另请参阅

enqueued
stop
active_cpu_seconds
distributed.scheduler.SchedulerState.total_nthreads
property start: float

属于此 span 树的任务最早开始计算的时间;如果尚未有任务完成计算,则为 0。

另请参阅

enqueued
stop
distributed.scheduler.TaskGroup.start

注意

此值在至少有一个任务完成计算之前不会更新。它可能随着任务完成而回退。

property states: dict[TaskStateState, int]

此 span 树中当前处于每种状态的任务数量;例如{"memory": 10, "processing": 3, "released": 4, ...}

另请参阅

distributed.scheduler.TaskGroup.states
property stop: float

此 span 树完成计算的时间,如果尚未完成,则为当前时间戳。

另请参阅

enqueued
start
done
distributed.scheduler.TaskGroup.stop

注意

当没有未完成的任务时,此值与TaskGroup.stop不同;它也永远不会为零。

traverse_groups() Iterator[TaskGroup][源代码]

属于此 span 树分支的所有 TaskGroups

traverse_spans() collections.abc.Iterator[distributed.spans.Span][源代码]

此 span 树分支中所有 spans 的自顶向下递归,包括自身

class distributed.spans.SpansSchedulerExtension(scheduler: Scheduler)[源代码]

用于支持 spans 的调度器扩展

find_by_tags(*tags: str) collections.abc.Iterator[distributed.spans.Span][源代码]

返回包含给定任意标签的所有 spans。当一个标签被某个 span 及其(孙)子 spans 共享时,只返回父级。

heartbeat(ws: scheduler_module.WorkerState, data: dict[tuple[Hashable, ...], float]) None[源代码]

SpansWorkerExtension.heartbeat()触发。

使用来自工作节点的数据填充Span.cumulative_worker_metrics()

merge_all() distributed.spans.Span[源代码]

返回一个表示所有 spans 总和的合成 Span

merge_by_tags(*tags: str) distributed.spans.Span[源代码]

返回一个表示包含给定标签的所有 spans 总和的合成 Span

observe_tasks(tss: Iterable[scheduler_module.TaskState], code: tuple[SourceCode, ...], span_metadata: SpanMetadata) dict[Key, dict][源代码]

确认调度器上存在可运行任务。这些任务可能是新任务、之前无法运行的任务,或者已经输入过此方法的任务。

将新观察到的任务附加到所需的 span 或 ("default", )。更新 TaskGroup.span_id 并清除 TaskState.annotations["span"]。

返回
更新后的 'span' 注释: {key: {"name": (…, …), "ids": (…, …)}}
root_spans: list[Span]

仅包含没有父级的 spans,按创建时间排序。这是一个方便的辅助结构,用于加速搜索。

spans: dict[str, Span]

所有 Span 对象,按 id 键控

spans_search_by_name: defaultdict[tuple[str, ...], list[Span]]

所有 spans,按其全名键控并按创建时间排序。这是一个方便的辅助结构,用于加速搜索。

spans_search_by_tag: defaultdict[str, list[Span]]

所有 spans,按构成其名称的单个标签键控并按创建时间排序。这是一个方便的辅助结构,用于加速搜索。

class distributed.spans.SpansWorkerExtension(worker: Worker)[源代码]

用于支持 spans 的工作节点扩展

heartbeat() dict[tuple[collections.abc.Hashable, ...], float][源代码]

将具有 span 的指标分配给调度器上的 Spans

返回
{(context, span_id, prefix, activity, unit): value}}

另请参阅

SpansSchedulerExtension.heartbeat
Span.cumulative_worker_metrics
distributed.worker.Worker.get_metrics