日志记录
目录
日志记录¶
在整个 Dask 集群中,可以通过几种方式记录状态和其他活动。
任务状态转换日志¶
调度器会跟踪每个任务的所有状态转换。这有助于了解任务是如何完成计算的,在调试时特别有价值。要检索给定任务的状态转换日志,请将其键传递给 Scheduler.story()
方法。
>>> f = client.submit(inc, 123)
>>> f
<Future: finished, type: builtins.int, key: inc-aad7bbea25dc61c8e53d929c7ec50bed>
>>> s.story(f.key)
[('inc-aad7bbea25dc61c8e53d929c7ec50bed', 'released', 'waiting', {'inc-aad7bbea25dc61c8e53d929c7ec50bed': 'processing'}, 1605143345.7283862),
('inc-aad7bbea25dc61c8e53d929c7ec50bed', 'waiting', 'processing', {}, 1605143345.7284858),
('inc-aad7bbea25dc61c8e53d929c7ec50bed', 'processing', 'memory', {}, 1605143345.731495)]
结构化日志¶
调度器、工作节点和客户端都支持将结构化事件记录到一个集中式账本,该账本按主题索引。默认情况下,Dask 会将一些管理事件(例如,工作节点加入和离开集群时)记录到此系统,但可以使用 Scheduler.log_event()
、Worker.log_event()
或 Client.log_event()
方法记录自定义事件。
例如,下面我们使用工作节点的 log_event
方法将开始时间和停止时间记录到 "runtimes"
主题:
>>> def myfunc(x):
... start = time()
... ...
... stop = time()
... dask.distributed.get_worker().log_event("runtimes", {"start": start, "stop": stop})
>>> futures = client.map(myfunc, range(10))
>>> client.get_events("runtimes")
((1605207481.77175, {'start': 1605207481.769397, 'stop': 1605207481.769397}),
(1605207481.772021, {'start': 1605207481.770036, 'stop': 1605207481.770037}),
...
)
可以使用 Client.get_events()
方法检索给定主题的事件。在上面的示例中,我们使用 client.get_events("runtimes")
检索了记录的开始时间和停止时间。请注意,Client.get_events
为每个记录的事件返回一个元组,其中包含记录的消息以及事件记录时的时间戳。
当与调度器和工作节点插件结合使用时,结构化事件系统可以生成丰富的日志记录/诊断系统。