分享

MapReduce作业的生命周期源码分析之任务分配

sstutu 发表于 2014-10-31 23:05:17 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 4 14099
问题导读
1.作业的生命周期大概分几个阶段,你认为分几个阶段?
2.JobTracker通过什么方法分配任务的?
3.Hadoop中数据本地性有哪三个等级?
4.JobTracker向TaskTracker下达命令,具体有哪些命令?







一个MapReduce作业的生命周期大体分为5个阶段:
1. 作业提交与初始化
2. 任务调度与监控
3. 任务运行环境准备
4. 任务执行
5. 作业完成

当JobTracker收到了来自TaskTracker的心跳后,是如何选择任务的呢?是通过assignTasks方法。下面详细分析该方法。在分析之前,首先提一下Hadoop的调度器调度模型。通常情况下,Hadoop会以队列为单位管理作业和资源。有了队列就产生所谓三级调度模型:调度器依次选择一个队列,队列中的一个作业,作业中的一个任务,最终将任务分配给有空闲slot的TaskTracker。assignTasks的实现也遵循这个模型:

  1. Collection<JobInProgress> jobQueue = jobQueueJobInProgressListener.getJobQueue();
复制代码



   

对于FIFO调度器而言,队列即为对应监听器中使用的作业队列。然后,声明一个列表,用于保存选择的任务:
  1.     // Assigned tasks
  2.     List<Task> assignedTasks = new ArrayList<Task>();
复制代码




接下来,计算队列中正在运行的和等待运行的map和reduce任务的数量:
  1.     // Compute (running + pending) map and reduce task numbers across pool
  2.     int remainingReduceLoad = 0;
  3.     int remainingMapLoad = 0;
  4.     synchronized (jobQueue) {
  5.       for (JobInProgress job : jobQueue) {
  6.         if (job.getStatus().getRunState() == JobStatus.RUNNING) {
  7.           remainingMapLoad += (job.desiredMaps() - job.finishedMaps());
  8.           if (job.scheduleReduces()) {
  9.             remainingReduceLoad +=
  10.               (job.desiredReduces() - job.finishedReduces());
  11.           }
  12.         }
  13.       }
  14.     }
复制代码




其中,job.scheduleReduces方法判断当前map任务的总体进度是否满足reduce任务开始调度的条件,map任务完成的比例是否超过变量mapred.reduce.slowstart.completed.maps的值,若超过则计算reduce任务的剩余任务数。接下来,计算map和reduce任务的负载因子:
  1.     // Compute the 'load factor' for maps and reduces
  2.     double mapLoadFactor = 0.0;
  3.     if (clusterMapCapacity > 0) {
  4.       mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity;
  5.     }
  6.     double reduceLoadFactor = 0.0;
  7.     if (clusterReduceCapacity > 0) {
  8.       reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity;
  9.     }
复制代码




map任务负载因子定义为当前剩余的(正在执行的和等待开始的)map任务的总数与集群总的map资源数(map slot数目)的商值。reduce任务负载因子同理。计算负载因子的目的是根据TaskTracker的负载情况和集群总的负载情况将所有任务均衡地调度到各个TaskTracker以便均衡地使用各个结点上的资源。根据这种思想,可以计算出某个TaskTracker当前可用的slot数目:
  1.   final int trackerCurrentMapCapacity =
  2.       Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity),
  3.                               trackerMapCapacity);
  4.     int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps;
  5.     boolean exceededMapPadding = false;
  6.     if (availableMapSlots > 0) {
  7.       exceededMapPadding =
  8.         exceededPadding(true, clusterStatus, trackerMapCapacity);
  9.     }
复制代码




由此可见,可用slot定义为:根据集群总体负载均衡还有多少slot应该可用的数目减去实际已经在用的slot数目。注意,exceededMapPadding表示是否有足够的slot预留给推测执行的任务。所谓推测执行,是Hadoop为了防止某些任务执行过慢,为一些较慢任务启动一个备份任务,让该任务做相同的事情,并最终选用最先成功运行完成的任务计算结果为最终结果。推测执行机制日后关注。下面就是任务选择过程:
  1.   int numLocalMaps = 0;
  2.     int numNonLocalMaps = 0;
  3.     scheduleMaps:
  4.     for (int i=0; i < availableMapSlots; ++i) {
  5.       synchronized (jobQueue) {
  6.         for (JobInProgress job : jobQueue) {
  7.           if (job.getStatus().getRunState() != JobStatus.RUNNING) {
  8.             continue;
  9.           }
  10.           Task t = null;
  11.           // Try to schedule a node-local or rack-local Map task
  12.           t =
  13.             job.obtainNewNodeOrRackLocalMapTask(taskTrackerStatus,
  14.                 numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts());
  15.           if (t != null) {
  16.             assignedTasks.add(t);
  17.             ++numLocalMaps;
  18.             // Don't assign map tasks to the hilt!
  19.             // Leave some free slots in the cluster for future task-failures,
  20.             // speculative tasks etc. beyond the highest priority job
  21.             if (exceededMapPadding) {
  22.               break scheduleMaps;
  23.             }
  24.             // Try all jobs again for the next Map task
  25.             break;
  26.           }
  27.           // Try to schedule a node-local or rack-local Map task
  28.           t =
  29.             job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers,
  30.                                    taskTrackerManager.getNumberOfUniqueHosts());
  31.           if (t != null) {
  32.             assignedTasks.add(t);
  33.             ++numNonLocalMaps;
  34.             
  35.             // We assign at most 1 off-switch or speculative task
  36.             // This is to prevent TaskTrackers from stealing local-tasks
  37.             // from other TaskTrackers.
  38.             break scheduleMaps;
  39.           }
  40.         }
  41.       }
  42.     }
  43.     int assignedMaps = assignedTasks.size();
复制代码




对于某个空闲的slot,从队列中选择一个正在执行的作业,并调用obtainNewNodeOrRackLocalMapTask方法获得一个具有数据本地性地任务。若找到了这样的任务,将其放入结果列表中,并检查刚才获得的exceedingMapPadding的值。若不满足,则跳出最外层循环,重新为每个slot分配任务,以期有新的空闲slot产生,从而满足推测执行的需求。当找到一个数据本地性任务后,马上跳出对队列的遍历,为下一个slot分配任务。

若没有找到具有数据本地性的任务,就调用obtainNewNonLocalMapTask方法获取一个非本地性的任务。如果找到了这样的任务,就将其放入结果列表中,然后跳出最外层循环,重新为每个slot分配任务。也就是说,一旦找到了一个非本地性任务,那么不能再继续获取任务,防止对于其他slot来说具有本地性地任务被抢夺。

这里解释一下数据本地性。在分布式环境中,为了减少任务执行过程中的网络传输开销,通常将任务调度到输入数据所在的计算节点,也就是让数据在本地进行计算【1】。Hadoop中数据本地性有三个等级:node-local(同节点),rack-local(同机架)和off-switch(跨机架)。选择任务时即按照上述顺序依次进行。

  1.   int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxLevel,
  2.                                 status.mapProgress());
  3.     if (target == -1) {
  4.       return null;
  5.     }
  6.     Task result = maps[target].getTaskToRun(tts.getTrackerName());
  7.     if (result != null) {
  8.       addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
  9.       resetSchedulingOpportunities();
  10.     }
  11.     return result;
复制代码





其中,findNewMapTask方法的第四个参数指定了获取任务的本地性等级,maxLevel表示最高。在obtainNewNonLocalMapTask方法中则使用的是

NON_LOCAL_CACHE_LEVEL。在findNewMapTask方法中可以看到,运行失败的任务总是被优先选择,让它们能够快速重新执行;然后按照数据本地性选择尚未运行的任务;最后是查找正在运行的任务,为较慢的任务启动备份(推测执行)。有兴趣可以看源码这里不展示了。
对于reduce任务来说选择过程十分类似,只不过reduce任务不涉及数据本地性,因为它的输入来自map任务的输出,来自所有map任务的结点。


  1. synchronized (jobQueue) {
  2.         for (JobInProgress job : jobQueue) {
  3.           if (job.getStatus().getRunState() != JobStatus.RUNNING ||
  4.               job.numReduceTasks == 0) {
  5.             continue;
  6.           }
  7.           Task t =
  8.             job.obtainNewReduceTask(taskTrackerStatus, numTaskTrackers,
  9.                                     taskTrackerManager.getNumberOfUniqueHosts()
  10.                                     );
  11.           if (t != null) {
  12.             assignedTasks.add(t);
  13.             break;
  14.           }         
  15.           // Don't assign reduce tasks to the hilt!
  16.           // Leave some free slots in the cluster for future task-failures,
  17.           // speculative tasks etc. beyond the highest priority job
  18.           if (exceededReducePadding) {
  19.             break;
  20.           }
  21.         }
  22.       }
复制代码




注意,每一次心跳只分配一个reduce任务。
最后,我们关注一下当要执行的任务获得以后,如何返回给TaskTracker,以及JobTracker下达的一些命令。
重新来看心跳方法heartbeat。它的返回值是一个HeartbeatResponse类型,其中有一个重要的字段:


  1. TaskTrackerAction[] actions;
复制代码



这个数组就用于JobTracker向TaskTracker下达命令,包括执行刚刚选择的任务的指令。具体的命令种类有以下五种:

1. ReinitTrackerAction
2. LaunchTaskAction
3. KillTaskAction
4. KillJobAction
5. CommitTaskAction
两种情况下JobTracker会下达ReinitTrackerAction命令:丢失上次心跳应答信息或者丢失TaskTracker状态信息。这两种状态为不一致状态。

  1.     short newResponseId = (short)(responseId + 1);
  2.     status.setLastSeen(now);
  3.     if (!processHeartbeat(status, initialContact, now)) {
  4.       if (prevHeartbeatResponse != null) {
  5.         trackerToHeartbeatResponseMap.remove(trackerName);
  6.       }
  7.       return new HeartbeatResponse(newResponseId,
  8.                    new TaskTrackerAction[] {new ReinitTrackerAction()});
  9.     }
复制代码



LaunchTaskAction命令即包含了需要执行的任务。JobTracker在选择任务时首先选择的是辅助型任务,例如job-cleanup task,task-cleanup task和job-setup task。这些任务在调用assignTasks方法之前就已经选择,因此优先级最高。
  1. List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
  2.     if (tasks == null ) {
  3.       tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));
  4.     }
  5.     if (tasks != null) {
  6.       for (Task task : tasks) {
  7.         expireLaunchingTasks.addNewTask(task.getTaskID());
  8.         actions.add(new LaunchTaskAction(task));
  9.       }
  10.     }
复制代码



KillTaskAction封装了需要杀死的任务。杀死的原因可能是任务失败,用户通过kill命令杀死等。KillJobAction封装了待清理的作业。清理的工作主要是删除临时目录。作业完成或失败时都会导致该作业被清理。最后,CommitTaskAction封装了需要提交的任务。Hadoop将一个成功运行完成的Task Attempt(一个任务的多个备份任务)结果文件从临时目录转移到最终目录的过程称为任务提交。后三种命令生成的代码如下:


  1. // Check for tasks to be killed
  2.     List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName);
  3.     if (killTasksList != null) {
  4.       actions.addAll(killTasksList);
  5.     }
  6.      
  7.     // Check for jobs to be killed/cleanedup
  8.     List<TaskTrackerAction> killJobsList = getJobsForCleanup(trackerName);
  9.     if (killJobsList != null) {
  10.       actions.addAll(killJobsList);
  11.     }
  12.     // Check for tasks whose outputs can be saved
  13.     List<TaskTrackerAction> commitTasksList = getTasksToSave(status);
  14.     if (commitTasksList != null) {
  15.       actions.addAll(commitTasksList);
  16.     }
复制代码



至此,任务调度功流程大体框架全部结束,接下来就是任务在TaskTracker上的具体执行过程了。



欢迎加入about云群425860289432264021 ,云计算爱好者群,关注about云腾讯认证空间

已有(4)人评论

跳转到指定楼层
gwgyk 发表于 2014-11-7 21:45:54
hadoop中,
  1. t = job.obtainNewNonLocalMapTask()
复制代码
这个方法的注释是这样的:
  1. // Try to schedule a node-local or rack-local
复制代码
这个注释对不对啊?应该是分配非本地的task吧?
回复

使用道具 举报

sstutu 发表于 2014-11-13 17:13:25
gwgyk 发表于 2014-11-7 21:45
hadoop中,这个方法的注释是这样的:
这个注释对不对啊?应该是分配非本地的task吧?
看的很细致哦,这个应该是非本地

  1. t = job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts());
复制代码



回复

使用道具 举报

xuezhiji 发表于 2016-5-4 16:48:36
不错,学习了
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条