分享

从源码角度解析SparkStreaming运行流程

问题导读:
1. StreamingContext 如何获取数据?
2. receiverTrackerstart 底层如何实现?
3. receiverExecutorstart 底层如何实现?
4. StreamingContext 如何加载数据?




解决方案:

获取数据

启动了receiverTracker.start去获取数据

jobGenerator启动job去使用数据计算


2017-02-08_122307.jpg

receiverTracker.start

2017-02-08_122451.jpg

在这里的receiverinputStreams其实已经实例化了。在inputDStream的时候已经往ssc.graph中添加了实例

2017-02-08_122707.jpg

实例化ReceiverTrackerActor,它负责RegisterReceiver(注册Receiver)、AddBlock、ReportError(报告错误)、DeregisterReceiver(注销Receiver)等事件的处理。

启动receiverExecutor(实际类是ReceiverLauncher,这名字起得。。),它主要负责启动Receiver,start方法里面调用了startReceivers方法吧。



receiverExecutor.start()

分发receiver到各个work上去,看上面的英文解释
看上面英文注释,看上面英文注释,看上面英文注释,看上面英文注释


2017-02-08_122801.jpg

分发各个receiver,然后启动receiver来接收数据,然后在receiver里面将数据存储在blockManage里面


加载数据

启动jobGenerator。

2017-02-08_122855.jpg

第一次启动会调用startFirstTime()

2017-02-08_122934.jpg

启动一个定时器Timer。定时器每隔一定时间eventActor发送GenerateJobs消息

2017-02-08_123007.jpg

启动定时器。里面是使用的一个线程,然后线程调用上面的传入的方法,(方法的功能就是,向eventActor发送一个GenerateJobs)。那个!是eventActor的一个方法,就是发送

2017-02-08_123048.jpg

下面是线程的方法,循环调用了callback(也就是)上面的向eventActor发消息

2017-02-08_123202.jpg

下面是线程的方法,循环调用了callback(也就是)上面的向eventActor发消息

2017-02-08_123248.jpg

如果事件触发了,就调用下面对应的方法

2017-02-08_123316.jpg

2017-02-08_123343.jpg

1、DStreamGraph生成jobs。
2、从stream那里获取接收到的Block信息。
3、调用submitJobSet方法提交作业。(其实只是作业的完成情况,在 graph.generateJobs(time)里面其实把作业完成了)
4、提交完作业之后,做一个CheckPoint。
先看DStreamGraph是怎么生成的jobs:(不同的DStream调用的outputStrem.generateJob方法不一样。看看DStream里面的union和print等方法。print调用的是ForEachDStream的generateJob。union(UnionDStream)调用的是DStream自带的)


2017-02-08_123440.jpg

union调用的是:大部分是调用的下面的这个:(DStream原生的方法)

2017-02-08_123527.jpg

print调用的是:

2017-02-08_123604.jpg

生成一个RDD。调用重写的compute。我们可以看到下面的
generatedRDDs.get(time).orElse 。如果这个时间time段的rdd已经准备好了。就不用去执行compute了。而是直接取。这个generatedRDDs 是一个hashmap是每个时间段的rdd。
dstream.foreachRDD什么时候会有多个rdd呢?
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()


2017-02-08_123715.jpg

返回头看:

2017-02-08_123749.jpg

总结

到这里就算结束了,最后来个总结吧,图例在下一章补上,这一章只是过程分析:

一个Receiver监控一个ip和port,如果要多机子并行监控的话,肯定每台机子都是一样的,所以这边是一个Receiver的Array,里面是每个Receiver,如果在开始的时候只定义了一个inputstream,那其实是启动一个机子在监控ip和port,如果要多机子并行监控,只能是通过不同的ip或不同的port来创建多个inputstream来实现并行。(源码里面是把Receiver做成一个RDD来分发到不同的机子上的,这个RDD里面的Receiver就是你多个Receiver的集合)

1、可以有多个输入,我们可以通过StreamingContext定义多个输入,比如我们监听多个(host,ip),可以给它们定义各自的处理逻辑和输出,输出方式不仅限于print方法,还可以有别的方法,saveAsTextFiles和saveAsObjectFiles。这块的设计是支持共享StreamingContext的。
(可以从InputDStream源码的 ssc.graph.addInputStream(this))看出,可以有多个输入
所以一个sparkstreaming程序,可以有多个监控输入。只要增加一个生成inputDStream就可以
2、StreamingContext启动了JobScheduler,JobScheduler启动ReceiverTracker和JobGenerator。
(ReceiverTracker用于分发接收器,在work上实时接收数据;JobGenerator启动一个线程(这个线程在driver端)在每隔一段时间就去使用Receiver接收的数据(使用数据的方法是启动一个job,job操作的rdd是从inputDStream的compute里面产生的))
3、ReceiverTracker是通过把Receiver包装成RDD的方式,发送到Executor端运行起来的,Receiver起来之后向ReceiverTracker发送RegisterReceiver消息。
3、Receiver把接收到的数据,通过ReceiverSupervisor保存。
4、ReceiverSupervisorImpl把数据写入到BlockGenerator的一个ArrayBuffer当中。
5、BlockGenerator内部每个一段时间(默认是200毫秒)就把这个ArrayBuffer构造成Block添加到blocksForPushing当中。
6、BlockGenerator的另外一条线程则不断的把加入到blocksForPushing当中的Block写入到BlockManager当中,并向ReceiverTracker发送AddBlock消息。
7、JobGenerator内部有个定时器,定期生成Job,通过DStream的id,把ReceiverTracker接收到的Block信息从BlockManager上抓取下来进行处理,这个间隔时间是我们在实例化StreamingContext的时候传进去的那个时间,在这个例子里面是Seconds(1)。


转自:csdn
作者:Spark的自由牧场

已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条