本帖最后由 sstutu 于 2014-3-10 21:15 编辑
大数据库现在如火如荼,但是如此好的事情,咱们却只能望眼欲穿,为什么那,缺这方面的人才,我们知道它好,却不知道如何下手,这里给一些企业一个解决方案,仅供参考。
我们可以带着下面问题来
flume + kafka + storm + mysql 组合在这个系统中各自的作用是什么?
对于 flume + kafka + storm + mysql 这条数据流走通了,只是一个简单的测试例子,但是依据这条数据流可以做的事情很多。
先简单看一下这几个工具的架构吧,架构图会更好说明:
flume的架构图:
kafka的架构图:
storm的架构图:
我们使用的 flume + kafka + storm +mysql的数据流架构图:
下面介绍一下kafka到storm的配置:
其实这些都是通过java代码实现的,这里用到了 KafkaSpout类,RDBMSDumperBolt类(以后这些可以作为工具类打包上传到集群中)
storm作业中,我们写了一个KafkaStormRdbms类,作业具体配置如下:
首先设置连接mysql的参数:
- ArrayList<String> columnNames = new ArrayList<String>();
- ArrayList<String> columnTypes = new ArrayList<String>();
- String tableName = "stormTestTable_01";
- // Note: if the rdbms table need not to have a primary key, set the variable 'primaryKey' to 'N/A'
- // else set its value to the name of the tuple field which is to be treated as primary key
- String primaryKey = "N/A";
- String rdbmsUrl = "jdbc:mysql://$hostname:3306/fuqingwuDB" ;
- String rdbmsUserName = "fuqingwu";
- String rdbmsPassword = "password";
-
- //add the column names and the respective types in the two arraylists
- columnNames.add("word");
-
- //add the types
- columnTypes.add("varchar (100)");
复制代码
配置 KafkaSpout 及 Topology:- TopologyBuilder builder = new TopologyBuilder();
-
- List<String> hosts = new ArrayList<String>();
- hosts.add("hadoop01");
- SpoutConfig spoutConf = SpoutConfig.fromHostStrings(hosts, 1, "flume_kafka", "/root", "id");
- spoutConf.scheme = new StringScheme();
- spoutConf.forceStartOffsetTime(-2);
-
- spoutConf.zkServers = new ArrayList<String>() {{
- add("hadoop01");
- }};
- spoutConf.zkPort = 2181;
-
- //set the spout for the topology
- builder.setSpout("spout", new KafkaSpout(spoutConf), 1);
-
- //dump the stream data into rdbms table
- RDBMSDumperBolt dumperBolt = new RDBMSDumperBolt(primaryKey, tableName, columnNames, columnTypes, rdbmsUrl, rdbmsUserName, rdbmsPassword);
- builder.setBolt("dumperBolt",dumperBolt, 1).shuffleGrouping("spout");
复制代码
|