分享

关于YARN capacity调度器模式下队列超额资源的释放的疑问

YARN的调度器设置为capacity调度器,设置两个队列a和b,无嵌套的子队列,capacity分别配置为40%和60%,max capacity分别设置为40%和100%,即队列a如果没有用到40%的资源,队列b可以“借用”其空闲的部分,而队列a不能超额使用资源


现在有一个疑问,假设队列a完全空闲,队列b很繁忙(执行一个动态分配资源的spark application,不停地提交规模较大的job),
此时队列b会占用YARN的100%资源,然后向队列a中提交另一个规模较大的spark application,这时候队列b需要将“借用”的资源释放出来,
现在的疑问是这部分资源是以什么策略、在什么时机释放的

在某些文档资料中看到capacity调度器模式下,队列内部的调度默认是fifo,也支持fair,另一个疑问是怎么将队列内部的调度配置成fair模式,
如果能配置成fair模式,是否支持抢占?

对于这部分的问题,欢迎大家给出宝贵意见,十分感谢!

已有(4)人评论

跳转到指定楼层
tntzbzc 发表于 2018-6-6 21:14:09
这个问题问的有深度,明天研究下,给楼主探讨
回复

使用道具 举报

tntzbzc 发表于 2018-6-7 18:11:47

现在有一个疑问,假设队列a完全空闲,队列b很繁忙(执行一个动态分配资源的spark application,不停地提交规模较大的job),
此时队列b会占用YARN的100%资源,然后向队列a中提交另一个规模较大的spark application,这时候队列b需要将“借用”的资源释放出来,
现在的疑问是这部分资源是以什么策略、在什么时机释放的

回答:
由于A队列资源正在被别B队列使用,因此调度器必须等待B队列释放资源后,才能将这些资源收回,这通常需要等待一段不确定时间。为了防止等待时间过长,调度器发现等待一段时间后若发现资源并未得到释放,则会强制进行资源回收。
上面其实是策略,那么相信楼主想看代码实现:这里贴出来

[mw_shl_code=java,true]/**
   * This method selects and tracks containers to be preempted. If a container
   * is in the target list for more than maxWaitTime it is killed.
   *
   * @param root the root of the CapacityScheduler queue hierarchy
   * @param clusterResources the total amount of resources in the cluster
   */
  private void containerBasedPreemptOrKill(CSQueue root,
      Resource clusterResources) {

    // extract a summary of the queues from scheduler

    // ---------------------------------- 第一步 -------------------------------------
    // 从ROOT队列开始递归拷贝子队列信息,此步骤目的是为了生成所有队列快照信息,防止集群
    // 队列信息变化导致资源抢占过程中出现问题。
    TempQueue tRoot;
    synchronized (scheduler) {
      tRoot = cloneQueues(root, clusterResources);
    }

    // compute the ideal distribution of resources among queues
    // updates cloned queues state accordingly

    // ---------------------------------- 第二步 -------------------------------------
    // 将用户设定的最低资源作为最开始的理想最低资源,递归计算各队列实际理想最低资源,作为
    // 第三步进行具体资源抢占的依据。
    tRoot.idealAssigned = tRoot.guaranteed;
    Resource totalPreemptionAllowed = Resources.multiply(clusterResources,
        percentageClusterPreemptionAllowed);
    List<TempQueue> queues =
      recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);

    // based on ideal allocation select containers to be preempted from each

    // ---------------------------------- 第三步---------------------------------------
    // 如果叶子队列已使用资源量大于第二步计算出的理想最低资源量,则此队列将根据多出的
    // 资源量选取队列中需被抢占的Container,并根据一定规则选取队列中哪些应用的哪些
    // container需要被抢占。如果在预设时间内Container没有被释放,则强制收回container。
    // queue and each application
    Map<ApplicationAttemptId,Set<RMContainer>> toPreempt =
        getContainersToPreempt(queues, clusterResources);

    if (LOG.isDebugEnabled()) {
      logToCSV(queues);
    }

    // if we are in observeOnly mode return before any action is taken
    if (observeOnly) {
      return;
    }

    // preempt (or kill) the selected containers
    for (Map.Entry<ApplicationAttemptId,Set<RMContainer>> e
         : toPreempt.entrySet()) {
      for (RMContainer container : e.getValue()) {
        // if we tried to preempt this for more than maxWaitTime
        if (preempted.get(container) != null &&
            preempted.get(container) + maxWaitTime < clock.getTime()) {
          // kill it
          dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container,
                ContainerPreemptEventType.KILL_CONTAINER));
          preempted.remove(container);
        } else {
          //otherwise just send preemption events
          dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container,
                ContainerPreemptEventType.PREEMPT_CONTAINER));
          if (preempted.get(container) == null) {
            preempted.put(container, clock.getTime());
          }
        }
      }
    }

    // Keep the preempted list clean
    for (Iterator<RMContainer> i = preempted.keySet().iterator(); i.hasNext();){
      RMContainer id = i.next();
      // garbage collect containers that are irrelevant for preemption
      if (preempted.get(id) + 2 * maxWaitTime < clock.getTime()) {
        i.remove();
      }
    }
  }
[/mw_shl_code]
我们来英文和中文
* This method selects and tracks containers to be preempted. If a container
* is in the target list for more than maxWaitTime it is killed.

container需要被抢占。如果在预设时间内Container没有被释放,则强制收回container。

上面都提到了,maxWaitTime 也就是中文的预设时间,天那,这个预设时间能否配置那?也就是代码里面的maxWaitTime ,是从哪读取到的。他的时间是多少?我们再来看下面代码

https://svn.apache.org/repos/asf/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java

我这里截图

1.png

也就是下面代码
[mw_shl_code=java,true] dispatcher = disp;
    scheduler = (CapacityScheduler) sched;
    maxIgnoredOverCapacity = config.getDouble(MAX_IGNORED_OVER_CAPACITY, 0.1);
    naturalTerminationFactor =
      config.getDouble(NATURAL_TERMINATION_FACTOR, 0.2);
    maxWaitTime = config.getLong(WAIT_TIME_BEFORE_KILL, 15000);
    monitoringInterval = config.getLong(MONITORING_INTERVAL, 3000);
    percentageClusterPreemptionAllowed =
      config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1);
    observeOnly = config.getBoolean(OBSERVE_ONLY, false);
    rc = scheduler.getResourceCalculator();[/mw_shl_code]

源文件
ProportionalCapacityPreemptionPolicy.rar (8.19 KB, 下载次数: 0)
回复

使用道具 举报

desehawk 发表于 2018-6-7 18:55:32
楼主想了解更详细,推荐下面文章
推荐这篇文章
Yarn调度之CapacityScheduler源码分析资源抢占
http://www.aboutyun.com/forum.php?mod=viewthread&tid=24628


回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条