分享

大数据架构:企业解决方案可以使用flume+kafka+storm+mysql组合

sstutu 发表于 2014-3-10 21:12:51 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 0 35212
本帖最后由 sstutu 于 2014-3-10 21:15 编辑

大数据库现在如火如荼,但是如此好的事情,咱们却只能望眼欲穿,为什么那,缺这方面的人才,我们知道它好,却不知道如何下手,这里给一些企业一个解决方案,仅供参考。
我们可以带着下面问题来
flume + kafka + storm + mysql 组合在这个系统中各自的作用是什么?


对于 flume + kafka + storm + mysql 这条数据流走通了,只是一个简单的测试例子,但是依据这条数据流可以做的事情很多。
先简单看一下这几个工具的架构吧,架构图会更好说明:

flume的架构图:
1.jpg

kafka的架构图:
2.jpg


storm的架构图:
3.png

我们使用的  flume + kafka + storm +mysql的数据流架构图:
4.png

下面介绍一下kafka到storm的配置:
其实这些都是通过java代码实现的,这里用到了 KafkaSpout类,RDBMSDumperBolt类(以后这些可以作为工具类打包上传到集群中)
storm作业中,我们写了一个KafkaStormRdbms类,作业具体配置如下:
首先设置连接mysql的参数:
  1. ArrayList<String> columnNames = new ArrayList<String>();  
  2. ArrayList<String> columnTypes = new ArrayList<String>();  
  3. String tableName = "stormTestTable_01";  
  4. // Note: if the rdbms table need not to have a primary key, set the variable 'primaryKey' to 'N/A'  
  5. // else set its value to the name of the tuple field which is to be treated as primary key  
  6. String primaryKey = "N/A";  
  7. String rdbmsUrl = "jdbc:mysql://$hostname:3306/fuqingwuDB" ;  
  8. String rdbmsUserName = "fuqingwu";  
  9. String rdbmsPassword = "password";  
  10.   
  11. //add the column names and the respective types in the two arraylists  
  12. columnNames.add("word");  
  13.   
  14. //add the types  
  15. columnTypes.add("varchar (100)");  
复制代码
配置 KafkaSpout 及 Topology:
  1. TopologyBuilder builder = new TopologyBuilder();  
  2.          
  3.         List<String> hosts = new ArrayList<String>();  
  4.         hosts.add("hadoop01");  
  5.         SpoutConfig spoutConf = SpoutConfig.fromHostStrings(hosts, 1, "flume_kafka", "/root", "id");  
  6.         spoutConf.scheme = new StringScheme();  
  7.         spoutConf.forceStartOffsetTime(-2);  
  8.          
  9.         spoutConf.zkServers = new ArrayList<String>() {{  
  10.                       add("hadoop01");   
  11.                     }};  
  12.         spoutConf.zkPort = 2181;  
  13.          
  14.         //set the spout for the topology  
  15.         builder.setSpout("spout",  new KafkaSpout(spoutConf), 1);  
  16.   
  17.         //dump the stream data into rdbms table      
  18.         RDBMSDumperBolt dumperBolt = new RDBMSDumperBolt(primaryKey, tableName, columnNames, columnTypes, rdbmsUrl, rdbmsUserName, rdbmsPassword);  
  19.         builder.setBolt("dumperBolt",dumperBolt, 1).shuffleGrouping("spout");  
复制代码



欢迎加入about云群425860289432264021 ,云计算爱好者群,关注about云腾讯认证空间

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条