工作优先级

工作优先级

当工作多于工作节点时,Dask 必须决定哪些任务比其他任务拥有更高的优先级。Dask 可以自动确定这些优先级以优化性能,或者用户可以根据他们的需求手动指定优先级。

Dask 按照以下顺序使用优先级

  1. 用户优先级: 用户定义的优先级通过函数(如 compute()persist()submit()map())的 priority= 关键字参数提供。优先级较高的任务在优先级较低的任务之前运行,默认优先级为零。

    future = client.submit(func, *args, priority=10)  # high priority task
    future = client.submit(func, *args, priority=-10)  # low priority task
    
    df = df.persist(priority=10)  # high priority computation
    

    优先级也可以使用 dask 注解机制指定

    with dask.annotate(priority=10):
        future = client.submit(func, *args)  # high priority task
    with dask.annotate(priority=-10):
        future = client.submit(func, *args)  # low priority task
    
    with dask.annotate(priority=10):
        df = df.persist()  # high priority computation
    
  2. 先入先出按时间顺序: Dask 优先处理早期提交的计算。由于用户可以异步提交计算,因此可能会有几个不同的计算同时在工作节点上运行。通常,Dask 优先处理先提交的任务组。

    一个细微之处是,在紧密的时间窗口内提交的任务通常被认为是在同一时间提交的。

    x = x.persist()  # submitted first and so has higher priority
    # wait a while
    x = x.persist()  # submitted second and so has lower priority
    

    在这种情况下,“一段时间”取决于计算的类型。通常用于批量处理的操作(如 computepersist)认为在同一六十秒内提交的任何两个计算具有相同的优先级。通常用于实时处理的操作(如 submitmap)如果它们在彼此的 100 毫秒内提交,则被认为具有相同的优先级。此行为可以通过 fifo_timeout= 关键字控制

    x = x.persist()
    # wait one minute
    x = x.persist(fifo_timeout='10 minutes')  # has the same priority
    
    a = client.submit(func, *args)
    # wait no time at all
    b = client.submit(func, *args, fifo_timeout='0ms')  # is lower priority
    
  3. 图结构: 在任何给定的计算(compute 或 persist 调用)中,Dask 以最小化计算内存占用空间的方式对任务进行排序。这在 任务排序文档 中有更深入的讨论。

如果多个任务的优先级完全相同(如上所述),则任务到达工作节点的顺序(以后进先出的方式)用于确定任务运行的顺序。