日志记录

在整个 Dask 集群中,可以通过几种方式记录状态和其他活动。

日志

调度器、工作节点和客户端都使用 Python 标准的 logging 模块记录各种管理事件。日志记录级别和处理程序都可以自定义。有关更多信息,请参阅 调试文档

任务状态转换日志

调度器会跟踪每个任务的所有状态转换。这有助于了解任务是如何完成计算的,在调试时特别有价值。要检索给定任务的状态转换日志,请将其键传递给 Scheduler.story() 方法。

>>> f = client.submit(inc, 123)
>>> f
<Future: finished, type: builtins.int, key: inc-aad7bbea25dc61c8e53d929c7ec50bed>
>>> s.story(f.key)
[('inc-aad7bbea25dc61c8e53d929c7ec50bed', 'released', 'waiting', {'inc-aad7bbea25dc61c8e53d929c7ec50bed': 'processing'}, 1605143345.7283862),
 ('inc-aad7bbea25dc61c8e53d929c7ec50bed', 'waiting', 'processing', {}, 1605143345.7284858),
 ('inc-aad7bbea25dc61c8e53d929c7ec50bed', 'processing', 'memory', {}, 1605143345.731495)]

结构化日志

调度器、工作节点和客户端都支持将结构化事件记录到一个集中式账本,该账本按主题索引。默认情况下,Dask 会将一些管理事件(例如,工作节点加入和离开集群时)记录到此系统,但可以使用 Scheduler.log_event()Worker.log_event()Client.log_event() 方法记录自定义事件。

例如,下面我们使用工作节点的 log_event 方法将开始时间和停止时间记录到 "runtimes" 主题:

>>> def myfunc(x):
...     start = time()
...     ...
...     stop = time()
...     dask.distributed.get_worker().log_event("runtimes", {"start": start, "stop": stop})
>>> futures = client.map(myfunc, range(10))
>>> client.get_events("runtimes")
((1605207481.77175, {'start': 1605207481.769397, 'stop': 1605207481.769397}),
 (1605207481.772021, {'start': 1605207481.770036, 'stop': 1605207481.770037}),
 ...
)

可以使用 Client.get_events() 方法检索给定主题的事件。在上面的示例中,我们使用 client.get_events("runtimes") 检索了记录的开始时间和停止时间。请注意,Client.get_events 为每个记录的事件返回一个元组,其中包含记录的消息以及事件记录时的时间戳。

当与调度器和工作节点插件结合使用时,结构化事件系统可以生成丰富的日志记录/诊断系统。