分享

Twitter Storm的新利器Pluggable Scheduler

xioaxu790 发表于 2014-9-25 19:22:23 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 8726
问题导读
1、Twitter Storm的新特性有哪些?
2、什么是Pluggable Scheduler?
3、让Storm知道我们的Scheduler,配置如何做?






最近把Twitter Storm的新特性:可插拔式的任务分配器(Pluggable Scheduler)给实现了,将在0.8.0版本里面跟大家见面。这篇文章先给大家尝尝鲜,介绍下这个新特性。

在Pluggable Scheduler之前,Twitter Storm里面对于用户提交的每个Topology进行任务分配是由nimbus来做的,nimbus的任务分配算法可是非常牛逼的哦,主要特点如下

在slot充沛的情况下,能够保证所有topology的task被均匀的分配到整个机器的所有机器上
在slot不足的情况下,它会把topology的所有的task分配到仅有的slot上去,这时候其实不是理想状态,所以。。
在nimbus发现有多余slot的时候,它会重新分配topology的task分配到空余的slot上去以达到理想状态。
在没有slot的时候,它什么也不做
一般情况下,用这种默认的task分配机制就已经足够了。但是也会有一些应用场景是默认的task分配机制所搞定不了的,比如

如果你想你的spout分配到固定的机器上去 — 比如你的数据就在那上面
如果你有两个topology都很耗CPU,你不想他们运行在同一台机器上

这些情况下我们默认的task分配机制虽然强大,却是搞不定的,因为它根本就不考虑这些。所以我们设计了新的Pluggable Scheduler机制,使得用户可以编写自己的task分配算法 — Scheduler来实现自己特定的需求。下面我们就来亲自动手来看看怎么才能实现上面提到的默认Scheduler搞不定的第一个场景,为了后面叙述的方便,我们来细化一下这个需求:让我们的名为special-spout的组件分配到名为special-supervisor的supervisor上去

要实现一个Scheduler其实很简单,只要实现IScheduler
  1. public interface IScheduler {
  2.     /**
  3.      * Set assignments for the topologies which needs scheduling. The new assignments is available
  4.      * through <code>cluster.getAssignments()</code>
  5.      *
  6.      *@param topologies all the topologies in the cluster, some of them need schedule. Topologies object here
  7.      *       only contain static information about topologies. Information like assignments, slots are all in
  8.      *       the <code>cluster</code>object.
  9.      *@param cluster the cluster these topologies are running in. <code>cluster</code> contains everything user
  10.      *       need to develop a new scheduling logic. e.g. supervisors information, available slots, current
  11.      *       assignments for all the topologies etc. User can set the new assignment for topologies using
  12.      *       <code>cluster.setAssignmentById</code>
  13.      */
  14.     public void schedule(Topologies topologies, Cluster cluster);
  15. }
复制代码


这个接口会提供两个参数,其中Topologies包含当前集群里面运行的所有Topology的信息:StormTopology对象,配置信息,以及从task到组件(bolt, spout)id的映射信息。Cluster对象则包含了当前集群的所有状态信息:对于系统所有Topology的task分配信息,所有的supervisor信息等等 — 已经足够我们实现上面的那个需求了,让我们动手吧


找出我们的目标Topology

首先我们要确定我们的topology是否已经提交到集群了,很简单,到topologies对象里面找找看,找到了的话就说明已经提交了。
  1. // Gets the topology which we want to schedule
  2. TopologyDetails topology = topologies.getByName("special-topology");
复制代码

只要这个topology不为null的话就说明这个topology已经提交了。


目标Topology是否需要分配

紧接着我们要看看这个topology需不需要进行task分配 — 有可能之前分配过了。怎么弄呢?很简单,Cluster对象已经提供了api可以使用
  1. boolean needsScheduling = cluster.needsScheduling(topology);
复制代码


这里要说明的一点是,有关Scheduler编写的几乎所有api都是定义在Cluster类里面,大家只要把这个类搞熟悉,编写起Scheduler起来应该就得心应手了。如果这个topology需要进行task分配我们还要看下有那些task需要进行分配 — 因为可能有部分task已经被分配过了
  1. // find out all the needs-scheduling components of this topology
  2. Map<String, List<Integer>> componentToTasks = cluster.getNeedsSchedulingComponentToTasks(topology);
复制代码



我们的目标spout是否需要分配?
因为我们的目标是让名为special-spout的组件运行在名为special-supervisor的supervisor上,所以我们要看看这些task里面有没有是属于special-spout的task,很简单,上面返回的componentToTasks就是从component-id到task-ids的一个映射。所以要找出special-spout就很简单了
  1. List<Integer> tasks = componentToTasks.get("special-spout");
复制代码


找出目标supervisor
找到我们要分配的task之后,我们还要把我们的special-supervisor找出来,Cluster同样提供了方便的方法:
  1. // find out the our "special-supervisor" from the supervisor metadata
  2. Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values();
  3. SupervisorDetails specialSupervisor = null;
  4. for (SupervisorDetails supervisor : supervisors) {
  5.     Map meta = (Map) supervisor.getSchedulerMeta();
  6.     if (meta.get("name").equals("special-supervisor")) {
  7.        specialSupervisor = supervisor;
  8.        break;
  9.     }
  10. }
复制代码


这里要特别说明一下Map meta = (Map) supervisor.getSchedulerMeta();, 我们前面说名为special-supervisor的supevisor,其实在storm里面supervisor是没有名字的,这里我们所谓的名字是从supervisor.getSchedulerMeta里面找出来的,这个schedulerMeta是supervisor上面配置的给scheduler使用的一些meta信息,你可以配置任意信息!比如在这个例子里面,我在storm.yaml里面配置了:
  1. supervisor.scheduler.meta:
  2.   name: "special-supervisor"
复制代码


这样我们才能用meta.get("name").equals("special-supervisor")找到我们的special-supervisor到这里我们就找到了我们的special-supervisor,但是要记住一点的是,我们的集群里面有很多topology,这个supervisor的slot很可能已经被别的topology占用掉了。所以我们要检查下有没有slot了
  1. List<WorkerSlot> availableSlots = cluster.getAvailableSlots(specialSupervisor);
复制代码


判断上面的availableSlots是不是空就知道有没有空余的slot了,如果没有slot了怎么办?没别的topology占用掉了怎么办?很简单!把它赶走
  1. // if there is no available slots on this supervisor, free some.
  2. if (availableSlots.isEmpty() && !tasks.isEmpty()) {
  3.     for (Integer task : specialSupervisor.getAllPorts()) {
  4.         cluster.freeSlot(new WorkerSlot(specialSupervisor.getId(), task));
  5.     }
  6. }
复制代码



最后一步:分配
到这里为止呢,我们要分配的tasks已经有了,要分配到的slot也搞定了,剩下的就分配下就好了(注意,这里因为为了保持例子简单,代码做了简化)
  1. // re-get the aviableSlots
  2. availableSlots = cluster.getAvailableSlots(specialSupervisor);
  3. // since it is just a demo, to keep things simple, we assign all the
  4. // tasks into one slot.
  5. cluster.assign(availableSlots.get(0), topology.getId(), tasks);
复制代码


我们的目标实现了! 随着cluster.assign的调用,我们已经把我们的special-spout分配到special-supervisor上去了。不难吧 :)

别的任务谁来分配?
不过有件事情别忘了,我们只给special-spout分配了task, 别的task谁来分配啊?你可能会说我不关心啊,没关系,把这个交给系统默认的分配器吧:我们已经把系统的默认分配器包装到backtype.storm.scheduler.EvenScheduler里面去了,所以你简单调用下就好了
  1. new backtype.storm.scheduler.EvenScheduler().schedule(topologies, cluster);
复制代码



让Storm知道我们的Scheduler
哦,有一件事情忘记说了,我们完成了我们的自定义Scheduler,怎么让storm知道并且使用我们的Scheduler呢?两件事情:

把包含这个Scheduler的jar包放到$STORM_HOME/lib下面去
在storm.yaml 里面作如下配置:

  1. storm.scheduler: "storm.DemoScheduler"
复制代码

这样Storm在做任务分配的时候就会用你的storm.DemoScheduler, 而不会使用默认的系统Scheduler

已有(1)人评论

跳转到指定楼层
jlon 发表于 2016-11-4 16:35:18
有一个地方不明白,在这里例子里,你分配的是task,我在别处看到过直接分配Executors。你在第0个slot上分配了所有的task。假如我的task有100个,那是不是这100个task都在这个slot上执行,那么我如何把这些task均匀的分配到我指定的supervisor上呢?还有能解释下这段代码吗?
SchedulerAssignment currentAssignment = cluster.getAssignmentById(topology.getId()); 这个的方法获取到的是什么?
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条