about云开发

 找回密码
 立即注册

QQ登录

只需一步,快速开始

扫一扫,访问微社区

查看: 30102|回复: 15

discuz论坛apache日志hadoop大数据分析项目:hive以及hbase是如何入库以及代码实现

[复制链接]
发表于 2014-8-10 12:41:00 | 显示全部楼层 |阅读模式
about云discuz论坛apache日志hadoop大数据分析项目: 数据时如何导入hbase与hive的到了这里项目的基本核心功能已经完成。这里介绍一下hive以及hbase是如何入库以及代码实现。
首先我们将hbase与hive整合,详细参考

about云分析discuz论坛apache日志hadoop大数据项目:hive与hbase是如何整合使用的


about云分析discuz论坛apache日志hadoop大数据项目:hive与hbase是如何整合使用的


整合完毕,我们就可以通过mapreduce把数据导入hbase,当然在导入hbase的同时,hive数据同时也可以查询出结果。

那么我们是如何导入hbase的,思路前面已经介绍,这里采用的是hbase put。以后的版本中,我们将采用多种方法来实现此功能包括hive分区、hbase后面如果遇到问题,我们可能还会重构。

开发环境介绍:
1.Eclipse
2.Hadoop2.2
3.hbase-0.98.3-hadoop2



思路:
在导入hbase的过程中,我们直接使用了mapreduce中的map函数,reduce在这里对我们没有太大的用处,我们这里借助的是mapreduce的分布式,提高查询效率。

mapreduce中map函数主要实现了哪些功能
1.清洗数据

通过
  1. public static void StringResolves(String line, Context context)
复制代码

函数实现

2.数据的导入
通过public static void addData(String rowKey, String tableName,        String[] column1, String[] value1, Context context)
函数实现



下面贴上代码:
HbaseMain.java代码

  1. package www.aboutyun.com;
  2. import java.io.IOException;

  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.LongWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  9. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  10. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

  11. public class HbaseMain {

  12.         static final String INPUT_PATH = "hdfs://master:8020/test.txt";
  13.         static final String OUT_PATH = "hdfs://master:8020/Output";

  14.         public static void main(String[] args) throws IOException,
  15.                         InterruptedException, ClassNotFoundException {

  16.                 // 主类
  17.                 Configuration conf = new Configuration();
  18.                 Job job = Job.getInstance(conf, HbaseMain.class.getSimpleName());
  19.                 job.setJarByClass(HbaseMain.class);
  20.                 // 寻找输入
  21.                 FileInputFormat.setInputPaths(job, INPUT_PATH);
  22.                 // 1.2对输入数据进行格式化处理的类
  23.                 job.setInputFormatClass(TextInputFormat.class);
  24.                 job.setMapperClass(HbaseMap.class);
  25.                 // 1.2指定map输出类型<key,value>类型
  26.                 job.setMapOutputKeyClass(Text.class);
  27.                 job.setMapOutputValueClass(LongWritable.class);
  28.                 job.setNumReduceTasks(0);
  29.                 // 指定输出路径
  30.                 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
  31.                
  32.                 job.waitForCompletion(true);

  33.         }
  34. }
复制代码

HbaseMap.java代码

  1. package www.aboutyun.com;

  2. import java.io.IOException;
  3. import java.text.DateFormat;
  4. import java.text.ParseException;
  5. import java.text.SimpleDateFormat;
  6. import java.util.Date;
  7. import java.util.Locale;
  8. import java.util.Random;

  9. import org.apache.hadoop.conf.Configuration;
  10. import org.apache.hadoop.hbase.HBaseConfiguration;
  11. import org.apache.hadoop.hbase.HColumnDescriptor;
  12. import org.apache.hadoop.hbase.client.HTable;
  13. import org.apache.hadoop.hbase.client.Put;
  14. import org.apache.hadoop.hbase.util.Bytes;
  15. import org.apache.hadoop.io.IntWritable;
  16. import org.apache.hadoop.io.LongWritable;
  17. import org.apache.hadoop.io.Text;
  18. import org.apache.hadoop.mapreduce.Mapper;
  19. import org.apache.hadoop.mapreduce.Mapper.Context;
  20. import org.apache.commons.logging.Log;
  21. import org.apache.commons.logging.LogFactory;

  22. public class HbaseMap extends Mapper<LongWritable, Text, Text, IntWritable> {
  23.         private static Configuration conf = null;
  24.         /**
  25.          * 初始化配置
  26.          */

  27.         static {
  28.                 conf = HBaseConfiguration.create();
  29.                 conf.set("hbase.zookeeper.quorum", "master");// 使用eclipse时必须添加这个,否则无法定位
  30.                 conf.set("hbase.zookeeper.property.clientPort", "2181");
  31.         }

  32.         /**************************************************************************/
  33.         public void map(LongWritable key, Text line, Context context)
  34.                         throws IOException, InterruptedException {

  35.                 try {
  36.                         StringResolves(line.toString(), context);
  37.                 } catch (ParseException e) {
  38.                         // TODO Auto-generated catch block
  39.                         e.printStackTrace();
  40.                 }

  41.         }

  42.         /**************************************************************************/
  43.         // 字符串解析

  44.         public static void StringResolves(String line, Context context)
  45.                         throws ParseException {
  46.                 String ipField, dateField, urlField, browserField;

  47.                 // 获取ip地址
  48.                 ipField = line.split("- -")[0].trim();

  49.                 // 获取时间,并转换格式
  50.                 int getTimeFirst = line.indexOf("[");
  51.                 int getTimeLast = line.indexOf("]");
  52.                 String time = line.substring(getTimeFirst + 1, getTimeLast).trim();
  53.                 Date dt = null;
  54.                 DateFormat df1 = DateFormat.getDateTimeInstance(DateFormat.LONG,
  55.                                 DateFormat.LONG);
  56.                 dt = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z", Locale.US)
  57.                                 .parse(time);
  58.                 dateField = df1.format(dt);
  59.                 SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHMM");
  60.                 String dateField1 = sdf.format(dt);
  61.                 // 获取url
  62.                 String[] getUrl = line.split("\"");

  63.                 String firtGeturl = getUrl[1].substring(3).trim();

  64.                 String secondGeturl = getUrl[3].trim();
  65.                 urlField = firtGeturl + "分隔符" + secondGeturl;

  66.                 // 获取浏览器
  67.                 String[] getBrowse = line.split("\"");
  68.                 String strBrowse = getBrowse[5].toString();
  69.                 String str = "(KHTML, like Gecko)";
  70.                 int i = strBrowse.indexOf(str);
  71.                 strBrowse = strBrowse.substring(i);
  72.                 String strBrowse1[] = strBrowse.split("\\/");
  73.                 strBrowse = strBrowse1[0].toString();
  74.                 String strBrowse2[] = strBrowse.split("\\)");
  75.                 browserField = strBrowse2[1].trim();

  76.                 // 添加到数据库

  77.                 String rowKey = ipField + dateField1 + urlField
  78.                                 + new Random().nextInt();
  79.                 String[] cols = new String[] { "IpAddress", "AccressTime", "Url",
  80.                                 "UserBrowser", };
  81.                 String[] colsValue = new String[] { ipField, dateField, urlField,
  82.                                 browserField };

  83.                 try {
  84.                         addData(rowKey, "LogTable", cols, colsValue, context);
  85.                         context.write(new Text("1"), new IntWritable(1));

  86.                 } catch (IOException | InterruptedException e) {
  87.                         // TODO Auto-generated catch block
  88.                         e.printStackTrace();
  89.                 }
  90.         }

  91.         /*
  92.          * 为表添加数据(适合知道有多少列族的固定表)
  93.          *
  94.          * @rowKey rowKey
  95.          *
  96.          * @tableName 表名
  97.          *
  98.          * @column1 第一个列族列表
  99.          *
  100.          * @value1 第一个列的值的列表
  101.          */
  102.         public static void addData(String rowKey, String tableName,
  103.                         String[] column1, String[] value1, Context context)
  104.                         throws IOException {

  105.                 Put put = new Put(Bytes.toBytes(rowKey));// 设置rowkey
  106.                 HTable table = new HTable(conf, Bytes.toBytes(tableName));// HTabel负责跟记录相关的操作如增删改查等//
  107.                                                                                                                                         // 获取表
  108.                 HColumnDescriptor[] columnFamilies = table.getTableDescriptor() // 获取所有的列族
  109.                                 .getColumnFamilies();

  110.                 for (int i = 0; i < columnFamilies.length; i++) {
  111.                         String familyName = columnFamilies[i].getNameAsString(); // 获取列族名
  112.                         if (familyName.equals("Info")) { // info列族put数据
  113.                                 for (int j = 0; j < column1.length; j++) {
  114.                                         put.add(Bytes.toBytes(familyName),
  115.                                                         Bytes.toBytes(column1[j]), Bytes.toBytes(value1[j]));
  116.                                 }
  117.                         }

  118.                 }
  119.                 table.put(put);
  120.                 // context.write(new Text(rowKey), null);
  121.                 System.out.println("add data Success!");
  122.         }

  123. }
复制代码

后面我们将会不断完善此功能。

上面的一些准备工作,就不要说了,这里展现一下运行后的效果:
hive效果图

hive.png


Hbase效果图
hbase.png


这样就达到了效果。后面我们使用hive统计,然后通过将统计结果展示,项目基本完成,后面就不断完善即可。
上文中test.txt数据
   static final String INPUT_PATH = "hdfs://master:8020/test.txt";
test.zip (1.04 KB, 下载次数: 74, 售价: 1 云币)

35

主题

11

听众

1

收听

高级会员

Rank: 4

积分
2657
发表于 2014-8-11 09:39:01 | 显示全部楼层
好文章,是干货,希望能给大家学习带来帮助。

1

主题

13

听众

16

收听

中级会员

Rank: 3Rank: 3

积分
513
发表于 2014-11-5 17:52:19 | 显示全部楼层
下载下来后,发现Job job = Job.getInstance(conf, HbaseMain.class.getSimpleName());这句有问题,我查看API没有发现Job有getInstance方法

1

主题

13

听众

16

收听

中级会员

Rank: 3Rank: 3

积分
513
发表于 2014-11-5 18:02:27 | 显示全部楼层
我自己找到问题了,版本不同,2.2有

1

主题

13

听众

16

收听

中级会员

Rank: 3Rank: 3

积分
513
发表于 2014-11-6 16:14:11 | 显示全部楼层
在window下测试上面的代码一直报:Error: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.HBaseConfiguration;本地开发环境Hbase包已引入,集群环境在hadoop下的lib下建了个lib_hbase文件夹来存放HBase的包,还是报上面的错误
大神给看看.

1

主题

3

听众

1

收听

中级会员

Rank: 3Rank: 3

积分
421
发表于 2014-12-17 12:27:41 | 显示全部楼层
为什么我测试后Hive中的数据中文乱码了?

hive中文乱码

hive中文乱码


 楼主| 发表于 2014-12-17 12:31:23 | 显示全部楼层
admln 发表于 2014-12-17 12:27
为什么我测试后Hive中的数据中文乱码了?
检测自己的程序编码

1

主题

3

听众

1

收听

中级会员

Rank: 3Rank: 3

积分
421
发表于 2014-12-30 18:29:10 | 显示全部楼层
pig2 发表于 2014-12-17 12:31
检测自己的程序编码

QQ截图20141230182722.png
hbase 中的数据是一样的,但是用hive查就是了乱码,hdfs上文件的编码,程序的编码,都改过了。
QQ截图20141230182903.png

1

主题

3

听众

1

收听

中级会员

Rank: 3Rank: 3

积分
421
发表于 2014-12-30 18:40:52 | 显示全部楼层
admln 发表于 2014-12-30 18:29
hbase 中的数据是一样的,但是用hive查就是了乱码,hdfs上文件的编码,程序的编码,都改过了。

不好意思,问了这么傻的问题,是secureCRT的编码问题。
QQ截图20141230184046.png

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条

QQ|小黑屋|about云开发-学问论坛|社区 ( 京ICP备12023829号

GMT+8, 2018-11-17 02:22 , Processed in 0.418666 second(s), 37 queries , Gzip On.

Powered by Discuz! X3.2 Licensed

快速回复 返回顶部 返回列表