分享

Spark Streaming 结合FlumeNG使用实例


问题导读:
1.flume-ng与spark如何结合?
2.spark streaming如何与多种数据源结合?









SparkStreaming是一个对实时数据流进行高通量、容错处理的流式处理系统,可以对多种数据源(如Kdfka、Flume、Twitter、Zero和TCP 套接字)进行类似map、reduce、join、window等复杂操作,并将结果保存到外部文件系统、数据库或应用到实时仪表盘。

2-1.png


Spark Streaming流式处理系统特点有:
  • 将流式计算分解成一系列短小的批处理作业
  • 将失败或者执行较慢的任务在其它节点上并行执行
  • 较强的容错能力(基于RDD继承关系Lineage)
  • 使用和RDD一样的语义

本文将Spark Streaming结合FlumeNG,然后以源码中的JavaFlumeEventCount作参考,建立maven工程,打包在spark standalone集群运行。


一、步骤

1.建立maven工程,写好pom.xml
需要spark streaming的flume插件包,jar的maven地址如下,填入pom.xml中
  1. 1 <dependency>
  2. 2     <groupId>org.apache.spark</groupId>
  3. 3     <artifactId>spark-streaming-flume_2.10</artifactId>
  4. 4     <version>1.1.0</version>
  5. 5 </dependency>
复制代码


完整的pom.xml
  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2.     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3.     <modelVersion>4.0.0</modelVersion>
  4.     <groupId>test</groupId>
  5.     <artifactId>hq</artifactId>
  6.     <version>0.0.1-SNAPSHOT</version>
  7.     <build>
  8.     <plugins>
  9.             <plugin>
  10.                 <groupId>org.apache.maven.plugins</groupId>
  11.                 <artifactId>maven-compiler-plugin</artifactId>
  12.                 <version>2.3.2</version>
  13.                 <configuration>
  14.                     <source>1.6</source>
  15.                     <target>1.6</target>
  16.                     <compilerVersion>1.6</compilerVersion>
  17.                     <encoding>UTF-8</encoding>
  18.                 </configuration>
  19.             </plugin>
  20.             <plugin>
  21.                 <groupId>org.apache.maven.plugins</groupId>
  22.                 <artifactId>maven-jar-plugin</artifactId>
  23.                 <version>2.3.2</version>
  24.                 <configuration>
  25.                     <archive>
  26.                         <manifest>
  27.                             <addClasspath>true</addClasspath>
  28.                             <classpathPrefix>.</classpathPrefix>
  29.                             <mainClass>JavaFlumeEventCount</mainClass>
  30.                         </manifest>
  31.                     </archive>
  32.                 </configuration>
  33.             </plugin>
  34.             <plugin>
  35.                 <groupId>org.apache.maven.plugins</groupId>
  36.                 <artifactId>maven-assembly-plugin</artifactId>
  37.                 <version>2.4</version>
  38.                 <configuration>
  39.                   <descriptorRefs>
  40.                     <descriptorRef>jar-with-dependencies</descriptorRef>
  41.                   </descriptorRefs>
  42.                 </configuration>
  43.             </plugin>
  44.         </plugins>
  45.     </build>
  46.     <dependencies>
  47.         <dependency>
  48.             <groupId>org.apache.spark</groupId>
  49.             <artifactId>spark-streaming-flume_2.10</artifactId>
  50.             <version>1.1.0</version>
  51.         </dependency>
  52.     </dependencies>
  53. </project>
复制代码



2.编码并且打包

JavaCode:


  1. import org.apache.spark.SparkConf;
  2. import org.apache.spark.api.java.function.Function;
  3. import org.apache.spark.streaming.*;
  4. import org.apache.spark.streaming.api.java.*;
  5. import org.apache.spark.streaming.flume.FlumeUtils;
  6. import org.apache.spark.streaming.flume.SparkFlumeEvent;
  7. public final class JavaFlumeEventCount {
  8.     private JavaFlumeEventCount() {
  9.     }
  10.     public static void main(String[] args) {
  11.         String host = args[0];
  12.         int port = Integer.parseInt(args[1]);
  13.         Duration batchInterval = new Duration(Integer.parseInt(args[2]));
  14.         SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
  15.         JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
  16.                 batchInterval);
  17.         JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils
  18.                 .createStream(ssc, host, port);
  19.         flumeStream.count();
  20.         flumeStream.count().map(new Function<Long, String>() {
  21.             private static final long serialVersionUID = -572435064083746235L;
  22.             public String call(Long in) {
  23.                 return "Received " + in + " flume events.";
  24.             }
  25.         }).print();
  26.         ssc.start();
  27.         ssc.awaitTermination();
  28.     }
  29. }
复制代码


maven 命令:eclipse中run as -> Maven Assembly:assembly
得到工程的target目录下得到jar包:hq-0.0.1-SNAPSHOT.jar


3.将3个jar包上传到服务器,准备运行
除了自身打的jar包外,运行还需要:spark-streaming-flume_2.10-1.1.0.jar,flume-ng-sdk-1.4.0.jar 这2个jar包(我使用的flume-ng版本是1.4.0)
将3个jar包上传到服务器~/spark/test/目录下。


4.命令行提交任务,运行
[ebupt@eb174 test]$ spark-submit --master spark://eb174:7077 --name FlumeStreaming --class JavaFlumeEventCount --executor-memory 1G --total-executor-cores 2 --jars spark-streaming-flume_2.10-1.1.0.jar,flume-ng-sdk-1.4.0.jar hq.jar eb174 11000 5000
注意:参数解释:spark-submit --help。自己可以根据需要修改内存,防止OOM。另外jars可以同时加载多个jar包,逗号分隔。指定的运行类后需要指定3个参数。


5.开启flume-ng,启动数据源
书写好flume的agent配置文件spark-flumeng.conf,内容如下:
  1. 1 #Agent5
  2. 2 #List the sources, sinks and channels for the agent
  3. 3 agent5.sources =  source1
  4. 4 agent5.sinks =  hdfs01
  5. 5 agent5.channels = channel1
  6. 6
  7. 7 #set channel for sources and sinks
  8. 8 agent5.sources.source1.channels = channel1
  9. 9 agent5.sinks.hdfs01.channel = channel1
  10. 10
  11. 11 #properties of someone source
  12. 12 agent5.sources.source1.type = spooldir
  13. 13 agent5.sources.source1.spoolDir = /home/hadoop/huangq/spark-flumeng-data/
  14. 14 agent5.sources.source1.ignorePattern = .*(\\.index|\\.tmp|\\.xml)$
  15. 15 agent5.sources.source1.fileSuffix = .1
  16. 16 agent5.sources.source1.fileHeader = true
  17. 17 agent5.sources.source1.fileHeaderKey = filename
  18. 18
  19. 19 # set interceptors
  20. 20 agent5.sources.source1.interceptors = i1 i2
  21. 21 agent5.sources.source1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
  22. 22 agent5.sources.source1.interceptors.i1.preserveExisting = false
  23. 23 agent5.sources.source1.interceptors.i1.hostHeader = hostname
  24. 24 agent5.sources.source1.interceptors.i1.useIP=false
  25. 25 agent5.sources.source1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
  26. 26
  27. 27 #properties of mem-channel-1
  28. 28 agent5.channels.channel1.type = memory
  29. 29 agent5.channels.channel1.capacity = 100000
  30. 30 agent5.channels.channel1.transactionCapacity = 100000
  31. 31 agent5.channels.channel1.keep-alive = 30
  32. 32
  33. 33 #properties of sink
  34. 34 agent5.sinks.hdfs01.type = avro
  35. 35 agent5.sinks.hdfs01.hostname = eb174
  36. 36 agent5.sinks.hdfs01.port = 11000
复制代码



启动flume-ng: [hadoop@eb170 flume]$ bin/flume-ng agent -n agent5 -c conf  -f conf/spark-flumeng.conf

注意:
①flume的sink要用avro,指定要发送到的spark集群中的一个节点,我们这里是eb174:11000。
②如果没有指定Flume的sdk包,会出现错误: java.lang.NoClassDefFoundError: Lorg/apache/flume/source/avro/AvroFlumeEvent;没有找到类。这个类在flume的sdk包内,在jars参数中指定jar包位置就可以。
③将自己定义的运行jar包单独列出,不要放在jars参数指定,否则也会有错误抛出。


6.运行结果
在提交spark任务的客户端可以看到,看到大量的输出信息,然后可以看到有数据的RDD会统计出这个RDD有多少行,统计结果如下:
  1. 1 Spark assembly has been built with Hive, including Datanucleus jars on classpath
  2. 2 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
  3. 3 14/10/13 19:00:44 INFO SecurityManager: Changing view acls to: ebupt,
  4. 4 14/10/13 19:00:44 INFO SecurityManager: Changing modify acls to: ebupt,
  5. 5 14/10/13 19:00:44 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(ebupt, ); users with modify permissions: Set(ebupt, )
  6. 6 14/10/13 19:00:45 INFO Slf4jLogger: Slf4jLogger started
  7. 7 14/10/13 19:00:45 INFO Remoting: Starting remoting
  8. 8 14/10/13 19:00:45 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@eb174:51147]
  9. 9 14/10/13 19:00:45 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@eb174:51147]
  10. 10 14/10/13 19:00:45 INFO Utils: Successfully started service 'sparkDriver' on port 51147.
  11. 11 14/10/13 19:00:45 INFO SparkEnv: Registering MapOutputTracker
  12. 12 14/10/13 19:00:45 INFO SparkEnv: Registering BlockManagerMaster
  13. 13 ....
  14. 14 .....
  15. 15 14/10/13 19:09:21 INFO DAGScheduler: Missing parents: List()
  16. 16 14/10/13 19:09:21 INFO DAGScheduler: Submitting Stage 145 (MappedRDD[291] at map at MappedDStream.scala:35), which has no missing parents
  17. 17 14/10/13 19:09:21 INFO MemoryStore: ensureFreeSpace(3400) called with curMem=13047, maxMem=278302556
  18. 18 14/10/13 19:09:21 INFO MemoryStore: Block broadcast_110 stored as values in memory (estimated size 3.3 KB, free 265.4 MB)
  19. 19 14/10/13 19:09:21 INFO MemoryStore: ensureFreeSpace(2020) called with curMem=16447, maxMem=278302556
  20. 20 14/10/13 19:09:21 INFO MemoryStore: Block broadcast_110_piece0 stored as bytes in memory (estimated size 2020.0 B, free 265.4 MB)
  21. 21 14/10/13 19:09:21 INFO BlockManagerInfo: Added broadcast_110_piece0 in memory on eb174:41187 (size: 2020.0 B, free: 265.4 MB)
  22. 22 14/10/13 19:09:21 INFO BlockManagerMaster: Updated info of block broadcast_110_piece0
  23. 23 14/10/13 19:09:21 INFO DAGScheduler: Submitting 1 missing tasks from Stage 145 (MappedRDD[291] at map at MappedDStream.scala:35)
  24. 24 14/10/13 19:09:21 INFO TaskSchedulerImpl: Adding task set 145.0 with 1 tasks
  25. 25 14/10/13 19:09:21 INFO TaskSetManager: Starting task 0.0 in stage 145.0 (TID 190, eb175, PROCESS_LOCAL, 1132 bytes)
  26. 26 14/10/13 19:09:21 INFO BlockManagerInfo: Added broadcast_110_piece0 in memory on eb175:57696 (size: 2020.0 B, free: 519.6 MB)
  27. 27 14/10/13 19:09:21 INFO TaskSetManager: Finished task 0.0 in stage 145.0 (TID 190) in 25 ms on eb175 (1/1)
  28. 28 14/10/13 19:09:21 INFO DAGScheduler: Stage 145 (take at DStream.scala:608) finished in 0.026 s
  29. 29 14/10/13 19:09:21 INFO TaskSchedulerImpl: Removed TaskSet 145.0, whose tasks have all completed, from pool
  30. 30 14/10/13 19:09:21 INFO SparkContext: Job finished: take at DStream.scala:608, took 0.036589357 s
  31. 31 -------------------------------------------
  32. 32 Time: 1413198560000 ms
  33. 33 -------------------------------------------
  34. 34 Received 35300 flume events.
  35. 35
  36. 36 14/10/13 19:09:55 INFO JobScheduler: Finished job streaming job 1413198595000 ms.0 from job set of time 1413198595000 ms
  37. 37 14/10/13 19:09:55 INFO JobScheduler: Total delay: 0.126 s for time 1413198595000 ms (execution: 0.112 s)
  38. 38 14/10/13 19:09:55 INFO MappedRDD: Removing RDD 339 from persistence list
  39. 39 14/10/13 19:09:55 INFO BlockManager: Removing RDD 339
  40. 40 14/10/13 19:09:55 INFO MappedRDD: Removing RDD 338 from persistence list
  41. 41 14/10/13 19:09:55 INFO BlockManager: Removing RDD 338
  42. 42 14/10/13 19:09:55 INFO MappedRDD: Removing RDD 337 from persistence list
  43. 43 14/10/13 19:09:55 INFO BlockManager: Removing RDD 337
  44. 44 14/10/13 19:09:55 INFO ShuffledRDD: Removing RDD 336 from persistence list
  45. 45 14/10/13 19:09:55 INFO BlockManager: Removing RDD 336
  46. 46 14/10/13 19:09:55 INFO UnionRDD: Removing RDD 335 from persistence list
  47. 47 14/10/13 19:09:55 INFO BlockManager: Removing RDD 335
  48. 48 14/10/13 19:09:55 INFO MappedRDD: Removing RDD 333 from persistence list
  49. 49 14/10/13 19:09:55 INFO BlockManager: Removing RDD 333
  50. 50 14/10/13 19:09:55 INFO BlockRDD: Removing RDD 332 from persistence list
  51. 51 14/10/13 19:09:55 INFO BlockManager: Removing RDD 332
  52. 52 ...
  53. 53 ...
  54. 54 14/10/13 19:10:00 INFO TaskSchedulerImpl: Adding task set 177.0 with 1 tasks
  55. 55 14/10/13 19:10:00 INFO TaskSetManager: Starting task 0.0 in stage 177.0 (TID 215, eb175, PROCESS_LOCAL, 1132 bytes)
  56. 56 14/10/13 19:10:00 INFO BlockManagerInfo: Added broadcast_134_piece0 in memory on eb175:57696 (size: 2021.0 B, free: 530.2 MB)
  57. 57 14/10/13 19:10:00 INFO TaskSetManager: Finished task 0.0 in stage 177.0 (TID 215) in 24 ms on eb175 (1/1)
  58. 58 14/10/13 19:10:00 INFO DAGScheduler: Stage 177 (take at DStream.scala:608) finished in 0.024 s
  59. 59 14/10/13 19:10:00 INFO TaskSchedulerImpl: Removed TaskSet 177.0, whose tasks have all completed, from pool
  60. 60 14/10/13 19:10:00 INFO SparkContext: Job finished: take at DStream.scala:608, took 0.033844743 s
  61. 61 -------------------------------------------
  62. 62 Time: 1413198600000 ms
  63. 63 -------------------------------------------
  64. 64 Received 0 flume events.
复制代码



二、结论

  • flume-ng与spark的结合成功,可根据需要灵活编写相关的类来实现实时处理FlumeNG传输的数据。
  • spark streaming和多种数据源结合,达到实时计算处理的能力。


三、参考资料




引用:http://www.cnblogs.com/byrhuangqiang/p/4022940.html

欢迎加入about云群90371779322273151432264021 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

已有(4)人评论

跳转到指定楼层
pengsuyun 发表于 2014-12-26 08:41:34
回复

使用道具 举报

小南3707 发表于 2014-12-26 08:44:23
赞~                           
回复

使用道具 举报

hbu126 发表于 2014-12-26 09:57:25
thank you very much
回复

使用道具 举报

yinuo2016 发表于 2016-6-12 18:09:42
如果日志是一个个小文件,怎么确保flume读取小文件日志时,每个文件的内容是一个数据流?
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条