工作优先级
工作优先级¶
当工作多于工作节点时,Dask 必须决定哪些任务比其他任务拥有更高的优先级。Dask 可以自动确定这些优先级以优化性能,或者用户可以根据他们的需求手动指定优先级。
Dask 按照以下顺序使用优先级
用户优先级: 用户定义的优先级通过函数(如
compute()
、persist()
、submit()
或map()
)的priority=
关键字参数提供。优先级较高的任务在优先级较低的任务之前运行,默认优先级为零。future = client.submit(func, *args, priority=10) # high priority task future = client.submit(func, *args, priority=-10) # low priority task df = df.persist(priority=10) # high priority computation
优先级也可以使用 dask 注解机制指定
with dask.annotate(priority=10): future = client.submit(func, *args) # high priority task with dask.annotate(priority=-10): future = client.submit(func, *args) # low priority task with dask.annotate(priority=10): df = df.persist() # high priority computation
先入先出按时间顺序: Dask 优先处理早期提交的计算。由于用户可以异步提交计算,因此可能会有几个不同的计算同时在工作节点上运行。通常,Dask 优先处理先提交的任务组。
一个细微之处是,在紧密的时间窗口内提交的任务通常被认为是在同一时间提交的。
x = x.persist() # submitted first and so has higher priority # wait a while x = x.persist() # submitted second and so has lower priority
在这种情况下,“一段时间”取决于计算的类型。通常用于批量处理的操作(如
compute
和persist
)认为在同一六十秒内提交的任何两个计算具有相同的优先级。通常用于实时处理的操作(如submit
或map
)如果它们在彼此的 100 毫秒内提交,则被认为具有相同的优先级。此行为可以通过fifo_timeout=
关键字控制x = x.persist() # wait one minute x = x.persist(fifo_timeout='10 minutes') # has the same priority a = client.submit(func, *args) # wait no time at all b = client.submit(func, *args, fifo_timeout='0ms') # is lower priority
图结构: 在任何给定的计算(compute 或 persist 调用)中,Dask 以最小化计算内存占用空间的方式对任务进行排序。这在 任务排序文档 中有更深入的讨论。
如果多个任务的优先级完全相同(如上所述),则任务到达工作节点的顺序(以后进先出的方式)用于确定任务运行的顺序。